RadonMetrics/CyclomaticComplexityCalculator.py

Wed, 21 Dec 2022 08:52:36 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Wed, 21 Dec 2022 08:52:36 +0100
branch
eric7
changeset 104
6eac83394939
parent 94
725eaca7bc4b
child 106
6422943b388f
permissions
-rw-r--r--

Adapted some import statements to eric 23.1 and newer.

# -*- coding: utf-8 -*-

# Copyright (c) 2015 - 2022 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing the cyclomatic complexity service.
"""

import multiprocessing
import queue


def initService():
    """
    Initialize the service and return the entry point.

    @return the entry point for the background client (function)
    """
    return cyclomaticComplexity


def initBatchService():
    """
    Initialize the batch service and return the entry point.

    @return the entry point for the background client (function)
    """
    return batchCyclomaticComplexity


def cyclomaticComplexity(file, text=""):
    """
    Private function to calculate the cyclomatic complexity of one file.

    @param file source filename
    @type str
    @param text source text
    @type str
    @return tuple containing the result dictionary
    @rtype (tuple of dict)
    """
    return __cyclomaticComplexity(file, text)


def batchCyclomaticComplexity(argumentsList, send, fx, cancelled, maxProcesses=0):
    """
    Module function to calculate the cyclomatic complexity for a batch of
    files.

    @param argumentsList list of arguments tuples as given for
        cyclomaticComplexity
    @type list
    @param send reference to send function
    @type function
    @param fx registered service name
    @type str
    @param cancelled reference to function checking for a cancellation
    @type function
    @param maxProcesses number of processes to be used
    @type int
    """
    if maxProcesses == 0:
        # determine based on CPU count
        try:
            NumberOfProcesses = multiprocessing.cpu_count()
            if NumberOfProcesses >= 1:
                NumberOfProcesses -= 1
        except NotImplementedError:
            NumberOfProcesses = 1
    else:
        NumberOfProcesses = maxProcesses

    # Create queues
    taskQueue = multiprocessing.Queue()
    doneQueue = multiprocessing.Queue()

    # Submit tasks (initially two time number of processes
    initialTasks = 2 * NumberOfProcesses
    for task in argumentsList[:initialTasks]:
        taskQueue.put(task)

    # Start worker processes
    workers = [
        multiprocessing.Process(target=workerTask, args=(taskQueue, doneQueue))
        for _ in range(NumberOfProcesses)
    ]
    for worker in workers:
        worker.start()

    # Get and send results
    endIndex = len(argumentsList) - initialTasks
    for i in range(len(argumentsList)):
        resultSent = False
        wasCancelled = False

        while not resultSent:
            try:
                # get result (waiting max. 3 seconds and send it to frontend
                filename, result = doneQueue.get()
                send(fx, filename, result)
                resultSent = True
            except queue.Empty:
                # ignore empty queue, just carry on
                if cancelled():
                    wasCancelled = True
                    break

        if wasCancelled or cancelled():
            # just exit the loop ignoring the results of queued tasks
            break

        if i < endIndex:
            taskQueue.put(argumentsList[i + initialTasks])

    # Tell child processes to stop
    for _ in range(NumberOfProcesses):
        taskQueue.put("STOP")

    for worker in workers:
        worker.join()
        worker.close()


def workerTask(inputQueue, outputQueue):
    """
    Module function acting as the parallel worker for the cyclomatic
    complexity calculation.

    @param inputQueue input queue
    @type multiprocessing.Queue
    @param outputQueue output queue
    @type multiprocessing.Queue
    """
    for filename, source in iter(inputQueue.get, "STOP"):
        result = __cyclomaticComplexity(filename, source)
        outputQueue.put((filename, result))


def __cyclomaticComplexity(file, text=""):
    """
    Private function to calculate the cyclomatic complexity for one Python
    file.

    @param file source filename
    @type str
    @param text source text
    @type str
    @return tuple containing the result dictionary
    @rtype (tuple of dict)
    """
    from radon.complexity import cc_rank, cc_visit

    try:
        cc = cc_visit(text)
        res = {"result": [v for v in map(__cc2Dict, cc) if v["type"] != "method"]}
        totalCC = 0
        rankSummary = {
            "A": 0,
            "B": 0,
            "C": 0,
            "D": 0,
            "E": 0,
            "F": 0,
        }
        for block in cc:
            totalCC += block.complexity
            rankSummary[cc_rank(block.complexity)] += 1
        res["total_cc"] = totalCC
        res["count"] = len(cc)
        res["summary"] = rankSummary
    except Exception as err:
        res = {"error": str(err)}
    return (res,)


def __cc2Dict(obj):
    """
    Private function to convert an object holding cyclomatic complexity results
    into a dictionary.

    @param obj object as returned from cc_visit()
    @type radon.visitors.Function
    @return conversion result
    @rtype dict
    """
    from radon.complexity import cc_rank
    from radon.visitors import Function

    result = {
        "type": __getType(obj),
        "rank": cc_rank(obj.complexity),
    }
    attrs = set(Function._fields) - {"is_method", "closures"}
    attrs.add("fullname")
    for attr in attrs:
        v = getattr(obj, attr, None)
        if v is not None:
            result[attr] = v
    for key in ("methods", "closures"):
        if hasattr(obj, key):
            result[key] = list(map(__cc2Dict, getattr(obj, key)))
    return result


def __getType(obj):
    """
    Private function to get the type of an object as a string.

    @param obj object to be analyzed
    @type radon.visitors.Function or radon.visitors.Class
    @return type string for the object
    @rtype str, one of ["method", "function", "class"]
    """
    from radon.visitors import Function

    if isinstance(obj, Function):
        if obj.is_method:
            return "method"
        else:
            return "function"
    else:
        return "class"

eric ide

mercurial