RefactoringRope/CodeAssistServer.py

Wed, 20 Jun 2018 18:52:38 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Wed, 20 Jun 2018 18:52:38 +0200
changeset 269
02bf946efbdc
parent 263
a2fee57f83cf
child 270
efc48da49a75
permissions
-rw-r--r--

CodeAssistServer, RefactoringServer: changed to support the use of Virtual Environments as of eric 18.07.

# -*- coding: utf-8 -*-

# Copyright (c) 2008 - 2018 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the autocompletion interface to rope.
"""

from __future__ import unicode_literals

import os
import sys

from PyQt5.QtCore import pyqtSlot, QCoreApplication, QTimer

from E5Gui.E5Application import e5App
from E5Gui import E5MessageBox

from .JsonServer import JsonServer

import Globals
import Preferences


class CodeAssistServer(JsonServer):
    """
    Class implementing the autocompletion interface to rope.
    """
    def __init__(self, plugin, parent=None):
        """
        Constructor
        
        @param plugin reference to the plugin object
        @type RefactoringRopePlugin
        @param parent parent
        @type QObject
        """
        super(CodeAssistServer, self).__init__(
            "CodeAssistServer", multiplex=True, parent=parent)
        
        self.__plugin = plugin
        self.__ui = parent
        self.__vm = e5App().getObject("ViewManager")
        
        self.__editorLanguageMapping = {}
        self.__clientConfigs = {}
        self.__editors = {}
        
        self.__asyncCompletions = False
        
        self.__documentationViewer = None
        
        # attributes to store the resuls of the client side
        self.__completions = None
        self.__calltips = None
        
        self.__methodMapping = {
            "Config": self.__setConfig,
            "CompletionsResult": self.__processCompletionsResult,
            "CallTipsResult": self.__processCallTipsResult,
            "DocumentationResult": self.__processDocumentationResult,
            
            "ClientException": self.__processClientException,
        }
        
        self.__typeMapping = {
            "staticmethod": self.tr("static method"),
            "classmethod": self.tr("class method"),
            "method": self.tr("method"),
            "function": self.tr("function"),
            "class": self.tr("class"),
            "module": self.tr("module"),
            "package": self.tr("package"),
            "object": self.tr("object"),
            "<unknown>": self.tr("not known"),
        }
        
        # Python 2
        self.__ensureActive("Python2")
        
        # Python 3
        self.__ensureActive("Python3")
    
    def __updateEditorLanguageMapping(self):
        """
        Private method to update the editor language to connection mapping.
        """
        self.__editorLanguageMapping = {}
        for name in self.connectionNames():
            if name == "Python2":
                self.__editorLanguageMapping.update({
                    "Python": "Python2",
                    "Python2": "Python2",
                    "Pygments|Python": "Python2",
                })
            elif name == "Python3":
                self.__editorLanguageMapping.update({
                    "Python3": "Python3",
                    "Pygments|Python 3": "Python3",
                })
    
    def __getConfigs(self):
        """
        Private method to get the configurations of all connected clients.
        """
        for idString in self.connectionNames():
            self.sendJson("getConfig", {}, idString=idString)
    
    def __setConfig(self, params):
        """
        Private method to set the rope client configuration data.
        
        @param params dictionary containing the configuration data
        @type dict
        """
        idString = params["Id"]
        ropeFolder = params["RopeFolderName"]
        
        self.__clientConfigs[idString] = ropeFolder
    
    def __ropeConfigFile(self, idString):
        """
        Private method to get the name of the rope configuration file.
        
        @param idString id for which to get the configuration file
        @type str
        @return name of the rope configuration file
        @rtype str
        """
        configfile = None
        if idString in self.__clientConfigs:
            ropedir = self.__clientConfigs[idString]
            if ropedir:
                configfile = os.path.join(ropedir, "config.py")
                if not os.path.exists(configfile):
                    configfile = None
        return configfile
    
    def __configChanged(self, idString):
        """
        Private slot called, when the rope config file has changed.
        
        @param idString id for which to get the configuration file
        @type str
        """
        self.sendJson("configChanged", {}, idString=idString)
    
    def editConfig(self, idString):
        """
        Public slot to open the rope configuration file in an editor.
        
        @param idString id for which to get the configuration file
        @type str
        """
        configfile = self.__ropeConfigFile(idString)
        if configfile:
            if os.path.exists(configfile):
                from QScintilla.MiniEditor import MiniEditor
                editor = MiniEditor(configfile)
                editor.show()
                editor.editorSaved.connect(
                    lambda: self.__configChanged(idString))
                self.__editors[idString] = editor
                return
        else:
            E5MessageBox.critical(
                self.__ui,
                self.tr("Configure Rope"),
                self.tr("""The Rope configuration file '{0}' does"""
                        """ not exist.""").format(configfile))
    
    def isSupportedLanguage(self, language):
        """
        Public method to check, if the given language is supported.
        
        @param language editor programming language to check
        @type str
        @return flag indicating the support status
        @rtype bool
        """
        return language in self.__editorLanguageMapping
    
    def getCompletions(self, editor, context):
        """
        Public method to calculate the possible completions.
        
        Note: This is the synchronous variant for eric6 before 17.11.
        
        @param editor reference to the editor object, that called this method
        @type QScintilla.Editor.Editor
        @param context flag indicating to autocomplete a context
        @type bool
        @return list of possible completions
        @rtype list of str
        """
        # reset the completions buffer
        self.__completions = None
        
        language = editor.getLanguage()
        if language not in self.__editorLanguageMapping:
            return []
        
        self.requestCompletions(editor, context, "")
        
        # emulate the synchronous behaviour
        timer = QTimer()
        timer.setSingleShot(True)
        timer.start(5000)           # 5s timeout
        while self.__completions is None and timer.isActive():
            QCoreApplication.processEvents()
        
        return [] if self.__completions is None else self.__completions
    
    def requestCompletions(self, editor, context, acText):
        """
        Public method to request a list of possible completions.
        
        Note: This is part of the asynchronous variant for eric6 17.11 and
              later.
        
        @param editor reference to the editor object, that called this method
        @type QScintilla.Editor.Editor
        @param context flag indicating to autocomplete a context
        @type bool
        @param acText text to be completed
        @type str
        """
        language = editor.getLanguage()
        if language not in self.__editorLanguageMapping:
            return
        idString = self.__editorLanguageMapping[language]
        
        filename = editor.getFileName()
        line, index = editor.getCursorPosition()
        source = editor.text()
        offset = len("".join(source.splitlines(True)[:line])) + index
        maxfixes = self.__plugin.getPreferences("MaxFixes")
        
        self.__ensureActive(idString)
        self.sendJson("getCompletions", {
            "FileName": filename,
            "Source": source,
            "Offset": offset,
            "MaxFixes": maxfixes,
            "CompletionText": acText,
            "SysPath": sys.path,
        }, idString=idString)
    
    def __processCompletionsResult(self, result):
        """
        Private method to process the completions sent by the client.
        
        @param result dictionary containing the result sent by the client
        @type dict
        """
        if self.__asyncCompletions:
            # asynchronous variant for eric6 17.11 and later
            if "Error" not in result:
                editor = self.__vm.getOpenEditor(result["FileName"])
                if editor is not None:
                    editor.completionsListReady(result["Completions"],
                                                result["CompletionText"])
        else:
            # synchronous variant for eric6 before 17.11
            if "Error" in result:
                self.__completions = []
            else:
                self.__completions = result["Completions"]
    
    def getCallTips(self, editor, pos, commas):
        """
        Public method to calculate calltips.
        
        @param editor reference to the editor object, that called this method
        @type QScintilla.Editor.Editor
        @param pos position in the text for the calltip
        @type int
        @param commas minimum number of commas contained in the calltip
        @type int
        @return list of possible calltips
        @rtype list of str
        """
        # reset the calltips buffer
        self.__calltips = None
        
        language = editor.getLanguage()
        if language not in self.__editorLanguageMapping:
            return []
        idString = self.__editorLanguageMapping[language]
        
        filename = editor.getFileName()
        source = editor.text()
        line, index = editor.lineIndexFromPosition(pos)
        offset = len("".join(source.splitlines(True)[:line])) + index
        maxfixes = self.__plugin.getPreferences("CalltipsMaxFixes")
        
        self.__ensureActive(idString)
        self.sendJson("getCallTips", {
            "FileName": filename,
            "Source": source,
            "Offset": offset,
            "MaxFixes": maxfixes,
            "SysPath": sys.path,
        }, idString=idString)
        
        # emulate the synchronous behaviour
        timer = QTimer()
        timer.setSingleShot(True)
        timer.start(5000)           # 5s timeout
        while self.__calltips is None and timer.isActive():
            QCoreApplication.processEvents()
        
        return [] if self.__calltips is None else self.__calltips
    
    def __processCallTipsResult(self, result):
        """
        Private method to process the calltips sent by the client.
        
        @param result dictionary containing the result sent by the client
        @type dict
        """
        if "Error" in result:
            self.__calltips = []
        else:
            self.__calltips = result["CallTips"]
    
    def reportChanged(self, filename, oldSource):
        """
        Public slot to report some changed sources.
        
        @param filename file name of the changed source
        @type str
        @param oldSource source code before the change
        @type str
        """
        editor = self.__vm.getOpenEditor(filename)
        if editor is not None:
            language = editor.getLanguage()
            if language in self.__editorLanguageMapping:
                idString = self.__editorLanguageMapping[language]
                
                self.__ensureActive(idString)
                self.sendJson("reportChanged", {
                    "FileName": filename,
                    "OldSource": oldSource,
                }, idString=idString)
    
    def requestCodeDocumentation(self, editor):
        """
        Public method to request source code documentation for the given
        editor.
        
        @param editor reference to the editor to get source code documentation
            for
        @type QScintilla.Editor.Editor
        """
        language = editor.getLanguage()
        if language not in self.__editorLanguageMapping:
            if Preferences.getDocuViewer("ShowInfoAsRichText"):
                warning = self.tr("Language <b>{0}</b> is not supported.")\
                    .format(language)
            else:
                warning = self.tr("Language '{0}' is not supported.")\
                    .format(language)
            self.__documentationViewer.documentationReady(
                warning, isWarning=True)
            return
        
        idString = self.__editorLanguageMapping[language]
        
        filename = editor.getFileName()
        source = editor.text()
        line, index = editor.getCursorPosition()
        offset = len("".join(source.splitlines(True)[:line])) + index
        maxfixes = self.__plugin.getPreferences("CalltipsMaxFixes")
        
        offset = editor.positionBefore(offset)
        if editor.charAt(offset) == "(":
            offset = editor.positionBefore(offset)
        
        self.__ensureActive(idString)
        self.sendJson("getDocumentation", {
            "FileName": filename,
            "Source": source,
            "Offset": offset,
            "MaxFixes": maxfixes,
            "SysPath": sys.path,
        }, idString=idString)
    
    def __processDocumentationResult(self, result):
        """
        Private method to process the documentation sent by the client.
        
        @param result dictionary containing the result sent by the client
        @type dict with keys 'name', 'argspec', 'note', 'docstring', 'typ'
        """
        docu = None
        
        if "Error" not in result:
            documentationDict = result["DocumentationDict"]
            if documentationDict:
                if "module" in documentationDict:
                    if documentationDict["module"]:
                        if Preferences.getDocuViewer("ShowInfoAsRichText"):
                            documentationDict["note"] = \
                                self.tr("Present in <i>{0}</i> module")\
                                .format(documentationDict["module"])
                        else:
                            documentationDict["note"] = \
                                self.tr("Present in '{0}' module")\
                                .format(documentationDict["module"])
                    del documentationDict["module"]
                
                if "typ" in documentationDict:
                    if documentationDict["typ"] not in self.__typeMapping:
                        del documentationDict["typ"]
                    else:
                        documentationDict["typ"] = \
                            self.__typeMapping[documentationDict["typ"]]
                
                if "note" not in documentationDict:
                    documentationDict["note"] = ""
                
                docu = documentationDict
        
        if docu is None:
            msg = self.tr("No documentation available.")
            self.__documentationViewer.documentationReady(
                msg, isDocWarning=True)
        else:
            self.__documentationViewer.documentationReady(docu)
    
    #######################################################################
    ## Methods below handle the network connection
    #######################################################################
    
    def handleCall(self, method, params):
        """
        Public method to handle a method call from the client.
        
        @param method requested method name
        @type str
        @param params dictionary with method specific parameters
        @type dict
        """
        self.__methodMapping[method](params)
    
    def __processClientException(self, params):
        """
        Private method to handle exceptions of the refactoring client.
        
        @param params dictionary containing the exception data
        @type dict
        """
        if params["ExceptionType"] == "ProtocolError":
            self.__ui.appendToStderr(
                self.tr("""The data received from the code assist"""
                        """ server could not be decoded. Please report"""
                        """ this issue with the received data to the"""
                        """ eric bugs email address.\n"""
                        """Error: {0}\n"""
                        """Data: {1}\n""").format(
                    params["ExceptionValue"],
                    params["ProtocolData"]))
        else:
            self.__ui.appendToStderr(
                self.tr("An exception happened in the code assist"
                        " client. Please report it to the eric bugs"
                        " email address.\n"
                        "Exception: {0}\n"
                        "Value: {1}\n"
                        "Traceback: {2}\n").format(
                    params["ExceptionType"],
                    params["ExceptionValue"],
                    params["Traceback"]))
    
    def __startCodeAssistClient(self, interpreter, idString):
        """
        Private method to start the code assist client with the given
        interpreter.
        
        @param interpreter interpreter to be used for the code assist client
        @type str
        @param idString id of the client to be started
        @type str
        @return flag indicating a successful start of the client
        @rtype bool
        """
        ok = False
        
        if interpreter:
            configDir = os.path.join(Globals.getConfigDir(), "rope", idString)
            if not os.path.exists(configDir):
                os.makedirs(configDir)
            
            client = os.path.join(os.path.dirname(__file__),
                                  "CodeAssistClient.py")
            ok = self.startClient(interpreter, client, [configDir],
                                  idString=idString)
            if not ok:
                self.__ui.appendToStderr(self.tr(
                    "'{0}' is not supported because the configured interpreter"
                    " could not be started.\n"
                ).format(idString))
        else:
            self.__ui.appendToStderr(self.tr(
                "'{0}' is not supported because no suitable interpreter is"
                " configured.\n"
            ).format(idString))
        
        return ok
    
    def __ensureActive(self, idString):
        """
        Private method to ensure, that the requested client is active.
        
        A non-active client will be started.
        
        @param idString id of the client to be checked
        @type str
        @return flag indicating an active client
        @rtype bool
        """
        ok = idString in self.connectionNames()
        if not ok:
            # client is not running
            try:
                # new code using virtual environments
                if idString == "Python2":
                    # Python 2
                    venvName = Preferences.getDebugger("Python2VirtualEnv")
                elif idString == "Python3":
                    # Python 3
                    venvName = Preferences.getDebugger("Python3VirtualEnv")
                else:
                    venvName = ""
                if venvName:
                    interpreter = e5App().getObject("VirtualEnvManager")\
                        .getVirtualenvInterpreter(venvName)
            except KeyError:
                # backward compatibility (eric <18.07)
                if idString == "Python2":
                    # Python 2
                    interpreter = Preferences.getDebugger("PythonInterpreter")
                elif idString == "Python3":
                    # Python 3
                    interpreter = Preferences.getDebugger("Python3Interpreter")
                else:
                    interpreter = ""
            if interpreter:
                ok = self.__startCodeAssistClient(interpreter, idString)
        return ok
    
    @pyqtSlot()
    def handleNewConnection(self):
        """
        Public slot for new incoming connections from a client.
        """
        super(CodeAssistServer, self).handleNewConnection()
        
        self.__updateEditorLanguageMapping()
        
        self.__getConfigs()
    
    def activate(self):
        """
        Public method to activate the code assist server.
        """
        try:
            self.__documentationViewer = self.__ui.documentationViewer()
            self.__documentationViewer.registerProvider(
                "rope", self.tr("Rope"), self.requestCodeDocumentation,
                self.isSupportedLanguage)
        except AttributeError:
            # eric6 before 17.11 doesn't have this
            pass
    
    def deactivate(self):
        """
        Public method to deactivate the code assist server.
        """
        """
        Public method to shut down the code assist server.
        """
        if self.__documentationViewer is not None:
            self.__documentationViewer.unregisterProvider("rope")
        
        for idString in self.connectionNames():
            self.sendJson("closeProject", {}, flush=True, idString=idString)
        
        self.stopAllClients()
    
    #######################################################################
    ## Methods below handle setting/unsetting the hook methods
    #######################################################################
    
    def connectEditor(self, editor):
        """
        Public method to connect an editor.
        
        @param editor reference to the editor
        @type QScintilla.Editor.Editor
        """
        if self.isSupportedLanguage(editor.getLanguage()):
            if self.__plugin.getPreferences("CodeAssistEnabled") and \
               editor.getCompletionListHook("rope") is None:
                self.__setAutoCompletionHook(editor)
            if self.__plugin.getPreferences("CodeAssistCalltipsEnabled") and \
               editor.getCallTipHook("rope") is None:
                self.__setCalltipsHook(editor)
        else:
            self.disconnectEditor(editor)
    
    def disconnectEditor(self, editor):
        """
        Public method to disconnect an editor.
        
        @param editor reference to the editor
        @type QScintilla.Editor.Editor
        """
        if editor.getCompletionListHook("rope"):
            self.__unsetAutoCompletionHook(editor)
        if editor.getCallTipHook("rope"):
            self.__unsetCalltipsHook(editor)
    
    def __setAutoCompletionHook(self, editor):
        """
        Private method to set the auto-completion hook.
        
        @param editor reference to the editor
        @type QScintilla.Editor.Editor
        """
        try:
            editor.addCompletionListHook("rope", self.requestCompletions, True)
            self.__asyncCompletions = True
        except TypeError:
            # backward compatibility for eric6 before 17.11
            editor.addCompletionListHook("rope", self.getCompletions)
            self.__asyncCompletions = False
    
    def __unsetAutoCompletionHook(self, editor):
        """
        Private method to unset the auto-completion hook.
        
        @param editor reference to the editor
        @type QScintilla.Editor.Editor
        """
        editor.removeCompletionListHook("rope")
    
    def __setCalltipsHook(self, editor):
        """
        Private method to set the calltip hook.
        
        @param editor reference to the editor
        @type QScintilla.Editor.Editor
        """
        editor.addCallTipHook("rope", self.getCallTips)
    
    def __unsetCalltipsHook(self, editor):
        """
        Private method to unset the calltip hook.
        
        @param editor reference to the editor
        @type QScintilla.Editor.Editor
        """
        editor.removeCallTipHook("rope")

eric ide

mercurial