RadonMetrics/CyclomaticComplexityCalculator.py

changeset 13
22bc345844e7
child 37
7fd806094f0f
equal deleted inserted replaced
12:32a3c9d62e90 13:22bc345844e7
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2015 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing the cyclomatic complexity service.
8 """
9
10 from __future__ import unicode_literals
11
12 try:
13 str = unicode # __IGNORE_EXCEPTION__ __IGNORE_WARNING__
14 except NameError:
15 pass
16
17 import multiprocessing
18 import sys
19
20
21 def initService():
22 """
23 Initialize the service and return the entry point.
24
25 @return the entry point for the background client (function)
26 """
27 return cyclomaticComplexity
28
29
30 def initBatchService():
31 """
32 Initialize the batch service and return the entry point.
33
34 @return the entry point for the background client (function)
35 """
36 return batchCyclomaticComplexity
37
38
39 def cyclomaticComplexity(file, text=""):
40 """
41 Private function to calculate the cyclomatic complexity of one file.
42
43 @param file source filename
44 @type str
45 @param text source text
46 @type str
47 @return tuple containing the result dictionary
48 @rtype (tuple of dict)
49 """
50 return __cyclomaticComplexity(file, text)
51
52
53 def batchCyclomaticComplexity(argumentsList, send, fx, cancelled):
54 """
55 Module function to calculate the cyclomatic complexity for a batch of
56 files.
57
58 @param argumentsList list of arguments tuples as given for
59 cyclomaticComplexity
60 @type list
61 @param send reference to send function
62 @type function
63 @param fx registered service name
64 @type str
65 @param cancelled reference to function checking for a cancellation
66 @type function
67 """
68 try:
69 NumberOfProcesses = multiprocessing.cpu_count()
70 if NumberOfProcesses >= 1:
71 NumberOfProcesses -= 1
72 except NotImplementedError:
73 NumberOfProcesses = 1
74
75 # Create queues
76 taskQueue = multiprocessing.Queue()
77 doneQueue = multiprocessing.Queue()
78
79 # Submit tasks (initially two time number of processes
80 initialTasks = 2 * NumberOfProcesses
81 for task in argumentsList[:initialTasks]:
82 taskQueue.put(task)
83
84 # Start worker processes
85 for i in range(NumberOfProcesses):
86 multiprocessing.Process(target=worker, args=(taskQueue, doneQueue))\
87 .start()
88
89 # Get and send results
90 endIndex = len(argumentsList) - initialTasks
91 for i in range(len(argumentsList)):
92 filename, result = doneQueue.get()
93 send(fx, filename, result)
94 if cancelled():
95 # just exit the loop ignoring the results of queued tasks
96 break
97 if i < endIndex:
98 taskQueue.put(argumentsList[i + initialTasks])
99
100 # Tell child processes to stop
101 for i in range(NumberOfProcesses):
102 taskQueue.put('STOP')
103
104
105 def worker(input, output):
106 """
107 Module function acting as the parallel worker for the cyclomatic
108 complexity calculation.
109
110 @param input input queue
111 @type multiprocessing.Queue
112 @param output output queue
113 @type multiprocessing.Queue
114 """
115 for filename, source in iter(input.get, 'STOP'):
116 result = __cyclomaticComplexity(filename, source)
117 output.put((filename, result))
118
119
120 def __cyclomaticComplexity(file, text=""):
121 """
122 Private function to calculate the cyclomatic complexity for one Python
123 file.
124
125 @param file source filename
126 @type str
127 @param text source text
128 @type str
129 @return tuple containing the result dictionary
130 @rtype (tuple of dict)
131 """
132 from radon.complexity import cc_visit, cc_rank
133
134 # Check type for py2: if not str it's unicode
135 if sys.version_info[0] == 2:
136 try:
137 text = text.encode('utf-8')
138 except UnicodeError:
139 pass
140
141 try:
142 cc = cc_visit(text)
143 res = {"result": [v for v in map(__cc2Dict, cc)
144 if v["type"] != "method"]}
145 totalCC = 0
146 rankSummary = {
147 "A": 0,
148 "B": 0,
149 "C": 0,
150 "D": 0,
151 "E": 0,
152 "F": 0,
153 }
154 for block in cc:
155 totalCC += block.complexity
156 rankSummary[cc_rank(block.complexity)] += 1
157 res["total_cc"] = totalCC
158 res["count"] = len(cc)
159 res["summary"] = rankSummary
160 except Exception as err:
161 res = {"error": str(err)}
162 return (res, )
163
164
165 def __cc2Dict(obj):
166 """
167 Private function to convert an object holding cyclomatic complexity results
168 into a dictionary.
169
170 @param obj object as returned from analyze()
171 @type radon.raw.Module
172 @return conversion result
173 @rtype dict
174 """
175 from radon.complexity import cc_rank
176 from radon.visitors import Function
177
178 result = {
179 'type': __getType(obj),
180 'rank': cc_rank(obj.complexity),
181 }
182 attrs = set(Function._fields) - set(('is_method', 'closures'))
183 attrs.add("fullname")
184 for attr in attrs:
185 v = getattr(obj, attr, None)
186 if v is not None:
187 result[attr] = v
188 for key in ('methods', 'closures'):
189 if hasattr(obj, key):
190 result[key] = list(map(__cc2Dict, getattr(obj, key)))
191 return result
192
193
194 def __getType(obj):
195 """
196 Private function to get the type of an object as a string.
197
198 @param obj object to be analyzed
199 @type radon.visitors.Function or radon.visitors.Class
200 @return type string for the object
201 @rtype str, one of ["method", "function", "class"]
202 """
203 from radon.visitors import Function
204
205 if isinstance(obj, Function):
206 if obj.is_method:
207 return 'method'
208 else:
209 return 'function'
210 else:
211 return 'class'

eric ide

mercurial