Sun, 25 Aug 2019 12:03:54 +0200
Added support for MicroPython (treated like Python3).
# -*- coding: utf-8 -*- # Copyright (c) 2008 - 2019 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the autocompletion interface to rope. """ from __future__ import unicode_literals import os import sys from PyQt5.QtCore import pyqtSlot, QCoreApplication, QTimer from E5Gui.E5Application import e5App from E5Gui import E5MessageBox from QScintilla.Editor import Editor from .JsonServer import JsonServer import Globals import Preferences import Utilities class CodeAssistServer(JsonServer): """ Class implementing the autocompletion interface to rope. """ IdProject = "Project" PictureIDs = { "class": "?{0}".format(Editor.ClassID), "_class": "?{0}".format(Editor.ClassProtectedID), "__class": "?{0}".format(Editor.ClassPrivateID), "instance": "?{0}".format(Editor.ClassID), "_instance": "?{0}".format(Editor.ClassProtectedID), "__instance": "?{0}".format(Editor.ClassPrivateID), "function": "?{0}".format(Editor.MethodID), "_function": "?{0}".format(Editor.MethodProtectedID), "__function": "?{0}".format(Editor.MethodPrivateID), "module": "?{0}".format(Editor.ModuleID), "_module": "?{0}".format(Editor.ModuleID), "__module": "?{0}".format(Editor.ModuleID), "None": "", } def __init__(self, plugin, parent=None): """ Constructor @param plugin reference to the plugin object @type RefactoringRopePlugin @param parent parent @type QObject """ super(CodeAssistServer, self).__init__( "CodeAssistServer", multiplex=True, parent=parent) self.__plugin = plugin self.__ui = parent self.__vm = e5App().getObject("ViewManager") self.__e5project = e5App().getObject("Project") self.__editorLanguageMapping = {} self.__clientConfigs = {} self.__editors = {} self.__asyncCompletions = False self.__documentationViewer = None # attributes to store the resuls of the client side self.__completions = None self.__calltips = None self.__methodMapping = { "Config": self.__setConfig, "CompletionsResult": self.__processCompletionsResult, "CallTipsResult": self.__processCallTipsResult, "DocumentationResult": self.__processDocumentationResult, "GotoDefinitionResult": self.__gotoDefinitionResult, "ClientException": self.__processClientException, } self.__typeMapping = { "staticmethod": self.tr("static method"), "classmethod": self.tr("class method"), "method": self.tr("method"), "function": self.tr("function"), "class": self.tr("class"), "module": self.tr("module"), "package": self.tr("package"), "object": self.tr("object"), "<unknown>": self.tr("not known"), } # Python 2 self.__ensureActive("Python2") # Python 3 self.__ensureActive("Python3") def setAsyncCompletions(self, asynchronous): """ Public method to set the asynchronous completions flag. @param asynchronous flag indicating asynchronous completions @type bool """ self.__asyncCompletions = asynchronous def __updateEditorLanguageMapping(self): """ Private method to update the editor language to connection mapping. """ self.__editorLanguageMapping = {} for name in self.connectionNames(): if name == "Python2": self.__editorLanguageMapping.update({ "Python": "Python2", "Python2": "Python2", "Pygments|Python": "Python2", }) elif name == "Python3": self.__editorLanguageMapping.update({ "Python3": "Python3", "Pygments|Python 3": "Python3", "MicroPython": "Python3", }) def isSupportedLanguage(self, language): """ Public method to check, if the given language is supported. @param language editor programming language to check @type str @return flag indicating the support status @rtype bool """ return language in self.__editorLanguageMapping def __idString(self, editor): """ Private method to determine the ID string for the back-end. @param editor reference to the editor to determine the ID string for @type QScintilla.Editor @return ID string @rtype str """ idString = "" language = editor.getLanguage() if self.__e5project.isOpen() and \ self.__e5project.getProjectLanguage() == language: filename = editor.getFileName() if self.__e5project.isProjectSource(filename): idString = CodeAssistServer.IdProject if not idString and language in self.__editorLanguageMapping: idString = self.__editorLanguageMapping[language] return idString def __getConfigs(self): """ Private method to get the configurations of all connected clients. """ for idString in self.connectionNames(): self.sendJson("getConfig", {}, idString=idString) def __setConfig(self, params): """ Private method to set the rope client configuration data. @param params dictionary containing the configuration data @type dict """ idString = params["Id"] ropeFolder = params["RopeFolderName"] self.__clientConfigs[idString] = ropeFolder def __ropeConfigFile(self, idString): """ Private method to get the name of the rope configuration file. @param idString id for which to get the configuration file @type str @return name of the rope configuration file @rtype str """ configfile = None if idString in self.__clientConfigs: ropedir = self.__clientConfigs[idString] if ropedir: configfile = os.path.join(ropedir, "config.py") if not os.path.exists(configfile): configfile = None return configfile def __configChanged(self, idString): """ Private slot called, when the rope config file has changed. @param idString id for which to get the configuration file @type str """ self.sendJson("configChanged", {}, idString=idString) def editConfig(self, idString): """ Public slot to open the rope configuration file in an editor. @param idString id for which to get the configuration file @type str """ configfile = self.__ropeConfigFile(idString) if configfile: if os.path.exists(configfile): from QScintilla.MiniEditor import MiniEditor editor = MiniEditor(configfile) editor.show() editor.editorSaved.connect( lambda: self.__configChanged(idString)) self.__editors[idString] = editor return else: E5MessageBox.critical( self.__ui, self.tr("Configure Rope"), self.tr("""The Rope configuration file '{0}' does""" """ not exist.""").format(configfile)) def getCompletions(self, editor, context): """ Public method to calculate the possible completions. Note: This is the synchronous variant for eric6 before 17.11. @param editor reference to the editor object, that called this method @type QScintilla.Editor.Editor @param context flag indicating to autocomplete a context @type bool @return list of possible completions @rtype list of str """ if not self.__plugin.getPreferences("CodeAssistEnabled"): return [] # reset the completions buffer self.__completions = None if not self.__idString(editor): return [] self.requestCompletions(editor, context, "") # emulate the synchronous behaviour timer = QTimer() timer.setSingleShot(True) timer.start(5000) # 5s timeout while self.__completions is None and timer.isActive(): QCoreApplication.processEvents() return [] if self.__completions is None else self.__completions def requestCompletions(self, editor, context, acText): """ Public method to request a list of possible completions. Note: This is part of the asynchronous variant for eric6 17.11 and later. @param editor reference to the editor object, that called this method @type QScintilla.Editor.Editor @param context flag indicating to autocomplete a context @type bool @param acText text to be completed @type str """ if not self.__plugin.getPreferences("CodeAssistEnabled"): return idString = self.__idString(editor) if not idString: return filename = editor.getFileName() line, index = editor.getCursorPosition() source = editor.text() offset = len("".join(source.splitlines(True)[:line])) + index maxfixes = self.__plugin.getPreferences("MaxFixes") self.__ensureActive(idString) self.sendJson("getCompletions", { "FileName": filename, "Source": source, "Offset": offset, "MaxFixes": maxfixes, "CompletionText": acText, "SysPath": sys.path, }, idString=idString) def __processCompletionsResult(self, result): """ Private method to process the completions sent by the client. @param result dictionary containing the result sent by the client @type dict """ names = [] for completion in result["Completions"]: name = completion['Name'] name += CodeAssistServer.PictureIDs.get( completion['CompletionType'], '') names.append(name) if self.__asyncCompletions: # asynchronous variant for eric6 17.11 and later if "Error" not in result: editor = self.__vm.getOpenEditor(result["FileName"]) if editor is not None: editor.completionsListReady(names, result["CompletionText"]) else: # synchronous variant for eric6 before 17.11 if "Error" in result: self.__completions = [] else: self.__completions = result["Completions"] def getCallTips(self, editor, pos, commas): """ Public method to calculate calltips. @param editor reference to the editor object, that called this method @type QScintilla.Editor.Editor @param pos position in the text for the calltip @type int @param commas minimum number of commas contained in the calltip @type int @return list of possible calltips @rtype list of str """ if not self.__plugin.getPreferences("CodeAssistCalltipsEnabled"): return [] # reset the calltips buffer self.__calltips = None idString = self.__idString(editor) if not idString: return [] filename = editor.getFileName() source = editor.text() line, index = editor.lineIndexFromPosition(pos) offset = len("".join(source.splitlines(True)[:line])) + index maxfixes = self.__plugin.getPreferences("CalltipsMaxFixes") self.__ensureActive(idString) self.sendJson("getCallTips", { "FileName": filename, "Source": source, "Offset": offset, "MaxFixes": maxfixes, "SysPath": sys.path, }, idString=idString) # emulate the synchronous behaviour timer = QTimer() timer.setSingleShot(True) timer.start(5000) # 5s timeout while self.__calltips is None and timer.isActive(): QCoreApplication.processEvents() return [] if self.__calltips is None else self.__calltips def __processCallTipsResult(self, result): """ Private method to process the calltips sent by the client. @param result dictionary containing the result sent by the client @type dict """ if "Error" in result: self.__calltips = [] else: self.__calltips = result["CallTips"] def reportChanged(self, filename, oldSource): """ Public slot to report some changed sources. @param filename file name of the changed source @type str @param oldSource source code before the change @type str """ editor = self.__vm.getOpenEditor(filename) if editor is not None: idString = self.__idString(editor) if idString: self.__ensureActive(idString) self.sendJson("reportChanged", { "FileName": filename, "OldSource": oldSource, }, idString=idString) def requestCodeDocumentation(self, editor): """ Public method to request source code documentation for the given editor. @param editor reference to the editor to get source code documentation for @type QScintilla.Editor.Editor """ if self.__documentationViewer is None: return idString = self.__idString(editor) if not idString: language = editor.getLanguage() if Preferences.getDocuViewer("ShowInfoAsRichText"): warning = self.tr("Language <b>{0}</b> is not supported.")\ .format(language) else: warning = self.tr("Language '{0}' is not supported.")\ .format(language) self.__documentationViewer.documentationReady( warning, isWarning=True) return filename = editor.getFileName() source = editor.text() line, index = editor.getCursorPosition() offset = len("".join(source.splitlines(True)[:line])) + index maxfixes = self.__plugin.getPreferences("CalltipsMaxFixes") offset = editor.positionBefore(offset) if editor.charAt(offset) == "(": offset = editor.positionBefore(offset) self.__ensureActive(idString) self.sendJson("getDocumentation", { "FileName": filename, "Source": source, "Offset": offset, "MaxFixes": maxfixes, "SysPath": sys.path, }, idString=idString) def __processDocumentationResult(self, result): """ Private method to process the documentation sent by the client. @param result dictionary containing the result sent by the client @type dict with keys 'name', 'argspec', 'note', 'docstring', 'typ' """ if self.__documentationViewer is None: return docu = None if "Error" not in result: documentationDict = result["DocumentationDict"] if documentationDict: if "module" in documentationDict: if documentationDict["module"]: if Preferences.getDocuViewer("ShowInfoAsRichText"): documentationDict["note"] = \ self.tr("Present in <i>{0}</i> module")\ .format(documentationDict["module"]) else: documentationDict["note"] = \ self.tr("Present in '{0}' module")\ .format(documentationDict["module"]) del documentationDict["module"] if "typ" in documentationDict: if documentationDict["typ"] not in self.__typeMapping: del documentationDict["typ"] else: documentationDict["typ"] = \ self.__typeMapping[documentationDict["typ"]] if "note" not in documentationDict: documentationDict["note"] = "" docu = documentationDict if docu is None: msg = self.tr("No documentation available.") self.__documentationViewer.documentationReady( msg, isDocWarning=True) else: self.__documentationViewer.documentationReady(docu) def gotoDefinition(self, editor): """ Public slot to find the definition for the word at the cursor position and go to it. Note: This is executed upon a mouse click sequence. @param editor reference to the calling editor @type QScintilla.Editor.Editor """ if not self.__plugin.getPreferences("MouseClickEnabled"): return idString = self.__idString(editor) if not idString: return filename = editor.getFileName() source = editor.text() line, index = editor.getCursorPosition() offset = len("".join(source.splitlines(True)[:line])) + index self.__ensureActive(idString) self.sendJson("gotoDefinition", { "FileName": filename, "Offset": offset, "Source": editor.text(), "SysPath": sys.path, }, idString=idString) def __gotoDefinitionResult(self, result): """ Private method to handle the "Goto Definition" result sent by the client. @param result dictionary containing the result data @type dict """ if "Error" not in result: # ignore errors silently if "Location" in result: location = result["Location"] try: self.__vm.openSourceFile( location["ModulePath"], location["Line"], addNext=True) except TypeError: # backward compatibility; <= 17.03 self.__vm.openSourceFile( location["ModulePath"], location["Line"], next=True) else: e5App().getObject("UserInterface").statusBar().showMessage( self.tr('Code Assist: No definition found'), 5000) ####################################################################### ## Methods below handle the network connection ####################################################################### def handleCall(self, method, params): """ Public method to handle a method call from the client. @param method requested method name @type str @param params dictionary with method specific parameters @type dict """ self.__methodMapping[method](params) def __processClientException(self, params): """ Private method to handle exceptions of the refactoring client. @param params dictionary containing the exception data @type dict """ if params["ExceptionType"] == "ProtocolError": self.__ui.appendToStderr( self.tr("The data received from the code assist" " server could not be decoded. Please report" " this issue with the received data to the" " eric bugs email address.\n" "Error: {0}\n" "Data:\n{1}\n").format( params["ExceptionValue"], params["ProtocolData"])) else: self.__ui.appendToStderr( self.tr("An exception happened in the code assist" " client. Please report it to the eric bugs" " email address.\n" "Exception: {0}\n" "Value: {1}\n" "Traceback: {2}\n").format( params["ExceptionType"], params["ExceptionValue"], params["Traceback"])) def __startCodeAssistClient(self, interpreter, idString, clientEnv): """ Private method to start the code assist client with the given interpreter. @param interpreter interpreter to be used for the code assist client @type str @param idString id of the client to be started @type str @param clientEnv dictionary with environment variables to run the interpreter with @type dict @return flag indicating a successful start of the client @rtype bool """ ok = False if interpreter: if idString == CodeAssistServer.IdProject: configDir = self.__e5project.getProjectPath() else: configDir = os.path.join(Globals.getConfigDir(), "rope", idString) if not os.path.exists(configDir): os.makedirs(configDir) client = os.path.join(os.path.dirname(__file__), "CodeAssistClient.py") ok = self.startClient(interpreter, client, [configDir], idString=idString, environment=clientEnv) if not ok: self.__ui.appendToStderr(self.tr( "'{0}' is not supported because the configured interpreter" " could not be started.\n" ).format(idString)) else: self.__ui.appendToStderr(self.tr( "'{0}' is not supported because no suitable interpreter is" " configured.\n" ).format(idString)) return ok def __ensureActive(self, idString): """ Private method to ensure, that the requested client is active. A non-active client will be started. @param idString id of the client to be checked @type str @return flag indicating an active client @rtype bool """ ok = idString in self.connectionNames() if not ok: # client is not running if idString == CodeAssistServer.IdProject: interpreter, clientEnv = self.__interpreterForProject() else: interpreter = "" venvName = "" clientEnv = os.environ.copy() if "PATH" in clientEnv: try: clientEnv["PATH"] = self.__ui.getOriginalPathString() except AttributeError: # ignore for eric6 < 18.12 pass try: # new code using virtual environments venvManager = e5App().getObject("VirtualEnvManager") if idString == "Python2": # Python 2 venvName = Preferences.getDebugger("Python2VirtualEnv") if not venvName and sys.version_info[0] == 2: try: venvName, _ = \ venvManager.getDefaultEnvironment() except AttributeError: # ignore for eric6 < 18.10 pass elif idString == "Python3": # Python 3 venvName = Preferences.getDebugger("Python3VirtualEnv") if not venvName and sys.version_info[0] == 3: try: venvName, _ = \ venvManager.getDefaultEnvironment() except AttributeError: # ignore for eric6 < 18.10 pass if venvName: interpreter = \ venvManager.getVirtualenvInterpreter(venvName) try: execPath = \ venvManager.getVirtualenvExecPath(venvName) except AttributeError: # eric6 < 18.12 execPath = "" # build a suitable environment if execPath: if "PATH" in clientEnv: clientEnv["PATH"] = os.pathsep.join( [execPath, clientEnv["PATH"]]) else: clientEnv["PATH"] = execPath except KeyError: # backward compatibility (eric <18.07) if idString == "Python2": # Python 2 interpreter = \ Preferences.getDebugger("PythonInterpreter") elif idString == "Python3": # Python 3 interpreter = \ Preferences.getDebugger("Python3Interpreter") if interpreter: ok = self.__startCodeAssistClient(interpreter, idString, clientEnv) else: ok = False return ok def __interpreterForProject(self): """ Private method to determine the interpreter for the current project and the environment to run it. @return tuple containing the interpreter of the current project and the environment variables @rtype tuple of (str, dict) """ projectLanguage = self.__e5project.getProjectLanguage() interpreter = "" clientEnv = os.environ.copy() if "PATH" in clientEnv: try: clientEnv["PATH"] = self.__ui.getOriginalPathString() except AttributeError: # ignore for eric6 < 18.12 pass if projectLanguage.startswith("Python"): try: # new code using virtual environments venvManager = e5App().getObject("VirtualEnvManager") # get virtual environment from project first venvName = self.__e5project.getDebugProperty("VIRTUALENV") if not venvName: # get it from debugger settings next if projectLanguage == "Python2": # Python 2 venvName = Preferences.getDebugger("Python2VirtualEnv") if not venvName and sys.version_info[0] == 2: try: venvName, _ = \ venvManager.getDefaultEnvironment() except AttributeError: # ignore for eric6 < 18.10 pass elif projectLanguage == "Python3": # Python 3 venvName = Preferences.getDebugger("Python3VirtualEnv") if not venvName and sys.version_info[0] == 3: try: venvName, _ = \ venvManager.getDefaultEnvironment() except AttributeError: # ignore for eric6 < 18.10 pass else: venvName = "" if venvName: interpreter = \ venvManager.getVirtualenvInterpreter(venvName) try: execPath = venvManager.getVirtualenvExecPath(venvName) except AttributeError: # eric6 < 18.12 execPath = "" # build a suitable environment if execPath: if "PATH" in clientEnv: clientEnv["PATH"] = os.pathsep.join( [execPath, clientEnv["PATH"]]) else: clientEnv["PATH"] = execPath except KeyError: # backward compatibility (eric < 18.07) # get interpreter from project first interpreter = self.__e5project.getDebugProperty("INTERPRETER") if not interpreter or not Utilities.isinpath(interpreter): # get it from debugger settings second if projectLanguage == "Python2": interpreter = Preferences.getDebugger( "PythonInterpreter") elif projectLanguage == "Python3": interpreter = Preferences.getDebugger( "Python3Interpreter") return interpreter, clientEnv @pyqtSlot() def handleNewConnection(self): """ Public slot for new incoming connections from a client. """ super(CodeAssistServer, self).handleNewConnection() self.__updateEditorLanguageMapping() self.__getConfigs() def activate(self): """ Public method to activate the code assist server. """ try: self.__documentationViewer = self.__ui.documentationViewer() if self.__documentationViewer is not None: self.__documentationViewer.registerProvider( "rope", self.tr("Rope"), self.requestCodeDocumentation, self.isSupportedLanguage) except AttributeError: # eric6 before 17.11 doesn't have this pass self.__e5project.projectClosed.connect(self.__projectClosed) def deactivate(self): """ Public method to deactivate the code assist server. """ """ Public method to shut down the code assist server. """ if self.__documentationViewer is not None: self.__documentationViewer.unregisterProvider("rope") for idString in self.connectionNames(): self.sendJson("closeProject", {}, flush=True, idString=idString) try: self.__e5project.projectClosed.disconnect(self.__projectClosed) except TypeError: # ignore it, the signal may be disconnected already pass self.stopAllClients() @pyqtSlot() def __projectClosed(self): """ Private slot to handle the projectClosed signal. """ self.stopClient(idString=CodeAssistServer.IdProject)