RefactoringRope/CodeAssistServer.py

Sat, 17 Jul 2021 15:39:01 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sat, 17 Jul 2021 15:39:01 +0200
branch
eric7
changeset 368
c206d08c28e7
parent 365
f740b50380df
child 370
9d246420f284
permissions
-rw-r--r--

Removed some obsolete code and obsolete files.

# -*- coding: utf-8 -*-

# Copyright (c) 2008 - 2021 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the autocompletion interface to rope.
"""

import os
import sys
import contextlib

from PyQt6.QtCore import (
    pyqtSlot, QCoreApplication, QTimer
)

from EricWidgets.EricApplication import ericApp
from EricWidgets import EricMessageBox

from EricNetwork.EricJsonServer import EricJsonServer

from QScintilla.Editor import Editor

import Globals
import Preferences


class CodeAssistServer(EricJsonServer):
    """
    Class implementing the autocompletion interface to rope.
    """
    IdProject = "Project"
    
    PictureIDs = {
        "class": "?{0}".format(Editor.ClassID),
        "_class": "?{0}".format(Editor.ClassProtectedID),
        "__class": "?{0}".format(Editor.ClassPrivateID),
        "instance": "?{0}".format(Editor.ClassID),
        "_instance": "?{0}".format(Editor.ClassProtectedID),
        "__instance": "?{0}".format(Editor.ClassPrivateID),
        "function": "?{0}".format(Editor.MethodID),
        "_function": "?{0}".format(Editor.MethodProtectedID),
        "__function": "?{0}".format(Editor.MethodPrivateID),
        "module": "?{0}".format(Editor.ModuleID),
        "_module": "?{0}".format(Editor.ModuleID),
        "__module": "?{0}".format(Editor.ModuleID),
        "None": "",
    }
    
    def __init__(self, plugin, parent=None):
        """
        Constructor
        
        @param plugin reference to the plugin object
        @type RefactoringRopePlugin
        @param parent parent
        @type QObject
        """
        super().__init__(
            "CodeAssistServer", multiplex=True, parent=parent)
        
        self.__plugin = plugin
        self.__ui = parent
        self.__vm = ericApp().getObject("ViewManager")
        self.__e5project = ericApp().getObject("Project")
        
        self.__editorLanguageMapping = {}
        self.__clientConfigs = {}
        self.__editors = {}
        
        self.__documentationViewer = None
        
        # attributes to store the resuls of the client side
        self.__completions = None
        self.__calltips = None
        
        self.__methodMapping = {
            "Config": self.__setConfig,
            "CompletionsResult": self.__processCompletionsResult,
            "CallTipsResult": self.__processCallTipsResult,
            "DocumentationResult": self.__processDocumentationResult,
            "GotoDefinitionResult": self.__gotoDefinitionResult,
            
            "ClientException": self.__processClientException,
        }
        
        self.__typeMapping = {
            "staticmethod": self.tr("static method"),
            "classmethod": self.tr("class method"),
            "method": self.tr("method"),
            "function": self.tr("function"),
            "class": self.tr("class"),
            "module": self.tr("module"),
            "package": self.tr("package"),
            "object": self.tr("object"),
            "<unknown>": self.tr("not known"),
        }
        
        # Python 3
        self.__ensureActive("Python3")
    
    def __updateEditorLanguageMapping(self):
        """
        Private method to update the editor language to connection mapping.
        """
        self.__editorLanguageMapping = {}
        for name in self.connectionNames():
            if name == "Python3":
                self.__editorLanguageMapping.update({
                    "Python3": "Python3",
                    "MicroPython": "Python3",
                    "Pygments|Python": "Python3",
                    "Pygments|Python 2.x": "Python3",
                    "Cython": "Python3",
                })
    
    def isSupportedLanguage(self, language):
        """
        Public method to check, if the given language is supported.
        
        @param language editor programming language to check
        @type str
        @return flag indicating the support status
        @rtype bool
        """
        return language in self.__editorLanguageMapping
    
    def __idString(self, editor):
        """
        Private method to determine the ID string for the back-end.
        
        @param editor reference to the editor to determine the ID string for
        @type Editor
        @return ID string
        @rtype str
        """
        idString = ""
        
        language = editor.getLanguage()
        if (
            self.__e5project.isOpen() and
            self.__e5project.getProjectLanguage() == language
        ):
            filename = editor.getFileName()
            if self.__e5project.isProjectSource(filename):
                idString = CodeAssistServer.IdProject
        
        if not idString and language in self.__editorLanguageMapping:
            idString = self.__editorLanguageMapping[language]
        
        return idString
    
    def __getConfigs(self):
        """
        Private method to get the configurations of all connected clients.
        """
        for idString in self.connectionNames():
            self.sendJson("getConfig", {}, idString=idString)
    
    def __setConfig(self, params):
        """
        Private method to set the rope client configuration data.
        
        @param params dictionary containing the configuration data
        @type dict
        """
        idString = params["Id"]
        ropeFolder = params["RopeFolderName"]
        
        self.__clientConfigs[idString] = ropeFolder
    
    def __ropeConfigFile(self, idString):
        """
        Private method to get the name of the rope configuration file.
        
        @param idString id for which to get the configuration file
        @type str
        @return name of the rope configuration file
        @rtype str
        """
        configfile = None
        if idString in self.__clientConfigs:
            ropedir = self.__clientConfigs[idString]
            if ropedir:
                configfile = os.path.join(ropedir, "config.py")
                if not os.path.exists(configfile):
                    configfile = None
        return configfile
    
    def __configChanged(self, idString):
        """
        Private slot called, when the rope config file has changed.
        
        @param idString id for which to get the configuration file
        @type str
        """
        self.sendJson("configChanged", {}, idString=idString)
    
    def editConfig(self, idString):
        """
        Public slot to open the rope configuration file in an editor.
        
        @param idString id for which to get the configuration file
        @type str
        """
        configfile = self.__ropeConfigFile(idString)
        if configfile:
            if os.path.exists(configfile):
                from QScintilla.MiniEditor import MiniEditor
                editor = MiniEditor(configfile)
                editor.show()
                editor.editorSaved.connect(
                    lambda: self.__configChanged(idString))
                self.__editors[idString] = editor
                return
        else:
            EricMessageBox.critical(
                self.__ui,
                self.tr("Configure Rope"),
                self.tr("""The Rope configuration file '{0}' does"""
                        """ not exist.""").format(configfile))
    
    def requestCompletions(self, editor, context, acText):
        """
        Public method to request a list of possible completions.
        
        @param editor reference to the editor object, that called this method
        @type Editor
        @param context flag indicating to autocomplete a context
        @type bool
        @param acText text to be completed
        @type str
        """
        if not self.__plugin.getPreferences("CodeAssistEnabled"):
            return
        
        idString = self.__idString(editor)
        if not idString:
            return
        
        filename = editor.getFileName()
        line, index = editor.getCursorPosition()
        source = editor.text()
        offset = len("".join(source.splitlines(True)[:line])) + index
        maxfixes = self.__plugin.getPreferences("MaxFixes")
        
        self.__ensureActive(idString)
        self.sendJson("getCompletions", {
            "FileName": filename,
            "Source": source,
            "Offset": offset,
            "MaxFixes": maxfixes,
            "CompletionText": acText,
            "SysPath": sys.path,
        }, idString=idString)
    
    def __processCompletionsResult(self, result):
        """
        Private method to process the completions sent by the client.
        
        @param result dictionary containing the result sent by the client
        @type dict
        """
        names = []
        for completion in result["Completions"]:
            name = completion['Name']
            
            name += CodeAssistServer.PictureIDs.get(
                completion['CompletionType'], '')
            names.append(name)
        
        if "Error" not in result:
            editor = self.__vm.getOpenEditor(result["FileName"])
            if editor is not None:
                editor.completionsListReady(names,
                                            result["CompletionText"])
    
    def getCallTips(self, editor, pos, commas):
        """
        Public method to calculate calltips.
        
        @param editor reference to the editor object, that called this method
        @type Editor
        @param pos position in the text for the calltip
        @type int
        @param commas minimum number of commas contained in the calltip
        @type int
        @return list of possible calltips
        @rtype list of str
        """
        if not self.__plugin.getPreferences("CodeAssistCalltipsEnabled"):
            return []
        
        # reset the calltips buffer
        self.__calltips = None
        
        idString = self.__idString(editor)
        if not idString:
            return []
        
        filename = editor.getFileName()
        source = editor.text()
        line, index = editor.lineIndexFromPosition(pos)
        offset = len("".join(source.splitlines(True)[:line])) + index
        maxfixes = self.__plugin.getPreferences("CalltipsMaxFixes")
        
        self.__ensureActive(idString)
        self.sendJson("getCallTips", {
            "FileName": filename,
            "Source": source,
            "Offset": offset,
            "MaxFixes": maxfixes,
            "SysPath": sys.path,
        }, idString=idString)
        
        # emulate the synchronous behaviour
        timer = QTimer()
        timer.setSingleShot(True)
        timer.start(5000)           # 5s timeout
        while self.__calltips is None and timer.isActive():
            QCoreApplication.processEvents()
        
        return [] if self.__calltips is None else self.__calltips
    
    def __processCallTipsResult(self, result):
        """
        Private method to process the calltips sent by the client.
        
        @param result dictionary containing the result sent by the client
        @type dict
        """
        if "Error" in result:
            self.__calltips = []
        else:
            self.__calltips = result["CallTips"]
    
    def reportChanged(self, filename, oldSource):
        """
        Public slot to report some changed sources.
        
        @param filename file name of the changed source
        @type str
        @param oldSource source code before the change
        @type str
        """
        editor = self.__vm.getOpenEditor(filename)
        if editor is not None:
            idString = self.__idString(editor)
            if idString:
                self.__ensureActive(idString)
                self.sendJson("reportChanged", {
                    "FileName": filename,
                    "OldSource": oldSource,
                }, idString=idString)
    
    def requestCodeDocumentation(self, editor):
        """
        Public method to request source code documentation for the given
        editor.
        
        @param editor reference to the editor to get source code documentation
            for
        @type Editor
        """
        if self.__documentationViewer is None:
            return
        
        idString = self.__idString(editor)
        
        if not idString:
            language = editor.getLanguage()
            warning = self.tr(
                "Language <b>{0}</b> is not supported."
            ).format(language)
            self.__documentationViewer.documentationReady(
                warning, isWarning=True)
            return
        
        filename = editor.getFileName()
        source = editor.text()
        line, index = editor.getCursorPosition()
        offset = len("".join(source.splitlines(True)[:line])) + index
        maxfixes = self.__plugin.getPreferences("CalltipsMaxFixes")
        
        offset = editor.positionBefore(offset)
        if editor.charAt(offset) == "(":
            offset = editor.positionBefore(offset)
        
        self.__ensureActive(idString)
        self.sendJson("getDocumentation", {
            "FileName": filename,
            "Source": source,
            "Offset": offset,
            "MaxFixes": maxfixes,
            "SysPath": sys.path,
        }, idString=idString)
    
    def __processDocumentationResult(self, result):
        """
        Private method to process the documentation sent by the client.
        
        @param result dictionary containing the result sent by the client
        @type dict with keys 'name', 'argspec', 'note', 'docstring', 'typ'
        """
        if self.__documentationViewer is None:
            return
        
        docu = None
        
        if "Error" not in result:
            documentationDict = result["DocumentationDict"]
            if documentationDict:
                if "module" in documentationDict:
                    if documentationDict["module"]:
                        documentationDict["note"] = self.tr(
                            "Present in <i>{0}</i> module"
                        ).format(documentationDict["module"])
                    del documentationDict["module"]
                
                if "typ" in documentationDict:
                    if documentationDict["typ"] not in self.__typeMapping:
                        del documentationDict["typ"]
                    else:
                        documentationDict["typ"] = self.__typeMapping[
                            documentationDict["typ"]]
                
                if "note" not in documentationDict:
                    documentationDict["note"] = ""
                
                docu = documentationDict
        
        if docu is None:
            msg = self.tr("No documentation available.")
            self.__documentationViewer.documentationReady(
                msg, isDocWarning=True)
        else:
            self.__documentationViewer.documentationReady(docu)
    
    def gotoDefinition(self, editor):
        """
        Public slot to find the definition for the word at the cursor position
        and go to it.
        
        Note: This is executed upon a mouse click sequence.
        
        @param editor reference to the calling editor
        @type Editor
        """
        if not self.__plugin.getPreferences("MouseClickEnabled"):
            return
        
        idString = self.__idString(editor)
        if not idString:
            return
        
        filename = editor.getFileName()
        source = editor.text()
        line, index = editor.getCursorPosition()
        offset = len("".join(source.splitlines(True)[:line])) + index
        
        self.__ensureActive(idString)
        self.sendJson("gotoDefinition", {
            "FileName": filename,
            "Offset": offset,
            "Source": editor.text(),
            "SysPath": sys.path,
        }, idString=idString)
    
    def __gotoDefinitionResult(self, result):
        """
        Private method to handle the "Goto Definition" result sent by
        the client.
        
        @param result dictionary containing the result data
        @type dict
        """
        if "Error" not in result:
            # ignore errors silently
            if "Location" in result:
                location = result["Location"]
                self.__vm.openSourceFile(location["ModulePath"],
                                         location["Line"],
                                         addNext=True)
            else:
                ericApp().getObject("UserInterface").statusBar().showMessage(
                    self.tr('Code Assist: No definition found'), 5000)
    
    #######################################################################
    ## Methods below handle the network connection
    #######################################################################
    
    def handleCall(self, method, params):
        """
        Public method to handle a method call from the client.
        
        @param method requested method name
        @type str
        @param params dictionary with method specific parameters
        @type dict
        """
        self.__methodMapping[method](params)
    
    def __processClientException(self, params):
        """
        Private method to handle exceptions of the refactoring client.
        
        @param params dictionary containing the exception data
        @type dict
        """
        if params["ExceptionType"] == "ProtocolError":
            self.__ui.appendToStderr(
                self.tr("The data received from the code assist"
                        " server could not be decoded. Please report"
                        " this issue with the received data to the"
                        " eric bugs email address.\n"
                        "Error: {0}\n"
                        "Data:\n{1}\n").format(
                    params["ExceptionValue"],
                    params["ProtocolData"]))
        else:
            self.__ui.appendToStderr(
                self.tr("An exception happened in the code assist"
                        " client. Please report it to the eric bugs"
                        " email address.\n"
                        "Exception: {0}\n"
                        "Value: {1}\n"
                        "Traceback: {2}\n").format(
                    params["ExceptionType"],
                    params["ExceptionValue"],
                    params["Traceback"]))
    
    def __startCodeAssistClient(self, interpreter, idString, clientEnv):
        """
        Private method to start the code assist client with the given
        interpreter.
        
        @param interpreter interpreter to be used for the code assist client
        @type str
        @param idString id of the client to be started
        @type str
        @param clientEnv dictionary with environment variables to run the
            interpreter with
        @type dict
        @return flag indicating a successful start of the client
        @rtype bool
        """
        ok = False
        
        if interpreter:
            if idString == CodeAssistServer.IdProject:
                configDir = self.__e5project.getProjectPath()
            else:
                configDir = os.path.join(Globals.getConfigDir(), "rope",
                                         idString)
            if not os.path.exists(configDir):
                os.makedirs(configDir)
            
            client = os.path.join(os.path.dirname(__file__),
                                  "CodeAssistClient.py")
            ok, exitCode = self.startClient(
                interpreter, client,
                clientArgs=[configDir, Globals.getPythonLibraryDirectory()],
                idString=idString, environment=clientEnv)
            if not ok:
                if exitCode == 42:
                    self.__ui.appendToStderr("CodeAssistServer: " + self.tr(
                        "The rope refactoring library is not installed.\n"
                    ))
                else:
                    self.__ui.appendToStderr("CodeAssistServer: " + self.tr(
                        "'{0}' is not supported because the configured"
                        " interpreter could not be started.\n"
                    ).format(idString))
        else:
            self.__ui.appendToStderr("CodeAssistServer: " + self.tr(
                "'{0}' is not supported because no suitable interpreter is"
                " configured.\n"
            ).format(idString))
        
        return ok
    
    def __ensureActive(self, idString):
        """
        Private method to ensure, that the requested client is active.
        
        A non-active client will be started.
        
        @param idString id of the client to be checked
        @type str
        @return flag indicating an active client
        @rtype bool
        """
        ok = idString in self.connectionNames()
        if not ok:
            # client is not running
            if idString == CodeAssistServer.IdProject:
                interpreter, clientEnv = self.__interpreterForProject()
            else:
                interpreter = ""
                venvName = ""
                clientEnv = os.environ.copy()
                if "PATH" in clientEnv:
                    clientEnv["PATH"] = self.__ui.getOriginalPathString()
                venvManager = ericApp().getObject("VirtualEnvManager")
                if idString == "Python3":
                    venvName = Preferences.getDebugger("Python3VirtualEnv")
                    if not venvName:
                        venvName, _ = venvManager.getDefaultEnvironment()
                if venvName:
                    interpreter = venvManager.getVirtualenvInterpreter(
                        venvName)
                    
                    execPath = venvManager.getVirtualenvExecPath(venvName)
                    
                    # build a suitable environment
                    if execPath:
                        if "PATH" in clientEnv:
                            clientEnv["PATH"] = os.pathsep.join(
                                [execPath, clientEnv["PATH"]])
                        else:
                            clientEnv["PATH"] = execPath
            if interpreter:
                ok = self.__startCodeAssistClient(
                    interpreter, idString, clientEnv)
            else:
                ok = False
        return ok
    
    def __interpreterForProject(self):
        """
        Private method to determine the interpreter for the current project and
        the environment to run it.
        
        @return tuple containing the interpreter of the current project and the
            environment variables
        @rtype tuple of (str, dict)
        """
        projectLanguage = self.__e5project.getProjectLanguage()
        interpreter = ""
        clientEnv = os.environ.copy()
        if "PATH" in clientEnv:
            clientEnv["PATH"] = self.__ui.getOriginalPathString()
        
        if projectLanguage.startswith("Python"):
            venvManager = ericApp().getObject("VirtualEnvManager")
            
            # get virtual environment from project first
            venvName = self.__e5project.getDebugProperty("VIRTUALENV")
            if not venvName:
                # get it from debugger settings next
                if projectLanguage == "Python3":
                    venvName = Preferences.getDebugger("Python3VirtualEnv")
                    if not venvName:
                        venvName, _ = venvManager.getDefaultEnvironment()
                else:
                    venvName = ""
            if venvName:
                interpreter = venvManager.getVirtualenvInterpreter(
                    venvName
                )
                execPath = venvManager.getVirtualenvExecPath(venvName)
                
                # build a suitable environment
                if execPath:
                    if "PATH" in clientEnv:
                        clientEnv["PATH"] = os.pathsep.join(
                            [execPath, clientEnv["PATH"]])
                    else:
                        clientEnv["PATH"] = execPath
        
        return interpreter, clientEnv
    
    @pyqtSlot()
    def handleNewConnection(self):
        """
        Public slot for new incoming connections from a client.
        """
        super().handleNewConnection()
        
        self.__updateEditorLanguageMapping()
        
        self.__getConfigs()
    
    def activate(self):
        """
        Public method to activate the code assist server.
        """
        self.__documentationViewer = self.__ui.documentationViewer()
        if self.__documentationViewer is not None:
            self.__documentationViewer.registerProvider(
                "rope", self.tr("Rope"), self.requestCodeDocumentation,
                self.isSupportedLanguage)
        
        self.__e5project.projectClosed.connect(self.__projectClosed)
    
    def deactivate(self):
        """
        Public method to deactivate the code assist server.
        """
        """
        Public method to shut down the code assist server.
        """
        if self.__documentationViewer is not None:
            self.__documentationViewer.unregisterProvider("rope")
        
        for idString in self.connectionNames():
            self.sendJson("closeProject", {}, flush=True, idString=idString)
        
        with contextlib.suppress(TypeError):
            self.__e5project.projectClosed.disconnect(self.__projectClosed)
        
        self.stopAllClients()
    
    @pyqtSlot()
    def __projectClosed(self):
        """
        Private slot to handle the projectClosed signal.
        """
        self.stopClient(idString=CodeAssistServer.IdProject)

eric ide

mercurial