RefactoringRope/CodeAssistClient.py

Tue, 10 Dec 2024 15:49:01 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Tue, 10 Dec 2024 15:49:01 +0100
branch
eric7
changeset 426
7592a1c052e8
parent 413
a4cba20ad7ab
permissions
-rw-r--r--

Updated copyright for 2025.

# -*- coding: utf-8 -*-

# Copyright (c) 2017 - 2025 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the code assist client interface to rope.
"""

import contextlib
import sys

modulePath = sys.argv[-1]  # it is always the last parameter
sys.path.append(modulePath)

try:
    import rope.base.libutils
    import rope.base.project
    import rope.contrib.codeassist
    import rope.contrib.findit

    from rope.base import pynames, pyobjects, pyobjectsdef
    from rope.base.exceptions import BadIdentifierError, ModuleSyntaxError
    from rope.contrib import fixsyntax
except ImportError:
    sys.exit(42)

from eric7.EricNetwork.EricJsonClient import EricJsonClient


class CodeAssistClient(EricJsonClient):
    """
    Class implementing the code assist client interface to rope.
    """

    IdProject = "Project"

    def __init__(self, host, port, idString, projectPath):
        """
        Constructor

        @param host ip address the background service is listening
        @type str
        @param port port of the background service
        @type int
        @param idString assigned client id to be sent back to the server in
            order to identify the connection
        @type str
        @param projectPath path to the project
        @type str
        """
        from FileSystemCommands import RefactoringClientFileSystemCommands  # noqa: I102

        super().__init__(host, port, idString)

        self.__methodMapping = {
            "getConfig": self.__getConfig,
            "configChanged": self.__configChanged,
            "closeProject": self.__closeProject,
            "getCompletions": self.__getCompletions,
            "getCallTips": self.__getCallTips,
            "getDocumentation": self.__getDocumentation,
            "gotoDefinition": self.__gotoDefinition,
            "gotoReferences": self.__getReferences,
            "reportChanged": self.__reportChanged,
        }

        self.__fsCommands = RefactoringClientFileSystemCommands(self)

        self.__projectpath = projectPath
        self.__project = rope.base.project.Project(
            self.__projectpath, fscommands=self.__fsCommands
        )
        self.__project.validate(self.__project.root)

        self.__id = idString

    def handleCall(self, method, params):
        """
        Public method to handle a method call from the server.

        @param method requested method name
        @type str
        @param params dictionary with method specific parameters
        @type dict
        """
        self.__methodMapping[method](params)

    def __handleRopeError(self, err):
        """
        Private method to process a rope error.

        @param err rope exception object
        @type Exception
        @return dictionary containing the error information
        @rtype dict
        """
        ropeError = str(type(err)).split()[-1]
        ropeError = ropeError[1:-2].split(".")[-1]
        errorDict = {
            "Error": ropeError,
            "ErrorString": str(err),
        }
        if ropeError == "ModuleSyntaxError":
            errorDict["ErrorFile"] = err.filename
            errorDict["ErrorLine"] = err.lineno

        return errorDict

    def __getConfig(self, params):  # noqa: U100
        """
        Private method to send some configuration data to the server.

        @param params dictionary containing the method parameters sent by
            the server
        @type dict
        """
        result = {
            "RopeFolderName": self.__project.ropefolder.real_path,
            "Id": self.__id,
        }

        self.sendJson("Config", result)

    def __configChanged(self, params):  # noqa: U100
        """
        Private method to handle a change of the configuration file.

        @param params dictionary containing the method parameters sent by
            the server
        @type dict
        """
        self.__project.close()
        self.__project = rope.base.project.Project(
            self.__projectpath, fscommands=self.__fsCommands
        )
        self.__project.validate(self.__project.root)

    def __closeProject(self, params):  # noqa: U100
        """
        Private slot to validate the project.

        @param params dictionary containing the method parameters sent by
            the server
        @type dict
        """
        self.__project.close()

    def __getCompletions(self, params):
        """
        Private method to calculate possible completions.

        @param params dictionary containing the method parameters
        @type dict
        """
        filename = params["FileName"]
        source = params["Source"]
        offset = params["Offset"]
        maxfixes = params["MaxFixes"]

        self.__project.prefs.set("python_path", params["SysPath"])
        resource = (
            rope.base.libutils.path_to_resource(self.__project, filename)
            if filename
            else None
        )

        errorDict = {}
        completions = []

        try:
            proposals = rope.contrib.codeassist.code_assist(
                self.__project, source, offset, resource, maxfixes=maxfixes
            )
            for proposal in proposals:
                proposalType = proposal.type
                if proposal.name.startswith("__"):
                    proposalType = "__" + proposalType
                elif proposal.name.startswith("_"):
                    proposalType = "_" + proposalType
                completions.append(
                    {
                        "Name": proposal.name,
                        "CompletionType": proposalType,
                    }
                )
        except Exception as err:
            errorDict = self.__handleRopeError(err)

        result = {
            "Completions": completions,
            "CompletionText": params["CompletionText"],
            "FileName": filename,
        }
        result.update(errorDict)

        self.sendJson("CompletionsResult", result)

    def __getCallTips(self, params):
        """
        Private method to calculate possible calltips.

        @param params dictionary containing the method parameters
        @type dict
        """
        filename = params["FileName"]
        source = params["Source"]
        offset = params["Offset"]
        maxfixes = params["MaxFixes"]

        self.__project.prefs.set("python_path", params["SysPath"])
        resource = (
            rope.base.libutils.path_to_resource(self.__project, filename)
            if filename
            else None
        )

        errorDict = {}
        calltips = []

        try:
            cts = rope.contrib.codeassist.get_calltip(
                self.__project,
                source,
                offset,
                resource,
                maxfixes=maxfixes,
                remove_self=True,
            )
            if cts is not None:
                calltips = [cts]
        except Exception as err:
            errorDict = self.__handleRopeError(err)

        result = {
            "CallTips": calltips,
        }
        result.update(errorDict)

        self.sendJson("CallTipsResult", result)

    def __getDocumentation(self, params):
        """
        Private method to get some source code documentation.

        @param params dictionary containing the method parameters
        @type dict
        """
        filename = params["FileName"]
        source = params["Source"]
        offset = params["Offset"]
        maxfixes = params["MaxFixes"]

        if self.__id != CodeAssistClient.IdProject:
            self.__project.prefs.set("python_path", params["SysPath"])
        resource = (
            rope.base.libutils.path_to_resource(self.__project, filename)
            if filename
            else None
        )

        errorDict = {}
        documentation = ""
        cts = None

        with contextlib.suppress(Exception):
            cts = rope.contrib.codeassist.get_calltip(
                self.__project,
                source,
                offset,
                resource,
                maxfixes=maxfixes,
                remove_self=True,
            )

        if cts is not None:
            while ".." in cts:
                cts = cts.replace("..", ".")
            if "(.)" in cts:
                cts = cts.replace("(.)", "(...)")

        try:
            documentation = rope.contrib.codeassist.get_doc(
                self.__project, source, offset, resource, maxfixes=maxfixes
            )
        except Exception as err:
            errorDict = self.__handleRopeError(err)

        typeName = self.__getObjectTypeAndName(
            self.__project, source, offset, resource, maxfixes=maxfixes
        )

        documentationDict = self.__processDocumentation(cts, documentation, typeName)
        result = {
            "DocumentationDict": documentationDict,
        }
        result.update(errorDict)

        self.sendJson("DocumentationResult", result)

    def __processDocumentation(self, cts, documentation, typeName):
        """
        Private method to process the call-tips and documentation.

        @param cts call-tips
        @type str
        @param documentation extracted source code documentation
        @type str
        @param typeName type and name of the object
        @type tuple of (str, str)
        @return dictionary containing document information
        @rtype dictionary with keys "name", "argspec", "module" and
            "docstring"
        """
        objectFullname = ""
        calltip = ""
        argspec = ""
        module = ""

        if cts:
            cts = cts.replace(".__init__", "")
            parenthesisPos = cts.find("(")
            if parenthesisPos != -1:
                objectFullname = cts[:parenthesisPos]
                objectName = objectFullname.split(".")[-1]
                cts = cts.replace(objectFullname, objectName)
                calltip = cts
            else:
                objectFullname = cts

        if objectFullname and not objectFullname.startswith("self."):
            if calltip:
                argspecStart = calltip.find("(")
                argspec = calltip[argspecStart:]

            moduleEnd = objectFullname.rfind(".")
            module = objectFullname[:moduleEnd]

        if not objectFullname and typeName[1] not in ["", "<unknown>"]:
            objectFullname = typeName[1]

        return {
            "name": objectFullname,
            "argspec": argspec,
            "module": module,
            "docstring": documentation,
            "typ": typeName[0],
        }

    def __getObjectTypeAndName(
        self, project, sourceCode, offset, resource=None, maxfixes=1
    ):
        """
        Private method to determine an object type and name for the given
        location.

        @param project reference to the rope project object
        @type rope.base.project.Project
        @param sourceCode source code
        @type str
        @param offset offset to base the calculation on
        @type int
        @param resource reference to the rope resource object
        @type rope.base.resources.Resource
        @param maxfixes number of fixes to be done
        @type int
        @return tuple containing the object type and name
        @rtype tuple of (str, str)
        """
        try:
            fixer = fixsyntax.FixSyntax(project, sourceCode, resource, maxfixes)
            pyname = fixer.pyname_at(offset)
        except BadIdentifierError:
            pyname = None
        except ModuleSyntaxError:
            pyname = None
        except IndexError:
            pyname = None
        if pyname is None:
            return "<unknown>", "<unknown>"

        pyobject = pyname.get_object()
        if isinstance(pyobject, pyobjectsdef.PyPackage):
            typ = "package"
            if isinstance(pyname, pynames.ImportedModule):
                name = pyname.module_name
            else:
                name = "<unknown>"
        elif isinstance(pyobject, pyobjectsdef.PyModule):
            typ = "module"
            name = pyobject.get_name()
        elif isinstance(pyobject, pyobjectsdef.PyClass):
            typ = "class"
            name = pyobject.get_name()
        elif isinstance(pyobject, pyobjectsdef.PyFunction):
            typ = pyobject.get_kind()
            name = pyobject.get_name()
        elif isinstance(pyobject, pyobjects.PyObject):
            typ = "object"
            name = ""
        else:
            typ = ""
            name = ""

        return typ, name

    def __gotoDefinition(self, params):
        """
        Private method to handle the Goto Definition action.

        @param params dictionary containing the method parameters sent by
            the server
        @type dict
        """
        filename = params["FileName"]
        offset = params["Offset"]
        source = params["Source"]
        uid = params["Uuid"]

        self.__project.prefs.set("python_path", params["SysPath"])
        resource = (
            rope.base.libutils.path_to_resource(self.__project, filename)
            if filename
            else None
        )

        errorDict = {}
        result = {}

        try:
            location = rope.contrib.findit.find_definition(
                self.__project, source, offset, resource
            )
        except Exception as err:
            location = None
            errorDict = self.__handleRopeError(err)

        if location is not None:
            result["Location"] = {
                "ModulePath": location.resource.real_path,
                "Line": location.lineno,
            }
        result["Uuid"] = uid
        result.update(errorDict)

        self.sendJson("GotoDefinitionResult", result)

    def __getReferences(self, params):
        """
        Private method to get the places a parameter is referenced.

        @param params dictionary containing the method parameters sent by
            the server
        @type dict
        """
        filename = params["FileName"]
        offset = params["Offset"]
        line = params["Line"]
        uid = params["Uuid"]

        self.__project.prefs.set("python_path", params["SysPath"])
        resource = (
            rope.base.libutils.path_to_resource(self.__project, filename)
            if filename
            else None
        )

        errorDict = {}
        gotoReferences = []

        try:
            occurrences = rope.contrib.findit.find_occurrences(
                self.__project, resource, offset, in_hierarchy=True
            )
            for occurrence in occurrences:
                if (
                    occurrence.lineno == line
                    and occurrence.resource.real_path == filename
                ):
                    continue
                gotoReferences.append(
                    {
                        "ModulePath": occurrence.resource.real_path,
                        "Line": occurrence.lineno,
                        "Code": occurrence.resource.read().splitlines()[
                            occurrence.lineno - 1
                        ],
                    }
                )
        except Exception as err:
            errorDict = self.__handleRopeError(err)

        result = {
            "GotoReferencesList": gotoReferences,
            "Uuid": uid,
        }
        result.update(errorDict)

        self.sendJson("GotoReferencesResult", result)

    def __reportChanged(self, params):
        """
        Private method to register some changed sources.

        @param params dictionary containing the method parameters sent by
            the server
        @type dict
        """
        filename = params["FileName"]
        oldSource = params["OldSource"]

        with contextlib.suppress(Exception):
            rope.base.libutils.report_change(self.__project, filename, oldSource)


if __name__ == "__main__":
    if len(sys.argv) != 6:
        print(
            "Host, port, id, project path and module path parameters are"
            " missing. Abort."
        )
        sys.exit(1)

    host, port, idString, projectPath = sys.argv[1:-1]

    # Create a Qt6 application object in order to allow the processing of
    # modules containing Qt stuff.
    try:
        from PyQt6.QtCore import QCoreApplication
    except (ImportError, RuntimeError):
        QCoreApplication = None
    if QCoreApplication is not None:
        app = QCoreApplication([])

    client = CodeAssistClient(host, int(port), idString, projectPath)
    # Start the main loop
    client.run()

    sys.exit(0)

#
# eflag: noqa = M801

eric ide

mercurial