--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/eric7/JediInterface/JediServer.py Sat Sep 11 19:47:02 2021 +0200 @@ -0,0 +1,735 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2015 - 2021 Detlev Offenbach <detlev@die-offenbachs.de> +# + +""" +Module implementing the autocompletion interface to jedi. +""" + +import contextlib +import os +import uuid + +from PyQt6.QtCore import pyqtSlot, QCoreApplication, QTimer + +from EricWidgets.EricApplication import ericApp + +from EricNetwork.EricJsonServer import EricJsonServer + +from QScintilla.Editor import Editor + +import Preferences +import Globals + + +class JediServer(EricJsonServer): + """ + Class implementing the interface to the jedi library. + """ + IdProject = "Project" + + PictureIDs = { + "class": "?{0}".format(Editor.ClassID), + "_class": "?{0}".format(Editor.ClassProtectedID), + "__class": "?{0}".format(Editor.ClassPrivateID), + "instance": "?{0}".format(Editor.ClassID), + "_instance": "?{0}".format(Editor.ClassProtectedID), + "__instance": "?{0}".format(Editor.ClassPrivateID), + "function": "?{0}".format(Editor.MethodID), + "_function": "?{0}".format(Editor.MethodProtectedID), + "__function": "?{0}".format(Editor.MethodPrivateID), + "module": "?{0}".format(Editor.ModuleID), + "_module": "?{0}".format(Editor.ModuleID), + "__module": "?{0}".format(Editor.ModuleID), + "param": "?{0}".format(Editor.AttributeID), + "_param": "?{0}".format(Editor.AttributeProtectedID), + "__param": "?{0}".format(Editor.AttributePrivateID), + "statement": "?{0}".format(Editor.AttributeID), + "_statement": "?{0}".format(Editor.AttributeProtectedID), + "__statement": "?{0}".format(Editor.AttributePrivateID), + "import": "", + "None": "", + } + + def __init__(self, viewManager, project, ui): + """ + Constructor + + @param viewManager reference to the viewmanager object + @type ViewManager + @param project reference to the project object + @type Project + @param ui reference to the user interface + @type UserInterface + """ + super().__init__( + "JediServer", multiplex=True, parent=ui) + + self.__ui = ui + self.__vm = viewManager + self.__ericProject = project + + self.__editorLanguageMapping = {} + + self.__documentationViewer = None + + # attributes to store the resuls of the client side + self.__completions = None + self.__calltips = None + + self.__methodMapping = { + "CompletionsResult": self.__processCompletionsResult, + "CallTipsResult": self.__processCallTipsResult, + "DocumentationResult": self.__processDocumentationResult, + "HoverHelpResult": self.__processHoverHelpResult, + "GotoDefinitionResult": self.__processGotoDefinitionResult, + "GotoReferencesResult": self.__processGotoReferencesResult, + + "ClientException": self.__processClientException, + } + + # temporary store for editor references indexed by Uuid + self.__editors = {} + + # Python 3 + self.__ensureActive("Python3") + + def __updateEditorLanguageMapping(self): + """ + Private method to update the editor language to connection mapping. + """ + self.__editorLanguageMapping = {} + for name in self.connectionNames(): + if name == "Python3": + self.__editorLanguageMapping.update({ + "Python3": "Python3", + "MicroPython": "Python3", + "Pygments|Python": "Python3", + "Pygments|Python 2.x": "Python3", + "Cython": "Python3", + }) + + def isSupportedLanguage(self, language): + """ + Public method to check, if the given language is supported. + + @param language editor programming language to check + @type str + @return flag indicating the support status + @rtype bool + """ + return language in self.__editorLanguageMapping + + def __idString(self, editor): + """ + Private method to determine the ID string for the back-end. + + @param editor reference to the editor to determine the ID string for + @type Editor + @return ID string + @rtype str + """ + idString = "" + + language = editor.getLanguage() + if ( + self.__ericProject.isOpen() and + self.__ericProject.getProjectLanguage() == language + ): + filename = editor.getFileName() + if self.__ericProject.isProjectSource(filename): + idString = JediServer.IdProject + + if not idString and language in self.__editorLanguageMapping: + idString = self.__editorLanguageMapping[language] + + return idString + + def __prepareData(self, editor): + """ + Private method to gather data about current cursor position. + + @param editor reference to the editor object, that called this method + @type Editor + @return tuple of filename, line, index, source + @rtype tuple (str, int, int, str) + """ + filename = editor.getFileName() + line, index = editor.getCursorPosition() + line += 1 # jedi line numbers are 1 based + source = editor.text() + return filename, line, index, source + + def requestCompletions(self, editor, context, acText): + """ + Public method to request a list of possible completions. + + @param editor reference to the editor object, that called this method + @type Editor + @param context flag indicating to autocomplete a context + @type bool + @param acText text to be completed + @type str + """ + if not Preferences.getJedi("JediCompletionsEnabled"): + return + + idString = self.__idString(editor) + if not idString: + return + + filename, line, index, source = self.__prepareData(editor) + fuzzy = Preferences.getJedi("JediFuzzyCompletionsEnabled") + + self.__ensureActive(idString) + + self.sendJson("getCompletions", { + "FileName": filename, + "Source": source, + "Line": line, + "Index": index, + "Fuzzy": fuzzy, + "CompletionText": acText, + }, idString=idString) + + def __processCompletionsResult(self, result): + """ + Private method to process the completions sent by the client. + + @param result dictionary containing the result sent by the client + @type dict + """ + names = [] + for completion in result["Completions"]: + name = completion['Name'] + context = completion['FullName'] + if context.endswith(".{0}".format(name)): + context = context.rsplit(".", 1)[0] + if context: + name = "{0} ({1})".format(name, context) + + name += JediServer.PictureIDs.get(completion['CompletionType'], '') + names.append(name) + + if "Error" not in result: + editor = self.__vm.getOpenEditor(result["FileName"]) + if editor is not None: + editor.completionsListReady(names, + result["CompletionText"]) + + def getCallTips(self, editor, pos, commas): + """ + Public method to calculate calltips. + + @param editor reference to the editor object, that called this method + @type Editor + @param pos position in the text for the calltip + @type int + @param commas minimum number of commas contained in the calltip + @type int + @return list of possible calltips + @rtype list of str + """ + if not Preferences.getJedi("JediCalltipsEnabled"): + return [] + + # reset the calltips buffer + self.__calltips = None + + idString = self.__idString(editor) + if not idString: + return [] + + filename, line, index, source = self.__prepareData(editor) + + self.__ensureActive(idString) + self.sendJson("getCallTips", { + "FileName": filename, + "Source": source, + "Line": line, + "Index": index, + }, idString=idString) + + # emulate the synchronous behaviour + timer = QTimer() + timer.setSingleShot(True) + timer.start(5000) # 5s timeout + while self.__calltips is None and timer.isActive(): + QCoreApplication.processEvents() + + return [] if self.__calltips is None else self.__calltips + + def __processCallTipsResult(self, result): + """ + Private method to process the calltips sent by the client. + + @param result dictionary containing the result sent by the client + @type dict + """ + if "Error" in result: + self.__calltips = [] + else: + self.__calltips = result["CallTips"] + + def requestCodeDocumentation(self, editor): + """ + Public method to request source code documentation for the given + editor. + + @param editor reference to the editor to get source code documentation + for + @type Editor + """ + if self.__documentationViewer is None: + return + + idString = self.__idString(editor) + + if not idString: + language = editor.getLanguage() + warning = ( + self.tr("Language <b>{0}</b> is not supported.") + .format(language) + ) + self.__documentationViewer.documentationReady( + warning, isWarning=True) + return + + filename, line, index, source = self.__prepareData(editor) + sourceLines = source.splitlines() + # Correct index if cursor is standing after an opening bracket + if line > 0 and index > 0 and sourceLines[line - 1][index - 1] == '(': + index -= 1 + + self.__ensureActive(idString) + self.sendJson("getDocumentation", { + "FileName": filename, + "Source": source, + "Line": line, + "Index": index, + }, idString=idString) + + def __processDocumentationResult(self, result): + """ + Private method to process the documentation sent by the client. + + @param result dictionary containing the result sent by the client + @type dict with keys 'name', 'module', 'argspec', 'docstring' + """ + if self.__documentationViewer is None: + return + + docu = None + + if "Error" not in result: + docu = result["DocumentationDict"] + docu["note"] = ( + self.tr("Present in <i>{0}</i> module") + .format(docu["module"])) + + if docu is None: + msg = self.tr("No documentation available.") + self.__documentationViewer.documentationReady( + msg, isDocWarning=True) + else: + self.__documentationViewer.documentationReady(docu) + + def gotoDefinition(self, editor): + """ + Public slot to find the definition for the word at the cursor position + and go to it. + + Note: This is executed upon a mouse click sequence. + + @param editor reference to the calling editor + @type Editor + """ + if not Preferences.getJedi("MouseClickEnabled"): + return + + idString = self.__idString(editor) + if not idString: + return + + filename, line, index, source = self.__prepareData(editor) + + self.__ensureActive(idString) + + euuid = str(uuid.uuid4()) + self.__editors[euuid] = editor + + self.sendJson("gotoDefinition", { + "FileName": filename, + "Source": source, + "Line": line, + "Index": index, + "Uuid": euuid, + }, idString=idString) + + def __processGotoDefinitionResult(self, result): + """ + Private method callback for the goto definition result. + + @param result dictionary containing the result data + @type dict + """ + euuid = result["Uuid"] + if "Error" not in result: + # ignore errors silently + location = result["GotoDefinitionDict"] + if location: + try: + editor = self.__editors[euuid] + except KeyError: + editor = None + + if ( + editor is not None and + editor.getFileName() == location["ModulePath"] and + editor.getCursorPosition()[0] + 1 == location["Line"] + ): + # this was a click onto the definition line + # -> get references + idString = self.__idString(editor) + filename, line, index, source = self.__prepareData(editor) + self.sendJson("gotoReferences", { + "FileName": filename, + "Source": source, + "Line": line, + "Index": index, + "Uuid": euuid, + }, idString=idString) + return + + self.__vm.openSourceFile(location["ModulePath"], + location["Line"], + addNext=True) + else: + ericApp().getObject("UserInterface").statusBar().showMessage( + self.tr('Jedi: No definition found'), 5000) + + with contextlib.suppress(KeyError): + del self.__editors[euuid] + + def __processGotoReferencesResult(self, result): + """ + Private method callback for the goto references result. + + @param result dictionary containing the result data + @type dict + """ + euuid = result["Uuid"] + with contextlib.suppress(ImportError): + from QScintilla.Editor import ReferenceItem + + if "Error" not in result: + # ignore errors silently + references = result["GotoReferencesList"] + if references: + try: + editor = self.__editors[euuid] + except KeyError: + editor = None + if editor is not None: + referenceItemsList = [ + ReferenceItem( + modulePath=ref["ModulePath"], + codeLine=ref["Code"], + line=ref["Line"], + column=ref["Column"], + ) for ref in references + ] + editor.gotoReferenceHandler(referenceItemsList) + + with contextlib.suppress(KeyError): + del self.__editors[euuid] + + def hoverHelp(self, editor, line, index): + """ + Public method to initiate the display of mouse hover help. + + @param editor reference to the calling editor + @type Editor + @param line line number (zero based) + @type int + @param index index within the line (zero based) + @type int + """ + idString = self.__idString(editor) + if not idString: + return + + filename = editor.getFileName() + line += 1 # jedi line numbers are 1 based + source = editor.text() + + self.__ensureActive(idString) + + euuid = str(uuid.uuid4()) + self.__editors[euuid] = editor + + self.sendJson("hoverHelp", { + "FileName": filename, + "Source": source, + "Line": line, + "Index": index, + "Uuid": euuid, + }, idString=idString) + + def __processHoverHelpResult(self, result): + """ + Private method callback for the goto definition result. + + @param result dictionary containing the result data + @type dict + """ + euuid = result["Uuid"] + if "Error" not in result: + # ignore errors silently + helpText = result["HoverHelp"] + if helpText: + with contextlib.suppress(KeyError): + self.__editors[euuid].showMouseHoverHelpData( + result["Line"] - 1, + result["Index"], + helpText + ) + else: + ericApp().getObject("UserInterface").statusBar().showMessage( + self.tr('Jedi: No mouse hover help found'), 5000) + + with contextlib.suppress(KeyError): + del self.__editors[euuid] + + ####################################################################### + ## Methods below handle the network connection + ####################################################################### + + def handleCall(self, method, params): + """ + Public method to handle a method call from the client. + + @param method requested method name + @type str + @param params dictionary with method specific parameters + @type dict + """ + self.__methodMapping[method](params) + + def __processClientException(self, params): + """ + Private method to handle exceptions of the refactoring client. + + @param params dictionary containing the exception data + @type dict + """ + if params["ExceptionType"] == "ProtocolError": + self.__ui.appendToStderr( + self.tr("The data received from the Jedi server could not be" + " decoded. Please report this issue with the received" + " data to the eric bugs email address.\n" + "Error: {0}\n" + "Data:\n{1}\n").format( + params["ExceptionValue"], + params["ProtocolData"])) + else: + self.__ui.appendToStderr( + self.tr("An exception happened in the Jedi client. Please" + " report it to the eric bugs email address.\n" + "Exception: {0}\n" + "Value: {1}\n" + "Traceback: {2}\n").format( + params["ExceptionType"], + params["ExceptionValue"], + params["Traceback"])) + + def __startJediClient(self, interpreter, idString, clientEnv): + """ + Private method to start the Jedi client with the given interpreter. + + @param interpreter interpreter to be used for the Jedi client + @type str + @param idString id of the client to be started + @type str + @param clientEnv dictionary with environment variables to run the + interpreter with + @type dict + @return flag indicating a successful start of the client + @rtype bool + """ + ok = False + + if interpreter: + client = os.path.join(os.path.dirname(__file__), + "JediClient.py") + ok, exitCode = self.startClient( + interpreter, client, + [Globals.getPythonLibraryDirectory()], + idString=idString, environment=clientEnv) + if not ok: + if exitCode == 42: + self.__ui.appendToStderr("JediServer: " + self.tr( + "The jedi and/or parso library is not installed.\n" + )) + else: + self.__ui.appendToStderr("JediServer: " + self.tr( + "'{0}' is not supported because the configured" + " interpreter could not be started.\n" + ).format(idString)) + else: + self.__ui.appendToStderr("JediServer: " + self.tr( + "'{0}' is not supported because no suitable interpreter is" + " configured.\n" + ).format(idString)) + + return ok + + def __ensureActive(self, idString): + """ + Private method to ensure, that the requested client is active. + + A non-active client will be started. + + @param idString id of the client to be checked + @type str + @return flag indicating an active client + @rtype bool + """ + ok = idString in self.connectionNames() + if not ok: + # client is not running + if idString == JediServer.IdProject: + interpreter, clientEnv = self.__interpreterForProject() + else: + interpreter = "" + venvName = "" + clientEnv = os.environ.copy() + if "PATH" in clientEnv: + clientEnv["PATH"] = self.__ui.getOriginalPathString() + # new code using virtual environments + venvManager = ericApp().getObject("VirtualEnvManager") + if idString == "Python3": + venvName = Preferences.getDebugger("Python3VirtualEnv") + if not venvName: + venvName, _ = venvManager.getDefaultEnvironment() + if venvName: + interpreter = venvManager.getVirtualenvInterpreter( + venvName) + execPath = venvManager.getVirtualenvExecPath(venvName) + + # build a suitable environment + if execPath: + if "PATH" in clientEnv: + clientEnv["PATH"] = os.pathsep.join( + [execPath, clientEnv["PATH"]]) + else: + clientEnv["PATH"] = execPath + if interpreter: + ok = self.__startJediClient(interpreter, idString, clientEnv) + else: + ok = False + return ok + + def __interpreterForProject(self): + """ + Private method to determine the interpreter for the current project and + the environment to run it. + + @return tuple containing the interpreter of the current project and the + environment variables + @rtype tuple of (str, dict) + """ + projectLanguage = self.__ericProject.getProjectLanguage() + interpreter = "" + clientEnv = os.environ.copy() + if "PATH" in clientEnv: + clientEnv["PATH"] = self.__ui.getOriginalPathString() + + if (projectLanguage.startswith("Python") or + projectLanguage == "MicroPython"): + # new code using virtual environments + venvManager = ericApp().getObject("VirtualEnvManager") + + # get virtual environment from project first + venvName = self.__ericProject.getDebugProperty("VIRTUALENV") + if not venvName: + # get it from debugger settings next + if projectLanguage in ("Python3", "MicroPython", "Cython"): + venvName = Preferences.getDebugger("Python3VirtualEnv") + if not venvName: + venvName, _ = venvManager.getDefaultEnvironment() + else: + venvName = "" + if venvName: + interpreter = venvManager.getVirtualenvInterpreter( + venvName) + execPath = venvManager.getVirtualenvExecPath(venvName) + + # build a suitable environment + if execPath: + if "PATH" in clientEnv: + clientEnv["PATH"] = os.pathsep.join( + [execPath, clientEnv["PATH"]]) + else: + clientEnv["PATH"] = execPath + + return interpreter, clientEnv + + @pyqtSlot() + def handleNewConnection(self): + """ + Public slot for new incoming connections from a client. + """ + super().handleNewConnection() + + self.__updateEditorLanguageMapping() + + def activate(self): + """ + Public method to activate the Jedi server. + """ + self.__documentationViewer = self.__ui.documentationViewer() + if self.__documentationViewer is not None: + self.__documentationViewer.registerProvider( + "jedi", self.tr("Jedi"), self.requestCodeDocumentation, + self.isSupportedLanguage) + + self.__ericProject.projectOpened.connect(self.__projectOpened) + self.__ericProject.projectClosed.connect(self.__projectClosed) + + def deactivate(self): + """ + Public method to deactivate the code assist server. + """ + """ + Public method to shut down the code assist server. + """ + if self.__documentationViewer is not None: + self.__documentationViewer.unregisterProvider("jedi") + + with contextlib.suppress(TypeError): + self.__ericProject.projectOpened.disconnect(self.__projectOpened) + self.__ericProject.projectClosed.disconnect(self.__projectClosed) + + self.stopAllClients() + + @pyqtSlot() + def __projectOpened(self): + """ + Private slot to handle the projectOpened signal. + """ + self.__ensureActive(JediServer.IdProject) + self.sendJson("openProject", { + "ProjectPath": self.__ericProject.getProjectPath(), + }, idString=JediServer.IdProject) + + @pyqtSlot() + def __projectClosed(self): + """ + Private slot to handle the projectClosed signal. + """ + self.__ensureActive(JediServer.IdProject) + self.sendJson("closeProject", {}, idString=JediServer.IdProject) + + self.stopClient(idString=JediServer.IdProject)