Mon, 27 Mar 2017 18:34:33 +0200
Fixed some code style issues.
# -*- coding: utf-8 -*- # Copyright (c) 2015 - 2017 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the cyclomatic complexity service. """ from __future__ import unicode_literals try: str = unicode # __IGNORE_EXCEPTION__ __IGNORE_WARNING__ except NameError: pass import multiprocessing import sys def initService(): """ Initialize the service and return the entry point. @return the entry point for the background client (function) """ return cyclomaticComplexity def initBatchService(): """ Initialize the batch service and return the entry point. @return the entry point for the background client (function) """ return batchCyclomaticComplexity def cyclomaticComplexity(file, text=""): """ Private function to calculate the cyclomatic complexity of one file. @param file source filename @type str @param text source text @type str @return tuple containing the result dictionary @rtype (tuple of dict) """ return __cyclomaticComplexity(file, text) def batchCyclomaticComplexity(argumentsList, send, fx, cancelled): """ Module function to calculate the cyclomatic complexity for a batch of files. @param argumentsList list of arguments tuples as given for cyclomaticComplexity @type list @param send reference to send function @type function @param fx registered service name @type str @param cancelled reference to function checking for a cancellation @type function """ try: NumberOfProcesses = multiprocessing.cpu_count() if NumberOfProcesses >= 1: NumberOfProcesses -= 1 except NotImplementedError: NumberOfProcesses = 1 # Create queues taskQueue = multiprocessing.Queue() doneQueue = multiprocessing.Queue() # Submit tasks (initially two time number of processes initialTasks = 2 * NumberOfProcesses for task in argumentsList[:initialTasks]: taskQueue.put(task) # Start worker processes for i in range(NumberOfProcesses): multiprocessing.Process(target=worker, args=(taskQueue, doneQueue))\ .start() # Get and send results endIndex = len(argumentsList) - initialTasks for i in range(len(argumentsList)): filename, result = doneQueue.get() send(fx, filename, result) if cancelled(): # just exit the loop ignoring the results of queued tasks break if i < endIndex: taskQueue.put(argumentsList[i + initialTasks]) # Tell child processes to stop for i in range(NumberOfProcesses): taskQueue.put('STOP') def worker(inputQueue, outputQueue): """ Module function acting as the parallel worker for the cyclomatic complexity calculation. @param inputQueue input queue @type multiprocessing.Queue @param outputQueue output queue @type multiprocessing.Queue """ for filename, source in iter(inputQueue.get, 'STOP'): result = __cyclomaticComplexity(filename, source) outputQueue.put((filename, result)) def __cyclomaticComplexity(file, text=""): """ Private function to calculate the cyclomatic complexity for one Python file. @param file source filename @type str @param text source text @type str @return tuple containing the result dictionary @rtype (tuple of dict) """ from radon.complexity import cc_visit, cc_rank # Check type for py2: if not str it's unicode if sys.version_info[0] == 2: try: text = text.encode('utf-8') except UnicodeError: pass try: cc = cc_visit(text) res = {"result": [v for v in map(__cc2Dict, cc) if v["type"] != "method"]} totalCC = 0 rankSummary = { "A": 0, "B": 0, "C": 0, "D": 0, "E": 0, "F": 0, } for block in cc: totalCC += block.complexity rankSummary[cc_rank(block.complexity)] += 1 res["total_cc"] = totalCC res["count"] = len(cc) res["summary"] = rankSummary except Exception as err: res = {"error": str(err)} return (res, ) def __cc2Dict(obj): """ Private function to convert an object holding cyclomatic complexity results into a dictionary. @param obj object as returned from analyze() @type radon.raw.Module @return conversion result @rtype dict """ from radon.complexity import cc_rank from radon.visitors import Function result = { 'type': __getType(obj), 'rank': cc_rank(obj.complexity), } attrs = set(Function._fields) - set(('is_method', 'closures')) attrs.add("fullname") for attr in attrs: v = getattr(obj, attr, None) if v is not None: result[attr] = v for key in ('methods', 'closures'): if hasattr(obj, key): result[key] = list(map(__cc2Dict, getattr(obj, key))) return result def __getType(obj): """ Private function to get the type of an object as a string. @param obj object to be analyzed @type radon.visitors.Function or radon.visitors.Class @return type string for the object @rtype str, one of ["method", "function", "class"] """ from radon.visitors import Function if isinstance(obj, Function): if obj.is_method: return 'method' else: return 'function' else: return 'class'