Mon, 24 Oct 2022 19:31:03 +0200
Adapted the import statements to the new structure.
# -*- coding: utf-8 -*- # Copyright (c) 2008 - 2022 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the autocompletion interface to rope. """ import contextlib import os import shutil import sys import uuid from PyQt6.QtCore import pyqtSlot, QCoreApplication, QTimer from eric7 import Globals, Preferences from eric7.EricNetwork.EricJsonServer import EricJsonServer from eric7.EricWidgets.EricApplication import ericApp from eric7.EricWidgets import EricMessageBox from eric7.QScintilla.Editor import Editor class CodeAssistServer(EricJsonServer): """ Class implementing the autocompletion interface to rope. """ IdProject = "Project" PictureIDs = { "class": "?{0}".format(Editor.ClassID), "_class": "?{0}".format(Editor.ClassProtectedID), "__class": "?{0}".format(Editor.ClassPrivateID), "instance": "?{0}".format(Editor.ClassID), "_instance": "?{0}".format(Editor.ClassProtectedID), "__instance": "?{0}".format(Editor.ClassPrivateID), "function": "?{0}".format(Editor.MethodID), "_function": "?{0}".format(Editor.MethodProtectedID), "__function": "?{0}".format(Editor.MethodPrivateID), "module": "?{0}".format(Editor.ModuleID), "_module": "?{0}".format(Editor.ModuleID), "__module": "?{0}".format(Editor.ModuleID), "None": "", } def __init__(self, plugin, parent=None): """ Constructor @param plugin reference to the plugin object @type RefactoringRopePlugin @param parent parent @type QObject """ super().__init__("CodeAssistServer", multiplex=True, parent=parent) self.__plugin = plugin self.__ui = parent self.__vm = ericApp().getObject("ViewManager") self.__e5project = ericApp().getObject("Project") self.__editorLanguageMapping = {} self.__clientConfigs = {} self.__editors = {} self.__documentationViewer = None # attributes to store the resuls of the client side self.__completions = None self.__calltips = None self.__methodMapping = { "Config": self.__setConfig, "CompletionsResult": self.__processCompletionsResult, "CallTipsResult": self.__processCallTipsResult, "DocumentationResult": self.__processDocumentationResult, "GotoDefinitionResult": self.__gotoDefinitionResult, "GotoReferencesResult": self.__processGotoReferencesResult, "ClientException": self.__processClientException, } self.__typeMapping = { "staticmethod": self.tr("static method"), "classmethod": self.tr("class method"), "method": self.tr("method"), "function": self.tr("function"), "class": self.tr("class"), "module": self.tr("module"), "package": self.tr("package"), "object": self.tr("object"), "<unknown>": self.tr("not known"), } # temporary store for editor references indexed by Uuid self.__editors = {} # Python 3 self.__ensureActive("Python3") def __updateEditorLanguageMapping(self): """ Private method to update the editor language to connection mapping. """ self.__editorLanguageMapping = {} for name in self.connectionNames(): if name == "Python3": self.__editorLanguageMapping.update( { "Python3": "Python3", "MicroPython": "Python3", "Pygments|Python": "Python3", "Pygments|Python 2.x": "Python3", "Cython": "Python3", } ) def isSupportedLanguage(self, language): """ Public method to check, if the given language is supported. @param language editor programming language to check @type str @return flag indicating the support status @rtype bool """ return language in self.__editorLanguageMapping def __idString(self, editor): """ Private method to determine the ID string for the back-end. @param editor reference to the editor to determine the ID string for @type Editor @return ID string @rtype str """ idString = "" language = editor.getLanguage() if ( self.__e5project.isOpen() and self.__e5project.getProjectLanguage() == language ): filename = editor.getFileName() if self.__e5project.isProjectSource(filename): idString = CodeAssistServer.IdProject if not idString and language in self.__editorLanguageMapping: idString = self.__editorLanguageMapping[language] return idString def __getConfigs(self): """ Private method to get the configurations of all connected clients. """ for idString in self.connectionNames(): self.sendJson("getConfig", {}, idString=idString) def __setConfig(self, params): """ Private method to set the rope client configuration data. @param params dictionary containing the configuration data @type dict """ idString = params["Id"] ropeFolder = params["RopeFolderName"] self.__clientConfigs[idString] = ropeFolder def __ropeConfigFile(self, idString): """ Private method to get the name of the rope configuration file. @param idString id for which to get the configuration file @type str @return name of the rope configuration file @rtype str """ configfile = None if idString in self.__clientConfigs: ropedir = self.__clientConfigs[idString] if ropedir: configfile = os.path.join(ropedir, "config.py") return configfile def __configChanged(self, idString): """ Private slot called, when the rope config file has changed. @param idString id for which to get the configuration file @type str """ self.sendJson("configChanged", {}, idString=idString) def editConfig(self, idString): """ Public slot to open the rope configuration file in an editor. @param idString id for which to get the configuration file @type str """ configfile = self.__ropeConfigFile(idString) if configfile: if os.path.exists(configfile): from eric7.QScintilla.MiniEditor import MiniEditor editor = MiniEditor(configfile) editor.show() editor.editorSaved.connect(lambda: self.__configChanged(idString)) self.__editors[idString] = editor return else: EricMessageBox.critical( self.__ui, self.tr("Configure rope"), self.tr( """The rope configuration file '{0}' does not exist.""" ).format(configfile), ) else: EricMessageBox.critical( self.__ui, self.tr("Configure rope"), self.tr( "The path of the rope configuration file could not be determined." ).format(configfile), ) def createConfig(self, idString): """ Public slot to create a new default rope configuration file. @param idString id for which to create the configuration file @type str """ configfile = self.__ropeConfigFile(idString) if configfile: defaultConfigFile = os.path.join( os.path.dirname(__file__), "default_config.py" ) try: shutil.copyfile(defaultConfigFile, configfile) except OSError as err: EricMessageBox.critical( self.__ui, self.tr("Configure Rope"), self.tr( "<p>A new Rope default configuration could not be written to" " the file '{0}'.</p><p>Reason: {1}</p>" ).format(configfile, str(err)), ) else: EricMessageBox.critical( self.__ui, self.tr("Configure rope"), self.tr( "The path of the rope configuration file could not be determined." ).format(configfile), ) def requestCompletions(self, editor, context, acText): """ Public method to request a list of possible completions. @param editor reference to the editor object, that called this method @type Editor @param context flag indicating to autocomplete a context @type bool @param acText text to be completed @type str """ if not self.__plugin.getPreferences("CodeAssistEnabled"): return idString = self.__idString(editor) if not idString: return filename = editor.getFileName() line, index = editor.getCursorPosition() source = editor.text() offset = len("".join(source.splitlines(True)[:line])) + index maxfixes = self.__plugin.getPreferences("MaxFixes") self.__ensureActive(idString) self.sendJson( "getCompletions", { "FileName": filename, "Source": source, "Offset": offset, "MaxFixes": maxfixes, "CompletionText": acText, "SysPath": sys.path, }, idString=idString, ) def __processCompletionsResult(self, result): """ Private method to process the completions sent by the client. @param result dictionary containing the result sent by the client @type dict """ names = [] for completion in result["Completions"]: name = completion["Name"] name += CodeAssistServer.PictureIDs.get(completion["CompletionType"], "") names.append(name) if "Error" not in result: editor = self.__vm.getOpenEditor(result["FileName"]) if editor is not None: editor.completionsListReady(names, result["CompletionText"]) def getCallTips(self, editor, pos, commas): """ Public method to calculate calltips. @param editor reference to the editor object, that called this method @type Editor @param pos position in the text for the calltip @type int @param commas minimum number of commas contained in the calltip @type int @return list of possible calltips @rtype list of str """ if not self.__plugin.getPreferences("CodeAssistCalltipsEnabled"): return [] # reset the calltips buffer self.__calltips = None idString = self.__idString(editor) if not idString: return [] filename = editor.getFileName() source = editor.text() line, index = editor.lineIndexFromPosition(pos) offset = len("".join(source.splitlines(True)[:line])) + index maxfixes = self.__plugin.getPreferences("CalltipsMaxFixes") self.__ensureActive(idString) self.sendJson( "getCallTips", { "FileName": filename, "Source": source, "Offset": offset, "MaxFixes": maxfixes, "SysPath": sys.path, }, idString=idString, ) # emulate the synchronous behaviour timer = QTimer() timer.setSingleShot(True) timer.start(5000) # 5s timeout while self.__calltips is None and timer.isActive(): QCoreApplication.processEvents() return [] if self.__calltips is None else self.__calltips def __processCallTipsResult(self, result): """ Private method to process the calltips sent by the client. @param result dictionary containing the result sent by the client @type dict """ if "Error" in result: self.__calltips = [] else: self.__calltips = result["CallTips"] def reportChanged(self, filename, oldSource): """ Public slot to report some changed sources. @param filename file name of the changed source @type str @param oldSource source code before the change @type str """ editor = self.__vm.getOpenEditor(filename) if editor is not None: idString = self.__idString(editor) if idString: self.__ensureActive(idString) self.sendJson( "reportChanged", { "FileName": filename, "OldSource": oldSource, }, idString=idString, ) def requestCodeDocumentation(self, editor): """ Public method to request source code documentation for the given editor. @param editor reference to the editor to get source code documentation for @type Editor """ if self.__documentationViewer is None: return idString = self.__idString(editor) if not idString: language = editor.getLanguage() warning = self.tr("Language <b>{0}</b> is not supported.").format(language) self.__documentationViewer.documentationReady(warning, isWarning=True) return filename = editor.getFileName() source = editor.text() line, index = editor.getCursorPosition() offset = len("".join(source.splitlines(True)[:line])) + index maxfixes = self.__plugin.getPreferences("CalltipsMaxFixes") offset = editor.positionBefore(offset) if editor.charAt(offset) == "(": offset = editor.positionBefore(offset) self.__ensureActive(idString) self.sendJson( "getDocumentation", { "FileName": filename, "Source": source, "Offset": offset, "MaxFixes": maxfixes, "SysPath": sys.path, }, idString=idString, ) def __processDocumentationResult(self, result): """ Private method to process the documentation sent by the client. @param result dictionary containing the result sent by the client @type dict with keys 'name', 'argspec', 'note', 'docstring', 'typ' """ if self.__documentationViewer is None: return docu = None if "Error" not in result: documentationDict = result["DocumentationDict"] if documentationDict: if "module" in documentationDict: if documentationDict["module"]: documentationDict["note"] = self.tr( "Present in <i>{0}</i> module" ).format(documentationDict["module"]) del documentationDict["module"] if "typ" in documentationDict: if documentationDict["typ"] not in self.__typeMapping: del documentationDict["typ"] else: documentationDict["typ"] = self.__typeMapping[ documentationDict["typ"] ] if "note" not in documentationDict: documentationDict["note"] = "" docu = documentationDict if docu is None: msg = self.tr("No documentation available.") self.__documentationViewer.documentationReady(msg, isDocWarning=True) else: self.__documentationViewer.documentationReady(docu) def gotoDefinition(self, editor): """ Public slot to find the definition for the word at the cursor position and go to it. Note: This is executed upon a mouse click sequence. @param editor reference to the calling editor @type Editor """ if not self.__plugin.getPreferences("MouseClickEnabled"): return idString = self.__idString(editor) if not idString: return filename = editor.getFileName() source = editor.text() line, index = editor.getCursorPosition() offset = len("".join(source.splitlines(True)[:line])) + index self.__ensureActive(idString) euuid = str(uuid.uuid4()) self.__editors[euuid] = editor self.sendJson( "gotoDefinition", { "FileName": filename, "Offset": offset, "Source": source, "SysPath": sys.path, "Uuid": euuid, }, idString=idString, ) def __gotoDefinitionResult(self, result): """ Private method to handle the "Goto Definition" result sent by the client. @param result dictionary containing the result data @type dict """ if "Error" not in result: # ignore errors silently location = result.get("Location") euuid = result["Uuid"] if location: try: editor = self.__editors[euuid] except KeyError: editor = None if ( editor is not None and editor.getFileName() == location["ModulePath"] and editor.getCursorPosition()[0] + 1 == location["Line"] ): # this was a click onto the definition line # -> get references idString = self.__idString(editor) filename = editor.getFileName() source = editor.text() line, index = editor.getCursorPosition() offset = len("".join(source.splitlines(True)[:line])) + index self.sendJson( "gotoReferences", { "FileName": filename, "Offset": offset, "Line": line + 1, "SysPath": sys.path, "Uuid": euuid, }, idString=idString, ) return self.__vm.openSourceFile( location["ModulePath"], location["Line"], addNext=True ) else: ericApp().getObject("UserInterface").statusBar().showMessage( self.tr("Code Assist: No definition found"), 5000 ) with contextlib.suppress(KeyError): del self.__editors[euuid] def __processGotoReferencesResult(self, result): """ Private method callback for the goto references result. @param result dictionary containing the result data @type dict """ with contextlib.suppress(ImportError): from eric7.QScintilla.Editor import ReferenceItem if "Error" not in result: # ignore errors silently references = result["GotoReferencesList"] euuid = result["Uuid"] if references: try: editor = self.__editors[euuid] except KeyError: editor = None if editor is not None: referenceItemsList = [ ReferenceItem( modulePath=ref["ModulePath"], codeLine=ref["Code"], line=ref["Line"], column=0, ) for ref in references ] editor.gotoReferenceHandler(referenceItemsList) with contextlib.suppress(KeyError): del self.__editors[euuid] ####################################################################### ## Methods below handle the network connection ####################################################################### def handleCall(self, method, params): """ Public method to handle a method call from the client. @param method requested method name @type str @param params dictionary with method specific parameters @type dict """ self.__methodMapping[method](params) def __processClientException(self, params): """ Private method to handle exceptions of the refactoring client. @param params dictionary containing the exception data @type dict """ if params["ExceptionType"] == "ProtocolError": self.__ui.appendToStderr( self.tr( "The data received from the code assist" " server could not be decoded. Please report" " this issue with the received data to the" " eric bugs email address.\n" "Error: {0}\n" "Data:\n{1}\n" ).format(params["ExceptionValue"], params["ProtocolData"]) ) else: self.__ui.appendToStderr( self.tr( "An exception happened in the code assist" " client. Please report it to the eric bugs" " email address.\n" "Exception: {0}\n" "Value: {1}\n" "Traceback: {2}\n" ).format( params["ExceptionType"], params["ExceptionValue"], params["Traceback"], ) ) def __startCodeAssistClient(self, interpreter, idString, clientEnv): """ Private method to start the code assist client with the given interpreter. @param interpreter interpreter to be used for the code assist client @type str @param idString id of the client to be started @type str @param clientEnv dictionary with environment variables to run the interpreter with @type dict @return flag indicating a successful start of the client @rtype bool """ ok = False if interpreter: if idString == CodeAssistServer.IdProject: configDir = self.__e5project.getProjectPath() else: configDir = os.path.join(Globals.getConfigDir(), "rope", idString) if not os.path.exists(configDir): os.makedirs(configDir) client = os.path.join(os.path.dirname(__file__), "CodeAssistClient.py") ok, exitCode = self.startClient( interpreter, client, clientArgs=[configDir, Globals.getPythonLibraryDirectory()], idString=idString, environment=clientEnv, ) if not ok: if exitCode == 42: self.__ui.appendToStderr( "CodeAssistServer: " + self.tr("The rope refactoring library is not installed.\n") ) else: self.__ui.appendToStderr( "CodeAssistServer: " + self.tr( "'{0}' is not supported because the configured" " interpreter could not be started.\n" ).format(idString) ) else: self.__ui.appendToStderr( "CodeAssistServer: " + self.tr( "'{0}' is not supported because no suitable interpreter is" " configured.\n" ).format(idString) ) return ok def __ensureActive(self, idString): """ Private method to ensure, that the requested client is active. A non-active client will be started. @param idString id of the client to be checked @type str @return flag indicating an active client @rtype bool """ ok = idString in self.connectionNames() if not ok: # client is not running if idString == CodeAssistServer.IdProject: interpreter, clientEnv = self.__interpreterForProject() else: interpreter = "" venvName = "" clientEnv = os.environ.copy() if "PATH" in clientEnv: clientEnv["PATH"] = self.__ui.getOriginalPathString() venvManager = ericApp().getObject("VirtualEnvManager") if idString == "Python3": venvName = Preferences.getDebugger("Python3VirtualEnv") if not venvName: venvName, _ = venvManager.getDefaultEnvironment() if venvName: interpreter = venvManager.getVirtualenvInterpreter(venvName) execPath = venvManager.getVirtualenvExecPath(venvName) # build a suitable environment if execPath: if "PATH" in clientEnv: clientEnv["PATH"] = os.pathsep.join( [execPath, clientEnv["PATH"]] ) else: clientEnv["PATH"] = execPath if interpreter: ok = self.__startCodeAssistClient(interpreter, idString, clientEnv) else: ok = False return ok def __interpreterForProject(self): """ Private method to determine the interpreter for the current project and the environment to run it. @return tuple containing the interpreter of the current project and the environment variables @rtype tuple of (str, dict) """ projectLanguage = self.__e5project.getProjectLanguage() interpreter = "" clientEnv = os.environ.copy() if "PATH" in clientEnv: clientEnv["PATH"] = self.__ui.getOriginalPathString() if projectLanguage.startswith("Python"): venvManager = ericApp().getObject("VirtualEnvManager") # get virtual environment from project first venvName = self.__e5project.getDebugProperty("VIRTUALENV") if not venvName: # get it from debugger settings next if projectLanguage == "Python3": venvName = Preferences.getDebugger("Python3VirtualEnv") if not venvName: venvName, _ = venvManager.getDefaultEnvironment() else: venvName = "" if venvName: interpreter = venvManager.getVirtualenvInterpreter(venvName) if not interpreter: interpreter = Globals.getPythonExecutable() execPath = venvManager.getVirtualenvExecPath(venvName) # build a suitable environment if execPath: if "PATH" in clientEnv: clientEnv["PATH"] = os.pathsep.join( [execPath, clientEnv["PATH"]] ) else: clientEnv["PATH"] = execPath return interpreter, clientEnv @pyqtSlot() def handleNewConnection(self): """ Public slot for new incoming connections from a client. """ super().handleNewConnection() self.__updateEditorLanguageMapping() self.__getConfigs() def activate(self): """ Public method to activate the code assist server. """ self.__documentationViewer = self.__ui.documentationViewer() if self.__documentationViewer is not None: self.__documentationViewer.registerProvider( "rope", self.tr("Rope"), self.requestCodeDocumentation, self.isSupportedLanguage, ) self.__e5project.projectClosed.connect(self.__projectClosed) def deactivate(self): """ Public method to deactivate the code assist server. """ """ Public method to shut down the code assist server. """ if self.__documentationViewer is not None: self.__documentationViewer.unregisterProvider("rope") for idString in self.connectionNames(): self.sendJson("closeProject", {}, flush=True, idString=idString) with contextlib.suppress(TypeError): self.__e5project.projectClosed.disconnect(self.__projectClosed) self.stopAllClients() @pyqtSlot() def __projectClosed(self): """ Private slot to handle the projectClosed signal. """ self.stopClient(idString=CodeAssistServer.IdProject)