Wed, 21 Dec 2022 08:52:36 +0100
Adapted some import statements to eric 23.1 and newer.
# -*- coding: utf-8 -*- # Copyright (c) 2015 - 2022 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the maintainability index service. """ import multiprocessing import queue def initService(): """ Initialize the service and return the entry point. @return the entry point for the background client (function) """ return maintainabilityIndex def initBatchService(): """ Initialize the batch service and return the entry point. @return the entry point for the background client (function) """ return batchMaintainabilityIndex def maintainabilityIndex(file, text=""): """ Private function to calculate the maintainability index of one file. @param file source filename @type str @param text source text @type str @return tuple containing the result dictionary @rtype (tuple of dict) """ return __maintainabilityIndex(file, text) def batchMaintainabilityIndex(argumentsList, send, fx, cancelled, maxProcesses=0): """ Module function to calculate the maintainability index for a batch of files. @param argumentsList list of arguments tuples as given for maintainabilityIndex @type list @param send reference to send function @type function @param fx registered service name @type str @param cancelled reference to function checking for a cancellation @type function @param maxProcesses number of processes to be used @type int """ if maxProcesses == 0: # determine based on CPU count try: NumberOfProcesses = multiprocessing.cpu_count() if NumberOfProcesses >= 1: NumberOfProcesses -= 1 except NotImplementedError: NumberOfProcesses = 1 else: NumberOfProcesses = maxProcesses # Create queues taskQueue = multiprocessing.Queue() doneQueue = multiprocessing.Queue() # Submit tasks (initially two time number of processes initialTasks = 2 * NumberOfProcesses for task in argumentsList[:initialTasks]: taskQueue.put(task) # Start worker processes workers = [ multiprocessing.Process(target=workerTask, args=(taskQueue, doneQueue)) for _ in range(NumberOfProcesses) ] for worker in workers: worker.start() # Get and send results endIndex = len(argumentsList) - initialTasks for i in range(len(argumentsList)): resultSent = False wasCancelled = False while not resultSent: try: # get result (waiting max. 3 seconds and send it to frontend filename, result = doneQueue.get() send(fx, filename, result) resultSent = True except queue.Empty: # ignore empty queue, just carry on if cancelled(): wasCancelled = True break if wasCancelled or cancelled(): # just exit the loop ignoring the results of queued tasks break if i < endIndex: taskQueue.put(argumentsList[i + initialTasks]) # Tell child processes to stop for _ in range(NumberOfProcesses): taskQueue.put("STOP") for worker in workers: worker.join() worker.close() def workerTask(inputQueue, outputQueue): """ Module function acting as the parallel worker for the maintainability index calculation. @param inputQueue input queue @type multiprocessing.Queue @param outputQueue output queue @type multiprocessing.Queue """ for filename, source in iter(inputQueue.get, "STOP"): result = __maintainabilityIndex(filename, source) outputQueue.put((filename, result)) def __maintainabilityIndex(file, text=""): """ Private function to calculate the maintainability index for one Python file. @param file source filename @type str @param text source text @type str @return tuple containing the result dictionary @rtype (tuple of dict) """ from radon.metrics import mi_rank, mi_visit try: mi = mi_visit(text, True) rank = mi_rank(mi) res = {"mi": mi, "rank": rank} except Exception as err: res = {"error": str(err)} return (res,)