src/eric7/Plugins/CheckerPlugins/CodeStyleChecker/Async/AsyncChecker.py

branch
eric7
changeset 10116
4a619fb7bd09
child 10439
21c28b0f9e41
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/eric7/Plugins/CheckerPlugins/CodeStyleChecker/Async/AsyncChecker.py	Mon Jul 17 17:37:38 2023 +0200
@@ -0,0 +1,154 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2023 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing a checker for "async" related issues.
+"""
+
+
+import copy
+
+
+class AsyncChecker:
+    """
+    Class implementing a checker for "async" related issues.
+    """
+
+    Codes = [
+        "ASY100",
+        "ASY101",
+        "ASY102",
+        "ASY103",
+        "ASY104",
+        "ASY105",
+    ]
+
+    def __init__(self, source, filename, tree, select, ignore, expected, repeat, args):
+        """
+        Constructor
+
+        @param source source code to be checked
+        @type list of str
+        @param filename name of the source file
+        @type str
+        @param tree AST tree of the source code
+        @type ast.Module
+        @param select list of selected codes
+        @type list of str
+        @param ignore list of codes to be ignored
+        @type list of str
+        @param expected list of expected codes
+        @type list of str
+        @param repeat flag indicating to report each occurrence of a code
+        @type bool
+        @param args dictionary of arguments for the various checks
+        @type dict
+        """
+        self.__select = tuple(select)
+        self.__ignore = (
+            ("",) if select else tuple(x for x in ignore if x.startswith("ASY"))
+        )
+        self.__expected = expected[:]
+        self.__repeat = repeat
+        self.__filename = filename
+        self.__source = source[:]
+        self.__tree = copy.deepcopy(tree)
+        self.__args = args
+
+        # statistics counters
+        self.counters = {}
+
+        # collection of detected errors
+        self.errors = []
+
+        checkersWithCodes = [
+            (
+                self.__checkSyncUses,
+                ("ASY100", "ASY101", "ASY102", "ASY103", "ASY104", "ASY105"),
+            ),
+        ]
+
+        self.__checkers = []
+        for checker, codes in checkersWithCodes:
+            if any(not (code and self.__ignoreCode(code)) for code in codes):
+                self.__checkers.append(checker)
+
+    def __ignoreCode(self, code):
+        """
+        Private method to check if the message code should be ignored.
+
+        @param code message code to check for
+        @type str
+        @return flag indicating to ignore the given code
+        @rtype bool
+        """
+        return code.startswith(self.__ignore) and not code.startswith(self.__select)
+
+    def __error(self, lineNumber, offset, code, *args):
+        """
+        Private method to record an issue.
+
+        @param lineNumber line number of the issue
+        @type int
+        @param offset position within line of the issue
+        @type int
+        @param code message code
+        @type str
+        @param args arguments for the message
+        @type list
+        """
+        if self.__ignoreCode(code):
+            return
+
+        if code in self.counters:
+            self.counters[code] += 1
+        else:
+            self.counters[code] = 1
+
+        # Don't care about expected codes
+        if code in self.__expected:
+            return
+
+        if code and (self.counters[code] == 1 or self.__repeat):
+            # record the issue with one based line number
+            self.errors.append(
+                {
+                    "file": self.__filename,
+                    "line": lineNumber + 1,
+                    "offset": offset,
+                    "code": code,
+                    "args": args,
+                }
+            )
+
+    def run(self):
+        """
+        Public method to check the given source against miscellaneous
+        conditions.
+        """
+        if not self.__filename:
+            # don't do anything, if essential data is missing
+            return
+
+        if not self.__checkers:
+            # don't do anything, if no codes were selected
+            return
+
+        for check in self.__checkers:
+            check()
+
+    def __checkSyncUses(self):
+        """
+        Private method to check for use of synchroneous functions in async methods.
+        """
+        from .AsyncVisitor import AsyncVisitor
+
+        visitor = AsyncVisitor(self.__args, self)
+        visitor.visit(copy.deepcopy(self.__tree))
+        for violation in visitor.violations:
+            if not self.__ignoreCode(violation[1]):
+                node = violation[0]
+                reason = violation[1]
+                self.__error(node.lineno - 1, node.col_offset, reason)

eric ide

mercurial