|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2023 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing a node visitor to check for logging issues. |
|
8 """ |
|
9 |
|
10 ####################################################################### |
|
11 ## LoggingVisitor |
|
12 ## |
|
13 ## adapted from: flake8-logging v1.4.0 |
|
14 ## |
|
15 ## Original: Copyright (c) 2023 Adam Johnson |
|
16 ####################################################################### |
|
17 |
|
18 import ast |
|
19 import re |
|
20 import sys |
|
21 |
|
22 from functools import lru_cache |
|
23 from typing import cast |
|
24 |
|
25 _LoggerMethods = frozenset( |
|
26 ( |
|
27 "debug", |
|
28 "info", |
|
29 "warn", |
|
30 "warning", |
|
31 "error", |
|
32 "critical", |
|
33 "log", |
|
34 "exception", |
|
35 ) |
|
36 ) |
|
37 |
|
38 _LogrecordAttributes = frozenset( |
|
39 ( |
|
40 "asctime", |
|
41 "args", |
|
42 "created", |
|
43 "exc_info", |
|
44 "exc_text", |
|
45 "filename", |
|
46 "funcName", |
|
47 "levelname", |
|
48 "levelno", |
|
49 "lineno", |
|
50 "module", |
|
51 "msecs", |
|
52 "msg", |
|
53 "name", |
|
54 "pathname", |
|
55 "process", |
|
56 "processName", |
|
57 "relativeCreated", |
|
58 "stack_info", |
|
59 "taskName", |
|
60 "thread", |
|
61 "threadName", |
|
62 ) |
|
63 ) |
|
64 |
|
65 |
|
66 @lru_cache(maxsize=None) |
|
67 def _modposPlaceholderRe(): |
|
68 """ |
|
69 Function to generate a regular expression object for '%' formatting codes. |
|
70 |
|
71 @return regular expression object |
|
72 @rtype re.Pattern |
|
73 """ |
|
74 # https://docs.python.org/3/library/stdtypes.html#printf-style-string-formatting |
|
75 return re.compile( |
|
76 r""" |
|
77 % # noqa: M601 |
|
78 (?P<spec> |
|
79 % | # raw % character # noqa: M601 |
|
80 (?: |
|
81 ([-#0 +]+)? # conversion flags |
|
82 (?P<minwidth>\d+|\*)? # minimum field width |
|
83 (?P<precision>\.\d+|\.\*)? # precision |
|
84 [hlL]? # length modifier |
|
85 [acdeEfFgGiorsuxX] # conversion type |
|
86 ) |
|
87 ) |
|
88 """, |
|
89 re.VERBOSE, |
|
90 ) |
|
91 |
|
92 |
|
93 @lru_cache(maxsize=None) |
|
94 def _modnamedPlaceholderRe(): |
|
95 """ |
|
96 Function to generate a regular expression object for '%' formatting codes using |
|
97 names. |
|
98 |
|
99 @return regular expression object |
|
100 @rtype re.Pattern |
|
101 """ |
|
102 # https://docs.python.org/3/library/stdtypes.html#printf-style-string-formatting |
|
103 return re.compile( |
|
104 r""" |
|
105 % # noqa: M601 |
|
106 \( |
|
107 (?P<name>.*?) |
|
108 \) |
|
109 ([-#0 +]+)? # conversion flags |
|
110 (\d+)? # minimum field width |
|
111 (\.\d+)? # precision |
|
112 [hlL]? # length modifier |
|
113 [acdeEfFgGiorsuxX] # conversion type |
|
114 """, |
|
115 re.VERBOSE, |
|
116 ) |
|
117 |
|
118 |
|
119 class LoggingVisitor(ast.NodeVisitor): |
|
120 """ |
|
121 Class implementing a node visitor to check for logging issues. |
|
122 """ |
|
123 |
|
124 GetLoggerNames = frozenset(("__cached__", "__file__")) |
|
125 |
|
126 def __init__(self, errorCallback): |
|
127 """ |
|
128 Constructor |
|
129 |
|
130 @param errorCallback callback function to register an error |
|
131 @type func |
|
132 """ |
|
133 super().__init__() |
|
134 |
|
135 self.__error = errorCallback |
|
136 |
|
137 self.__loggingName = None |
|
138 self.__loggerName = None |
|
139 self.__fromImports = {} |
|
140 self.__stack = [] |
|
141 |
|
142 def visit(self, node): |
|
143 """ |
|
144 Public method to handle ast nodes. |
|
145 |
|
146 @param node reference to the node to be processed |
|
147 @type ast.AST |
|
148 """ |
|
149 self.__stack.append(node) |
|
150 super().visit(node) |
|
151 self.__stack.pop() |
|
152 |
|
153 def visit_Import(self, node): |
|
154 """ |
|
155 Public method to handle Import nodes. |
|
156 |
|
157 @param node reference to the node to be processed |
|
158 @type ast.Import |
|
159 """ |
|
160 for alias in node.names: |
|
161 if alias.name == "logging": |
|
162 self.__loggingName = alias.asname or alias.name |
|
163 self.generic_visit(node) |
|
164 |
|
165 def visit_ImportFrom(self, node): |
|
166 """ |
|
167 Public method to handle ImportFrom nodes. |
|
168 |
|
169 @param node reference to the node to be processed |
|
170 @type ast.ImportFrom |
|
171 """ |
|
172 if node.module == "logging": |
|
173 for alias in node.names: |
|
174 if alias.name == "WARN": |
|
175 if sys.version_info >= (3, 10): |
|
176 lineno = alias.lineno |
|
177 colOffset = alias.col_offset |
|
178 else: |
|
179 lineno = node.lineno |
|
180 colOffset = node.col_offset |
|
181 self.__error(lineno - 1, colOffset, "L109") |
|
182 if not alias.asname: |
|
183 self.__fromImports[alias.name] = node.module |
|
184 |
|
185 self.generic_visit(node) |
|
186 |
|
187 def visit_Attribute(self, node): |
|
188 """ |
|
189 Public method to handle Attribute nodes. |
|
190 |
|
191 @param node reference to the node to be processed |
|
192 @type ast.Attribute |
|
193 """ |
|
194 if ( |
|
195 self.__loggingName |
|
196 and isinstance(node.value, ast.Name) |
|
197 and node.value.id == self.__loggingName |
|
198 and node.attr == "WARN" |
|
199 ): |
|
200 self.__error(node.lineno - 1, node.col_offset, "L109") |
|
201 |
|
202 self.generic_visit(node) |
|
203 |
|
204 def visit_Call(self, node): |
|
205 """ |
|
206 Public method to handle Call nodes. |
|
207 |
|
208 @param node reference to the node to be processed |
|
209 @type ast.Call |
|
210 """ |
|
211 if ( |
|
212 ( |
|
213 self.__loggingName |
|
214 and isinstance(node.func, ast.Attribute) |
|
215 and node.func.attr == "Logger" |
|
216 and isinstance(node.func.value, ast.Name) |
|
217 and node.func.value.id == self.__loggingName |
|
218 ) |
|
219 or ( |
|
220 isinstance(node.func, ast.Name) |
|
221 and node.func.id == "Logger" |
|
222 and self.__fromImports.get("Logger") == "logging" |
|
223 ) |
|
224 ) and not self.__atModuleLevel(): |
|
225 self.__error(node.lineno - 1, node.col_offset, "L101") |
|
226 |
|
227 if ( |
|
228 self.__loggingName |
|
229 and isinstance(node.func, ast.Attribute) |
|
230 and node.func.attr == "getLogger" |
|
231 and isinstance(node.func.value, ast.Name) |
|
232 and node.func.value.id == self.__loggingName |
|
233 ) or ( |
|
234 isinstance(node.func, ast.Name) |
|
235 and node.func.id == "getLogger" |
|
236 and self.__fromImports.get("getLogger") == "logging" |
|
237 ): |
|
238 if ( |
|
239 len(self.__stack) >= 2 |
|
240 and isinstance(assign := self.__stack[-2], ast.Assign) |
|
241 and len(assign.targets) == 1 |
|
242 and isinstance(assign.targets[0], ast.Name) |
|
243 and not self.__atModuleLevel() |
|
244 ): |
|
245 self.__loggerName = assign.targets[0].id |
|
246 |
|
247 if ( |
|
248 node.args |
|
249 and isinstance(node.args[0], ast.Name) |
|
250 and node.args[0].id in self.GetLoggerNames |
|
251 ): |
|
252 self.__error(node.args[0].lineno - 1, node.args[0].col_offset, "L102") |
|
253 |
|
254 if ( |
|
255 isinstance(node.func, ast.Attribute) |
|
256 and node.func.attr in _LoggerMethods |
|
257 and isinstance(node.func.value, ast.Name) |
|
258 ) and ( |
|
259 (self.__loggingName and node.func.value.id == self.__loggingName) |
|
260 or (self.__loggerName and node.func.value.id == self.__loggerName) |
|
261 ): |
|
262 excHandler = self.__currentExceptHandler() |
|
263 |
|
264 # L108 |
|
265 if node.func.attr == "warn": |
|
266 self.__error(node.lineno - 1, node.col_offset, "L108") |
|
267 |
|
268 # L103 |
|
269 extraKeys = [] |
|
270 if any((extraNode := kw).arg == "extra" for kw in node.keywords): |
|
271 if isinstance(extraNode.value, ast.Dict): |
|
272 extraKeys = [ |
|
273 (k.value, k) |
|
274 for k in extraNode.value.keys |
|
275 if isinstance(k, ast.Constant) |
|
276 ] |
|
277 elif ( |
|
278 isinstance(extraNode.value, ast.Call) |
|
279 and isinstance(extraNode.value.func, ast.Name) |
|
280 and extraNode.value.func.id == "dict" |
|
281 ): |
|
282 extraKeys = [ |
|
283 (k.arg, k) |
|
284 for k in extraNode.value.keywords |
|
285 if k.arg is not None |
|
286 ] |
|
287 |
|
288 for key, keyNode in extraKeys: |
|
289 if key in _LogrecordAttributes: |
|
290 if isinstance(keyNode, ast.keyword): |
|
291 lineno, colOffset = self.__keywordPos(keyNode) |
|
292 else: |
|
293 lineno = keyNode.lineno |
|
294 colOffset = keyNode.col_offset |
|
295 self.__error(lineno - 1, colOffset, "L103", repr(key)) |
|
296 |
|
297 if node.func.attr == "exception": |
|
298 # L104 |
|
299 if not excHandler: |
|
300 self.__error(node.lineno - 1, node.col_offset, "L104") |
|
301 |
|
302 if any((excInfo := kw).arg == "exc_info" for kw in node.keywords): |
|
303 # L106 |
|
304 if ( |
|
305 isinstance(excInfo.value, ast.Constant) and excInfo.value.value |
|
306 ) or ( |
|
307 excHandler |
|
308 and isinstance(excInfo.value, ast.Name) |
|
309 and excInfo.value.id == excHandler.name |
|
310 ): |
|
311 lineno, colOffset = self.__keywordPos(excInfo) |
|
312 self.__error(lineno - 1, colOffset, "L106") |
|
313 |
|
314 # L107 |
|
315 elif ( |
|
316 isinstance(excInfo.value, ast.Constant) |
|
317 and not excInfo.value.value |
|
318 ): |
|
319 lineno, colOffset = self.__keywordPos(excInfo) |
|
320 self.__error(lineno - 1, colOffset, "L107") |
|
321 |
|
322 # L105 |
|
323 elif node.func.attr == "error" and excHandler is not None: |
|
324 rewritable = False |
|
325 if any((excInfo := kw).arg == "exc_info" for kw in node.keywords): |
|
326 if isinstance(excInfo.value, ast.Constant) and excInfo.value.value: |
|
327 rewritable = True |
|
328 elif ( |
|
329 isinstance(excInfo.value, ast.Name) |
|
330 and excInfo.value.id == excHandler.name |
|
331 ): |
|
332 rewritable = True |
|
333 else: |
|
334 rewritable = True |
|
335 |
|
336 if rewritable: |
|
337 self.__error(node.lineno - 1, node.col_offset, "L105") |
|
338 |
|
339 # L114 |
|
340 elif ( |
|
341 excHandler is None |
|
342 and any((excInfo := kw).arg == "exc_info" for kw in node.keywords) |
|
343 and isinstance(excInfo.value, ast.Constant) |
|
344 and excInfo.value.value |
|
345 ): |
|
346 lineno, colOffset = self.__keywordPos(excInfo) |
|
347 self.__error(lineno - 1, colOffset, "L114") |
|
348 |
|
349 # L110 |
|
350 if ( |
|
351 node.func.attr == "exception" |
|
352 and len(node.args) >= 1 |
|
353 and isinstance(node.args[0], ast.Name) |
|
354 and excHandler is not None |
|
355 and node.args[0].id == excHandler.name |
|
356 ): |
|
357 self.__error(node.args[0].lineno - 1, node.args[0].col_offset, "L110") |
|
358 |
|
359 msgArgKwarg = False |
|
360 if node.func.attr == "log" and len(node.args) >= 2: |
|
361 msgArg = node.args[1] |
|
362 elif node.func.attr != "log" and len(node.args) >= 1: |
|
363 msgArg = node.args[0] |
|
364 else: |
|
365 try: |
|
366 msgArg = [k for k in node.keywords if k.arg == "msg"][0].value |
|
367 msgArgKwarg = True |
|
368 except IndexError: |
|
369 msgArg = None |
|
370 |
|
371 # L111 |
|
372 if isinstance(msgArg, ast.JoinedStr): |
|
373 self.__error(msgArg.lineno - 1, msgArg.col_offset, "L111a") |
|
374 elif ( |
|
375 isinstance(msgArg, ast.Call) |
|
376 and isinstance(msgArg.func, ast.Attribute) |
|
377 and isinstance(msgArg.func.value, ast.Constant) |
|
378 and isinstance(msgArg.func.value.value, str) |
|
379 and msgArg.func.attr == "format" |
|
380 ): |
|
381 self.__error(msgArg.lineno - 1, msgArg.col_offset, "L111b") |
|
382 elif ( |
|
383 isinstance(msgArg, ast.BinOp) |
|
384 and isinstance(msgArg.op, ast.Mod) |
|
385 and isinstance(msgArg.left, ast.Constant) |
|
386 and isinstance(msgArg.left.value, str) |
|
387 ): |
|
388 self.__error(msgArg.lineno - 1, msgArg.col_offset, "L111c") |
|
389 elif isinstance(msgArg, ast.BinOp) and self.__isAddChainWithNonStr(msgArg): |
|
390 self.__error(msgArg.lineno - 1, msgArg.col_offset, "L111d") |
|
391 |
|
392 # L112 |
|
393 if ( |
|
394 msgArg is not None |
|
395 and not msgArgKwarg |
|
396 and (msg := self.__flattenStrChain(msgArg)) |
|
397 and not any(isinstance(arg, ast.Starred) for arg in node.args) |
|
398 ): |
|
399 self.__checkMsgAndArgs(node, msgArg, msg) |
|
400 |
|
401 self.generic_visit(node) |
|
402 |
|
403 def __checkMsgAndArgs(self, node, msgArg, msg): |
|
404 """ |
|
405 Private method to check the message and arguments a given Call node. |
|
406 |
|
407 @param node reference to the Call node |
|
408 @type ast.Call |
|
409 @param msgArg message argument nodes |
|
410 @type ast.AST |
|
411 @param msg message |
|
412 @type str |
|
413 """ |
|
414 if not isinstance(node.func, ast.Attribute): |
|
415 return |
|
416 |
|
417 if ( |
|
418 ( |
|
419 (node.func.attr != "log" and (dictIdx := 1)) |
|
420 or (node.func.attr == "log" and (dictIdx := 2)) |
|
421 ) |
|
422 and len(node.args) == dictIdx + 1 |
|
423 and (dictNode := node.args[dictIdx]) |
|
424 and isinstance(dictNode, ast.Dict) |
|
425 and all( |
|
426 isinstance(k, ast.Constant) and isinstance(k.value, str) |
|
427 for k in dictNode.keys |
|
428 ) |
|
429 and ( |
|
430 modnames := {m["name"] for m in _modnamedPlaceholderRe().finditer(msg)} |
|
431 ) |
|
432 ): |
|
433 # L113 |
|
434 given = {cast(ast.Constant, k).value for k in dictNode.keys} |
|
435 if missing := modnames - given: |
|
436 self.__error( |
|
437 msgArg.lineno - 1, |
|
438 msgArg.col_offset, |
|
439 "L113a", # missing keys |
|
440 ", ".join([repr(k) for k in missing]), |
|
441 ) |
|
442 |
|
443 if missing := given - modnames: |
|
444 self.__error( |
|
445 msgArg.lineno - 1, |
|
446 msgArg.col_offset, |
|
447 "L113b", # unreferenced keys |
|
448 ", ".join([repr(k) for k in missing]), |
|
449 ) |
|
450 |
|
451 return |
|
452 |
|
453 # L112 |
|
454 modposCount = sum( |
|
455 1 + (m["minwidth"] == "*") + (m["precision"] == ".*") |
|
456 for m in _modposPlaceholderRe().finditer(msg) |
|
457 if m["spec"] != "%" |
|
458 ) |
|
459 argCount = len(node.args) - 1 - (node.func.attr == "log") |
|
460 |
|
461 if modposCount > 0 and modposCount != argCount: |
|
462 self.__error( |
|
463 msgArg.lineno - 1, |
|
464 msgArg.col_offset, |
|
465 "L112", |
|
466 modposCount, |
|
467 "'%'", # noqa: M601 |
|
468 argCount, |
|
469 ) |
|
470 return |
|
471 |
|
472 def __atModuleLevel(self): |
|
473 """ |
|
474 Private method to check, if we are on the module level. |
|
475 |
|
476 @return flag indicating the module level |
|
477 @rtype bool |
|
478 """ |
|
479 return any( |
|
480 isinstance(parent, (ast.FunctionDef, ast.AsyncFunctionDef)) |
|
481 for parent in self.__stack |
|
482 ) |
|
483 |
|
484 def __currentExceptHandler(self): |
|
485 """ |
|
486 Private method to determine the current exception handler node. |
|
487 |
|
488 @return reference to the current exception handler node or None |
|
489 @rtype ast.ExceptHandler |
|
490 """ |
|
491 for node in reversed(self.__stack): |
|
492 if isinstance(node, ast.ExceptHandler): |
|
493 return node |
|
494 elif isinstance(node, (ast.AsyncFunctionDef, ast.FunctionDef)): |
|
495 break |
|
496 |
|
497 return None |
|
498 |
|
499 def __keywordPos(self, node): |
|
500 """ |
|
501 Private method determine line number and column offset of a given keyword node. |
|
502 |
|
503 @param node reference to the keyword node |
|
504 @type ast.keyword |
|
505 @return tuple containing the line number and the column offset |
|
506 @rtype tuple of (int, int) |
|
507 """ |
|
508 if sys.version_info >= (3, 9): |
|
509 return (node.lineno, node.col_offset) |
|
510 else: |
|
511 # Educated guess |
|
512 return ( |
|
513 node.value.lineno, |
|
514 max(0, node.value.col_offset - 1 - len(node.arg)), |
|
515 ) |
|
516 |
|
517 def __isAddChainWithNonStr(self, node): |
|
518 """ |
|
519 Private method to check, if the node is an Add with a non string argument. |
|
520 |
|
521 @param node reference to the binary operator node |
|
522 @type ast.BinOp |
|
523 @return flag indicating an Add with a non string argument |
|
524 @rtype bool |
|
525 """ |
|
526 if not isinstance(node.op, ast.Add): |
|
527 return False |
|
528 |
|
529 for side in (node.left, node.right): |
|
530 if isinstance(side, ast.BinOp): |
|
531 if self.__isAddChainWithNonStr(side): |
|
532 return True |
|
533 elif not (isinstance(side, ast.Constant) and isinstance(side.value, str)): |
|
534 return True |
|
535 |
|
536 return False |
|
537 |
|
538 def __flattenStrChain(self, node): |
|
539 """ |
|
540 Private method to flatten the given string chain. |
|
541 |
|
542 @param node reference to the AST node |
|
543 @type ast.AST |
|
544 @return flattened string |
|
545 @rtype str |
|
546 """ |
|
547 parts = [] |
|
548 |
|
549 def visit(node): |
|
550 if isinstance(node, ast.Constant) and isinstance(node.value, str): |
|
551 parts.append(node.value) |
|
552 return True |
|
553 elif isinstance(node, ast.BinOp) and isinstance(node.op, ast.Add): |
|
554 return visit(node.left) and visit(node.right) |
|
555 return False |
|
556 |
|
557 result = visit(node) |
|
558 if result: |
|
559 return "".join(parts) |
|
560 else: |
|
561 return None |