Fri, 30 Jul 2021 16:38:13 +0200
Added code to jump to references when clicked on a definition.
# -*- coding: utf-8 -*- # Copyright (c) 2008 - 2021 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the autocompletion interface to rope. """ import contextlib import os import sys import uuid from PyQt6.QtCore import ( pyqtSlot, QCoreApplication, QTimer ) from EricWidgets.EricApplication import ericApp from EricWidgets import EricMessageBox from EricNetwork.EricJsonServer import EricJsonServer from QScintilla.Editor import Editor import Globals import Preferences class CodeAssistServer(EricJsonServer): """ Class implementing the autocompletion interface to rope. """ IdProject = "Project" PictureIDs = { "class": "?{0}".format(Editor.ClassID), "_class": "?{0}".format(Editor.ClassProtectedID), "__class": "?{0}".format(Editor.ClassPrivateID), "instance": "?{0}".format(Editor.ClassID), "_instance": "?{0}".format(Editor.ClassProtectedID), "__instance": "?{0}".format(Editor.ClassPrivateID), "function": "?{0}".format(Editor.MethodID), "_function": "?{0}".format(Editor.MethodProtectedID), "__function": "?{0}".format(Editor.MethodPrivateID), "module": "?{0}".format(Editor.ModuleID), "_module": "?{0}".format(Editor.ModuleID), "__module": "?{0}".format(Editor.ModuleID), "None": "", } def __init__(self, plugin, parent=None): """ Constructor @param plugin reference to the plugin object @type RefactoringRopePlugin @param parent parent @type QObject """ super().__init__( "CodeAssistServer", multiplex=True, parent=parent) self.__plugin = plugin self.__ui = parent self.__vm = ericApp().getObject("ViewManager") self.__e5project = ericApp().getObject("Project") self.__editorLanguageMapping = {} self.__clientConfigs = {} self.__editors = {} self.__documentationViewer = None # attributes to store the resuls of the client side self.__completions = None self.__calltips = None self.__methodMapping = { "Config": self.__setConfig, "CompletionsResult": self.__processCompletionsResult, "CallTipsResult": self.__processCallTipsResult, "DocumentationResult": self.__processDocumentationResult, "GotoDefinitionResult": self.__gotoDefinitionResult, "GotoReferencesResult": self.__processGotoReferencesResult, "ClientException": self.__processClientException, } self.__typeMapping = { "staticmethod": self.tr("static method"), "classmethod": self.tr("class method"), "method": self.tr("method"), "function": self.tr("function"), "class": self.tr("class"), "module": self.tr("module"), "package": self.tr("package"), "object": self.tr("object"), "<unknown>": self.tr("not known"), } # temporary store for editor references indexed by Uuid self.__editors = {} # Python 3 self.__ensureActive("Python3") def __updateEditorLanguageMapping(self): """ Private method to update the editor language to connection mapping. """ self.__editorLanguageMapping = {} for name in self.connectionNames(): if name == "Python3": self.__editorLanguageMapping.update({ "Python3": "Python3", "MicroPython": "Python3", "Pygments|Python": "Python3", "Pygments|Python 2.x": "Python3", "Cython": "Python3", }) def isSupportedLanguage(self, language): """ Public method to check, if the given language is supported. @param language editor programming language to check @type str @return flag indicating the support status @rtype bool """ return language in self.__editorLanguageMapping def __idString(self, editor): """ Private method to determine the ID string for the back-end. @param editor reference to the editor to determine the ID string for @type Editor @return ID string @rtype str """ idString = "" language = editor.getLanguage() if ( self.__e5project.isOpen() and self.__e5project.getProjectLanguage() == language ): filename = editor.getFileName() if self.__e5project.isProjectSource(filename): idString = CodeAssistServer.IdProject if not idString and language in self.__editorLanguageMapping: idString = self.__editorLanguageMapping[language] return idString def __getConfigs(self): """ Private method to get the configurations of all connected clients. """ for idString in self.connectionNames(): self.sendJson("getConfig", {}, idString=idString) def __setConfig(self, params): """ Private method to set the rope client configuration data. @param params dictionary containing the configuration data @type dict """ idString = params["Id"] ropeFolder = params["RopeFolderName"] self.__clientConfigs[idString] = ropeFolder def __ropeConfigFile(self, idString): """ Private method to get the name of the rope configuration file. @param idString id for which to get the configuration file @type str @return name of the rope configuration file @rtype str """ configfile = None if idString in self.__clientConfigs: ropedir = self.__clientConfigs[idString] if ropedir: configfile = os.path.join(ropedir, "config.py") if not os.path.exists(configfile): configfile = None return configfile def __configChanged(self, idString): """ Private slot called, when the rope config file has changed. @param idString id for which to get the configuration file @type str """ self.sendJson("configChanged", {}, idString=idString) def editConfig(self, idString): """ Public slot to open the rope configuration file in an editor. @param idString id for which to get the configuration file @type str """ configfile = self.__ropeConfigFile(idString) if configfile: if os.path.exists(configfile): from QScintilla.MiniEditor import MiniEditor editor = MiniEditor(configfile) editor.show() editor.editorSaved.connect( lambda: self.__configChanged(idString)) self.__editors[idString] = editor return else: EricMessageBox.critical( self.__ui, self.tr("Configure Rope"), self.tr("""The Rope configuration file '{0}' does""" """ not exist.""").format(configfile)) def requestCompletions(self, editor, context, acText): """ Public method to request a list of possible completions. @param editor reference to the editor object, that called this method @type Editor @param context flag indicating to autocomplete a context @type bool @param acText text to be completed @type str """ if not self.__plugin.getPreferences("CodeAssistEnabled"): return idString = self.__idString(editor) if not idString: return filename = editor.getFileName() line, index = editor.getCursorPosition() source = editor.text() offset = len("".join(source.splitlines(True)[:line])) + index maxfixes = self.__plugin.getPreferences("MaxFixes") self.__ensureActive(idString) self.sendJson("getCompletions", { "FileName": filename, "Source": source, "Offset": offset, "MaxFixes": maxfixes, "CompletionText": acText, "SysPath": sys.path, }, idString=idString) def __processCompletionsResult(self, result): """ Private method to process the completions sent by the client. @param result dictionary containing the result sent by the client @type dict """ names = [] for completion in result["Completions"]: name = completion['Name'] name += CodeAssistServer.PictureIDs.get( completion['CompletionType'], '') names.append(name) if "Error" not in result: editor = self.__vm.getOpenEditor(result["FileName"]) if editor is not None: editor.completionsListReady(names, result["CompletionText"]) def getCallTips(self, editor, pos, commas): """ Public method to calculate calltips. @param editor reference to the editor object, that called this method @type Editor @param pos position in the text for the calltip @type int @param commas minimum number of commas contained in the calltip @type int @return list of possible calltips @rtype list of str """ if not self.__plugin.getPreferences("CodeAssistCalltipsEnabled"): return [] # reset the calltips buffer self.__calltips = None idString = self.__idString(editor) if not idString: return [] filename = editor.getFileName() source = editor.text() line, index = editor.lineIndexFromPosition(pos) offset = len("".join(source.splitlines(True)[:line])) + index maxfixes = self.__plugin.getPreferences("CalltipsMaxFixes") self.__ensureActive(idString) self.sendJson("getCallTips", { "FileName": filename, "Source": source, "Offset": offset, "MaxFixes": maxfixes, "SysPath": sys.path, }, idString=idString) # emulate the synchronous behaviour timer = QTimer() timer.setSingleShot(True) timer.start(5000) # 5s timeout while self.__calltips is None and timer.isActive(): QCoreApplication.processEvents() return [] if self.__calltips is None else self.__calltips def __processCallTipsResult(self, result): """ Private method to process the calltips sent by the client. @param result dictionary containing the result sent by the client @type dict """ if "Error" in result: self.__calltips = [] else: self.__calltips = result["CallTips"] def reportChanged(self, filename, oldSource): """ Public slot to report some changed sources. @param filename file name of the changed source @type str @param oldSource source code before the change @type str """ editor = self.__vm.getOpenEditor(filename) if editor is not None: idString = self.__idString(editor) if idString: self.__ensureActive(idString) self.sendJson("reportChanged", { "FileName": filename, "OldSource": oldSource, }, idString=idString) def requestCodeDocumentation(self, editor): """ Public method to request source code documentation for the given editor. @param editor reference to the editor to get source code documentation for @type Editor """ if self.__documentationViewer is None: return idString = self.__idString(editor) if not idString: language = editor.getLanguage() warning = self.tr( "Language <b>{0}</b> is not supported." ).format(language) self.__documentationViewer.documentationReady( warning, isWarning=True) return filename = editor.getFileName() source = editor.text() line, index = editor.getCursorPosition() offset = len("".join(source.splitlines(True)[:line])) + index maxfixes = self.__plugin.getPreferences("CalltipsMaxFixes") offset = editor.positionBefore(offset) if editor.charAt(offset) == "(": offset = editor.positionBefore(offset) self.__ensureActive(idString) self.sendJson("getDocumentation", { "FileName": filename, "Source": source, "Offset": offset, "MaxFixes": maxfixes, "SysPath": sys.path, }, idString=idString) def __processDocumentationResult(self, result): """ Private method to process the documentation sent by the client. @param result dictionary containing the result sent by the client @type dict with keys 'name', 'argspec', 'note', 'docstring', 'typ' """ if self.__documentationViewer is None: return docu = None if "Error" not in result: documentationDict = result["DocumentationDict"] if documentationDict: if "module" in documentationDict: if documentationDict["module"]: documentationDict["note"] = self.tr( "Present in <i>{0}</i> module" ).format(documentationDict["module"]) del documentationDict["module"] if "typ" in documentationDict: if documentationDict["typ"] not in self.__typeMapping: del documentationDict["typ"] else: documentationDict["typ"] = self.__typeMapping[ documentationDict["typ"]] if "note" not in documentationDict: documentationDict["note"] = "" docu = documentationDict if docu is None: msg = self.tr("No documentation available.") self.__documentationViewer.documentationReady( msg, isDocWarning=True) else: self.__documentationViewer.documentationReady(docu) def gotoDefinition(self, editor): """ Public slot to find the definition for the word at the cursor position and go to it. Note: This is executed upon a mouse click sequence. @param editor reference to the calling editor @type Editor """ if not self.__plugin.getPreferences("MouseClickEnabled"): return idString = self.__idString(editor) if not idString: return filename = editor.getFileName() source = editor.text() line, index = editor.getCursorPosition() offset = len("".join(source.splitlines(True)[:line])) + index self.__ensureActive(idString) euuid = str(uuid.uuid4()) self.__editors[euuid] = editor self.sendJson("gotoDefinition", { "FileName": filename, "Offset": offset, "Source": source, "SysPath": sys.path, "Uuid": euuid, }, idString=idString) def __gotoDefinitionResult(self, result): """ Private method to handle the "Goto Definition" result sent by the client. @param result dictionary containing the result data @type dict """ if "Error" not in result: # ignore errors silently location = result["Location"] euuid = result["Uuid"] if location: try: editor = self.__editors[euuid] except KeyError: editor = None if ( editor is not None and editor.getFileName() == location["ModulePath"] and editor.getCursorPosition()[0] + 1 == location["Line"] ): # this was a click onto the definition line # -> get references idString = self.__idString(editor) filename = editor.getFileName() source = editor.text() line, index = editor.getCursorPosition() offset = ( len("".join(source.splitlines(True)[:line])) + index ) self.sendJson("gotoReferences", { "FileName": filename, "Offset": offset, "Line": line + 1, "SysPath": sys.path, "Uuid": euuid, }, idString=idString) return self.__vm.openSourceFile(location["ModulePath"], location["Line"], addNext=True) else: ericApp().getObject("UserInterface").statusBar().showMessage( self.tr('Code Assist: No definition found'), 5000) with contextlib.suppress(KeyError): del self.__editors[euuid] def __processGotoReferencesResult(self, result): """ Private method callback for the goto references result. @param result dictionary containing the result data @type dict """ with contextlib.suppress(ImportError): from QScintilla.Editor import ReferenceItem if "Error" not in result: # ignore errors silently references = result["GotoReferencesList"] euuid = result["Uuid"] if references: try: editor = self.__editors[euuid] except KeyError: editor = None if editor is not None: referenceItemsList = [ ReferenceItem( modulePath=ref["ModulePath"], codeLine=ref["Code"], line=ref["Line"], column=0, ) for ref in references ] editor.gotoReferenceHandler(referenceItemsList) with contextlib.suppress(KeyError): del self.__editors[euuid] ####################################################################### ## Methods below handle the network connection ####################################################################### def handleCall(self, method, params): """ Public method to handle a method call from the client. @param method requested method name @type str @param params dictionary with method specific parameters @type dict """ self.__methodMapping[method](params) def __processClientException(self, params): """ Private method to handle exceptions of the refactoring client. @param params dictionary containing the exception data @type dict """ if params["ExceptionType"] == "ProtocolError": self.__ui.appendToStderr( self.tr("The data received from the code assist" " server could not be decoded. Please report" " this issue with the received data to the" " eric bugs email address.\n" "Error: {0}\n" "Data:\n{1}\n").format( params["ExceptionValue"], params["ProtocolData"])) else: self.__ui.appendToStderr( self.tr("An exception happened in the code assist" " client. Please report it to the eric bugs" " email address.\n" "Exception: {0}\n" "Value: {1}\n" "Traceback: {2}\n").format( params["ExceptionType"], params["ExceptionValue"], params["Traceback"])) def __startCodeAssistClient(self, interpreter, idString, clientEnv): """ Private method to start the code assist client with the given interpreter. @param interpreter interpreter to be used for the code assist client @type str @param idString id of the client to be started @type str @param clientEnv dictionary with environment variables to run the interpreter with @type dict @return flag indicating a successful start of the client @rtype bool """ ok = False if interpreter: if idString == CodeAssistServer.IdProject: configDir = self.__e5project.getProjectPath() else: configDir = os.path.join(Globals.getConfigDir(), "rope", idString) if not os.path.exists(configDir): os.makedirs(configDir) client = os.path.join(os.path.dirname(__file__), "CodeAssistClient.py") ok, exitCode = self.startClient( interpreter, client, clientArgs=[configDir, Globals.getPythonLibraryDirectory()], idString=idString, environment=clientEnv) if not ok: if exitCode == 42: self.__ui.appendToStderr("CodeAssistServer: " + self.tr( "The rope refactoring library is not installed.\n" )) else: self.__ui.appendToStderr("CodeAssistServer: " + self.tr( "'{0}' is not supported because the configured" " interpreter could not be started.\n" ).format(idString)) else: self.__ui.appendToStderr("CodeAssistServer: " + self.tr( "'{0}' is not supported because no suitable interpreter is" " configured.\n" ).format(idString)) return ok def __ensureActive(self, idString): """ Private method to ensure, that the requested client is active. A non-active client will be started. @param idString id of the client to be checked @type str @return flag indicating an active client @rtype bool """ ok = idString in self.connectionNames() if not ok: # client is not running if idString == CodeAssistServer.IdProject: interpreter, clientEnv = self.__interpreterForProject() else: interpreter = "" venvName = "" clientEnv = os.environ.copy() if "PATH" in clientEnv: clientEnv["PATH"] = self.__ui.getOriginalPathString() venvManager = ericApp().getObject("VirtualEnvManager") if idString == "Python3": venvName = Preferences.getDebugger("Python3VirtualEnv") if not venvName: venvName, _ = venvManager.getDefaultEnvironment() if venvName: interpreter = venvManager.getVirtualenvInterpreter( venvName) execPath = venvManager.getVirtualenvExecPath(venvName) # build a suitable environment if execPath: if "PATH" in clientEnv: clientEnv["PATH"] = os.pathsep.join( [execPath, clientEnv["PATH"]]) else: clientEnv["PATH"] = execPath if interpreter: ok = self.__startCodeAssistClient( interpreter, idString, clientEnv) else: ok = False return ok def __interpreterForProject(self): """ Private method to determine the interpreter for the current project and the environment to run it. @return tuple containing the interpreter of the current project and the environment variables @rtype tuple of (str, dict) """ projectLanguage = self.__e5project.getProjectLanguage() interpreter = "" clientEnv = os.environ.copy() if "PATH" in clientEnv: clientEnv["PATH"] = self.__ui.getOriginalPathString() if projectLanguage.startswith("Python"): venvManager = ericApp().getObject("VirtualEnvManager") # get virtual environment from project first venvName = self.__e5project.getDebugProperty("VIRTUALENV") if not venvName: # get it from debugger settings next if projectLanguage == "Python3": venvName = Preferences.getDebugger("Python3VirtualEnv") if not venvName: venvName, _ = venvManager.getDefaultEnvironment() else: venvName = "" if venvName: interpreter = venvManager.getVirtualenvInterpreter( venvName ) execPath = venvManager.getVirtualenvExecPath(venvName) # build a suitable environment if execPath: if "PATH" in clientEnv: clientEnv["PATH"] = os.pathsep.join( [execPath, clientEnv["PATH"]]) else: clientEnv["PATH"] = execPath return interpreter, clientEnv @pyqtSlot() def handleNewConnection(self): """ Public slot for new incoming connections from a client. """ super().handleNewConnection() self.__updateEditorLanguageMapping() self.__getConfigs() def activate(self): """ Public method to activate the code assist server. """ self.__documentationViewer = self.__ui.documentationViewer() if self.__documentationViewer is not None: self.__documentationViewer.registerProvider( "rope", self.tr("Rope"), self.requestCodeDocumentation, self.isSupportedLanguage) self.__e5project.projectClosed.connect(self.__projectClosed) def deactivate(self): """ Public method to deactivate the code assist server. """ """ Public method to shut down the code assist server. """ if self.__documentationViewer is not None: self.__documentationViewer.unregisterProvider("rope") for idString in self.connectionNames(): self.sendJson("closeProject", {}, flush=True, idString=idString) with contextlib.suppress(TypeError): self.__e5project.projectClosed.disconnect(self.__projectClosed) self.stopAllClients() @pyqtSlot() def __projectClosed(self): """ Private slot to handle the projectClosed signal. """ self.stopClient(idString=CodeAssistServer.IdProject)