Mon, 24 Oct 2022 16:06:28 +0200
Adapted the import statements to the new structure.
# -*- coding: utf-8 -*- # Copyright (c) 2008 - 2022 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the APIsManager. """ import contextlib import os from PyQt6.QtCore import QTimer, QThread, QFileInfo, pyqtSignal, QDateTime, QObject, Qt with contextlib.suppress(ImportError): from PyQt6.QtSql import QSqlDatabase, QSqlQuery from eric7 import Globals, Preferences, Utilities from eric7.EricWidgets.EricApplication import ericApp from eric7.QScintilla import Lexers from eric7.Utilities import ModuleParser WorkerStatusStarted = 2001 WorkerStatusFinished = 2002 WorkerStatusAborted = 2003 WorkerStatusFile = 2004 ApisNameProject = "__Project__" class DbAPIsWorker(QThread): """ Class implementing a worker thread to prepare the API database. @signal processing(status, file) emitted to indicate the processing status (one of WorkerStatus..., string) """ processing = pyqtSignal(int, str) populate_api_stmt = """ INSERT INTO api ( acWord, context, fullContext, signature, fileId, pictureId) VALUES ( :acWord, :context, :fullContext, :signature, :fileId, :pictureId) """ populate_del_api_stmt = """ DELETE FROM api WHERE fileId = :fileId """ populate_bases_stmt = """ INSERT INTO bases (class, baseClasses, fileId) VALUES (:class, :baseClasses, :fileId) """ populate_del_bases_stmt = """ DELETE FROM bases WHERE fileId = :fileId """ populate_file_stmt = """ INSERT INTO file (file) VALUES (:file) """ update_file_stmt = """ UPDATE file SET lastRead = :lastRead WHERE file = :file """ file_loaded_stmt = """ SELECT lastRead from file WHERE file = :file """ file_id_stmt = """ SELECT id FROM file WHERE file = :file """ file_delete_id_stmt = """ DELETE FROM file WHERE id = :id """ api_files_stmt = """ SELECT file FROM file """ def __init__( self, proxy, language, apiFiles, dbName, projectPath="", refresh=False, projectType="", ): """ Constructor @param proxy reference to the object that is proxied @type DbAPIs @param language language of the APIs object @type str @param apiFiles list of API files to process @type list of str @param dbName path name of the database file @type str @param projectPath path of the project. Only needed, if the APIs are extracted out of the sources of a project. @type str @param refresh flag indicating a refresh of the APIs of one file @type bool @param projectType type of the project @type str """ QThread.__init__(self) self.setTerminationEnabled(True) # Get the AC word separators for all of the languages that the editor # supports. This has to be before we create a new thread, because # access to GUI elements is not allowed from non-GUI threads. self.__wseps = {} for lang in Lexers.getSupportedLanguages(): lexer = Lexers.getLexer(lang) if lexer is not None: self.__wseps[lang] = lexer.autoCompletionWordSeparators() self.__proxy = proxy self.__language = language self.__projectType = projectType self.__apiFiles = apiFiles[:] self.__aborted = False self.__projectPath = projectPath self.__refresh = refresh self.__databaseName = dbName if self.__projectType: self.__connectionName = "{0}_{1}_{2}".format( self.__language, self.__projectType, id(self) ) else: self.__connectionName = "{0}_{1}".format(self.__language, id(self)) def __autoCompletionWordSeparators(self, language): """ Private method to get the word separator characters for a language. @param language language of the APIs object @type str @return word separator characters @rtype list of str """ if language: return self.__wseps.get(language, None) else: return self.__proxy.autoCompletionWordSeparators() def abort(self): """ Public method to ask the thread to stop. """ self.__aborted = True def __loadApiFileIfNewer(self, apiFile): """ Private method to load an API file, if it is newer than the one read into the database. @param apiFile filename of the raw API file @type str """ db = QSqlDatabase.database(self.__connectionName) db.transaction() try: query = QSqlQuery(db) query.prepare(self.file_loaded_stmt) query.bindValue(":file", apiFile) query.exec() loadTime = ( QDateTime.fromString(query.value(0), Qt.DateFormat.ISODate) if query.next() and query.isValid() else # __IGNORE_WARNING_M513__ QDateTime(1970, 1, 1, 0, 0) ) query.finish() del query finally: db.commit() if self.__projectPath: modTime = QFileInfo( os.path.join(self.__projectPath, apiFile) ).lastModified() else: modTime = QFileInfo(apiFile).lastModified() basesFile = os.path.splitext(apiFile)[0] + ".bas" if os.path.exists(basesFile): modTimeBases = QFileInfo(basesFile).lastModified() if modTimeBases > modTime: modTime = modTimeBases if loadTime < modTime: self.processing.emit(WorkerStatusFile, apiFile) self.__loadApiFile(apiFile) def __classesAttributesApi(self, module): """ Private method to generate class api section for class attributes. @param module module object to get the info from @type Module @return API information @rtype list of str """ api = [] modulePath = module.name.split(".") moduleName = "{0}.".format(".".join(modulePath)) for className in sorted(module.classes.keys()): _class = module.classes[className] classNameStr = "{0}{1}.".format(moduleName, className) for variable in sorted(_class.attributes.keys()): if not _class.attributes[variable].isPrivate(): from eric7.QScintilla.Editor import Editor if _class.attributes[variable].isPublic(): iconId = Editor.AttributeID elif _class.attributes[variable].isProtected(): iconId = Editor.AttributeProtectedID else: iconId = Editor.AttributePrivateID api.append("{0}{1}?{2:d}".format(classNameStr, variable, iconId)) return api def __loadApiFile(self, apiFile): """ Private method to read a raw API file into the database. @param apiFile filename of the raw API file @type str """ apis = [] bases = [] if self.__language == ApisNameProject: with contextlib.suppress(OSError, ImportError): module = ModuleParser.readModule( os.path.join(self.__projectPath, apiFile), basename=self.__projectPath + os.sep, caching=False, ) language = module.getType() if language: from eric7.DocumentationTools.APIGenerator import APIGenerator apiGenerator = APIGenerator(module) apis = apiGenerator.genAPI(True, "", True) if os.path.basename(apiFile).startswith("Ui_"): # it is a forms source file, extract public attributes # as well apis.extend(self.__classesAttributesApi(module)) basesDict = apiGenerator.genBases(True) for baseEntry in basesDict: if basesDict[baseEntry]: bases.append( "{0} {1}\n".format( baseEntry, " ".join(sorted(basesDict[baseEntry])) ) ) else: with contextlib.suppress(OSError, UnicodeError): apis = Utilities.readEncodedFile(apiFile)[0].splitlines(True) with contextlib.suppress(OSError, UnicodeError): basesFile = os.path.splitext(apiFile)[0] + ".bas" if os.path.exists(basesFile): bases = Utilities.readEncodedFile(basesFile)[0].splitlines(True) language = None if len(apis) > 0: self.__storeApis(apis, bases, apiFile, language) else: # just store file info to avoid rereading it every time self.__storeFileInfoOnly(apiFile) def __storeFileInfoOnly(self, apiFile): """ Private method to store file info only. Doing this avoids rereading the file whenever the API is initialized in case the given file doesn't contain API data. @param apiFile file name of the API file @type str """ db = QSqlDatabase.database(self.__connectionName) db.transaction() try: query = QSqlQuery(db) # step 1: create entry in file table query.prepare(self.populate_file_stmt) query.bindValue(":file", apiFile) query.exec() # step 2: update the file entry query.prepare(self.update_file_stmt) query.bindValue(":lastRead", QDateTime.currentDateTime()) query.bindValue(":file", apiFile) query.exec() finally: query.finish() del query if self.__aborted: db.rollback() else: db.commit() def __storeApis(self, apis, bases, apiFile, language): """ Private method to put the API entries into the database. @param apis list of api entries @type list of str @param bases list of base class entries @type list of str @param apiFile filename of the file read to get the APIs @type str @param language programming language of the file of the APIs @type str """ wseps = self.__autoCompletionWordSeparators(language) if wseps is None: return db = QSqlDatabase.database(self.__connectionName) db.transaction() try: query = QSqlQuery(db) # step 1: create entry in file table and get the ID query.prepare(self.populate_file_stmt) query.bindValue(":file", apiFile) if not query.exec(): return query.prepare(self.file_id_stmt) query.bindValue(":file", apiFile) if not query.exec(): return if not query.next(): # __IGNORE_WARNING_M513__ return fileId = int(query.value(0)) # step 2: delete all entries belonging to this file query.prepare(self.populate_del_api_stmt) query.bindValue(":fileId", fileId) query.exec() query.prepare(self.populate_del_bases_stmt) query.bindValue(":fileId", fileId) query.exec() # step 3: load the given API info query.prepare(self.populate_api_stmt) for api in apis: if self.__aborted: break api = api.strip() if len(api) == 0: continue b = api.find("(") if b == -1: path = api sig = "" else: path = api[:b] sig = api[b:] while len(path) > 0: acWord = "" context = "" fullContext = "" pictureId = "" # search for word separators index = len(path) while index > 0: index -= 1 found = False for wsep in wseps: if path[:index].endswith(wsep): found = True break if found: if acWord == "": # completion found acWord = path[index:] path = path[: (index - len(wsep))] index = len(path) fullContext = path context = path with contextlib.suppress(ValueError): acWord, pictureId = acWord.split("?", 1) else: context = path[index:] break # none found? if acWord == "": acWord = path path = "" query.bindValue(":acWord", acWord) query.bindValue(":context", context) query.bindValue(":fullContext", fullContext) query.bindValue(":signature", sig) query.bindValue(":fileId", fileId) query.bindValue(":pictureId", pictureId) query.exec() sig = "" # step 4: load the given base classes info query.prepare(self.populate_bases_stmt) for base in bases: if self.__aborted: break base = base.strip() if len(base) == 0: continue class_, baseClasses = base.split(" ", 1) query.bindValue(":class", class_) query.bindValue(":baseClasses", baseClasses) query.bindValue(":fileId", fileId) query.exec() if not self.__aborted: # step 5: update the file entry query.prepare(self.update_file_stmt) query.bindValue(":lastRead", QDateTime.currentDateTime()) query.bindValue(":file", apiFile) query.exec() finally: query.finish() del query if self.__aborted: db.rollback() else: db.commit() def __deleteApiFile(self, apiFile): """ Private method to delete all references to an api file. @param apiFile filename of the raw API file @type str """ db = QSqlDatabase.database(self.__connectionName) db.transaction() try: query = QSqlQuery(db) # step 1: get the ID belonging to the api file query.prepare(self.file_id_stmt) query.bindValue(":file", apiFile) query.exec() query.next() # __IGNORE_WARNING_M513__ fileId = int(query.value(0)) # step 2: delete all API entries belonging to this file query.prepare(self.populate_del_api_stmt) query.bindValue(":fileId", fileId) query.exec() # step 3: delete all base classes entries belonging to this file query.prepare(self.populate_del_bases_stmt) query.bindValue(":fileId", fileId) query.exec() # step 4: delete the file entry query.prepare(self.file_delete_id_stmt) query.bindValue(":id", fileId) query.exec() finally: query.finish() del query db.commit() def __getApiFiles(self): """ Private method to get a list of API files loaded into the database. @return list of API filenames @rtype list of str """ apiFiles = [] db = QSqlDatabase.database(self.__connectionName) db.transaction() try: query = QSqlQuery(db) query.exec(self.api_files_stmt) while query.next(): # __IGNORE_WARNING_M513__ apiFiles.append(query.value(0)) finally: query.finish() del query db.commit() return apiFiles def run(self): """ Public method to perform the threads work. """ self.processing.emit(WorkerStatusStarted, "") if QSqlDatabase.contains(self.__connectionName): QSqlDatabase.removeDatabase(self.__connectionName) db = QSqlDatabase.addDatabase("QSQLITE", self.__connectionName) db.setDatabaseName(self.__databaseName) db.open() if db.isValid() and db.isOpen(): # step 1: remove API files not wanted any longer if not self.__refresh: loadedApiFiles = self.__getApiFiles() for apiFile in loadedApiFiles: if not self.__aborted and apiFile not in self.__apiFiles: self.__deleteApiFile(apiFile) # step 2: (re-)load api files for apiFile in self.__apiFiles: if not self.__aborted: self.__loadApiFileIfNewer(apiFile) db.close() if self.__aborted: self.processing.emit(WorkerStatusAborted, "") else: self.processing.emit(WorkerStatusFinished, "") class DbAPIs(QObject): """ Class implementing an API storage entity. @signal apiPreparationStatus(language, status, file) emitted to indicate the API preparation status for a language """ apiPreparationStatus = pyqtSignal(str, int, str) DB_VERSION = 4 create_mgmt_stmt = """ CREATE TABLE mgmt (format INTEGER) """ drop_mgmt_stmt = """DROP TABLE IF EXISTS mgmt""" create_api_stmt = """ CREATE TABLE api (acWord TEXT, context TEXT, fullContext TEXT, signature TEXT, fileId INTEGER, pictureId INTEGER, UNIQUE(acWord, fullContext, signature) ON CONFLICT IGNORE ) """ drop_api_stmt = """DROP TABLE IF EXISTS api""" create_bases_stmt = """ CREATE TABLE bases (class TEXT UNIQUE ON CONFLICT IGNORE, baseClasses TEXT, fileId INTEGER ) """ drop_bases_stmt = """DROP TABLE IF EXISTS bases""" create_file_stmt = """ CREATE TABLE file (id INTEGER PRIMARY KEY AUTOINCREMENT, file TEXT UNIQUE ON CONFLICT IGNORE, lastRead TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ drop_file_stmt = """DROP TABLE IF EXISTS file""" create_acWord_idx = """CREATE INDEX acWord_idx on api (acWord)""" drop_acWord_idx = """DROP INDEX IF EXISTS acWord_idx""" create_context_idx = """CREATE INDEX context_idx on api (context)""" drop_context_idx = """DROP INDEX IF EXISTS context_idx""" create_fullContext_idx = """CREATE INDEX fullContext_idx on api (fullContext)""" drop_fullContext_idx = """DROP INDEX IF EXISTS fullContext_idx""" create_bases_idx = """CREATE INDEX base_idx on bases (class)""" drop_bases_idx = """DROP INDEX IF EXISTS base_idx""" create_file_idx = """CREATE INDEX file_idx on file (file)""" drop_file_idx = """DROP INDEX IF EXISTS file_idx""" api_files_stmt = """ SELECT file FROM file """ ac_stmt = """ SELECT DISTINCT acWord, fullContext, pictureId FROM api WHERE acWord GLOB :acWord ORDER BY acWord """ ac_context_stmt = """ SELECT DISTINCT acWord, fullContext, pictureId FROM api WHERE context = :context ORDER BY acWord """ ac_context_word_stmt = """ SELECT DISTINCT acWord, fullContext, pictureId FROM api WHERE acWord GLOB :acWord AND context = :context ORDER BY acWord """ bases_stmt = """ SELECT baseClasses from bases WHERE class = :class """ ct_stmt = """ SELECT DISTINCT acWord, signature, fullContext FROM api WHERE acWord = :acWord """ ct_context_stmt = """ SELECT DISTINCT acWord, signature, fullContext FROM api WHERE acWord = :acWord AND context = :context """ ct_fullContext_stmt = """ SELECT DISTINCT acWord, signature, fullContext FROM api WHERE acWord = :acWord AND fullContext = :fullContext """ format_select_stmt = """ SELECT format FROM mgmt """ mgmt_insert_stmt = """ INSERT INTO mgmt (format) VALUES ({0:d}) """.format( DB_VERSION ) def __init__(self, language, projectType="", parent=None): """ Constructor @param language language of the APIs object @type str @param projectType type of the project @type str @param parent reference to the parent object @type QObject """ QObject.__init__(self, parent) if projectType: self.setObjectName("DbAPIs_{0}_{1}".format(language, projectType)) else: self.setObjectName("DbAPIs_{0}".format(language)) self.__inPreparation = False self.__worker = None self.__workerQueue = [] self.__opened = False self.__projectType = projectType self.__language = language if self.__projectType: self.__connectionName = "{0}_{1}".format( self.__language, self.__projectType ) else: self.__connectionName = self.__language if self.__language == ApisNameProject: self.__initAsProject() else: self.__initAsLanguage() def __initAsProject(self): """ Private method to initialize as a project API object. """ self.__lexer = None self.__project = ericApp().getObject("Project") self.__project.projectOpened.connect(self.__projectOpened) self.__project.newProject.connect(self.__projectOpened) self.__project.projectClosed.connect(self.__projectClosed) self.__project.projectFormCompiled.connect(self.__projectFormCompiled) self.__project.projectChanged.connect(self.__projectChanged) if self.__project.isOpen(): self.__projectOpened() def __initAsLanguage(self): """ Private method to initialize as a language API object. """ if self.__language == "Python3": self.__discardFirst = ["self", "cls"] else: self.__discardFirst = [] self.__lexer = Lexers.getLexer(self.__language) try: self.__apifiles = Preferences.getEditorAPI( self.__language, projectType=self.__projectType ) except TypeError: # older interface self.__apifiles = Preferences.getEditorAPI(self.__language) self.__apifiles.sort() if self.__lexer is not None: self.__openAPIs() def _apiDbName(self): """ Protected method to determine the name of the database file. @return name of the database file @rtype str """ if self.__language == ApisNameProject: return os.path.join( self.__project.getProjectManagementDir(), "project-apis.db" ) else: apisDir = os.path.join(Globals.getConfigDir(), "APIs") if not os.path.exists(apisDir): os.makedirs(apisDir) if self.__projectType: filename = "{0}_{1}-api.db".format(self.__language, self.__projectType) else: filename = "{0}-api.db".format(self.__language) return os.path.join(apisDir, filename) def close(self): """ Public method to close the database. """ self.__workerQueue = [] if self.__worker is not None: self.__worker.abort() if self.__worker is not None: self.__worker.wait(5000) if self.__worker is not None and not self.__worker.isFinished(): self.__worker.terminate() if self.__worker is not None: self.__worker.wait(5000) if QSqlDatabase and QSqlDatabase.database(self.__connectionName).isOpen(): QSqlDatabase.database(self.__connectionName).close() QSqlDatabase.removeDatabase(self.__connectionName) self.__opened = False def __openApiDb(self): """ Private method to open the API database. @return flag indicating the database status @rtype bool """ db = QSqlDatabase.database(self.__connectionName, False) if not db.isValid(): # the database connection is a new one db = QSqlDatabase.addDatabase("QSQLITE", self.__connectionName) dbName = self._apiDbName() if self.__language == ApisNameProject and not os.path.exists( self.__project.getProjectManagementDir() ): opened = False else: db.setDatabaseName(dbName) opened = db.open() if not opened: QSqlDatabase.removeDatabase(self.__connectionName) else: opened = True return opened def __createApiDB(self): """ Private method to create an API database. """ db = QSqlDatabase.database(self.__connectionName) db.transaction() try: query = QSqlQuery(db) # step 1: drop old tables query.exec(self.drop_mgmt_stmt) query.exec(self.drop_api_stmt) query.exec(self.drop_bases_stmt) query.exec(self.drop_file_stmt) # step 2: drop old indices query.exec(self.drop_acWord_idx) query.exec(self.drop_context_idx) query.exec(self.drop_fullContext_idx) query.exec(self.drop_bases_idx) query.exec(self.drop_file_idx) # step 3: create tables query.exec(self.create_api_stmt) query.exec(self.create_bases_stmt) query.exec(self.create_file_stmt) query.exec(self.create_mgmt_stmt) query.exec(self.mgmt_insert_stmt) # step 4: create indices query.exec(self.create_acWord_idx) query.exec(self.create_context_idx) query.exec(self.create_fullContext_idx) query.exec(self.create_bases_idx) query.exec(self.create_file_idx) finally: query.finish() del query db.commit() def getApiFiles(self): """ Public method to get a list of API files loaded into the database. @return list of API filenames @rtype list of str """ apiFiles = [] db = QSqlDatabase.database(self.__connectionName) db.transaction() try: query = QSqlQuery(db) query.exec(self.api_files_stmt) while query.next(): # __IGNORE_WARNING_M513__ apiFiles.append(query.value(0)) finally: query.finish() del query db.commit() return apiFiles def __isPrepared(self): """ Private method to check, if the database has been prepared. @return flag indicating the prepared status @rtype bool """ db = QSqlDatabase.database(self.__connectionName) prepared = len(db.tables()) > 0 if prepared: db.transaction() prepared = False try: query = QSqlQuery(db) ok = query.exec(self.format_select_stmt) if ok: query.next() # __IGNORE_WARNING_M513__ formatVersion = int(query.value(0)) if formatVersion >= self.DB_VERSION: prepared = True finally: query.finish() del query db.commit() return prepared def getCompletions(self, start=None, context=None, followHierarchy=False): """ Public method to determine the possible completions. @param start string giving the start of the word to be completed @type str @param context string giving the context (e.g. classname) to be completed @type str @param followHierarchy flag indicating to follow the hierarchy of base classes @type bool @return list of dictionaries with possible completions (key 'completion' contains the completion, key 'context' contains the context and key 'pictureId' contains the ID of the icon to be shown) @rtype list of dict """ completions = [] db = QSqlDatabase.database(self.__connectionName) if db.isOpen() and not self.__inPreparation: db.transaction() try: query = None if start is not None and context is not None: query = QSqlQuery(db) query.prepare(self.ac_context_word_stmt) query.bindValue(":acWord", start + "*") query.bindValue(":context", context) elif start is not None: query = QSqlQuery(db) query.prepare(self.ac_stmt) query.bindValue(":acWord", start + "*") elif context is not None: query = QSqlQuery(db) query.prepare(self.ac_context_stmt) query.bindValue(":context", context) if query is not None: query.exec() while query.next(): # __IGNORE_WARNING_M513__ completions.append( { "completion": query.value(0), "context": query.value(1), "pictureId": query.value(2), } ) query.finish() del query finally: db.commit() if followHierarchy: query = QSqlQuery(db) query.prepare(self.bases_stmt) query.bindValue(":class", context) query.exec() if query.next(): # __IGNORE_WARNING_M513__ bases = query.value(0).split() else: bases = [] for base in bases: completions.extend( self.getCompletions(start, base, followHierarchy=True) ) query.finish() del query return completions def getCalltips( self, acWord, commas, context=None, fullContext=None, showContext=True, followHierarchy=False, ): """ Public method to determine the calltips. @param acWord function to get calltips for @type str @param commas minimum number of commas contained in the calltip @type int @param context string giving the context (e.g. classname) @type str @param fullContext string giving the full context @type str @param showContext flag indicating to show the calltip context @type bool @param followHierarchy flag indicating to follow the hierarchy of base classes @type bool @return list of calltips @rtype list of str """ calltips = [] db = QSqlDatabase.database(self.__connectionName) if db.isOpen() and not self.__inPreparation: if self.autoCompletionWordSeparators(): contextSeparator = self.autoCompletionWordSeparators()[0] else: contextSeparator = " " db.transaction() try: query = QSqlQuery(db) if fullContext: query.prepare(self.ct_fullContext_stmt) query.bindValue(":fullContext", fullContext) elif context: query.prepare(self.ct_context_stmt) query.bindValue(":context", context) else: query.prepare(self.ct_stmt) query.bindValue(":acWord", acWord) query.exec() while query.next(): # __IGNORE_WARNING_M513__ word = query.value(0) sig = query.value(1) fullCtx = query.value(2) if sig: if self.__discardFirst: sig = "({0}".format(sig[1:]) for discard in self.__discardFirst: sig = sig.replace(discard, "", 1) sig = sig.strip(", \t\r\n") if self.__enoughCommas(sig, commas): if showContext: calltips.append( "{0}{1}{2}{3}".format( fullCtx, contextSeparator if fullCtx else "", word, sig, ) ) else: calltips.append("{0}{1}".format(word, sig)) query.finish() del query finally: db.commit() if followHierarchy: query = QSqlQuery(db) query.prepare(self.bases_stmt) query.bindValue(":class", context) query.exec() if query.next(): # __IGNORE_WARNING_M513__ bases = query.value(0).split() else: bases = [] for base in bases: calltips.extend( self.getCalltips( acWord, commas, context=base, showContext=showContext, followHierarchy=True, ) ) query.finish() del query if context and len(calltips) == 0 and not followHierarchy: # nothing found, try without a context calltips = self.getCalltips(acWord, commas, showContext=showContext) return calltips def __enoughCommas(self, s, commas): """ Private method to determine, if the given string contains enough commas. @param s string to check @type str @param commas number of commas to check for @type int @return flag indicating, that there are enough commas @rtype bool """ end = s.find(")") if end < 0: return False w = s[:end] return w.count(",") >= commas def __openAPIs(self): """ Private method to open the API database. """ self.__opened = self.__openApiDb() if self.__opened: if not self.__isPrepared(): self.__createApiDB() # prepare the database if neccessary self.prepareAPIs() def __getProjectFormSources(self, normalized=False): """ Private method to get the source files for the project forms. @param normalized flag indicating a normalized filename is wanted @type bool @return list of project form sources @rtype list of str """ if self.__project.getProjectLanguage() in ("Python3", "MicroPython", "Cython"): sourceExt = ".py" elif self.__project.getProjectLanguage() == "Ruby": sourceExt = ".rb" else: return [] formsSources = [] forms = self.__project.getProjectFiles("FORMS") for fn in forms: ofn = os.path.splitext(fn)[0] dirname, filename = os.path.split(ofn) formSource = os.path.join(dirname, "Ui_" + filename + sourceExt) if normalized: formSource = os.path.join(self.__project.getProjectPath(), formSource) formsSources.append(formSource) return formsSources def prepareAPIs(self, rawList=None): """ Public method to prepare the APIs if neccessary. @param rawList list of raw API files @type list of str """ if self.__inPreparation: return projectPath = "" if rawList: apiFiles = rawList[:] elif self.__language == ApisNameProject: apiFiles = self.__project.getSources()[:] apiFiles.extend( [f for f in self.__getProjectFormSources() if f not in apiFiles] ) projectPath = self.__project.getProjectPath() projectType = "" else: apiFiles = Preferences.getEditorAPI( self.__language, projectType=self.__projectType ) projectType = self.__projectType self.__worker = DbAPIsWorker( self, self.__language, apiFiles, self._apiDbName(), projectPath, projectType=projectType, ) self.__worker.processing.connect( self.__processingStatus, Qt.ConnectionType.QueuedConnection ) self.__worker.start() def __processQueue(self): """ Private slot to process the queue of files to load. """ if self.__worker is not None and self.__worker.isFinished(): self.__worker.deleteLater() self.__worker = None if self.__worker is None and len(self.__workerQueue) > 0: apiFiles = [self.__workerQueue.pop(0)] if self.__language == ApisNameProject: projectPath = self.__project.getProjectPath() apiFiles = [apiFiles[0].replace(projectPath + os.sep, "")] projectType = "" else: projectPath = "" projectType = self.__projectType self.__worker = DbAPIsWorker( self, self.__language, apiFiles, self._apiDbName(), projectPath, projectType=projectType, refresh=True, ) self.__worker.processing.connect( self.__processingStatus, Qt.ConnectionType.QueuedConnection ) self.__worker.start() def getLexer(self): """ Public method to return a reference to our lexer object. @return reference to the lexer object @rtype Lexer """ return self.__lexer def autoCompletionWordSeparators(self): """ Public method to get the word separator characters. @return word separator characters @rtype list of str """ if self.__lexer: return self.__lexer.autoCompletionWordSeparators() return None def __processingStatus(self, status, filename): """ Private slot handling the processing signal of the API preparation thread. @param status preparation status (one of WorkerStatus...) @type int @param filename name of the file being processed @type str """ if status == WorkerStatusStarted: self.__inPreparation = True self.apiPreparationStatus.emit(self.__language, WorkerStatusStarted, "") elif status == WorkerStatusFinished: self.__inPreparation = False self.apiPreparationStatus.emit(self.__language, WorkerStatusFinished, "") QTimer.singleShot(0, self.__processQueue) elif status == WorkerStatusAborted: self.__inPreparation = False self.apiPreparationStatus.emit(self.__language, WorkerStatusAborted, "") QTimer.singleShot(0, self.__processQueue) elif status == WorkerStatusFile: self.apiPreparationStatus.emit(self.__language, WorkerStatusFile, filename) ######################################################## ## project related stuff below ######################################################## def __projectOpened(self): """ Private slot to perform actions after a project has been opened. """ if self.__project.getProjectLanguage() in ("Python3", "MicroPython", "Cython"): self.__discardFirst = ["self", "cls"] else: self.__discardFirst = [] self.__lexer = Lexers.getLexer(self.__project.getProjectLanguage()) self.__openAPIs() def __projectClosed(self): """ Private slot to perform actions after a project has been closed. """ self.close() def __projectFormCompiled(self, filename): """ Private slot to handle the projectFormCompiled signal. @param filename name of the form file that was compiled @type str """ self.__workerQueue.append(filename) self.__processQueue() def __projectChanged(self): """ Private slot to handle the projectChanged signal. """ if self.__opened: self.__projectClosed() self.__projectOpened() def editorSaved(self, filename): """ Public slot to handle the editorSaved signal. @param filename name of the file that was saved @type str """ if self.__project.isProjectSource(filename): self.__workerQueue.append(filename) self.__processQueue() class APIsManager(QObject): """ Class implementing the APIsManager class, which is the central store for API information used by autocompletion and calltips. """ def __init__(self, mainWindow, parent=None): """ Constructor @param mainWindow reference to the main eric7 window @type QMainWindow @param parent reference to the parent object @type QObject """ QObject.__init__(self, parent) self.setObjectName("Assistant_APIsManager") self.__mw = mainWindow # initialize the apis dictionary self.__apis = {} def reloadAPIs(self): """ Public slot to reload the api information. """ for api in list(self.__apis.values()): api and api.prepareAPIs() def getAPIs(self, language, projectType=""): """ Public method to get an apis object for autocompletion/calltips. This method creates and loads an APIs object dynamically upon request. This saves memory for languages, that might not be needed at the moment. @param language language of the requested APIs object @type str @param projectType type of the project @type str @return reference to the APIs object @rtype APIs """ try: return self.__apis[(language, projectType)] except KeyError: if ( language in Lexers.getSupportedApiLanguages() or language == ApisNameProject ): # create the api object api = DbAPIs(language, projectType=projectType) api.apiPreparationStatus.connect(self.__apiPreparationStatus) self.__apis[(language, projectType)] = api return self.__apis[(language, projectType)] else: return None def deactivate(self): """ Public method to perform actions upon deactivation. """ for apiLang in self.__apis: self.__apis[apiLang].close() self.__apis[apiLang].deleteLater() self.__apis[apiLang] = None def __showMessage(self, msg): """ Private message to show a message in the main windows status bar. @param msg message to be shown @type str """ if msg: self.__mw.statusBar().showMessage(msg, 2000) def __apiPreparationStatus(self, language, status, filename): """ Private slot handling the preparation status signal of an API object. @param language language of the API @type str @param status preparation status (one of WorkerStatus...) @type int @param filename name of the file being processed @type str """ if language == ApisNameProject: language = self.tr("Project") if status == WorkerStatusStarted: self.__showMessage( self.tr("Preparation of '{0}' APIs started.").format(language) ) elif status == WorkerStatusFile: self.__showMessage( self.tr("'{0}' APIs: Processing '{1}'").format( language, os.path.basename(filename) ) ) elif status == WorkerStatusFinished: self.__showMessage( self.tr("Preparation of '{0}' APIs finished.").format(language) ) elif status == WorkerStatusAborted: self.__showMessage( self.tr("Preparation of '{0}' APIs cancelled.").format(language) ) # # eflag: noqa = M523, M834, S608