RefactoringRope/CodeAssistServer.py

Sun, 04 Nov 2018 18:41:51 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sun, 04 Nov 2018 18:41:51 +0100
changeset 287
09afe26b734c
parent 285
247d62c682dc
child 291
da88cb84ae30
permissions
-rw-r--r--

Changes to start the backend clients with a clean PATH setting.

# -*- coding: utf-8 -*-

# Copyright (c) 2008 - 2018 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the autocompletion interface to rope.
"""

from __future__ import unicode_literals

import os
import sys

from PyQt5.QtCore import pyqtSlot, QCoreApplication, QTimer

from E5Gui.E5Application import e5App
from E5Gui import E5MessageBox

from .JsonServer import JsonServer

import Globals
import Preferences
import Utilities


class CodeAssistServer(JsonServer):
    """
    Class implementing the autocompletion interface to rope.
    """
    IdProject = "Project"
    
    def __init__(self, plugin, parent=None):
        """
        Constructor
        
        @param plugin reference to the plugin object
        @type RefactoringRopePlugin
        @param parent parent
        @type QObject
        """
        super(CodeAssistServer, self).__init__(
            "CodeAssistServer", multiplex=True, parent=parent)
        
        self.__plugin = plugin
        self.__ui = parent
        self.__vm = e5App().getObject("ViewManager")
        self.__e5project = e5App().getObject("Project")
        
        self.__editorLanguageMapping = {}
        self.__clientConfigs = {}
        self.__editors = {}
        
        self.__asyncCompletions = False
        
        self.__documentationViewer = None
        
        # attributes to store the resuls of the client side
        self.__completions = None
        self.__calltips = None
        
        self.__methodMapping = {
            "Config": self.__setConfig,
            "CompletionsResult": self.__processCompletionsResult,
            "CallTipsResult": self.__processCallTipsResult,
            "DocumentationResult": self.__processDocumentationResult,
            
            "ClientException": self.__processClientException,
        }
        
        self.__typeMapping = {
            "staticmethod": self.tr("static method"),
            "classmethod": self.tr("class method"),
            "method": self.tr("method"),
            "function": self.tr("function"),
            "class": self.tr("class"),
            "module": self.tr("module"),
            "package": self.tr("package"),
            "object": self.tr("object"),
            "<unknown>": self.tr("not known"),
        }
        
        # Python 2
        self.__ensureActive("Python2")
        
        # Python 3
        self.__ensureActive("Python3")
    
    def __updateEditorLanguageMapping(self):
        """
        Private method to update the editor language to connection mapping.
        """
        self.__editorLanguageMapping = {}
        for name in self.connectionNames():
            if name == "Python2":
                self.__editorLanguageMapping.update({
                    "Python": "Python2",
                    "Python2": "Python2",
                    "Pygments|Python": "Python2",
                })
            elif name == "Python3":
                self.__editorLanguageMapping.update({
                    "Python3": "Python3",
                    "Pygments|Python 3": "Python3",
                })
    
    def isSupportedLanguage(self, language):
        """
        Public method to check, if the given language is supported.
        
        @param language editor programming language to check
        @type str
        @return flag indicating the support status
        @rtype bool
        """
        return language in self.__editorLanguageMapping
    
    def __idString(self, editor):
        """
        Private method to determine the ID string for the back-end.
        
        @param editor reference to the editor to determine the ID string for
        @type QScintilla.Editor
        @return ID string
        @rtype str
        """
        idString = ""
        
        language = editor.getLanguage()
        if self.__e5project.isOpen() and \
           self.__e5project.getProjectLanguage() == language:
            filename = editor.getFileName()
            if self.__e5project.isProjectSource(filename):
                idString = CodeAssistServer.IdProject
        
        if not idString and language in self.__editorLanguageMapping:
            idString = self.__editorLanguageMapping[language]
        
        return idString
    
    def __getConfigs(self):
        """
        Private method to get the configurations of all connected clients.
        """
        for idString in self.connectionNames():
            self.sendJson("getConfig", {}, idString=idString)
    
    def __setConfig(self, params):
        """
        Private method to set the rope client configuration data.
        
        @param params dictionary containing the configuration data
        @type dict
        """
        idString = params["Id"]
        ropeFolder = params["RopeFolderName"]
        
        self.__clientConfigs[idString] = ropeFolder
    
    def __ropeConfigFile(self, idString):
        """
        Private method to get the name of the rope configuration file.
        
        @param idString id for which to get the configuration file
        @type str
        @return name of the rope configuration file
        @rtype str
        """
        configfile = None
        if idString in self.__clientConfigs:
            ropedir = self.__clientConfigs[idString]
            if ropedir:
                configfile = os.path.join(ropedir, "config.py")
                if not os.path.exists(configfile):
                    configfile = None
        return configfile
    
    def __configChanged(self, idString):
        """
        Private slot called, when the rope config file has changed.
        
        @param idString id for which to get the configuration file
        @type str
        """
        self.sendJson("configChanged", {}, idString=idString)
    
    def editConfig(self, idString):
        """
        Public slot to open the rope configuration file in an editor.
        
        @param idString id for which to get the configuration file
        @type str
        """
        configfile = self.__ropeConfigFile(idString)
        if configfile:
            if os.path.exists(configfile):
                from QScintilla.MiniEditor import MiniEditor
                editor = MiniEditor(configfile)
                editor.show()
                editor.editorSaved.connect(
                    lambda: self.__configChanged(idString))
                self.__editors[idString] = editor
                return
        else:
            E5MessageBox.critical(
                self.__ui,
                self.tr("Configure Rope"),
                self.tr("""The Rope configuration file '{0}' does"""
                        """ not exist.""").format(configfile))
    
    def getCompletions(self, editor, context):
        """
        Public method to calculate the possible completions.
        
        Note: This is the synchronous variant for eric6 before 17.11.
        
        @param editor reference to the editor object, that called this method
        @type QScintilla.Editor.Editor
        @param context flag indicating to autocomplete a context
        @type bool
        @return list of possible completions
        @rtype list of str
        """
        # reset the completions buffer
        self.__completions = None
        
        if not self.__idString(editor):
            return []
        
        self.requestCompletions(editor, context, "")
        
        # emulate the synchronous behaviour
        timer = QTimer()
        timer.setSingleShot(True)
        timer.start(5000)           # 5s timeout
        while self.__completions is None and timer.isActive():
            QCoreApplication.processEvents()
        
        return [] if self.__completions is None else self.__completions
    
    def requestCompletions(self, editor, context, acText):
        """
        Public method to request a list of possible completions.
        
        Note: This is part of the asynchronous variant for eric6 17.11 and
              later.
        
        @param editor reference to the editor object, that called this method
        @type QScintilla.Editor.Editor
        @param context flag indicating to autocomplete a context
        @type bool
        @param acText text to be completed
        @type str
        """
        idString = self.__idString(editor)
        if not idString:
            return
        
        filename = editor.getFileName()
        line, index = editor.getCursorPosition()
        source = editor.text()
        offset = len("".join(source.splitlines(True)[:line])) + index
        maxfixes = self.__plugin.getPreferences("MaxFixes")
        
        self.__ensureActive(idString)
        self.sendJson("getCompletions", {
            "FileName": filename,
            "Source": source,
            "Offset": offset,
            "MaxFixes": maxfixes,
            "CompletionText": acText,
            "SysPath": sys.path,
        }, idString=idString)
    
    def __processCompletionsResult(self, result):
        """
        Private method to process the completions sent by the client.
        
        @param result dictionary containing the result sent by the client
        @type dict
        """
        if self.__asyncCompletions:
            # asynchronous variant for eric6 17.11 and later
            if "Error" not in result:
                editor = self.__vm.getOpenEditor(result["FileName"])
                if editor is not None:
                    editor.completionsListReady(result["Completions"],
                                                result["CompletionText"])
        else:
            # synchronous variant for eric6 before 17.11
            if "Error" in result:
                self.__completions = []
            else:
                self.__completions = result["Completions"]
    
    def getCallTips(self, editor, pos, commas):
        """
        Public method to calculate calltips.
        
        @param editor reference to the editor object, that called this method
        @type QScintilla.Editor.Editor
        @param pos position in the text for the calltip
        @type int
        @param commas minimum number of commas contained in the calltip
        @type int
        @return list of possible calltips
        @rtype list of str
        """
        # reset the calltips buffer
        self.__calltips = None
        
        idString = self.__idString(editor)
        if not idString:
            return []
        
        filename = editor.getFileName()
        source = editor.text()
        line, index = editor.lineIndexFromPosition(pos)
        offset = len("".join(source.splitlines(True)[:line])) + index
        maxfixes = self.__plugin.getPreferences("CalltipsMaxFixes")
        
        self.__ensureActive(idString)
        self.sendJson("getCallTips", {
            "FileName": filename,
            "Source": source,
            "Offset": offset,
            "MaxFixes": maxfixes,
            "SysPath": sys.path,
        }, idString=idString)
        
        # emulate the synchronous behaviour
        timer = QTimer()
        timer.setSingleShot(True)
        timer.start(5000)           # 5s timeout
        while self.__calltips is None and timer.isActive():
            QCoreApplication.processEvents()
        
        return [] if self.__calltips is None else self.__calltips
    
    def __processCallTipsResult(self, result):
        """
        Private method to process the calltips sent by the client.
        
        @param result dictionary containing the result sent by the client
        @type dict
        """
        if "Error" in result:
            self.__calltips = []
        else:
            self.__calltips = result["CallTips"]
    
    def reportChanged(self, filename, oldSource):
        """
        Public slot to report some changed sources.
        
        @param filename file name of the changed source
        @type str
        @param oldSource source code before the change
        @type str
        """
        editor = self.__vm.getOpenEditor(filename)
        if editor is not None:
            idString = self.__idString(editor)
            if idString:
                self.__ensureActive(idString)
                self.sendJson("reportChanged", {
                    "FileName": filename,
                    "OldSource": oldSource,
                }, idString=idString)
    
    def requestCodeDocumentation(self, editor):
        """
        Public method to request source code documentation for the given
        editor.
        
        @param editor reference to the editor to get source code documentation
            for
        @type QScintilla.Editor.Editor
        """
        if self.__documentationViewer is None:
            return
        
        language = editor.getLanguage()
        if not self.isSupportedLanguage(language):
            if Preferences.getDocuViewer("ShowInfoAsRichText"):
                warning = self.tr("Language <b>{0}</b> is not supported.")\
                    .format(language)
            else:
                warning = self.tr("Language '{0}' is not supported.")\
                    .format(language)
            self.__documentationViewer.documentationReady(
                warning, isWarning=True)
            return
        
        idString = self.__idString(editor)
        if not idString:
            return
        
        filename = editor.getFileName()
        source = editor.text()
        line, index = editor.getCursorPosition()
        offset = len("".join(source.splitlines(True)[:line])) + index
        maxfixes = self.__plugin.getPreferences("CalltipsMaxFixes")
        
        offset = editor.positionBefore(offset)
        if editor.charAt(offset) == "(":
            offset = editor.positionBefore(offset)
        
        self.__ensureActive(idString)
        self.sendJson("getDocumentation", {
            "FileName": filename,
            "Source": source,
            "Offset": offset,
            "MaxFixes": maxfixes,
            "SysPath": sys.path,
        }, idString=idString)
    
    def __processDocumentationResult(self, result):
        """
        Private method to process the documentation sent by the client.
        
        @param result dictionary containing the result sent by the client
        @type dict with keys 'name', 'argspec', 'note', 'docstring', 'typ'
        """
        if self.__documentationViewer is None:
            return
        
        docu = None
        
        if "Error" not in result:
            documentationDict = result["DocumentationDict"]
            if documentationDict:
                if "module" in documentationDict:
                    if documentationDict["module"]:
                        if Preferences.getDocuViewer("ShowInfoAsRichText"):
                            documentationDict["note"] = \
                                self.tr("Present in <i>{0}</i> module")\
                                .format(documentationDict["module"])
                        else:
                            documentationDict["note"] = \
                                self.tr("Present in '{0}' module")\
                                .format(documentationDict["module"])
                    del documentationDict["module"]
                
                if "typ" in documentationDict:
                    if documentationDict["typ"] not in self.__typeMapping:
                        del documentationDict["typ"]
                    else:
                        documentationDict["typ"] = \
                            self.__typeMapping[documentationDict["typ"]]
                
                if "note" not in documentationDict:
                    documentationDict["note"] = ""
                
                docu = documentationDict
        
        if docu is None:
            msg = self.tr("No documentation available.")
            self.__documentationViewer.documentationReady(
                msg, isDocWarning=True)
        else:
            self.__documentationViewer.documentationReady(docu)
    
    #######################################################################
    ## Methods below handle the network connection
    #######################################################################
    
    def handleCall(self, method, params):
        """
        Public method to handle a method call from the client.
        
        @param method requested method name
        @type str
        @param params dictionary with method specific parameters
        @type dict
        """
        self.__methodMapping[method](params)
    
    def __processClientException(self, params):
        """
        Private method to handle exceptions of the refactoring client.
        
        @param params dictionary containing the exception data
        @type dict
        """
        if params["ExceptionType"] == "ProtocolError":
            self.__ui.appendToStderr(
                self.tr("""The data received from the code assist"""
                        """ server could not be decoded. Please report"""
                        """ this issue with the received data to the"""
                        """ eric bugs email address.\n"""
                        """Error: {0}\n"""
                        """Data: {1}\n""").format(
                    params["ExceptionValue"],
                    params["ProtocolData"]))
        else:
            self.__ui.appendToStderr(
                self.tr("An exception happened in the code assist"
                        " client. Please report it to the eric bugs"
                        " email address.\n"
                        "Exception: {0}\n"
                        "Value: {1}\n"
                        "Traceback: {2}\n").format(
                    params["ExceptionType"],
                    params["ExceptionValue"],
                    params["Traceback"]))
    
    def __startCodeAssistClient(self, interpreter, idString, clientEnv):
        """
        Private method to start the code assist client with the given
        interpreter.
        
        @param interpreter interpreter to be used for the code assist client
        @type str
        @param idString id of the client to be started
        @type str
        @param clientEnv dictionary with environment variables to run the
            interpreter with
        @type dict
        @return flag indicating a successful start of the client
        @rtype bool
        """
        ok = False
        
        if interpreter:
            if idString == CodeAssistServer.IdProject:
                configDir = self.__e5project.getProjectPath()
            else:
                configDir = os.path.join(Globals.getConfigDir(), "rope",
                                         idString)
            if not os.path.exists(configDir):
                os.makedirs(configDir)
            
            client = os.path.join(os.path.dirname(__file__),
                                  "CodeAssistClient.py")
            ok = self.startClient(interpreter, client, [configDir],
                                  idString=idString, environment=clientEnv)
            if not ok:
                self.__ui.appendToStderr(self.tr(
                    "'{0}' is not supported because the configured interpreter"
                    " could not be started.\n"
                ).format(idString))
        else:
            self.__ui.appendToStderr(self.tr(
                "'{0}' is not supported because no suitable interpreter is"
                " configured.\n"
            ).format(idString))
        
        return ok
    
    def __ensureActive(self, idString):
        """
        Private method to ensure, that the requested client is active.
        
        A non-active client will be started.
        
        @param idString id of the client to be checked
        @type str
        @return flag indicating an active client
        @rtype bool
        """
        ok = idString in self.connectionNames()
        if not ok:
            # client is not running
            if idString == CodeAssistServer.IdProject:
                interpreter, clientEnv = self.__interpreterForProject()
            else:
                interpreter = ""
                venvName = ""
                clientEnv = os.environ.copy()
                if "PATH" in clientEnv:
                    try:
                        clientEnv["PATH"] = self.__ui.getOriginalPathString()
                    except AttributeError:
                        # ignore for eric6 < 18.12
                        pass
                try:
                    # new code using virtual environments
                    venvManager = e5App().getObject("VirtualEnvManager")
                    if idString == "Python2":
                        # Python 2
                        venvName = Preferences.getDebugger("Python2VirtualEnv")
                        if not venvName and sys.version_info[0] == 2:
                            try:
                                venvName, _ = \
                                    venvManager.getDefaultEnvironment()
                            except AttributeError:
                                # ignore for eric6 < 18.10
                                pass
                    elif idString == "Python3":
                        # Python 3
                        venvName = Preferences.getDebugger("Python3VirtualEnv")
                        if not venvName and sys.version_info[0] == 3:
                            try:
                                venvName, _ = \
                                    venvManager.getDefaultEnvironment()
                            except AttributeError:
                                # ignore for eric6 < 18.10
                                pass
                    if venvName:
                        interpreter = \
                            venvManager.getVirtualenvInterpreter(venvName)
                        
                        try:
                            execPath = \
                                venvManager.getVirtualenvExecPath(venvName)
                        except AttributeError:
                            # eric6 < 18.12
                            execPath = ""
                        
                        # build a suitable environment
                        if execPath:
                            if "PATH" in clientEnv:
                                clientEnv["PATH"] = os.pathsep.join(
                                    [execPath, clientEnv["PATH"]])
                            else:
                                clientEnv["PATH"] = execPath
                except KeyError:
                    # backward compatibility (eric <18.07)
                    if idString == "Python2":
                        # Python 2
                        interpreter = \
                            Preferences.getDebugger("PythonInterpreter")
                    elif idString == "Python3":
                        # Python 3
                        interpreter = \
                            Preferences.getDebugger("Python3Interpreter")
            if interpreter:
                ok = self.__startCodeAssistClient(interpreter, idString,
                                                  clientEnv)
            else:
                ok = False
        return ok
    
    def __interpreterForProject(self):
        """
        Private method to determine the interpreter for the current project and
        the environment to run it.
        
        @return tuple containing the interpreter of the current project and the
            environment variables
        @rtype tuple of (str, dict)
        """
        projectLanguage = self.__e5project.getProjectLanguage()
        interpreter = ""
        clientEnv = os.environ.copy()
        if "PATH" in clientEnv:
            try:
                clientEnv["PATH"] = self.__ui.getOriginalPathString()
            except AttributeError:
                # ignore for eric6 < 18.12
                pass
        
        if projectLanguage.startswith("Python"):
            try:
                # new code using virtual environments
                venvManager = e5App().getObject("VirtualEnvManager")
                
                # get virtual environment from project first
                venvName = self.__e5project.getDebugProperty("VIRTUALENV")
                if not venvName:
                    # get it from debugger settings next
                    if projectLanguage == "Python2":
                        # Python 2
                        venvName = Preferences.getDebugger("Python2VirtualEnv")
                        if not venvName and sys.version_info[0] == 2:
                            try:
                                venvName, _ = \
                                    venvManager.getDefaultEnvironment()
                            except AttributeError:
                                # ignore for eric6 < 18.10
                                pass
                    elif projectLanguage == "Python3":
                        # Python 3
                        venvName = Preferences.getDebugger("Python3VirtualEnv")
                        if not venvName and sys.version_info[0] == 3:
                            try:
                                venvName, _ = \
                                    venvManager.getDefaultEnvironment()
                            except AttributeError:
                                # ignore for eric6 < 18.10
                                pass
                    else:
                        venvName = ""
                if venvName:
                    interpreter = \
                        venvManager.getVirtualenvInterpreter(venvName)
                    
                    try:
                        execPath = venvManager.getVirtualenvExecPath(venvName)
                    except AttributeError:
                        # eric6 < 18.12
                        execPath = ""
                    
                    # build a suitable environment
                    if execPath:
                        if "PATH" in clientEnv:
                            clientEnv["PATH"] = os.pathsep.join(
                                [execPath, clientEnv["PATH"]])
                        else:
                            clientEnv["PATH"] = execPath
            except KeyError:
                # backward compatibility (eric < 18.07)
                # get interpreter from project first
                interpreter = self.__e5project.getDebugProperty("INTERPRETER")
                if not interpreter or not Utilities.isinpath(interpreter):
                    # get it from debugger settings second
                    if projectLanguage == "Python2":
                        interpreter = Preferences.getDebugger(
                            "PythonInterpreter")
                    elif projectLanguage == "Python3":
                        interpreter = Preferences.getDebugger(
                            "Python3Interpreter")
        
        return interpreter, clientEnv
    
    @pyqtSlot()
    def handleNewConnection(self):
        """
        Public slot for new incoming connections from a client.
        """
        super(CodeAssistServer, self).handleNewConnection()
        
        self.__updateEditorLanguageMapping()
        
        self.__getConfigs()
    
    def activate(self):
        """
        Public method to activate the code assist server.
        """
        try:
            self.__documentationViewer = self.__ui.documentationViewer()
            if self.__documentationViewer is not None:
                self.__documentationViewer.registerProvider(
                    "rope", self.tr("Rope"), self.requestCodeDocumentation,
                    self.isSupportedLanguage)
        except AttributeError:
            # eric6 before 17.11 doesn't have this
            pass
        
        self.__e5project.projectClosed.connect(self.__projectClosed)
    
    def deactivate(self):
        """
        Public method to deactivate the code assist server.
        """
        """
        Public method to shut down the code assist server.
        """
        if self.__documentationViewer is not None:
            self.__documentationViewer.unregisterProvider("rope")
        
        for idString in self.connectionNames():
            self.sendJson("closeProject", {}, flush=True, idString=idString)
        
        try:
            self.__e5project.projectClosed.disconnect(self.__projectClosed)
        except TypeError:
            # ignore it, the signal may be disconnected already
            pass
        
        self.stopAllClients()
    
    @pyqtSlot()
    def __projectClosed(self):
        """
        Private slot to handle the projectClosed signal.
        """
        self.stopClient(idString=CodeAssistServer.IdProject)
    
    #######################################################################
    ## Methods below handle setting/unsetting the hook methods
    #######################################################################
    
    def connectEditor(self, editor):
        """
        Public method to connect an editor.
        
        @param editor reference to the editor
        @type QScintilla.Editor.Editor
        """
        if self.isSupportedLanguage(editor.getLanguage()):
            if self.__plugin.getPreferences("CodeAssistEnabled") and \
               editor.getCompletionListHook("rope") is None:
                self.__setAutoCompletionHook(editor)
            if self.__plugin.getPreferences("CodeAssistCalltipsEnabled") and \
               editor.getCallTipHook("rope") is None:
                self.__setCalltipsHook(editor)
        else:
            self.disconnectEditor(editor)
    
    def disconnectEditor(self, editor):
        """
        Public method to disconnect an editor.
        
        @param editor reference to the editor
        @type QScintilla.Editor.Editor
        """
        if editor.getCompletionListHook("rope"):
            self.__unsetAutoCompletionHook(editor)
        if editor.getCallTipHook("rope"):
            self.__unsetCalltipsHook(editor)
    
    def __setAutoCompletionHook(self, editor):
        """
        Private method to set the auto-completion hook.
        
        @param editor reference to the editor
        @type QScintilla.Editor.Editor
        """
        try:
            editor.addCompletionListHook("rope", self.requestCompletions, True)
            self.__asyncCompletions = True
        except TypeError:
            # backward compatibility for eric6 before 17.11
            editor.addCompletionListHook("rope", self.getCompletions)
            self.__asyncCompletions = False
    
    def __unsetAutoCompletionHook(self, editor):
        """
        Private method to unset the auto-completion hook.
        
        @param editor reference to the editor
        @type QScintilla.Editor.Editor
        """
        editor.removeCompletionListHook("rope")
    
    def __setCalltipsHook(self, editor):
        """
        Private method to set the calltip hook.
        
        @param editor reference to the editor
        @type QScintilla.Editor.Editor
        """
        editor.addCallTipHook("rope", self.getCallTips)
    
    def __unsetCalltipsHook(self, editor):
        """
        Private method to unset the calltip hook.
        
        @param editor reference to the editor
        @type QScintilla.Editor.Editor
        """
        editor.removeCallTipHook("rope")

eric ide

mercurial