AssistantEric/APIsManager.py

Sat, 23 Dec 2023 15:48:44 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sat, 23 Dec 2023 15:48:44 +0100
branch
eric7
changeset 210
6b1440b975df
parent 206
0e83bc0cc7fd
child 212
24d6bae09db6
permissions
-rw-r--r--

Updated copyright for 2024.

# -*- coding: utf-8 -*-

# Copyright (c) 2008 - 2024 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the APIsManager.
"""

import contextlib
import os

from PyQt6.QtCore import QDateTime, QFileInfo, QObject, Qt, QThread, QTimer, pyqtSignal

with contextlib.suppress(ImportError):
    from PyQt6.QtSql import QSqlDatabase, QSqlQuery

from eric7 import Globals, Preferences, Utilities
from eric7.DocumentationTools.APIGenerator import APIGenerator
from eric7.EricWidgets.EricApplication import ericApp
from eric7.QScintilla import Lexers
from eric7.QScintilla.Editor import Editor
from eric7.Utilities import ModuleParser

WorkerStatusStarted = 2001
WorkerStatusFinished = 2002
WorkerStatusAborted = 2003
WorkerStatusFile = 2004

ApisNameProject = "__Project__"


class DbAPIsWorker(QThread):
    """
    Class implementing a worker thread to prepare the API database.

    @signal processing(status, file) emitted to indicate the processing status
            (one of WorkerStatus..., string)
    """

    processing = pyqtSignal(int, str)

    populate_api_stmt = """
        INSERT INTO api (
        acWord, context, fullContext, signature, fileId, pictureId)
        VALUES (
        :acWord, :context, :fullContext, :signature, :fileId, :pictureId)
    """
    populate_del_api_stmt = """
        DELETE FROM api WHERE fileId = :fileId
    """
    populate_bases_stmt = """
        INSERT INTO bases (class, baseClasses, fileId)
        VALUES (:class, :baseClasses, :fileId)
    """
    populate_del_bases_stmt = """
        DELETE FROM bases WHERE fileId = :fileId
    """
    populate_file_stmt = """
        INSERT INTO file (file) VALUES (:file)
    """
    update_file_stmt = """
        UPDATE file SET lastRead = :lastRead WHERE file = :file
    """

    file_loaded_stmt = """
        SELECT lastRead from file WHERE file = :file
    """
    file_id_stmt = """
        SELECT id FROM file WHERE file = :file
    """
    file_delete_id_stmt = """
        DELETE FROM file WHERE id = :id
    """

    api_files_stmt = """
        SELECT file FROM file
    """

    def __init__(
        self,
        proxy,
        language,
        apiFiles,
        dbName,
        projectPath="",
        refresh=False,
        projectType="",
    ):
        """
        Constructor

        @param proxy reference to the object that is proxied
        @type DbAPIs
        @param language language of the APIs object
        @type str
        @param apiFiles list of API files to process
        @type list of str
        @param dbName path name of the database file
        @type str
        @param projectPath path of the project. Only needed, if the APIs
            are extracted out of the sources of a project.
        @type str
        @param refresh flag indicating a refresh of the APIs of one file
        @type bool
        @param projectType type of the project
        @type str
        """
        QThread.__init__(self)

        self.setTerminationEnabled(True)

        # Get the AC word separators for all of the languages that the editor
        # supports. This has to be before we create a new thread, because
        # access to GUI elements is not allowed from non-GUI threads.
        self.__wseps = {}
        for lang in Lexers.getSupportedLanguages():
            lexer = Lexers.getLexer(lang)
            if lexer is not None:
                self.__wseps[lang] = lexer.autoCompletionWordSeparators()

        self.__proxy = proxy
        self.__language = language
        self.__projectType = projectType
        self.__apiFiles = apiFiles[:]
        self.__aborted = False
        self.__projectPath = projectPath
        self.__refresh = refresh

        self.__databaseName = dbName

        if self.__projectType:
            self.__connectionName = "{0}_{1}_{2}".format(
                self.__language, self.__projectType, id(self)
            )
        else:
            self.__connectionName = "{0}_{1}".format(self.__language, id(self))

    def __autoCompletionWordSeparators(self, language):
        """
        Private method to get the word separator characters for a language.

        @param language language of the APIs object
        @type str
        @return word separator characters
        @rtype list of str
        """
        if language:
            return self.__wseps.get(language)
        else:
            return self.__proxy.autoCompletionWordSeparators()

    def abort(self):
        """
        Public method to ask the thread to stop.
        """
        self.__aborted = True

    def __loadApiFileIfNewer(self, apiFile):
        """
        Private method to load an API file, if it is newer than the one read
        into the database.

        @param apiFile filename of the raw API file
        @type str
        """
        db = QSqlDatabase.database(self.__connectionName)
        db.transaction()
        try:
            query = QSqlQuery(db)
            query.prepare(self.file_loaded_stmt)
            query.bindValue(":file", apiFile)
            query.exec()
            loadTime = (
                QDateTime.fromString(query.value(0), Qt.DateFormat.ISODate)
                if query.next() and query.isValid()
                else
                # __IGNORE_WARNING_M513__
                QDateTime(1970, 1, 1, 0, 0)
            )
            query.finish()
            del query
        finally:
            db.commit()
        if self.__projectPath:
            modTime = QFileInfo(
                os.path.join(self.__projectPath, apiFile)
            ).lastModified()
        else:
            modTime = QFileInfo(apiFile).lastModified()
            basesFile = os.path.splitext(apiFile)[0] + ".bas"
            if os.path.exists(basesFile):
                modTimeBases = QFileInfo(basesFile).lastModified()
                if modTimeBases > modTime:
                    modTime = modTimeBases
        if loadTime < modTime:
            self.processing.emit(WorkerStatusFile, apiFile)
            self.__loadApiFile(apiFile)

    def __classesAttributesApi(self, module):
        """
        Private method to generate class api section for class attributes.

        @param module module object to get the info from
        @type Module
        @return API information
        @rtype list of str
        """
        api = []
        modulePath = module.name.split(".")
        moduleName = "{0}.".format(".".join(modulePath))

        for className in sorted(module.classes.keys()):
            _class = module.classes[className]
            classNameStr = "{0}{1}.".format(moduleName, className)
            for variable in sorted(_class.attributes.keys()):
                if not _class.attributes[variable].isPrivate():
                    if _class.attributes[variable].isPublic():
                        iconId = Editor.AttributeID
                    elif _class.attributes[variable].isProtected():
                        iconId = Editor.AttributeProtectedID
                    else:
                        iconId = Editor.AttributePrivateID
                    api.append("{0}{1}?{2:d}".format(classNameStr, variable, iconId))
        return api

    def __loadApiFile(self, apiFile):
        """
        Private method to read a raw API file into the database.

        @param apiFile filename of the raw API file
        @type str
        """
        apis = []
        bases = []

        if self.__language == ApisNameProject:
            with contextlib.suppress(OSError, ImportError):
                module = ModuleParser.readModule(
                    os.path.join(self.__projectPath, apiFile),
                    basename=self.__projectPath + os.sep,
                    caching=False,
                )
                language = module.getType()
                if language:
                    apiGenerator = APIGenerator(module)
                    try:
                        apis = apiGenerator.genAPI("", True)
                    except TypeError:
                        # backward compatibility for eric7 < 23.6
                        apis = apiGenerator.genAPI(True, "", True)
                    if os.path.basename(apiFile).startswith("Ui_"):
                        # it is a forms source file, extract public attributes
                        # as well
                        apis.extend(self.__classesAttributesApi(module))

                    basesDict = apiGenerator.genBases(True)
                    for baseEntry in basesDict:
                        if basesDict[baseEntry]:
                            bases.append(
                                "{0} {1}\n".format(
                                    baseEntry, " ".join(sorted(basesDict[baseEntry]))
                                )
                            )
        else:
            with contextlib.suppress(OSError, UnicodeError):
                apis = Utilities.readEncodedFile(apiFile)[0].splitlines(True)
            with contextlib.suppress(OSError, UnicodeError):
                basesFile = os.path.splitext(apiFile)[0] + ".bas"
                if os.path.exists(basesFile):
                    bases = Utilities.readEncodedFile(basesFile)[0].splitlines(True)
            language = None

        if len(apis) > 0:
            self.__storeApis(apis, bases, apiFile, language)
        else:
            # just store file info to avoid rereading it every time
            self.__storeFileInfoOnly(apiFile)

    def __storeFileInfoOnly(self, apiFile):
        """
        Private method to store file info only.

        Doing this avoids rereading the file whenever the API is initialized
        in case the given file doesn't contain API data.

        @param apiFile file name of the API file
        @type str
        """
        db = QSqlDatabase.database(self.__connectionName)
        db.transaction()
        try:
            query = QSqlQuery(db)
            # step 1: create entry in file table
            query.prepare(self.populate_file_stmt)
            query.bindValue(":file", apiFile)
            query.exec()

            # step 2: update the file entry
            query.prepare(self.update_file_stmt)
            query.bindValue(":lastRead", QDateTime.currentDateTime())
            query.bindValue(":file", apiFile)
            query.exec()
        finally:
            query.finish()
            del query
            if self.__aborted:
                db.rollback()
            else:
                db.commit()

    def __storeApis(self, apis, bases, apiFile, language):
        """
        Private method to put the API entries into the database.

        @param apis list of api entries
        @type list of str
        @param bases list of base class entries
        @type list of str
        @param apiFile filename of the file read to get the APIs
        @type str
        @param language programming language of the file of the APIs
        @type str
        """
        wseps = self.__autoCompletionWordSeparators(language)
        if wseps is None:
            return

        db = QSqlDatabase.database(self.__connectionName)
        db.transaction()
        try:
            query = QSqlQuery(db)
            # step 1: create entry in file table and get the ID
            query.prepare(self.populate_file_stmt)
            query.bindValue(":file", apiFile)
            if not query.exec():
                return
            query.prepare(self.file_id_stmt)
            query.bindValue(":file", apiFile)
            if not query.exec():
                return
            if not query.next():  # __IGNORE_WARNING_M513__
                return
            fileId = int(query.value(0))

            # step 2: delete all entries belonging to this file
            query.prepare(self.populate_del_api_stmt)
            query.bindValue(":fileId", fileId)
            query.exec()

            query.prepare(self.populate_del_bases_stmt)
            query.bindValue(":fileId", fileId)
            query.exec()

            # step 3: load the given API info
            query.prepare(self.populate_api_stmt)
            for api in apis:
                if self.__aborted:
                    break

                api = api.strip()
                if len(api) == 0:
                    continue

                b = api.find("(")
                if b == -1:
                    path = api
                    sig = ""
                else:
                    path = api[:b]
                    sig = api[b:]

                while len(path) > 0:
                    acWord = ""
                    context = ""
                    fullContext = ""
                    pictureId = ""

                    # search for word separators
                    index = len(path)
                    while index > 0:
                        index -= 1
                        found = False
                        for wsep in wseps:
                            if path[:index].endswith(wsep):
                                found = True
                                break
                        if found:
                            if acWord == "":
                                # completion found
                                acWord = path[index:]
                                path = path[: (index - len(wsep))]
                                index = len(path)
                                fullContext = path
                                context = path
                                with contextlib.suppress(ValueError):
                                    acWord, pictureId = acWord.split("?", 1)
                            else:
                                context = path[index:]
                                break
                    # none found?
                    if acWord == "":
                        acWord = path
                        path = ""

                    query.bindValue(":acWord", acWord)
                    query.bindValue(":context", context)
                    query.bindValue(":fullContext", fullContext)
                    query.bindValue(":signature", sig)
                    query.bindValue(":fileId", fileId)
                    query.bindValue(":pictureId", pictureId)
                    query.exec()

                    sig = ""

            # step 4: load the given base classes info
            query.prepare(self.populate_bases_stmt)
            for base in bases:
                if self.__aborted:
                    break

                base = base.strip()
                if len(base) == 0:
                    continue

                class_, baseClasses = base.split(" ", 1)
                query.bindValue(":class", class_)
                query.bindValue(":baseClasses", baseClasses)
                query.bindValue(":fileId", fileId)
                query.exec()

            if not self.__aborted:
                # step 5: update the file entry
                query.prepare(self.update_file_stmt)
                query.bindValue(":lastRead", QDateTime.currentDateTime())
                query.bindValue(":file", apiFile)
                query.exec()
        finally:
            query.finish()
            del query
            if self.__aborted:
                db.rollback()
            else:
                db.commit()

    def __deleteApiFile(self, apiFile):
        """
        Private method to delete all references to an api file.

        @param apiFile filename of the raw API file
        @type str
        """
        db = QSqlDatabase.database(self.__connectionName)
        db.transaction()
        try:
            query = QSqlQuery(db)

            # step 1: get the ID belonging to the api file
            query.prepare(self.file_id_stmt)
            query.bindValue(":file", apiFile)
            query.exec()
            query.next()  # __IGNORE_WARNING_M513__
            fileId = int(query.value(0))

            # step 2: delete all API entries belonging to this file
            query.prepare(self.populate_del_api_stmt)
            query.bindValue(":fileId", fileId)
            query.exec()

            # step 3: delete all base classes entries belonging to this file
            query.prepare(self.populate_del_bases_stmt)
            query.bindValue(":fileId", fileId)
            query.exec()

            # step 4: delete the file entry
            query.prepare(self.file_delete_id_stmt)
            query.bindValue(":id", fileId)
            query.exec()
        finally:
            query.finish()
            del query
            db.commit()

    def __getApiFiles(self):
        """
        Private method to get a list of API files loaded into the database.

        @return list of API filenames
        @rtype list of str
        """
        apiFiles = []

        db = QSqlDatabase.database(self.__connectionName)
        db.transaction()
        try:
            query = QSqlQuery(db)
            query.exec(self.api_files_stmt)
            while query.next():  # __IGNORE_WARNING_M513__
                apiFiles.append(query.value(0))
        finally:
            query.finish()
            del query
            db.commit()

        return apiFiles

    def run(self):
        """
        Public method to perform the threads work.
        """
        self.processing.emit(WorkerStatusStarted, "")

        if QSqlDatabase.contains(self.__connectionName):
            QSqlDatabase.removeDatabase(self.__connectionName)

        db = QSqlDatabase.addDatabase("QSQLITE", self.__connectionName)
        db.setDatabaseName(self.__databaseName)
        db.open()

        if db.isValid() and db.isOpen():
            # step 1: remove API files not wanted any longer
            if not self.__refresh:
                loadedApiFiles = self.__getApiFiles()
                for apiFile in loadedApiFiles:
                    if not self.__aborted and apiFile not in self.__apiFiles:
                        self.__deleteApiFile(apiFile)

            # step 2: (re-)load api files
            for apiFile in self.__apiFiles:
                if not self.__aborted:
                    self.__loadApiFileIfNewer(apiFile)

        db.close()

        if self.__aborted:
            self.processing.emit(WorkerStatusAborted, "")
        else:
            self.processing.emit(WorkerStatusFinished, "")


class DbAPIs(QObject):
    """
    Class implementing an API storage entity.

    @signal apiPreparationStatus(language, status, file) emitted to indicate
        the API preparation status for a language
    """

    apiPreparationStatus = pyqtSignal(str, int, str)

    DB_VERSION = 4

    create_mgmt_stmt = """
        CREATE TABLE mgmt
        (format INTEGER)
    """
    drop_mgmt_stmt = """DROP TABLE IF EXISTS mgmt"""

    create_api_stmt = """
        CREATE TABLE api
        (acWord TEXT,
         context TEXT,
         fullContext TEXT,
         signature TEXT,
         fileId INTEGER,
         pictureId INTEGER,
         UNIQUE(acWord, fullContext, signature) ON CONFLICT IGNORE
        )
    """
    drop_api_stmt = """DROP TABLE IF EXISTS api"""

    create_bases_stmt = """
        CREATE TABLE bases
        (class TEXT UNIQUE ON CONFLICT IGNORE,
         baseClasses TEXT,
         fileId INTEGER
        )
    """
    drop_bases_stmt = """DROP TABLE IF EXISTS bases"""

    create_file_stmt = """
        CREATE TABLE file
        (id INTEGER PRIMARY KEY AUTOINCREMENT,
         file TEXT UNIQUE ON CONFLICT IGNORE,
         lastRead TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """
    drop_file_stmt = """DROP TABLE IF EXISTS file"""

    create_acWord_idx = """CREATE INDEX acWord_idx on api (acWord)"""
    drop_acWord_idx = """DROP INDEX IF EXISTS acWord_idx"""

    create_context_idx = """CREATE INDEX context_idx on api (context)"""
    drop_context_idx = """DROP INDEX IF EXISTS context_idx"""

    create_fullContext_idx = """CREATE INDEX fullContext_idx on api (fullContext)"""
    drop_fullContext_idx = """DROP INDEX IF EXISTS fullContext_idx"""

    create_bases_idx = """CREATE INDEX base_idx on bases (class)"""
    drop_bases_idx = """DROP INDEX IF EXISTS base_idx"""

    create_file_idx = """CREATE INDEX file_idx on file (file)"""
    drop_file_idx = """DROP INDEX IF EXISTS file_idx"""

    api_files_stmt = """
        SELECT file FROM file
    """

    ac_stmt = """
        SELECT DISTINCT acWord, fullContext, pictureId FROM api
        WHERE acWord GLOB :acWord
        ORDER BY acWord
    """
    ac_context_stmt = """
        SELECT DISTINCT acWord, fullContext, pictureId FROM api
        WHERE context = :context
        ORDER BY acWord
    """
    ac_context_word_stmt = """
        SELECT DISTINCT acWord, fullContext, pictureId FROM api
        WHERE acWord GLOB :acWord AND context = :context
        ORDER BY acWord
    """
    bases_stmt = """
        SELECT baseClasses from bases
        WHERE class = :class
    """
    ct_stmt = """
        SELECT DISTINCT acWord, signature, fullContext FROM api
        WHERE acWord = :acWord
    """
    ct_context_stmt = """
        SELECT DISTINCT acWord, signature, fullContext FROM api
        WHERE acWord = :acWord
        AND context = :context
    """
    ct_fullContext_stmt = """
        SELECT DISTINCT acWord, signature, fullContext FROM api
        WHERE acWord = :acWord
        AND fullContext = :fullContext
    """
    format_select_stmt = """
        SELECT format FROM mgmt
    """
    mgmt_insert_stmt = """
        INSERT INTO mgmt (format) VALUES ({0:d})
        """.format(
        DB_VERSION
    )

    def __init__(self, language, projectType="", parent=None):
        """
        Constructor

        @param language language of the APIs object
        @type str
        @param projectType type of the project
        @type str
        @param parent reference to the parent object
        @type QObject
        """
        QObject.__init__(self, parent)
        if projectType:
            self.setObjectName("DbAPIs_{0}_{1}".format(language, projectType))
        else:
            self.setObjectName("DbAPIs_{0}".format(language))

        self.__inPreparation = False
        self.__worker = None
        self.__workerQueue = []
        self.__opened = False

        self.__projectType = projectType
        self.__language = language

        if self.__projectType:
            self.__connectionName = "{0}_{1}".format(
                self.__language, self.__projectType
            )
        else:
            self.__connectionName = self.__language

        if self.__language == ApisNameProject:
            self.__initAsProject()
        else:
            self.__initAsLanguage()

    def __initAsProject(self):
        """
        Private method to initialize as a project API object.
        """
        self.__lexer = None

        self.__project = ericApp().getObject("Project")
        self.__project.projectOpened.connect(self.__projectOpened)
        self.__project.newProject.connect(self.__projectOpened)
        self.__project.projectClosed.connect(self.__projectClosed)
        self.__project.projectChanged.connect(self.__projectChanged)
        try:
            self.__project.projectFileCompiled.connect(self.__projectFileCompiled)
        except AttributeError:
            # backward compatibility for eric < 22.12
            self.__project.projectFormCompiled.connect(self.__projectFileCompiled)

        if self.__project.isOpen():
            self.__projectOpened()

    def __initAsLanguage(self):
        """
        Private method to initialize as a language API object.
        """
        if self.__language == "Python3":
            self.__discardFirst = ["self", "cls"]
        else:
            self.__discardFirst = []
        self.__lexer = Lexers.getLexer(self.__language)
        try:
            self.__apifiles = Preferences.getEditorAPI(
                self.__language, projectType=self.__projectType
            )
        except TypeError:
            # older interface
            self.__apifiles = Preferences.getEditorAPI(self.__language)
        self.__apifiles.sort()
        if self.__lexer is not None:
            self.__openAPIs()

    def _apiDbName(self):
        """
        Protected method to determine the name of the database file.

        @return name of the database file
        @rtype str
        """
        if self.__language == ApisNameProject:
            return os.path.join(
                self.__project.getProjectManagementDir(), "project-apis.db"
            )
        else:
            apisDir = os.path.join(Globals.getConfigDir(), "APIs")
            if not os.path.exists(apisDir):
                os.makedirs(apisDir)
            if self.__projectType:
                filename = "{0}_{1}-api.db".format(self.__language, self.__projectType)
            else:
                filename = "{0}-api.db".format(self.__language)
            return os.path.join(apisDir, filename)

    def close(self):
        """
        Public method to close the database.
        """
        self.__workerQueue = []
        if self.__worker is not None:
            self.__worker.abort()
            if self.__worker is not None:
                self.__worker.wait(5000)
                if self.__worker is not None and not self.__worker.isFinished():
                    self.__worker.terminate()
                    if self.__worker is not None:
                        self.__worker.wait(5000)

        if QSqlDatabase and QSqlDatabase.database(self.__connectionName).isOpen():
            QSqlDatabase.database(self.__connectionName).close()
            QSqlDatabase.removeDatabase(self.__connectionName)

        self.__opened = False

    def __openApiDb(self):
        """
        Private method to open the API database.

        @return flag indicating the database status
        @rtype bool
        """
        db = QSqlDatabase.database(self.__connectionName, False)
        if not db.isValid():
            # the database connection is a new one
            db = QSqlDatabase.addDatabase("QSQLITE", self.__connectionName)
            dbName = self._apiDbName()
            if self.__language == ApisNameProject and not os.path.exists(
                self.__project.getProjectManagementDir()
            ):
                opened = False
            else:
                db.setDatabaseName(dbName)
                opened = db.open()
            if not opened:
                QSqlDatabase.removeDatabase(self.__connectionName)
        else:
            opened = True
        return opened

    def __createApiDB(self):
        """
        Private method to create an API database.
        """
        db = QSqlDatabase.database(self.__connectionName)
        db.transaction()
        try:
            query = QSqlQuery(db)
            # step 1: drop old tables
            query.exec(self.drop_mgmt_stmt)
            query.exec(self.drop_api_stmt)
            query.exec(self.drop_bases_stmt)
            query.exec(self.drop_file_stmt)
            # step 2: drop old indices
            query.exec(self.drop_acWord_idx)
            query.exec(self.drop_context_idx)
            query.exec(self.drop_fullContext_idx)
            query.exec(self.drop_bases_idx)
            query.exec(self.drop_file_idx)
            # step 3: create tables
            query.exec(self.create_api_stmt)
            query.exec(self.create_bases_stmt)
            query.exec(self.create_file_stmt)
            query.exec(self.create_mgmt_stmt)
            query.exec(self.mgmt_insert_stmt)
            # step 4: create indices
            query.exec(self.create_acWord_idx)
            query.exec(self.create_context_idx)
            query.exec(self.create_fullContext_idx)
            query.exec(self.create_bases_idx)
            query.exec(self.create_file_idx)
        finally:
            query.finish()
            del query
            db.commit()

    def getApiFiles(self):
        """
        Public method to get a list of API files loaded into the database.

        @return list of API filenames
        @rtype list of str
        """
        apiFiles = []

        db = QSqlDatabase.database(self.__connectionName)
        db.transaction()
        try:
            query = QSqlQuery(db)
            query.exec(self.api_files_stmt)
            while query.next():  # __IGNORE_WARNING_M513__
                apiFiles.append(query.value(0))
        finally:
            query.finish()
            del query
            db.commit()

        return apiFiles

    def __isPrepared(self):
        """
        Private method to check, if the database has been prepared.

        @return flag indicating the prepared status
        @rtype bool
        """
        db = QSqlDatabase.database(self.__connectionName)
        prepared = len(db.tables()) > 0
        if prepared:
            db.transaction()
            prepared = False
            try:
                query = QSqlQuery(db)
                ok = query.exec(self.format_select_stmt)
                if ok:
                    query.next()  # __IGNORE_WARNING_M513__
                    formatVersion = int(query.value(0))
                    if formatVersion >= self.DB_VERSION:
                        prepared = True
            finally:
                query.finish()
                del query
                db.commit()
        return prepared

    def getCompletions(self, start=None, context=None, followHierarchy=False):
        """
        Public method to determine the possible completions.

        @param start string giving the start of the word to be
            completed
        @type str
        @param context string giving the context (e.g. classname)
            to be completed
        @type str
        @param followHierarchy flag indicating to follow the hierarchy of
            base classes
        @type bool
        @return list of dictionaries with possible completions
            (key 'completion' contains the completion,
            key 'context' contains the context and
            key 'pictureId' contains the ID of the icon to be shown)
        @rtype list of dict
        """
        completions = []

        db = QSqlDatabase.database(self.__connectionName)
        if db.isOpen() and not self.__inPreparation:
            db.transaction()
            try:
                query = None

                if start is not None and context is not None:
                    query = QSqlQuery(db)
                    query.prepare(self.ac_context_word_stmt)
                    query.bindValue(":acWord", start + "*")
                    query.bindValue(":context", context)
                elif start is not None:
                    query = QSqlQuery(db)
                    query.prepare(self.ac_stmt)
                    query.bindValue(":acWord", start + "*")
                elif context is not None:
                    query = QSqlQuery(db)
                    query.prepare(self.ac_context_stmt)
                    query.bindValue(":context", context)

                if query is not None:
                    query.exec()
                    while query.next():  # __IGNORE_WARNING_M513__
                        completions.append(
                            {
                                "completion": query.value(0),
                                "context": query.value(1),
                                "pictureId": query.value(2),
                            }
                        )
                    query.finish()
                    del query
            finally:
                db.commit()

            if followHierarchy:
                query = QSqlQuery(db)
                query.prepare(self.bases_stmt)
                query.bindValue(":class", context)
                query.exec()
                if query.next():  # __IGNORE_WARNING_M513__
                    bases = query.value(0).split()
                else:
                    bases = []
                for base in bases:
                    completions.extend(
                        self.getCompletions(start, base, followHierarchy=True)
                    )
                query.finish()
                del query

        return completions

    def getCalltips(
        self,
        acWord,
        commas,
        context=None,
        fullContext=None,
        showContext=True,
        followHierarchy=False,
    ):
        """
        Public method to determine the calltips.

        @param acWord function to get calltips for
        @type str
        @param commas minimum number of commas contained in the calltip
        @type int
        @param context string giving the context (e.g. classname)
        @type str
        @param fullContext string giving the full context
        @type str
        @param showContext flag indicating to show the calltip context
        @type bool
        @param followHierarchy flag indicating to follow the hierarchy of
            base classes
        @type bool
        @return list of calltips
        @rtype list of str
        """
        calltips = []

        db = QSqlDatabase.database(self.__connectionName)
        if db.isOpen() and not self.__inPreparation:
            if self.autoCompletionWordSeparators():
                contextSeparator = self.autoCompletionWordSeparators()[0]
            else:
                contextSeparator = " "
            db.transaction()
            try:
                query = QSqlQuery(db)
                if fullContext:
                    query.prepare(self.ct_fullContext_stmt)
                    query.bindValue(":fullContext", fullContext)
                elif context:
                    query.prepare(self.ct_context_stmt)
                    query.bindValue(":context", context)
                else:
                    query.prepare(self.ct_stmt)
                query.bindValue(":acWord", acWord)
                query.exec()
                while query.next():  # __IGNORE_WARNING_M513__
                    word = query.value(0)
                    sig = query.value(1)
                    fullCtx = query.value(2)
                    if sig:
                        if self.__discardFirst:
                            sig = "({0}".format(sig[1:])
                            for discard in self.__discardFirst:
                                sig = sig.replace(discard, "", 1)
                            sig = sig.strip(", \t\r\n")
                        if self.__enoughCommas(sig, commas):
                            if showContext:
                                calltips.append(
                                    "{0}{1}{2}{3}".format(
                                        fullCtx,
                                        contextSeparator if fullCtx else "",
                                        word,
                                        sig,
                                    )
                                )
                            else:
                                calltips.append("{0}{1}".format(word, sig))
                query.finish()
                del query
            finally:
                db.commit()

            if followHierarchy:
                query = QSqlQuery(db)
                query.prepare(self.bases_stmt)
                query.bindValue(":class", context)
                query.exec()
                if query.next():  # __IGNORE_WARNING_M513__
                    bases = query.value(0).split()
                else:
                    bases = []
                for base in bases:
                    calltips.extend(
                        self.getCalltips(
                            acWord,
                            commas,
                            context=base,
                            showContext=showContext,
                            followHierarchy=True,
                        )
                    )
                query.finish()
                del query

            if context and len(calltips) == 0 and not followHierarchy:
                # nothing found, try without a context
                calltips = self.getCalltips(acWord, commas, showContext=showContext)

        return calltips

    def __enoughCommas(self, s, commas):
        """
        Private method to determine, if the given string contains enough
        commas.

        @param s string to check
        @type str
        @param commas number of commas to check for
        @type int
        @return flag indicating, that there are enough commas
        @rtype bool
        """
        end = s.find(")")

        if end < 0:
            return False

        w = s[:end]
        return w.count(",") >= commas

    def __openAPIs(self):
        """
        Private method to open the API database.
        """
        self.__opened = self.__openApiDb()
        if self.__opened:
            if not self.__isPrepared():
                self.__createApiDB()

            # prepare the database if neccessary
            self.prepareAPIs()

    def __getProjectFormSources(self, normalized=False):
        """
        Private method to get the source files for the project forms.

        @param normalized flag indicating a normalized filename is wanted
        @type bool
        @return list of project form sources
        @rtype list of str
        """
        if self.__project.getProjectLanguage() in ("Python3", "MicroPython", "Cython"):
            sourceExt = ".py"
        elif self.__project.getProjectLanguage() == "Ruby":
            sourceExt = ".rb"
        else:
            return []

        formsSources = []
        forms = self.__project.getProjectFiles("FORMS")
        for fn in forms:
            ofn = os.path.splitext(fn)[0]
            dirname, filename = os.path.split(ofn)
            formSource = os.path.join(dirname, "Ui_" + filename + sourceExt)
            if normalized:
                formSource = os.path.join(self.__project.getProjectPath(), formSource)
            formsSources.append(formSource)
        return formsSources

    def prepareAPIs(self, rawList=None):
        """
        Public method to prepare the APIs if neccessary.

        @param rawList list of raw API files
        @type list of str
        """
        if self.__inPreparation:
            return

        projectPath = ""
        if rawList:
            apiFiles = rawList[:]
        elif self.__language == ApisNameProject:
            apiFiles = self.__project.getSources()[:]
            apiFiles.extend(
                [f for f in self.__getProjectFormSources() if f not in apiFiles]
            )
            projectPath = self.__project.getProjectPath()
            projectType = ""
        else:
            apiFiles = Preferences.getEditorAPI(
                self.__language, projectType=self.__projectType
            )
            projectType = self.__projectType
        self.__worker = DbAPIsWorker(
            self,
            self.__language,
            apiFiles,
            self._apiDbName(),
            projectPath,
            projectType=projectType,
        )
        self.__worker.processing.connect(
            self.__processingStatus, Qt.ConnectionType.QueuedConnection
        )
        self.__worker.start()

    def __processQueue(self):
        """
        Private slot to process the queue of files to load.
        """
        if self.__worker is not None and self.__worker.isFinished():
            self.__worker.deleteLater()
            self.__worker = None

        if self.__worker is None and len(self.__workerQueue) > 0:
            apiFiles = [self.__workerQueue.pop(0)]
            if self.__language == ApisNameProject:
                projectPath = self.__project.getProjectPath()
                apiFiles = [apiFiles[0].replace(projectPath + os.sep, "")]
                projectType = ""
            else:
                projectPath = ""
                projectType = self.__projectType
            self.__worker = DbAPIsWorker(
                self,
                self.__language,
                apiFiles,
                self._apiDbName(),
                projectPath,
                projectType=projectType,
                refresh=True,
            )
            self.__worker.processing.connect(
                self.__processingStatus, Qt.ConnectionType.QueuedConnection
            )
            self.__worker.start()

    def getLexer(self):
        """
        Public method to return a reference to our lexer object.

        @return reference to the lexer object
        @rtype Lexer
        """
        return self.__lexer

    def autoCompletionWordSeparators(self):
        """
        Public method to get the word separator characters.

        @return word separator characters
        @rtype list of str
        """
        if self.__lexer:
            return self.__lexer.autoCompletionWordSeparators()
        return None

    def __processingStatus(self, status, filename):
        """
        Private slot handling the processing signal of the API preparation
        thread.

        @param status preparation status (one of WorkerStatus...)
        @type int
        @param filename name of the file being processed
        @type str
        """
        if status == WorkerStatusStarted:
            self.__inPreparation = True
            self.apiPreparationStatus.emit(self.__language, WorkerStatusStarted, "")
        elif status == WorkerStatusFinished:
            self.__inPreparation = False
            self.apiPreparationStatus.emit(self.__language, WorkerStatusFinished, "")
            QTimer.singleShot(0, self.__processQueue)
        elif status == WorkerStatusAborted:
            self.__inPreparation = False
            self.apiPreparationStatus.emit(self.__language, WorkerStatusAborted, "")
            QTimer.singleShot(0, self.__processQueue)
        elif status == WorkerStatusFile:
            self.apiPreparationStatus.emit(self.__language, WorkerStatusFile, filename)

    ########################################################
    ## project related stuff below
    ########################################################

    def __projectOpened(self):
        """
        Private slot to perform actions after a project has been opened.
        """
        if self.__project.getProjectLanguage() in ("Python3", "MicroPython", "Cython"):
            self.__discardFirst = ["self", "cls"]
        else:
            self.__discardFirst = []
        self.__lexer = Lexers.getLexer(self.__project.getProjectLanguage())
        self.__openAPIs()

    def __projectClosed(self):
        """
        Private slot to perform actions after a project has been closed.
        """
        self.close()

    def __projectFileCompiled(self, filename):
        """
        Private slot to handle the projectFormCompiled signal.

        @param filename name of the source file that was generated
        @type str
        """
        self.__workerQueue.append(filename)
        self.__processQueue()

    def __projectChanged(self):
        """
        Private slot to handle the projectChanged signal.
        """
        if self.__opened:
            self.__projectClosed()
            self.__projectOpened()

    def editorSaved(self, filename):
        """
        Public slot to handle the editorSaved signal.

        @param filename name of the file that was saved
        @type str
        """
        try:
            isProjectSource = self.__project.isProjectCategory(filename, "SOURCES")
        except AttributeError:
            # backward compatibility for eric < 22.12
            isProjectSource = self.__project.isProjectSource(filename)
        if isProjectSource:
            self.__workerQueue.append(filename)
            self.__processQueue()


class APIsManager(QObject):
    """
    Class implementing the APIsManager class, which is the central store for
    API information used by autocompletion and calltips.
    """

    def __init__(self, mainWindow, parent=None):
        """
        Constructor

        @param mainWindow reference to the main eric7 window
        @type QMainWindow
        @param parent reference to the parent object
        @type QObject
        """
        QObject.__init__(self, parent)
        self.setObjectName("Assistant_APIsManager")

        self.__mw = mainWindow

        # initialize the apis dictionary
        self.__apis = {}

    def reloadAPIs(self):
        """
        Public slot to reload the api information.
        """
        for api in list(self.__apis.values()):
            api and api.prepareAPIs()

    def getAPIs(self, language, projectType=""):
        """
        Public method to get an apis object for autocompletion/calltips.

        This method creates and loads an APIs object dynamically upon request.
        This saves memory for languages, that might not be needed at the
        moment.

        @param language language of the requested APIs object
        @type str
        @param projectType type of the project
        @type str
        @return reference to the APIs object
        @rtype APIs
        """
        try:
            return self.__apis[(language, projectType)]
        except KeyError:
            if (
                language in Lexers.getSupportedApiLanguages()
                or language == ApisNameProject
            ):
                # create the api object
                api = DbAPIs(language, projectType=projectType)
                api.apiPreparationStatus.connect(self.__apiPreparationStatus)
                self.__apis[(language, projectType)] = api
                return self.__apis[(language, projectType)]
            else:
                return None

    def deactivate(self):
        """
        Public method to perform actions upon deactivation.
        """
        for apiLang in self.__apis:
            self.__apis[apiLang].close()
            self.__apis[apiLang].deleteLater()
            self.__apis[apiLang] = None

    def __showMessage(self, msg):
        """
        Private message to show a message in the main windows status bar.

        @param msg message to be shown
        @type str
        """
        if msg:
            self.__mw.statusBar().showMessage(msg, 2000)

    def __apiPreparationStatus(self, language, status, filename):
        """
        Private slot handling the preparation status signal of an API object.

        @param language language of the API
        @type str
        @param status preparation status (one of WorkerStatus...)
        @type int
        @param filename name of the file being processed
        @type str
        """
        if language == ApisNameProject:
            language = self.tr("Project")

        if status == WorkerStatusStarted:
            self.__showMessage(
                self.tr("Preparation of '{0}' APIs started.").format(language)
            )
        elif status == WorkerStatusFile:
            self.__showMessage(
                self.tr("'{0}' APIs: Processing '{1}'").format(
                    language, os.path.basename(filename)
                )
            )
        elif status == WorkerStatusFinished:
            self.__showMessage(
                self.tr("Preparation of '{0}' APIs finished.").format(language)
            )
        elif status == WorkerStatusAborted:
            self.__showMessage(
                self.tr("Preparation of '{0}' APIs cancelled.").format(language)
            )


#
# eflag: noqa = M523, M834, S608

eric ide

mercurial