diff -r ea6aed49cd69 -r b517a1c5d5de VultureChecker/VultureCheckerService.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/VultureChecker/VultureCheckerService.py Sun Oct 04 18:28:36 2015 +0200 @@ -0,0 +1,218 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2015 Detlev Offenbach <detlev@die-offenbachs.de> +# + +""" +Module implementing the cyclomatic complexity service. +""" + +from __future__ import unicode_literals + +try: + str = unicode # __IGNORE_EXCEPTION__ __IGNORE_WARNING__ +except NameError: + pass + +import ast + +import multiprocessing +import sys + +from vulture import Vulture + + +def initService(): + """ + Initialize the service and return the entry point. + + @return the entry point for the background client (function) + """ + return vultureCheck + + +def initBatchService(): + """ + Initialize the batch service and return the entry point. + + @return the entry point for the background client (function) + """ + return batchVultureCheck + + +def vultureCheck(file, text=""): + """ + Private function to analyze one file. + + @param file source filename + @type str + @param text source text + @type str + @return tuple containing the result dictionary + @rtype (tuple of dict) + """ + return __analyze(file, text) + + +def batchVultureCheck(argumentsList, send, fx, cancelled): + """ + Module function to analyze a batch of files. + + @param argumentsList list of arguments tuples as given for vultureCheck + @type list + @param send reference to send function + @type function + @param fx registered service name + @type str + @param cancelled reference to function checking for a cancellation + @type function + """ + try: + NumberOfProcesses = multiprocessing.cpu_count() + if NumberOfProcesses >= 1: + NumberOfProcesses -= 1 + except NotImplementedError: + NumberOfProcesses = 1 + + # Create queues + taskQueue = multiprocessing.Queue() + doneQueue = multiprocessing.Queue() + + # Submit tasks (initially two time number of processes + initialTasks = 2 * NumberOfProcesses + for task in argumentsList[:initialTasks]: + taskQueue.put(task) + + # Start worker processes + for i in range(NumberOfProcesses): + multiprocessing.Process(target=worker, args=(taskQueue, doneQueue))\ + .start() + + # Get and send results + endIndex = len(argumentsList) - initialTasks + for i in range(len(argumentsList)): + filename, result = doneQueue.get() + send(fx, filename, result) + if cancelled(): + # just exit the loop ignoring the results of queued tasks + break + if i < endIndex: + taskQueue.put(argumentsList[i + initialTasks]) + + # Tell child processes to stop + for i in range(NumberOfProcesses): + taskQueue.put('STOP') + + +def worker(input, output): + """ + Module function acting as the parallel worker for the cyclomatic + complexity calculation. + + @param input input queue + @type multiprocessing.Queue + @param output output queue + @type multiprocessing.Queue + """ + for filename, source in iter(input.get, 'STOP'): + result = __analyze(filename, source) + output.put((filename, result)) + + +def __analyze(file, text=""): + """ + Private function to analyze one Python file. + + @param file source file name + @type str + @param text source text + @type str + @return tuple containing the result dictionary + @rtype (tuple of dict) + """ + + # Check type for py2: if not str it's unicode + if sys.version_info[0] == 2: + try: + text = text.encode('utf-8') + except UnicodeError: + pass + + try: + v = EricVulture(file) + v.scan(text) + res = v.getResults() + except Exception as err: + res = {"error": str(err)} + return (res, ) + + +class EricVulture(Vulture): + """ + Class to adopt the Vulture class to the eric plug-in functionality. + """ + def __init__(self, filename): + """ + Constructor + + @param filename source file name + @type str + """ + super(EricVulture, self).__init__() + + self.file = filename + self.code = None + + def scan(self, source): + """ + Public method to scan the source text. + + @param source source text + @type str + """ + self.code = source.splitlines() + node = ast.parse(source, filename=self.file) + self.visit(node) + + def __item2Dict(self, item): + """ + Private method to convert a vulture item to a dictionary. + + @param item vulture item + @type vulture.Item + @return item dictionary + @rtype dict + """ + d = { + "name": str(item), + "type": getattr(item, "typ", ""), + "file": getattr(item, "file", ""), + "line": getattr(item, "lineno", ""), + } + return d + + def getResults(self): + """ + Public method to get the scan results. + + @return scan results + @rtype dict + """ + return { + "DefinedAttributes": + [self.__item2Dict(i) for i in self.defined_attrs], + "DefinedFunctions": + [self.__item2Dict(i) for i in self.defined_funcs], + "DefinedProperties": + [self.__item2Dict(i) for i in self.defined_props], + "DefinedVariables": + [self.__item2Dict(i) for i in self.defined_vars], + "UsedAttributes": + [self.__item2Dict(i) for i in self.used_attrs], + "UsedVariables": + [self.__item2Dict(i) for i in self.used_vars], + "TupleVariables": + [self.__item2Dict(i) for i in self.tuple_assign_vars], + "Aliases": + [self.__item2Dict(i) for i in self.names_imported_as_aliases], + }