diff -r 32a3c9d62e90 -r 22bc345844e7 RadonMetrics/CyclomaticComplexityCalculator.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/RadonMetrics/CyclomaticComplexityCalculator.py Sat Sep 19 18:24:07 2015 +0200 @@ -0,0 +1,211 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2015 Detlev Offenbach <detlev@die-offenbachs.de> +# + +""" +Module implementing the cyclomatic complexity service. +""" + +from __future__ import unicode_literals + +try: + str = unicode # __IGNORE_EXCEPTION__ __IGNORE_WARNING__ +except NameError: + pass + +import multiprocessing +import sys + + +def initService(): + """ + Initialize the service and return the entry point. + + @return the entry point for the background client (function) + """ + return cyclomaticComplexity + + +def initBatchService(): + """ + Initialize the batch service and return the entry point. + + @return the entry point for the background client (function) + """ + return batchCyclomaticComplexity + + +def cyclomaticComplexity(file, text=""): + """ + Private function to calculate the cyclomatic complexity of one file. + + @param file source filename + @type str + @param text source text + @type str + @return tuple containing the result dictionary + @rtype (tuple of dict) + """ + return __cyclomaticComplexity(file, text) + + +def batchCyclomaticComplexity(argumentsList, send, fx, cancelled): + """ + Module function to calculate the cyclomatic complexity for a batch of + files. + + @param argumentsList list of arguments tuples as given for + cyclomaticComplexity + @type list + @param send reference to send function + @type function + @param fx registered service name + @type str + @param cancelled reference to function checking for a cancellation + @type function + """ + try: + NumberOfProcesses = multiprocessing.cpu_count() + if NumberOfProcesses >= 1: + NumberOfProcesses -= 1 + except NotImplementedError: + NumberOfProcesses = 1 + + # Create queues + taskQueue = multiprocessing.Queue() + doneQueue = multiprocessing.Queue() + + # Submit tasks (initially two time number of processes + initialTasks = 2 * NumberOfProcesses + for task in argumentsList[:initialTasks]: + taskQueue.put(task) + + # Start worker processes + for i in range(NumberOfProcesses): + multiprocessing.Process(target=worker, args=(taskQueue, doneQueue))\ + .start() + + # Get and send results + endIndex = len(argumentsList) - initialTasks + for i in range(len(argumentsList)): + filename, result = doneQueue.get() + send(fx, filename, result) + if cancelled(): + # just exit the loop ignoring the results of queued tasks + break + if i < endIndex: + taskQueue.put(argumentsList[i + initialTasks]) + + # Tell child processes to stop + for i in range(NumberOfProcesses): + taskQueue.put('STOP') + + +def worker(input, output): + """ + Module function acting as the parallel worker for the cyclomatic + complexity calculation. + + @param input input queue + @type multiprocessing.Queue + @param output output queue + @type multiprocessing.Queue + """ + for filename, source in iter(input.get, 'STOP'): + result = __cyclomaticComplexity(filename, source) + output.put((filename, result)) + + +def __cyclomaticComplexity(file, text=""): + """ + Private function to calculate the cyclomatic complexity for one Python + file. + + @param file source filename + @type str + @param text source text + @type str + @return tuple containing the result dictionary + @rtype (tuple of dict) + """ + from radon.complexity import cc_visit, cc_rank + + # Check type for py2: if not str it's unicode + if sys.version_info[0] == 2: + try: + text = text.encode('utf-8') + except UnicodeError: + pass + + try: + cc = cc_visit(text) + res = {"result": [v for v in map(__cc2Dict, cc) + if v["type"] != "method"]} + totalCC = 0 + rankSummary = { + "A": 0, + "B": 0, + "C": 0, + "D": 0, + "E": 0, + "F": 0, + } + for block in cc: + totalCC += block.complexity + rankSummary[cc_rank(block.complexity)] += 1 + res["total_cc"] = totalCC + res["count"] = len(cc) + res["summary"] = rankSummary + except Exception as err: + res = {"error": str(err)} + return (res, ) + + +def __cc2Dict(obj): + """ + Private function to convert an object holding cyclomatic complexity results + into a dictionary. + + @param obj object as returned from analyze() + @type radon.raw.Module + @return conversion result + @rtype dict + """ + from radon.complexity import cc_rank + from radon.visitors import Function + + result = { + 'type': __getType(obj), + 'rank': cc_rank(obj.complexity), + } + attrs = set(Function._fields) - set(('is_method', 'closures')) + attrs.add("fullname") + for attr in attrs: + v = getattr(obj, attr, None) + if v is not None: + result[attr] = v + for key in ('methods', 'closures'): + if hasattr(obj, key): + result[key] = list(map(__cc2Dict, getattr(obj, key))) + return result + + +def __getType(obj): + """ + Private function to get the type of an object as a string. + + @param obj object to be analyzed + @type radon.visitors.Function or radon.visitors.Class + @return type string for the object + @rtype str, one of ["method", "function", "class"] + """ + from radon.visitors import Function + + if isinstance(obj, Function): + if obj.is_method: + return 'method' + else: + return 'function' + else: + return 'class'