RadonMetrics/MaintainabilityIndexCalculator.py

changeset 10
8b1920a22df3
child 13
22bc345844e7
diff -r 7f6e04213998 -r 8b1920a22df3 RadonMetrics/MaintainabilityIndexCalculator.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/RadonMetrics/MaintainabilityIndexCalculator.py	Fri Sep 18 19:46:57 2015 +0200
@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2015 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+from __future__ import unicode_literals
+
+try:
+    str = unicode       # __IGNORE_EXCEPTION __IGNORE_WARNING__
+except NameError:
+    pass
+
+import multiprocessing
+import sys
+
+
+def initService():
+    """
+    Initialize the service and return the entry point.
+    
+    @return the entry point for the background client (function)
+    """
+    return maintainabilityIndex
+
+
+def initBatchService():
+    """
+    Initialize the batch service and return the entry point.
+    
+    @return the entry point for the background client (function)
+    """
+    return batchMaintainabilityIndex
+
+
+def maintainabilityIndex(file, text=""):
+    """
+    Private function to calculate the maintainability index of one file.
+    
+    @param file source filename
+    @type str
+    @param text source text
+    @param str
+    @return tuple containing the result dictionary
+    @rtype (tuple of dict)
+    """
+    return __maintainabilityIndex(file, text)
+
+
+def batchMaintainabilityIndex(argumentsList, send, fx, cancelled):
+    """
+    Module function to calculate the maintainability index for a batch of
+    files.
+    
+    @param argumentsList list of arguments tuples as given for check
+    @type list
+    @param send reference to send function
+    @type function
+    @param fx registered service name
+    @type str
+    @param cancelled reference to function checking for a cancellation
+    @type function
+    """
+    try:
+        NumberOfProcesses = multiprocessing.cpu_count()
+        if NumberOfProcesses >= 1:
+            NumberOfProcesses -= 1
+    except NotImplementedError:
+        NumberOfProcesses = 1
+
+    # Create queues
+    taskQueue = multiprocessing.Queue()
+    doneQueue = multiprocessing.Queue()
+
+    # Submit tasks (initially two time number of processes
+    initialTasks = 2 * NumberOfProcesses
+    for task in argumentsList[:initialTasks]:
+        taskQueue.put(task)
+
+    # Start worker processes
+    for i in range(NumberOfProcesses):
+        multiprocessing.Process(target=worker, args=(taskQueue, doneQueue))\
+            .start()
+
+    # Get and send results
+    endIndex = len(argumentsList) - initialTasks
+    for i in range(len(argumentsList)):
+        filename, result = doneQueue.get()
+        send(fx, filename, result)
+        if cancelled():
+            # just exit the loop ignoring the results of queued tasks
+            break
+        if i < endIndex:
+            taskQueue.put(argumentsList[i + initialTasks])
+
+    # Tell child processes to stop
+    for i in range(NumberOfProcesses):
+        taskQueue.put('STOP')
+
+
+def worker(input, output):
+    """
+    Module function acting as the parallel worker for the style check.
+    
+    @param input input queue
+    @type multiprocessing.Queue
+    @param output output queue
+    @type multiprocessing.Queue
+    """
+    for filename, source in iter(input.get, 'STOP'):
+        result = __maintainabilityIndex(filename, source)
+        output.put((filename, result))
+
+
+def __maintainabilityIndex(file, text=""):
+    """
+    Private function to calculate the maintainability index for one Python
+    file.
+    
+    @param file source filename
+    @type str
+    @param text source text
+    @type str
+    @return tuple containing the result dictionary
+    @rtype (tuple of dict)
+    """
+    from radon.metrics import mi_visit, mi_rank
+    
+    # Check type for py2: if not str it's unicode
+    if sys.version_info[0] == 2:
+        try:
+            text = text.encode('utf-8')
+        except UnicodeError:
+            pass
+    
+    try:
+        mi = mi_visit(text, True)
+        rank = mi_rank(mi)
+        res = {"mi": mi, "rank": rank}
+    except Exception as err:
+        res = {"error": str(err)}
+    return (res, )

eric ide

mercurial