RadonMetrics/CyclomaticComplexityCalculator.py

changeset 13
22bc345844e7
child 37
7fd806094f0f
diff -r 32a3c9d62e90 -r 22bc345844e7 RadonMetrics/CyclomaticComplexityCalculator.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/RadonMetrics/CyclomaticComplexityCalculator.py	Sat Sep 19 18:24:07 2015 +0200
@@ -0,0 +1,211 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2015 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing the cyclomatic complexity service.
+"""
+
+from __future__ import unicode_literals
+
+try:
+    str = unicode       # __IGNORE_EXCEPTION__ __IGNORE_WARNING__
+except NameError:
+    pass
+
+import multiprocessing
+import sys
+
+
+def initService():
+    """
+    Initialize the service and return the entry point.
+    
+    @return the entry point for the background client (function)
+    """
+    return cyclomaticComplexity
+
+
+def initBatchService():
+    """
+    Initialize the batch service and return the entry point.
+    
+    @return the entry point for the background client (function)
+    """
+    return batchCyclomaticComplexity
+
+
+def cyclomaticComplexity(file, text=""):
+    """
+    Private function to calculate the cyclomatic complexity of one file.
+    
+    @param file source filename
+    @type str
+    @param text source text
+    @type str
+    @return tuple containing the result dictionary
+    @rtype (tuple of dict)
+    """
+    return __cyclomaticComplexity(file, text)
+
+
+def batchCyclomaticComplexity(argumentsList, send, fx, cancelled):
+    """
+    Module function to calculate the cyclomatic complexity for a batch of
+    files.
+    
+    @param argumentsList list of arguments tuples as given for
+        cyclomaticComplexity
+    @type list
+    @param send reference to send function
+    @type function
+    @param fx registered service name
+    @type str
+    @param cancelled reference to function checking for a cancellation
+    @type function
+    """
+    try:
+        NumberOfProcesses = multiprocessing.cpu_count()
+        if NumberOfProcesses >= 1:
+            NumberOfProcesses -= 1
+    except NotImplementedError:
+        NumberOfProcesses = 1
+
+    # Create queues
+    taskQueue = multiprocessing.Queue()
+    doneQueue = multiprocessing.Queue()
+
+    # Submit tasks (initially two time number of processes
+    initialTasks = 2 * NumberOfProcesses
+    for task in argumentsList[:initialTasks]:
+        taskQueue.put(task)
+
+    # Start worker processes
+    for i in range(NumberOfProcesses):
+        multiprocessing.Process(target=worker, args=(taskQueue, doneQueue))\
+            .start()
+
+    # Get and send results
+    endIndex = len(argumentsList) - initialTasks
+    for i in range(len(argumentsList)):
+        filename, result = doneQueue.get()
+        send(fx, filename, result)
+        if cancelled():
+            # just exit the loop ignoring the results of queued tasks
+            break
+        if i < endIndex:
+            taskQueue.put(argumentsList[i + initialTasks])
+
+    # Tell child processes to stop
+    for i in range(NumberOfProcesses):
+        taskQueue.put('STOP')
+
+
+def worker(input, output):
+    """
+    Module function acting as the parallel worker for the cyclomatic
+    complexity calculation.
+    
+    @param input input queue
+    @type multiprocessing.Queue
+    @param output output queue
+    @type multiprocessing.Queue
+    """
+    for filename, source in iter(input.get, 'STOP'):
+        result = __cyclomaticComplexity(filename, source)
+        output.put((filename, result))
+
+
+def __cyclomaticComplexity(file, text=""):
+    """
+    Private function to calculate the cyclomatic complexity for one Python
+    file.
+    
+    @param file source filename
+    @type str
+    @param text source text
+    @type str
+    @return tuple containing the result dictionary
+    @rtype (tuple of dict)
+    """
+    from radon.complexity import cc_visit, cc_rank
+    
+    # Check type for py2: if not str it's unicode
+    if sys.version_info[0] == 2:
+        try:
+            text = text.encode('utf-8')
+        except UnicodeError:
+            pass
+    
+    try:
+        cc = cc_visit(text)
+        res = {"result": [v for v in map(__cc2Dict, cc)
+                          if v["type"] != "method"]}
+        totalCC = 0
+        rankSummary = {
+            "A": 0,
+            "B": 0,
+            "C": 0,
+            "D": 0,
+            "E": 0,
+            "F": 0,
+        }
+        for block in cc:
+            totalCC += block.complexity
+            rankSummary[cc_rank(block.complexity)] += 1
+        res["total_cc"] = totalCC
+        res["count"] = len(cc)
+        res["summary"] = rankSummary
+    except Exception as err:
+        res = {"error": str(err)}
+    return (res, )
+
+
+def __cc2Dict(obj):
+    """
+    Private function to convert an object holding cyclomatic complexity results
+    into a dictionary.
+    
+    @param obj object as returned from analyze()
+    @type radon.raw.Module
+    @return conversion result
+    @rtype dict
+    """
+    from radon.complexity import cc_rank
+    from radon.visitors import Function
+    
+    result = {
+        'type': __getType(obj),
+        'rank': cc_rank(obj.complexity),
+    }
+    attrs = set(Function._fields) - set(('is_method', 'closures'))
+    attrs.add("fullname")
+    for attr in attrs:
+        v = getattr(obj, attr, None)
+        if v is not None:
+            result[attr] = v
+    for key in ('methods', 'closures'):
+        if hasattr(obj, key):
+            result[key] = list(map(__cc2Dict, getattr(obj, key)))
+    return result
+
+
+def __getType(obj):
+    """
+    Private function to get the type of an object as a string.
+    
+    @param obj object to be analyzed
+    @type radon.visitors.Function or radon.visitors.Class
+    @return type string for the object
+    @rtype str, one of ["method", "function", "class"]
+    """
+    from radon.visitors import Function
+    
+    if isinstance(obj, Function):
+        if obj.is_method:
+            return 'method'
+        else:
+            return 'function'
+    else:
+        return 'class'

eric ide

mercurial