VultureChecker/VultureCheckerService.py

Sun, 31 Dec 2017 16:59:08 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sun, 31 Dec 2017 16:59:08 +0100
changeset 53
4eb2ec8fff7c
parent 51
cf5c3ddc1de3
child 54
2194921f5e22
permissions
-rw-r--r--

Updated copyright for 2018.

# -*- coding: utf-8 -*-

# Copyright (c) 2015 - 2018 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the cyclomatic complexity service.
"""

from __future__ import unicode_literals

try:
    str = unicode       # __IGNORE_EXCEPTION__ __IGNORE_WARNING__
except NameError:
    pass

try:
    import Queue as queue   # Py2
except ImportError:
    import queue

import sys
import multiprocessing

from vulture import Vulture


def initService():
    """
    Initialize the service and return the entry point.
    
    @return the entry point for the background client (function)
    """
    return vultureCheck


def initBatchService():
    """
    Initialize the batch service and return the entry point.
    
    @return the entry point for the background client (function)
    """
    return batchVultureCheck


def vultureCheck(file, text=""):
    """
    Private function to analyze one file.
    
    @param file source filename
    @type str
    @param text source text
    @type str
    @return tuple containing the result dictionary
    @rtype (tuple of dict)
    """
    return __analyze(file, text)


def batchVultureCheck(argumentsList, send, fx, cancelled, maxProcesses=0):
    """
    Module function to analyze a batch of files.
    
    @param argumentsList list of arguments tuples as given for vultureCheck
    @type list
    @param send reference to send function
    @type function
    @param fx registered service name
    @type str
    @param cancelled reference to function checking for a cancellation
    @type function
    @param maxProcesses number of processes to be used
    @type int
    """
    if maxProcesses == 0:
        # determine based on CPU count
        try:
            NumberOfProcesses = multiprocessing.cpu_count()
            if NumberOfProcesses >= 1:
                NumberOfProcesses -= 1
        except NotImplementedError:
            NumberOfProcesses = 1
    else:
        NumberOfProcesses = maxProcesses

    # Create queues
    taskQueue = multiprocessing.Queue()
    doneQueue = multiprocessing.Queue()

    # Submit tasks (initially two time number of processes
    initialTasks = 2 * NumberOfProcesses
    for task in argumentsList[:initialTasks]:
        taskQueue.put(task)

    # Start worker processes
    for i in range(NumberOfProcesses):
        multiprocessing.Process(target=worker, args=(taskQueue, doneQueue))\
            .start()

    # Get and send results
    endIndex = len(argumentsList) - initialTasks
    for i in range(len(argumentsList)):
        resultSent = False
        wasCancelled = False
        
        while not resultSent:
            try:
                # get result (waiting max. 3 seconds and send it to frontend
                filename, result = doneQueue.get()
                send(fx, filename, result)
                resultSent = True
            except queue.Empty:
                # ignore empty queue, just carry on
                if cancelled():
                    wasCancelled = True
                    break
        
        if wasCancelled or cancelled():
            # just exit the loop ignoring the results of queued tasks
            break
        
        if i < endIndex:
            taskQueue.put(argumentsList[i + initialTasks])

    # Tell child processes to stop
    for i in range(NumberOfProcesses):
        taskQueue.put('STOP')


def worker(inputQueue, outputQueue):
    """
    Module function acting as the parallel worker for the cyclomatic
    complexity calculation.
    
    @param inputQueue input queue
    @type multiprocessing.Queue
    @param outputQueue output queue
    @type multiprocessing.Queue
    """
    for filename, source in iter(inputQueue.get, 'STOP'):
        result = __analyze(filename, source)
        outputQueue.put((filename, result))


def __analyze(filename, text=""):
    """
    Private function to analyze one Python file.
    
    @param filename source file name
    @type str
    @param text source text
    @type str
    @return tuple containing the result dictionary
    @rtype (tuple of dict)
    """
    # Check type for py2: if not str it's unicode
    if sys.version_info[0] == 2:
        try:
            text = text.encode('utf-8')
        except UnicodeError:
            pass
    
    try:
        v = EricVulture()
        v.scan(text, filename=filename)
        res = v.getResults()
    except Exception as err:
        res = {"error": str(err)}
    return (res, )


class EricVulture(Vulture):
    """
    Class to adopt the Vulture class to the eric plug-in functionality.
    """
    def __item2Dict(self, item):
        """
        Private method to convert a vulture item to a dictionary.
        
        @param item vulture item
        @type vulture.Item
        @return item dictionary
        @rtype dict
        """
        d = {
            "name": str(item),
            "type": getattr(item, "typ", ""),
            "file": getattr(item, "filename", ""),
            "line": getattr(item, "lineno", ""),
        }
        return d
    
    def getResults(self):
        """
        Public method to get the scan results.
        
        @return scan results
        @rtype dict
        """
        return {
            "DefinedAttributes":
                [self.__item2Dict(i) for i in self.defined_attrs],
            "DefinedFunctions":
                [self.__item2Dict(i) for i in self.defined_funcs],
            "DefinedSlots":
                [self.__item2Dict(i) for i in self.defined_slots],
            "DefinedProperties":
                [self.__item2Dict(i) for i in self.defined_props],
            "DefinedVariables":
                [self.__item2Dict(i) for i in self.defined_vars],
            "UsedAttributes":
                [self.__item2Dict(i) for i in self.used_attrs],
            "UsedVariables":
                [self.__item2Dict(i) for i in self.used_vars],
            "TupleVariables":
                [self.__item2Dict(i) for i in self.tuple_assign_vars],
            "Aliases":
                [self.__item2Dict(i) for i in self.names_imported_as_aliases],
        }

eric ide

mercurial