|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2023 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing a checker for "async" related issues. |
|
8 """ |
|
9 |
|
10 |
|
11 import copy |
|
12 |
|
13 |
|
14 class AsyncChecker: |
|
15 """ |
|
16 Class implementing a checker for "async" related issues. |
|
17 """ |
|
18 |
|
19 Codes = [ |
|
20 "ASY100", |
|
21 "ASY101", |
|
22 "ASY102", |
|
23 "ASY103", |
|
24 "ASY104", |
|
25 "ASY105", |
|
26 ] |
|
27 |
|
28 def __init__(self, source, filename, tree, select, ignore, expected, repeat, args): |
|
29 """ |
|
30 Constructor |
|
31 |
|
32 @param source source code to be checked |
|
33 @type list of str |
|
34 @param filename name of the source file |
|
35 @type str |
|
36 @param tree AST tree of the source code |
|
37 @type ast.Module |
|
38 @param select list of selected codes |
|
39 @type list of str |
|
40 @param ignore list of codes to be ignored |
|
41 @type list of str |
|
42 @param expected list of expected codes |
|
43 @type list of str |
|
44 @param repeat flag indicating to report each occurrence of a code |
|
45 @type bool |
|
46 @param args dictionary of arguments for the various checks |
|
47 @type dict |
|
48 """ |
|
49 self.__select = tuple(select) |
|
50 self.__ignore = ( |
|
51 ("",) if select else tuple(x for x in ignore if x.startswith("ASY")) |
|
52 ) |
|
53 self.__expected = expected[:] |
|
54 self.__repeat = repeat |
|
55 self.__filename = filename |
|
56 self.__source = source[:] |
|
57 self.__tree = copy.deepcopy(tree) |
|
58 self.__args = args |
|
59 |
|
60 # statistics counters |
|
61 self.counters = {} |
|
62 |
|
63 # collection of detected errors |
|
64 self.errors = [] |
|
65 |
|
66 checkersWithCodes = [ |
|
67 ( |
|
68 self.__checkSyncUses, |
|
69 ("ASY100", "ASY101", "ASY102", "ASY103", "ASY104", "ASY105"), |
|
70 ), |
|
71 ] |
|
72 |
|
73 self.__checkers = [] |
|
74 for checker, codes in checkersWithCodes: |
|
75 if any(not (code and self.__ignoreCode(code)) for code in codes): |
|
76 self.__checkers.append(checker) |
|
77 |
|
78 def __ignoreCode(self, code): |
|
79 """ |
|
80 Private method to check if the message code should be ignored. |
|
81 |
|
82 @param code message code to check for |
|
83 @type str |
|
84 @return flag indicating to ignore the given code |
|
85 @rtype bool |
|
86 """ |
|
87 return code.startswith(self.__ignore) and not code.startswith(self.__select) |
|
88 |
|
89 def __error(self, lineNumber, offset, code, *args): |
|
90 """ |
|
91 Private method to record an issue. |
|
92 |
|
93 @param lineNumber line number of the issue |
|
94 @type int |
|
95 @param offset position within line of the issue |
|
96 @type int |
|
97 @param code message code |
|
98 @type str |
|
99 @param args arguments for the message |
|
100 @type list |
|
101 """ |
|
102 if self.__ignoreCode(code): |
|
103 return |
|
104 |
|
105 if code in self.counters: |
|
106 self.counters[code] += 1 |
|
107 else: |
|
108 self.counters[code] = 1 |
|
109 |
|
110 # Don't care about expected codes |
|
111 if code in self.__expected: |
|
112 return |
|
113 |
|
114 if code and (self.counters[code] == 1 or self.__repeat): |
|
115 # record the issue with one based line number |
|
116 self.errors.append( |
|
117 { |
|
118 "file": self.__filename, |
|
119 "line": lineNumber + 1, |
|
120 "offset": offset, |
|
121 "code": code, |
|
122 "args": args, |
|
123 } |
|
124 ) |
|
125 |
|
126 def run(self): |
|
127 """ |
|
128 Public method to check the given source against miscellaneous |
|
129 conditions. |
|
130 """ |
|
131 if not self.__filename: |
|
132 # don't do anything, if essential data is missing |
|
133 return |
|
134 |
|
135 if not self.__checkers: |
|
136 # don't do anything, if no codes were selected |
|
137 return |
|
138 |
|
139 for check in self.__checkers: |
|
140 check() |
|
141 |
|
142 def __checkSyncUses(self): |
|
143 """ |
|
144 Private method to check for use of synchroneous functions in async methods. |
|
145 """ |
|
146 from .AsyncVisitor import AsyncVisitor |
|
147 |
|
148 visitor = AsyncVisitor(self.__args, self) |
|
149 visitor.visit(copy.deepcopy(self.__tree)) |
|
150 for violation in visitor.violations: |
|
151 if not self.__ignoreCode(violation[1]): |
|
152 node = violation[0] |
|
153 reason = violation[1] |
|
154 self.__error(node.lineno - 1, node.col_offset, reason) |