VultureChecker/VultureCheckerService.py

changeset 2
b517a1c5d5de
child 7
a1a6ff3e5486
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/VultureChecker/VultureCheckerService.py	Sun Oct 04 18:28:36 2015 +0200
@@ -0,0 +1,218 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2015 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing the cyclomatic complexity service.
+"""
+
+from __future__ import unicode_literals
+
+try:
+    str = unicode       # __IGNORE_EXCEPTION__ __IGNORE_WARNING__
+except NameError:
+    pass
+
+import ast
+
+import multiprocessing
+import sys
+
+from vulture import Vulture
+
+
+def initService():
+    """
+    Initialize the service and return the entry point.
+    
+    @return the entry point for the background client (function)
+    """
+    return vultureCheck
+
+
+def initBatchService():
+    """
+    Initialize the batch service and return the entry point.
+    
+    @return the entry point for the background client (function)
+    """
+    return batchVultureCheck
+
+
+def vultureCheck(file, text=""):
+    """
+    Private function to analyze one file.
+    
+    @param file source filename
+    @type str
+    @param text source text
+    @type str
+    @return tuple containing the result dictionary
+    @rtype (tuple of dict)
+    """
+    return __analyze(file, text)
+
+
+def batchVultureCheck(argumentsList, send, fx, cancelled):
+    """
+    Module function to analyze a batch of files.
+    
+    @param argumentsList list of arguments tuples as given for vultureCheck
+    @type list
+    @param send reference to send function
+    @type function
+    @param fx registered service name
+    @type str
+    @param cancelled reference to function checking for a cancellation
+    @type function
+    """
+    try:
+        NumberOfProcesses = multiprocessing.cpu_count()
+        if NumberOfProcesses >= 1:
+            NumberOfProcesses -= 1
+    except NotImplementedError:
+        NumberOfProcesses = 1
+
+    # Create queues
+    taskQueue = multiprocessing.Queue()
+    doneQueue = multiprocessing.Queue()
+
+    # Submit tasks (initially two time number of processes
+    initialTasks = 2 * NumberOfProcesses
+    for task in argumentsList[:initialTasks]:
+        taskQueue.put(task)
+
+    # Start worker processes
+    for i in range(NumberOfProcesses):
+        multiprocessing.Process(target=worker, args=(taskQueue, doneQueue))\
+            .start()
+
+    # Get and send results
+    endIndex = len(argumentsList) - initialTasks
+    for i in range(len(argumentsList)):
+        filename, result = doneQueue.get()
+        send(fx, filename, result)
+        if cancelled():
+            # just exit the loop ignoring the results of queued tasks
+            break
+        if i < endIndex:
+            taskQueue.put(argumentsList[i + initialTasks])
+
+    # Tell child processes to stop
+    for i in range(NumberOfProcesses):
+        taskQueue.put('STOP')
+
+
+def worker(input, output):
+    """
+    Module function acting as the parallel worker for the cyclomatic
+    complexity calculation.
+    
+    @param input input queue
+    @type multiprocessing.Queue
+    @param output output queue
+    @type multiprocessing.Queue
+    """
+    for filename, source in iter(input.get, 'STOP'):
+        result = __analyze(filename, source)
+        output.put((filename, result))
+
+
+def __analyze(file, text=""):
+    """
+    Private function to analyze one Python file.
+    
+    @param file source file name
+    @type str
+    @param text source text
+    @type str
+    @return tuple containing the result dictionary
+    @rtype (tuple of dict)
+    """
+    
+    # Check type for py2: if not str it's unicode
+    if sys.version_info[0] == 2:
+        try:
+            text = text.encode('utf-8')
+        except UnicodeError:
+            pass
+    
+    try:
+        v = EricVulture(file)
+        v.scan(text)
+        res = v.getResults()
+    except Exception as err:
+        res = {"error": str(err)}
+    return (res, )
+
+
+class EricVulture(Vulture):
+    """
+    Class to adopt the Vulture class to the eric plug-in functionality.
+    """
+    def __init__(self, filename):
+        """
+        Constructor
+        
+        @param filename source file name
+        @type str
+        """
+        super(EricVulture, self).__init__()
+        
+        self.file = filename
+        self.code = None
+
+    def scan(self, source):
+        """
+        Public method to scan the source text.
+        
+        @param source source text
+        @type str
+        """
+        self.code = source.splitlines()
+        node = ast.parse(source, filename=self.file)
+        self.visit(node)
+    
+    def __item2Dict(self, item):
+        """
+        Private method to convert a vulture item to a dictionary.
+        
+        @param item vulture item
+        @type vulture.Item
+        @return item dictionary
+        @rtype dict
+        """
+        d = {
+            "name": str(item),
+            "type": getattr(item, "typ", ""),
+            "file": getattr(item, "file", ""),
+            "line": getattr(item, "lineno", ""), 
+        }
+        return d
+    
+    def getResults(self):
+        """
+        Public method to get the scan results.
+        
+        @return scan results
+        @rtype dict
+        """
+        return {
+            "DefinedAttributes": 
+                [self.__item2Dict(i) for i in self.defined_attrs],
+            "DefinedFunctions": 
+                [self.__item2Dict(i) for i in self.defined_funcs],
+            "DefinedProperties": 
+                [self.__item2Dict(i) for i in self.defined_props],
+            "DefinedVariables": 
+                [self.__item2Dict(i) for i in self.defined_vars],
+            "UsedAttributes": 
+                [self.__item2Dict(i) for i in self.used_attrs],
+            "UsedVariables": 
+                [self.__item2Dict(i) for i in self.used_vars],
+            "TupleVariables": 
+                [self.__item2Dict(i) for i in self.tuple_assign_vars],
+            "Aliases": 
+                [self.__item2Dict(i) for i in self.names_imported_as_aliases],
+        }

eric ide

mercurial