src/eric7/DebugClients/Python/coverage/parser.py

branch
eric7
changeset 9209
b99e7fd55fd3
parent 9099
0e511e0e94a3
child 9252
32dd11232e06
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/eric7/DebugClients/Python/coverage/parser.py	Thu Jul 07 11:23:56 2022 +0200
@@ -0,0 +1,1361 @@
+# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
+# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
+
+"""Code parsing for coverage.py."""
+
+import ast
+import collections
+import os
+import re
+import token
+import tokenize
+
+from coverage import env
+from coverage.bytecode import code_objects
+from coverage.debug import short_stack
+from coverage.exceptions import NoSource, NotPython, _StopEverything
+from coverage.misc import contract, join_regex, new_contract, nice_pair, one_of
+from coverage.phystokens import compile_unicode, generate_tokens, neuter_encoding_declaration
+
+
+class PythonParser:
+    """Parse code to find executable lines, excluded lines, etc.
+
+    This information is all based on static analysis: no code execution is
+    involved.
+
+    """
+    @contract(text='unicode|None')
+    def __init__(self, text=None, filename=None, exclude=None):
+        """
+        Source can be provided as `text`, the text itself, or `filename`, from
+        which the text will be read.  Excluded lines are those that match
+        `exclude`, a regex.
+
+        """
+        assert text or filename, "PythonParser needs either text or filename"
+        self.filename = filename or "<code>"
+        self.text = text
+        if not self.text:
+            from coverage.python import get_python_source
+            try:
+                self.text = get_python_source(self.filename)
+            except OSError as err:
+                raise NoSource(f"No source for code: '{self.filename}': {err}") from err
+
+        self.exclude = exclude
+
+        # The text lines of the parsed code.
+        self.lines = self.text.split('\n')
+
+        # The normalized line numbers of the statements in the code. Exclusions
+        # are taken into account, and statements are adjusted to their first
+        # lines.
+        self.statements = set()
+
+        # The normalized line numbers of the excluded lines in the code,
+        # adjusted to their first lines.
+        self.excluded = set()
+
+        # The raw_* attributes are only used in this class, and in
+        # lab/parser.py to show how this class is working.
+
+        # The line numbers that start statements, as reported by the line
+        # number table in the bytecode.
+        self.raw_statements = set()
+
+        # The raw line numbers of excluded lines of code, as marked by pragmas.
+        self.raw_excluded = set()
+
+        # The line numbers of docstring lines.
+        self.raw_docstrings = set()
+
+        # Internal detail, used by lab/parser.py.
+        self.show_tokens = False
+
+        # A dict mapping line numbers to lexical statement starts for
+        # multi-line statements.
+        self._multiline = {}
+
+        # Lazily-created arc data, and missing arc descriptions.
+        self._all_arcs = None
+        self._missing_arc_fragments = None
+
+    def lines_matching(self, *regexes):
+        """Find the lines matching one of a list of regexes.
+
+        Returns a set of line numbers, the lines that contain a match for one
+        of the regexes in `regexes`.  The entire line needn't match, just a
+        part of it.
+
+        """
+        combined = join_regex(regexes)
+        regex_c = re.compile(combined)
+        matches = set()
+        for i, ltext in enumerate(self.lines, start=1):
+            if regex_c.search(ltext):
+                matches.add(i)
+        return matches
+
+    def _raw_parse(self):
+        """Parse the source to find the interesting facts about its lines.
+
+        A handful of attributes are updated.
+
+        """
+        # Find lines which match an exclusion pattern.
+        if self.exclude:
+            self.raw_excluded = self.lines_matching(self.exclude)
+
+        # Tokenize, to find excluded suites, to find docstrings, and to find
+        # multi-line statements.
+        indent = 0
+        exclude_indent = 0
+        excluding = False
+        excluding_decorators = False
+        prev_toktype = token.INDENT
+        first_line = None
+        empty = True
+        first_on_line = True
+        nesting = 0
+
+        tokgen = generate_tokens(self.text)
+        for toktype, ttext, (slineno, _), (elineno, _), ltext in tokgen:
+            if self.show_tokens:                # pragma: debugging
+                print("%10s %5s %-20r %r" % (
+                    tokenize.tok_name.get(toktype, toktype),
+                    nice_pair((slineno, elineno)), ttext, ltext
+                ))
+            if toktype == token.INDENT:
+                indent += 1
+            elif toktype == token.DEDENT:
+                indent -= 1
+            elif toktype == token.OP:
+                if ttext == ':' and nesting == 0:
+                    should_exclude = (elineno in self.raw_excluded) or excluding_decorators
+                    if not excluding and should_exclude:
+                        # Start excluding a suite.  We trigger off of the colon
+                        # token so that the #pragma comment will be recognized on
+                        # the same line as the colon.
+                        self.raw_excluded.add(elineno)
+                        exclude_indent = indent
+                        excluding = True
+                        excluding_decorators = False
+                elif ttext == '@' and first_on_line:
+                    # A decorator.
+                    if elineno in self.raw_excluded:
+                        excluding_decorators = True
+                    if excluding_decorators:
+                        self.raw_excluded.add(elineno)
+                elif ttext in "([{":
+                    nesting += 1
+                elif ttext in ")]}":
+                    nesting -= 1
+            elif toktype == token.STRING and prev_toktype == token.INDENT:
+                # Strings that are first on an indented line are docstrings.
+                # (a trick from trace.py in the stdlib.) This works for
+                # 99.9999% of cases.  For the rest (!) see:
+                # http://stackoverflow.com/questions/1769332/x/1769794#1769794
+                self.raw_docstrings.update(range(slineno, elineno+1))
+            elif toktype == token.NEWLINE:
+                if first_line is not None and elineno != first_line:
+                    # We're at the end of a line, and we've ended on a
+                    # different line than the first line of the statement,
+                    # so record a multi-line range.
+                    for l in range(first_line, elineno+1):
+                        self._multiline[l] = first_line
+                first_line = None
+                first_on_line = True
+
+            if ttext.strip() and toktype != tokenize.COMMENT:
+                # A non-whitespace token.
+                empty = False
+                if first_line is None:
+                    # The token is not whitespace, and is the first in a
+                    # statement.
+                    first_line = slineno
+                    # Check whether to end an excluded suite.
+                    if excluding and indent <= exclude_indent:
+                        excluding = False
+                    if excluding:
+                        self.raw_excluded.add(elineno)
+                    first_on_line = False
+
+            prev_toktype = toktype
+
+        # Find the starts of the executable statements.
+        if not empty:
+            byte_parser = ByteParser(self.text, filename=self.filename)
+            self.raw_statements.update(byte_parser._find_statements())
+
+        # The first line of modules can lie and say 1 always, even if the first
+        # line of code is later. If so, map 1 to the actual first line of the
+        # module.
+        if env.PYBEHAVIOR.module_firstline_1 and self._multiline:
+            self._multiline[1] = min(self.raw_statements)
+
+    def first_line(self, line):
+        """Return the first line number of the statement including `line`."""
+        if line < 0:
+            line = -self._multiline.get(-line, -line)
+        else:
+            line = self._multiline.get(line, line)
+        return line
+
+    def first_lines(self, lines):
+        """Map the line numbers in `lines` to the correct first line of the
+        statement.
+
+        Returns a set of the first lines.
+
+        """
+        return {self.first_line(l) for l in lines}
+
+    def translate_lines(self, lines):
+        """Implement `FileReporter.translate_lines`."""
+        return self.first_lines(lines)
+
+    def translate_arcs(self, arcs):
+        """Implement `FileReporter.translate_arcs`."""
+        return [(self.first_line(a), self.first_line(b)) for (a, b) in arcs]
+
+    def parse_source(self):
+        """Parse source text to find executable lines, excluded lines, etc.
+
+        Sets the .excluded and .statements attributes, normalized to the first
+        line of multi-line statements.
+
+        """
+        try:
+            self._raw_parse()
+        except (tokenize.TokenError, IndentationError) as err:
+            if hasattr(err, "lineno"):
+                lineno = err.lineno         # IndentationError
+            else:
+                lineno = err.args[1][0]     # TokenError
+            raise NotPython(
+                f"Couldn't parse '{self.filename}' as Python source: " +
+                f"{err.args[0]!r} at line {lineno}"
+            ) from err
+
+        self.excluded = self.first_lines(self.raw_excluded)
+
+        ignore = self.excluded | self.raw_docstrings
+        starts = self.raw_statements - ignore
+        self.statements = self.first_lines(starts) - ignore
+
+    def arcs(self):
+        """Get information about the arcs available in the code.
+
+        Returns a set of line number pairs.  Line numbers have been normalized
+        to the first line of multi-line statements.
+
+        """
+        if self._all_arcs is None:
+            self._analyze_ast()
+        return self._all_arcs
+
+    def _analyze_ast(self):
+        """Run the AstArcAnalyzer and save its results.
+
+        `_all_arcs` is the set of arcs in the code.
+
+        """
+        aaa = AstArcAnalyzer(self.text, self.raw_statements, self._multiline)
+        aaa.analyze()
+
+        self._all_arcs = set()
+        for l1, l2 in aaa.arcs:
+            fl1 = self.first_line(l1)
+            fl2 = self.first_line(l2)
+            if fl1 != fl2:
+                self._all_arcs.add((fl1, fl2))
+
+        self._missing_arc_fragments = aaa.missing_arc_fragments
+
+    def exit_counts(self):
+        """Get a count of exits from that each line.
+
+        Excluded lines are excluded.
+
+        """
+        exit_counts = collections.defaultdict(int)
+        for l1, l2 in self.arcs():
+            if l1 < 0:
+                # Don't ever report -1 as a line number
+                continue
+            if l1 in self.excluded:
+                # Don't report excluded lines as line numbers.
+                continue
+            if l2 in self.excluded:
+                # Arcs to excluded lines shouldn't count.
+                continue
+            exit_counts[l1] += 1
+
+        return exit_counts
+
+    def missing_arc_description(self, start, end, executed_arcs=None):
+        """Provide an English sentence describing a missing arc."""
+        if self._missing_arc_fragments is None:
+            self._analyze_ast()
+
+        actual_start = start
+
+        if (
+            executed_arcs and
+            end < 0 and end == -start and
+            (end, start) not in executed_arcs and
+            (end, start) in self._missing_arc_fragments
+        ):
+            # It's a one-line callable, and we never even started it,
+            # and we have a message about not starting it.
+            start, end = end, start
+
+        fragment_pairs = self._missing_arc_fragments.get((start, end), [(None, None)])
+
+        msgs = []
+        for smsg, emsg in fragment_pairs:
+            if emsg is None:
+                if end < 0:
+                    # Hmm, maybe we have a one-line callable, let's check.
+                    if (-end, end) in self._missing_arc_fragments:
+                        return self.missing_arc_description(-end, end)
+                    emsg = "didn't jump to the function exit"
+                else:
+                    emsg = "didn't jump to line {lineno}"
+            emsg = emsg.format(lineno=end)
+
+            msg = f"line {actual_start} {emsg}"
+            if smsg is not None:
+                msg += f", because {smsg.format(lineno=actual_start)}"
+
+            msgs.append(msg)
+
+        return " or ".join(msgs)
+
+
+class ByteParser:
+    """Parse bytecode to understand the structure of code."""
+
+    @contract(text='unicode')
+    def __init__(self, text, code=None, filename=None):
+        self.text = text
+        if code:
+            self.code = code
+        else:
+            try:
+                self.code = compile_unicode(text, filename, "exec")
+            except SyntaxError as synerr:
+                raise NotPython(
+                    "Couldn't parse '%s' as Python source: '%s' at line %d" % (
+                        filename, synerr.msg, synerr.lineno
+                    )
+                ) from synerr
+
+        # Alternative Python implementations don't always provide all the
+        # attributes on code objects that we need to do the analysis.
+        for attr in ['co_lnotab', 'co_firstlineno']:
+            if not hasattr(self.code, attr):
+                raise _StopEverything(                  # pragma: only jython
+                    "This implementation of Python doesn't support code analysis.\n" +
+                    "Run coverage.py under another Python for this command."
+                )
+
+    def child_parsers(self):
+        """Iterate over all the code objects nested within this one.
+
+        The iteration includes `self` as its first value.
+
+        """
+        return (ByteParser(self.text, code=c) for c in code_objects(self.code))
+
+    def _line_numbers(self):
+        """Yield the line numbers possible in this code object.
+
+        Uses co_lnotab described in Python/compile.c to find the
+        line numbers.  Produces a sequence: l0, l1, ...
+        """
+        if hasattr(self.code, "co_lines"):
+            for _, _, line in self.code.co_lines():
+                if line is not None:
+                    yield line
+        else:
+            # Adapted from dis.py in the standard library.
+            byte_increments = self.code.co_lnotab[0::2]
+            line_increments = self.code.co_lnotab[1::2]
+
+            last_line_num = None
+            line_num = self.code.co_firstlineno
+            byte_num = 0
+            for byte_incr, line_incr in zip(byte_increments, line_increments):
+                if byte_incr:
+                    if line_num != last_line_num:
+                        yield line_num
+                        last_line_num = line_num
+                    byte_num += byte_incr
+                if env.PYBEHAVIOR.negative_lnotab and line_incr >= 0x80:
+                    line_incr -= 0x100
+                line_num += line_incr
+            if line_num != last_line_num:
+                yield line_num
+
+    def _find_statements(self):
+        """Find the statements in `self.code`.
+
+        Produce a sequence of line numbers that start statements.  Recurses
+        into all code objects reachable from `self.code`.
+
+        """
+        for bp in self.child_parsers():
+            # Get all of the lineno information from this code.
+            yield from bp._line_numbers()
+
+
+#
+# AST analysis
+#
+
+class BlockBase:
+    """
+    Blocks need to handle various exiting statements in their own ways.
+
+    All of these methods take a list of exits, and a callable `add_arc`
+    function that they can use to add arcs if needed.  They return True if the
+    exits are handled, or False if the search should continue up the block
+    stack.
+    """
+    # pylint: disable=unused-argument
+    def process_break_exits(self, exits, add_arc):
+        """Process break exits."""
+        # Because break can only appear in loops, and most subclasses
+        # implement process_break_exits, this function is never reached.
+        raise AssertionError
+
+    def process_continue_exits(self, exits, add_arc):
+        """Process continue exits."""
+        # Because continue can only appear in loops, and most subclasses
+        # implement process_continue_exits, this function is never reached.
+        raise AssertionError
+
+    def process_raise_exits(self, exits, add_arc):
+        """Process raise exits."""
+        return False
+
+    def process_return_exits(self, exits, add_arc):
+        """Process return exits."""
+        return False
+
+
+class LoopBlock(BlockBase):
+    """A block on the block stack representing a `for` or `while` loop."""
+    @contract(start=int)
+    def __init__(self, start):
+        # The line number where the loop starts.
+        self.start = start
+        # A set of ArcStarts, the arcs from break statements exiting this loop.
+        self.break_exits = set()
+
+    def process_break_exits(self, exits, add_arc):
+        self.break_exits.update(exits)
+        return True
+
+    def process_continue_exits(self, exits, add_arc):
+        for xit in exits:
+            add_arc(xit.lineno, self.start, xit.cause)
+        return True
+
+
+class FunctionBlock(BlockBase):
+    """A block on the block stack representing a function definition."""
+    @contract(start=int, name=str)
+    def __init__(self, start, name):
+        # The line number where the function starts.
+        self.start = start
+        # The name of the function.
+        self.name = name
+
+    def process_raise_exits(self, exits, add_arc):
+        for xit in exits:
+            add_arc(
+                xit.lineno, -self.start, xit.cause,
+                f"didn't except from function {self.name!r}",
+            )
+        return True
+
+    def process_return_exits(self, exits, add_arc):
+        for xit in exits:
+            add_arc(
+                xit.lineno, -self.start, xit.cause,
+                f"didn't return from function {self.name!r}",
+            )
+        return True
+
+
+class TryBlock(BlockBase):
+    """A block on the block stack representing a `try` block."""
+    @contract(handler_start='int|None', final_start='int|None')
+    def __init__(self, handler_start, final_start):
+        # The line number of the first "except" handler, if any.
+        self.handler_start = handler_start
+        # The line number of the "finally:" clause, if any.
+        self.final_start = final_start
+
+        # The ArcStarts for breaks/continues/returns/raises inside the "try:"
+        # that need to route through the "finally:" clause.
+        self.break_from = set()
+        self.continue_from = set()
+        self.raise_from = set()
+        self.return_from = set()
+
+    def process_break_exits(self, exits, add_arc):
+        if self.final_start is not None:
+            self.break_from.update(exits)
+            return True
+        return False
+
+    def process_continue_exits(self, exits, add_arc):
+        if self.final_start is not None:
+            self.continue_from.update(exits)
+            return True
+        return False
+
+    def process_raise_exits(self, exits, add_arc):
+        if self.handler_start is not None:
+            for xit in exits:
+                add_arc(xit.lineno, self.handler_start, xit.cause)
+        else:
+            assert self.final_start is not None
+            self.raise_from.update(exits)
+        return True
+
+    def process_return_exits(self, exits, add_arc):
+        if self.final_start is not None:
+            self.return_from.update(exits)
+            return True
+        return False
+
+
+class WithBlock(BlockBase):
+    """A block on the block stack representing a `with` block."""
+    @contract(start=int)
+    def __init__(self, start):
+        # We only ever use this block if it is needed, so that we don't have to
+        # check this setting in all the methods.
+        assert env.PYBEHAVIOR.exit_through_with
+
+        # The line number of the with statement.
+        self.start = start
+
+        # The ArcStarts for breaks/continues/returns/raises inside the "with:"
+        # that need to go through the with-statement while exiting.
+        self.break_from = set()
+        self.continue_from = set()
+        self.return_from = set()
+
+    def _process_exits(self, exits, add_arc, from_set=None):
+        """Helper to process the four kinds of exits."""
+        for xit in exits:
+            add_arc(xit.lineno, self.start, xit.cause)
+        if from_set is not None:
+            from_set.update(exits)
+        return True
+
+    def process_break_exits(self, exits, add_arc):
+        return self._process_exits(exits, add_arc, self.break_from)
+
+    def process_continue_exits(self, exits, add_arc):
+        return self._process_exits(exits, add_arc, self.continue_from)
+
+    def process_raise_exits(self, exits, add_arc):
+        return self._process_exits(exits, add_arc)
+
+    def process_return_exits(self, exits, add_arc):
+        return self._process_exits(exits, add_arc, self.return_from)
+
+
+class ArcStart(collections.namedtuple("Arc", "lineno, cause")):
+    """The information needed to start an arc.
+
+    `lineno` is the line number the arc starts from.
+
+    `cause` is an English text fragment used as the `startmsg` for
+    AstArcAnalyzer.missing_arc_fragments.  It will be used to describe why an
+    arc wasn't executed, so should fit well into a sentence of the form,
+    "Line 17 didn't run because {cause}."  The fragment can include "{lineno}"
+    to have `lineno` interpolated into it.
+
+    """
+    def __new__(cls, lineno, cause=None):
+        return super().__new__(cls, lineno, cause)
+
+
+# Define contract words that PyContract doesn't have.
+# ArcStarts is for a list or set of ArcStart's.
+new_contract('ArcStarts', lambda seq: all(isinstance(x, ArcStart) for x in seq))
+
+
+class NodeList:
+    """A synthetic fictitious node, containing a sequence of nodes.
+
+    This is used when collapsing optimized if-statements, to represent the
+    unconditional execution of one of the clauses.
+
+    """
+    def __init__(self, body):
+        self.body = body
+        self.lineno = body[0].lineno
+
+# TODO: some add_arcs methods here don't add arcs, they return them. Rename them.
+# TODO: the cause messages have too many commas.
+# TODO: Shouldn't the cause messages join with "and" instead of "or"?
+
+def ast_parse(text):
+    """How we create an AST parse."""
+    return ast.parse(neuter_encoding_declaration(text))
+
+
+class AstArcAnalyzer:
+    """Analyze source text with an AST to find executable code paths."""
+
+    @contract(text='unicode', statements=set)
+    def __init__(self, text, statements, multiline):
+        self.root_node = ast_parse(text)
+        # TODO: I think this is happening in too many places.
+        self.statements = {multiline.get(l, l) for l in statements}
+        self.multiline = multiline
+
+        # Turn on AST dumps with an environment variable.
+        # $set_env.py: COVERAGE_AST_DUMP - Dump the AST nodes when parsing code.
+        dump_ast = bool(int(os.environ.get("COVERAGE_AST_DUMP", 0)))
+
+        if dump_ast:                                # pragma: debugging
+            # Dump the AST so that failing tests have helpful output.
+            print(f"Statements: {self.statements}")
+            print(f"Multiline map: {self.multiline}")
+            ast_dump(self.root_node)
+
+        self.arcs = set()
+
+        # A map from arc pairs to a list of pairs of sentence fragments:
+        #   { (start, end): [(startmsg, endmsg), ...], }
+        #
+        # For an arc from line 17, they should be usable like:
+        #    "Line 17 {endmsg}, because {startmsg}"
+        self.missing_arc_fragments = collections.defaultdict(list)
+        self.block_stack = []
+
+        # $set_env.py: COVERAGE_TRACK_ARCS - Trace possible arcs added while parsing code.
+        self.debug = bool(int(os.environ.get("COVERAGE_TRACK_ARCS", 0)))
+
+    def analyze(self):
+        """Examine the AST tree from `root_node` to determine possible arcs.
+
+        This sets the `arcs` attribute to be a set of (from, to) line number
+        pairs.
+
+        """
+        for node in ast.walk(self.root_node):
+            node_name = node.__class__.__name__
+            code_object_handler = getattr(self, "_code_object__" + node_name, None)
+            if code_object_handler is not None:
+                code_object_handler(node)
+
+    @contract(start=int, end=int)
+    def add_arc(self, start, end, smsg=None, emsg=None):
+        """Add an arc, including message fragments to use if it is missing."""
+        if self.debug:                      # pragma: debugging
+            print(f"\nAdding possible arc: ({start}, {end}): {smsg!r}, {emsg!r}")
+            print(short_stack(limit=10))
+        self.arcs.add((start, end))
+
+        if smsg is not None or emsg is not None:
+            self.missing_arc_fragments[(start, end)].append((smsg, emsg))
+
+    def nearest_blocks(self):
+        """Yield the blocks in nearest-to-farthest order."""
+        return reversed(self.block_stack)
+
+    @contract(returns=int)
+    def line_for_node(self, node):
+        """What is the right line number to use for this node?
+
+        This dispatches to _line__Node functions where needed.
+
+        """
+        node_name = node.__class__.__name__
+        handler = getattr(self, "_line__" + node_name, None)
+        if handler is not None:
+            return handler(node)
+        else:
+            return node.lineno
+
+    def _line_decorated(self, node):
+        """Compute first line number for things that can be decorated (classes and functions)."""
+        lineno = node.lineno
+        if env.PYBEHAVIOR.trace_decorated_def or env.PYBEHAVIOR.def_ast_no_decorator:
+            if node.decorator_list:
+                lineno = node.decorator_list[0].lineno
+        return lineno
+
+    def _line__Assign(self, node):
+        return self.line_for_node(node.value)
+
+    _line__ClassDef = _line_decorated
+
+    def _line__Dict(self, node):
+        if node.keys:
+            if node.keys[0] is not None:
+                return node.keys[0].lineno
+            else:
+                # Unpacked dict literals `{**{'a':1}}` have None as the key,
+                # use the value in that case.
+                return node.values[0].lineno
+        else:
+            return node.lineno
+
+    _line__FunctionDef = _line_decorated
+    _line__AsyncFunctionDef = _line_decorated
+
+    def _line__List(self, node):
+        if node.elts:
+            return self.line_for_node(node.elts[0])
+        else:
+            return node.lineno
+
+    def _line__Module(self, node):
+        if env.PYBEHAVIOR.module_firstline_1:
+            return 1
+        elif node.body:
+            return self.line_for_node(node.body[0])
+        else:
+            # Empty modules have no line number, they always start at 1.
+            return 1
+
+    # The node types that just flow to the next node with no complications.
+    OK_TO_DEFAULT = {
+        "AnnAssign", "Assign", "Assert", "AugAssign", "Delete", "Expr", "Global",
+        "Import", "ImportFrom", "Nonlocal", "Pass",
+    }
+
+    @contract(returns='ArcStarts')
+    def add_arcs(self, node):
+        """Add the arcs for `node`.
+
+        Return a set of ArcStarts, exits from this node to the next. Because a
+        node represents an entire sub-tree (including its children), the exits
+        from a node can be arbitrarily complex::
+
+            if something(1):
+                if other(2):
+                    doit(3)
+                else:
+                    doit(5)
+
+        There are two exits from line 1: they start at line 3 and line 5.
+
+        """
+        node_name = node.__class__.__name__
+        handler = getattr(self, "_handle__" + node_name, None)
+        if handler is not None:
+            return handler(node)
+        else:
+            # No handler: either it's something that's ok to default (a simple
+            # statement), or it's something we overlooked.
+            if env.TESTING:
+                if node_name not in self.OK_TO_DEFAULT:
+                    raise Exception(f"*** Unhandled: {node}")       # pragma: only failure
+
+            # Default for simple statements: one exit from this node.
+            return {ArcStart(self.line_for_node(node))}
+
+    @one_of("from_start, prev_starts")
+    @contract(returns='ArcStarts')
+    def add_body_arcs(self, body, from_start=None, prev_starts=None):
+        """Add arcs for the body of a compound statement.
+
+        `body` is the body node.  `from_start` is a single `ArcStart` that can
+        be the previous line in flow before this body.  `prev_starts` is a set
+        of ArcStarts that can be the previous line.  Only one of them should be
+        given.
+
+        Returns a set of ArcStarts, the exits from this body.
+
+        """
+        if prev_starts is None:
+            prev_starts = {from_start}
+        for body_node in body:
+            lineno = self.line_for_node(body_node)
+            first_line = self.multiline.get(lineno, lineno)
+            if first_line not in self.statements:
+                body_node = self.find_non_missing_node(body_node)
+                if body_node is None:
+                    continue
+                lineno = self.line_for_node(body_node)
+            for prev_start in prev_starts:
+                self.add_arc(prev_start.lineno, lineno, prev_start.cause)
+            prev_starts = self.add_arcs(body_node)
+        return prev_starts
+
+    def find_non_missing_node(self, node):
+        """Search `node` looking for a child that has not been optimized away.
+
+        This might return the node you started with, or it will work recursively
+        to find a child node in self.statements.
+
+        Returns a node, or None if none of the node remains.
+
+        """
+        # This repeats work just done in add_body_arcs, but this duplication
+        # means we can avoid a function call in the 99.9999% case of not
+        # optimizing away statements.
+        lineno = self.line_for_node(node)
+        first_line = self.multiline.get(lineno, lineno)
+        if first_line in self.statements:
+            return node
+
+        missing_fn = getattr(self, "_missing__" + node.__class__.__name__, None)
+        if missing_fn:
+            node = missing_fn(node)
+        else:
+            node = None
+        return node
+
+    # Missing nodes: _missing__*
+    #
+    # Entire statements can be optimized away by Python. They will appear in
+    # the AST, but not the bytecode.  These functions are called (by
+    # find_non_missing_node) to find a node to use instead of the missing
+    # node.  They can return None if the node should truly be gone.
+
+    def _missing__If(self, node):
+        # If the if-node is missing, then one of its children might still be
+        # here, but not both. So return the first of the two that isn't missing.
+        # Use a NodeList to hold the clauses as a single node.
+        non_missing = self.find_non_missing_node(NodeList(node.body))
+        if non_missing:
+            return non_missing
+        if node.orelse:
+            return self.find_non_missing_node(NodeList(node.orelse))
+        return None
+
+    def _missing__NodeList(self, node):
+        # A NodeList might be a mixture of missing and present nodes. Find the
+        # ones that are present.
+        non_missing_children = []
+        for child in node.body:
+            child = self.find_non_missing_node(child)
+            if child is not None:
+                non_missing_children.append(child)
+
+        # Return the simplest representation of the present children.
+        if not non_missing_children:
+            return None
+        if len(non_missing_children) == 1:
+            return non_missing_children[0]
+        return NodeList(non_missing_children)
+
+    def _missing__While(self, node):
+        body_nodes = self.find_non_missing_node(NodeList(node.body))
+        if not body_nodes:
+            return None
+        # Make a synthetic While-true node.
+        new_while = ast.While()
+        new_while.lineno = body_nodes.lineno
+        new_while.test = ast.Name()
+        new_while.test.lineno = body_nodes.lineno
+        new_while.test.id = "True"
+        new_while.body = body_nodes.body
+        new_while.orelse = None
+        return new_while
+
+    def is_constant_expr(self, node):
+        """Is this a compile-time constant?"""
+        node_name = node.__class__.__name__
+        if node_name in ["Constant", "NameConstant", "Num"]:
+            return "Num"
+        elif node_name == "Name":
+            if node.id in ["True", "False", "None", "__debug__"]:
+                return "Name"
+        return None
+
+    # In the fullness of time, these might be good tests to write:
+    #   while EXPR:
+    #   while False:
+    #   listcomps hidden deep in other expressions
+    #   listcomps hidden in lists: x = [[i for i in range(10)]]
+    #   nested function definitions
+
+
+    # Exit processing: process_*_exits
+    #
+    # These functions process the four kinds of jump exits: break, continue,
+    # raise, and return.  To figure out where an exit goes, we have to look at
+    # the block stack context.  For example, a break will jump to the nearest
+    # enclosing loop block, or the nearest enclosing finally block, whichever
+    # is nearer.
+
+    @contract(exits='ArcStarts')
+    def process_break_exits(self, exits):
+        """Add arcs due to jumps from `exits` being breaks."""
+        for block in self.nearest_blocks():                         # pragma: always breaks
+            if block.process_break_exits(exits, self.add_arc):
+                break
+
+    @contract(exits='ArcStarts')
+    def process_continue_exits(self, exits):
+        """Add arcs due to jumps from `exits` being continues."""
+        for block in self.nearest_blocks():                         # pragma: always breaks
+            if block.process_continue_exits(exits, self.add_arc):
+                break
+
+    @contract(exits='ArcStarts')
+    def process_raise_exits(self, exits):
+        """Add arcs due to jumps from `exits` being raises."""
+        for block in self.nearest_blocks():
+            if block.process_raise_exits(exits, self.add_arc):
+                break
+
+    @contract(exits='ArcStarts')
+    def process_return_exits(self, exits):
+        """Add arcs due to jumps from `exits` being returns."""
+        for block in self.nearest_blocks():                         # pragma: always breaks
+            if block.process_return_exits(exits, self.add_arc):
+                break
+
+    # Handlers: _handle__*
+    #
+    # Each handler deals with a specific AST node type, dispatched from
+    # add_arcs.  Handlers return the set of exits from that node, and can
+    # also call self.add_arc to record arcs they find.  These functions mirror
+    # the Python semantics of each syntactic construct.  See the docstring
+    # for add_arcs to understand the concept of exits from a node.
+    #
+    # Every node type that represents a statement should have a handler, or it
+    # should be listed in OK_TO_DEFAULT.
+
+    @contract(returns='ArcStarts')
+    def _handle__Break(self, node):
+        here = self.line_for_node(node)
+        break_start = ArcStart(here, cause="the break on line {lineno} wasn't executed")
+        self.process_break_exits([break_start])
+        return set()
+
+    @contract(returns='ArcStarts')
+    def _handle_decorated(self, node):
+        """Add arcs for things that can be decorated (classes and functions)."""
+        main_line = last = node.lineno
+        decs = node.decorator_list
+        if decs:
+            if env.PYBEHAVIOR.trace_decorated_def or env.PYBEHAVIOR.def_ast_no_decorator:
+                last = None
+            for dec_node in decs:
+                dec_start = self.line_for_node(dec_node)
+                if last is not None and dec_start != last:
+                    self.add_arc(last, dec_start)
+                last = dec_start
+            if env.PYBEHAVIOR.trace_decorated_def:
+                self.add_arc(last, main_line)
+                last = main_line
+            if env.PYBEHAVIOR.trace_decorator_line_again:
+                for top, bot in zip(decs, decs[1:]):
+                    self.add_arc(self.line_for_node(bot), self.line_for_node(top))
+                self.add_arc(self.line_for_node(decs[0]), main_line)
+                self.add_arc(main_line, self.line_for_node(decs[-1]))
+            # The definition line may have been missed, but we should have it
+            # in `self.statements`.  For some constructs, `line_for_node` is
+            # not what we'd think of as the first line in the statement, so map
+            # it to the first one.
+            if node.body:
+                body_start = self.line_for_node(node.body[0])
+                body_start = self.multiline.get(body_start, body_start)
+                for lineno in range(last+1, body_start):
+                    if lineno in self.statements:
+                        self.add_arc(last, lineno)
+                        last = lineno
+        # The body is handled in collect_arcs.
+        return {ArcStart(last)}
+
+    _handle__ClassDef = _handle_decorated
+
+    @contract(returns='ArcStarts')
+    def _handle__Continue(self, node):
+        here = self.line_for_node(node)
+        continue_start = ArcStart(here, cause="the continue on line {lineno} wasn't executed")
+        self.process_continue_exits([continue_start])
+        return set()
+
+    @contract(returns='ArcStarts')
+    def _handle__For(self, node):
+        start = self.line_for_node(node.iter)
+        self.block_stack.append(LoopBlock(start=start))
+        from_start = ArcStart(start, cause="the loop on line {lineno} never started")
+        exits = self.add_body_arcs(node.body, from_start=from_start)
+        # Any exit from the body will go back to the top of the loop.
+        for xit in exits:
+            self.add_arc(xit.lineno, start, xit.cause)
+        my_block = self.block_stack.pop()
+        exits = my_block.break_exits
+        from_start = ArcStart(start, cause="the loop on line {lineno} didn't complete")
+        if node.orelse:
+            else_exits = self.add_body_arcs(node.orelse, from_start=from_start)
+            exits |= else_exits
+        else:
+            # No else clause: exit from the for line.
+            exits.add(from_start)
+        return exits
+
+    _handle__AsyncFor = _handle__For
+
+    _handle__FunctionDef = _handle_decorated
+    _handle__AsyncFunctionDef = _handle_decorated
+
+    @contract(returns='ArcStarts')
+    def _handle__If(self, node):
+        start = self.line_for_node(node.test)
+        from_start = ArcStart(start, cause="the condition on line {lineno} was never true")
+        exits = self.add_body_arcs(node.body, from_start=from_start)
+        from_start = ArcStart(start, cause="the condition on line {lineno} was never false")
+        exits |= self.add_body_arcs(node.orelse, from_start=from_start)
+        return exits
+
+    @contract(returns='ArcStarts')
+    def _handle__Match(self, node):
+        start = self.line_for_node(node)
+        last_start = start
+        exits = set()
+        had_wildcard = False
+        for case in node.cases:
+            case_start = self.line_for_node(case.pattern)
+            if isinstance(case.pattern, ast.MatchAs):
+                had_wildcard = True
+            self.add_arc(last_start, case_start, "the pattern on line {lineno} always matched")
+            from_start = ArcStart(case_start, cause="the pattern on line {lineno} never matched")
+            exits |= self.add_body_arcs(case.body, from_start=from_start)
+            last_start = case_start
+        if not had_wildcard:
+            exits.add(from_start)
+        return exits
+
+    @contract(returns='ArcStarts')
+    def _handle__NodeList(self, node):
+        start = self.line_for_node(node)
+        exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
+        return exits
+
+    @contract(returns='ArcStarts')
+    def _handle__Raise(self, node):
+        here = self.line_for_node(node)
+        raise_start = ArcStart(here, cause="the raise on line {lineno} wasn't executed")
+        self.process_raise_exits([raise_start])
+        # `raise` statement jumps away, no exits from here.
+        return set()
+
+    @contract(returns='ArcStarts')
+    def _handle__Return(self, node):
+        here = self.line_for_node(node)
+        return_start = ArcStart(here, cause="the return on line {lineno} wasn't executed")
+        self.process_return_exits([return_start])
+        # `return` statement jumps away, no exits from here.
+        return set()
+
+    @contract(returns='ArcStarts')
+    def _handle__Try(self, node):
+        if node.handlers:
+            handler_start = self.line_for_node(node.handlers[0])
+        else:
+            handler_start = None
+
+        if node.finalbody:
+            final_start = self.line_for_node(node.finalbody[0])
+        else:
+            final_start = None
+
+        # This is true by virtue of Python syntax: have to have either except
+        # or finally, or both.
+        assert handler_start is not None or final_start is not None
+        try_block = TryBlock(handler_start, final_start)
+        self.block_stack.append(try_block)
+
+        start = self.line_for_node(node)
+        exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
+
+        # We're done with the `try` body, so this block no longer handles
+        # exceptions. We keep the block so the `finally` clause can pick up
+        # flows from the handlers and `else` clause.
+        if node.finalbody:
+            try_block.handler_start = None
+            if node.handlers:
+                # If there are `except` clauses, then raises in the try body
+                # will already jump to them.  Start this set over for raises in
+                # `except` and `else`.
+                try_block.raise_from = set()
+        else:
+            self.block_stack.pop()
+
+        handler_exits = set()
+
+        if node.handlers:
+            last_handler_start = None
+            for handler_node in node.handlers:
+                handler_start = self.line_for_node(handler_node)
+                if last_handler_start is not None:
+                    self.add_arc(last_handler_start, handler_start)
+                last_handler_start = handler_start
+                from_cause = "the exception caught by line {lineno} didn't happen"
+                from_start = ArcStart(handler_start, cause=from_cause)
+                handler_exits |= self.add_body_arcs(handler_node.body, from_start=from_start)
+
+        if node.orelse:
+            exits = self.add_body_arcs(node.orelse, prev_starts=exits)
+
+        exits |= handler_exits
+
+        if node.finalbody:
+            self.block_stack.pop()
+            final_from = (                  # You can get to the `finally` clause from:
+                exits |                         # the exits of the body or `else` clause,
+                try_block.break_from |          # or a `break`,
+                try_block.continue_from |       # or a `continue`,
+                try_block.raise_from |          # or a `raise`,
+                try_block.return_from           # or a `return`.
+            )
+
+            final_exits = self.add_body_arcs(node.finalbody, prev_starts=final_from)
+
+            if try_block.break_from:
+                if env.PYBEHAVIOR.finally_jumps_back:
+                    for break_line in try_block.break_from:
+                        lineno = break_line.lineno
+                        cause = break_line.cause.format(lineno=lineno)
+                        for final_exit in final_exits:
+                            self.add_arc(final_exit.lineno, lineno, cause)
+                    breaks = try_block.break_from
+                else:
+                    breaks = self._combine_finally_starts(try_block.break_from, final_exits)
+                self.process_break_exits(breaks)
+
+            if try_block.continue_from:
+                if env.PYBEHAVIOR.finally_jumps_back:
+                    for continue_line in try_block.continue_from:
+                        lineno = continue_line.lineno
+                        cause = continue_line.cause.format(lineno=lineno)
+                        for final_exit in final_exits:
+                            self.add_arc(final_exit.lineno, lineno, cause)
+                    continues = try_block.continue_from
+                else:
+                    continues = self._combine_finally_starts(try_block.continue_from, final_exits)
+                self.process_continue_exits(continues)
+
+            if try_block.raise_from:
+                self.process_raise_exits(
+                    self._combine_finally_starts(try_block.raise_from, final_exits)
+                )
+
+            if try_block.return_from:
+                if env.PYBEHAVIOR.finally_jumps_back:
+                    for return_line in try_block.return_from:
+                        lineno = return_line.lineno
+                        cause = return_line.cause.format(lineno=lineno)
+                        for final_exit in final_exits:
+                            self.add_arc(final_exit.lineno, lineno, cause)
+                    returns = try_block.return_from
+                else:
+                    returns = self._combine_finally_starts(try_block.return_from, final_exits)
+                self.process_return_exits(returns)
+
+            if exits:
+                # The finally clause's exits are only exits for the try block
+                # as a whole if the try block had some exits to begin with.
+                exits = final_exits
+
+        return exits
+
+    @contract(starts='ArcStarts', exits='ArcStarts', returns='ArcStarts')
+    def _combine_finally_starts(self, starts, exits):
+        """Helper for building the cause of `finally` branches.
+
+        "finally" clauses might not execute their exits, and the causes could
+        be due to a failure to execute any of the exits in the try block. So
+        we use the causes from `starts` as the causes for `exits`.
+        """
+        causes = []
+        for start in sorted(starts):
+            if start.cause is not None:
+                causes.append(start.cause.format(lineno=start.lineno))
+        cause = " or ".join(causes)
+        exits = {ArcStart(xit.lineno, cause) for xit in exits}
+        return exits
+
+    @contract(returns='ArcStarts')
+    def _handle__While(self, node):
+        start = to_top = self.line_for_node(node.test)
+        constant_test = self.is_constant_expr(node.test)
+        top_is_body0 = False
+        if constant_test:
+            top_is_body0 = True
+        if env.PYBEHAVIOR.keep_constant_test:
+            top_is_body0 = False
+        if top_is_body0:
+            to_top = self.line_for_node(node.body[0])
+        self.block_stack.append(LoopBlock(start=to_top))
+        from_start = ArcStart(start, cause="the condition on line {lineno} was never true")
+        exits = self.add_body_arcs(node.body, from_start=from_start)
+        for xit in exits:
+            self.add_arc(xit.lineno, to_top, xit.cause)
+        exits = set()
+        my_block = self.block_stack.pop()
+        exits.update(my_block.break_exits)
+        from_start = ArcStart(start, cause="the condition on line {lineno} was never false")
+        if node.orelse:
+            else_exits = self.add_body_arcs(node.orelse, from_start=from_start)
+            exits |= else_exits
+        else:
+            # No `else` clause: you can exit from the start.
+            if not constant_test:
+                exits.add(from_start)
+        return exits
+
+    @contract(returns='ArcStarts')
+    def _handle__With(self, node):
+        start = self.line_for_node(node)
+        if env.PYBEHAVIOR.exit_through_with:
+            self.block_stack.append(WithBlock(start=start))
+        exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
+        if env.PYBEHAVIOR.exit_through_with:
+            with_block = self.block_stack.pop()
+            with_exit = {ArcStart(start)}
+            if exits:
+                for xit in exits:
+                    self.add_arc(xit.lineno, start)
+                exits = with_exit
+            if with_block.break_from:
+                self.process_break_exits(
+                    self._combine_finally_starts(with_block.break_from, with_exit)
+                )
+            if with_block.continue_from:
+                self.process_continue_exits(
+                    self._combine_finally_starts(with_block.continue_from, with_exit)
+                )
+            if with_block.return_from:
+                self.process_return_exits(
+                    self._combine_finally_starts(with_block.return_from, with_exit)
+                )
+        return exits
+
+    _handle__AsyncWith = _handle__With
+
+    # Code object dispatchers: _code_object__*
+    #
+    # These methods are used by analyze() as the start of the analysis.
+    # There is one for each construct with a code object.
+
+    def _code_object__Module(self, node):
+        start = self.line_for_node(node)
+        if node.body:
+            exits = self.add_body_arcs(node.body, from_start=ArcStart(-start))
+            for xit in exits:
+                self.add_arc(xit.lineno, -start, xit.cause, "didn't exit the module")
+        else:
+            # Empty module.
+            self.add_arc(-start, start)
+            self.add_arc(start, -start)
+
+    def _code_object__FunctionDef(self, node):
+        start = self.line_for_node(node)
+        self.block_stack.append(FunctionBlock(start=start, name=node.name))
+        exits = self.add_body_arcs(node.body, from_start=ArcStart(-start))
+        self.process_return_exits(exits)
+        self.block_stack.pop()
+
+    _code_object__AsyncFunctionDef = _code_object__FunctionDef
+
+    def _code_object__ClassDef(self, node):
+        start = self.line_for_node(node)
+        self.add_arc(-start, start)
+        exits = self.add_body_arcs(node.body, from_start=ArcStart(start))
+        for xit in exits:
+            self.add_arc(
+                xit.lineno, -start, xit.cause,
+                f"didn't exit the body of class {node.name!r}",
+            )
+
+    def _make_expression_code_method(noun):                     # pylint: disable=no-self-argument
+        """A function to make methods for expression-based callable _code_object__ methods."""
+        def _code_object__expression_callable(self, node):
+            start = self.line_for_node(node)
+            self.add_arc(-start, start, None, f"didn't run the {noun} on line {start}")
+            self.add_arc(start, -start, None, f"didn't finish the {noun} on line {start}")
+        return _code_object__expression_callable
+
+    # pylint: disable=too-many-function-args
+    _code_object__Lambda = _make_expression_code_method("lambda")
+    _code_object__GeneratorExp = _make_expression_code_method("generator expression")
+    _code_object__DictComp = _make_expression_code_method("dictionary comprehension")
+    _code_object__SetComp = _make_expression_code_method("set comprehension")
+    _code_object__ListComp = _make_expression_code_method("list comprehension")
+
+
+# Code only used when dumping the AST for debugging.
+
+SKIP_DUMP_FIELDS = ["ctx"]
+
+def _is_simple_value(value):
+    """Is `value` simple enough to be displayed on a single line?"""
+    return (
+        value in [None, [], (), {}, set()] or
+        isinstance(value, (bytes, int, float, str))
+    )
+
+def ast_dump(node, depth=0, print=print):   # pylint: disable=redefined-builtin
+    """Dump the AST for `node`.
+
+    This recursively walks the AST, printing a readable version.
+
+    """
+    indent = " " * depth
+    lineno = getattr(node, "lineno", None)
+    if lineno is not None:
+        linemark = f" @ {node.lineno},{node.col_offset}"
+        if hasattr(node, "end_lineno"):
+            linemark += ":"
+            if node.end_lineno != node.lineno:
+                linemark += f"{node.end_lineno},"
+            linemark += f"{node.end_col_offset}"
+    else:
+        linemark = ""
+    head = f"{indent}<{node.__class__.__name__}{linemark}"
+
+    named_fields = [
+        (name, value)
+        for name, value in ast.iter_fields(node)
+        if name not in SKIP_DUMP_FIELDS
+    ]
+    if not named_fields:
+        print(f"{head}>")
+    elif len(named_fields) == 1 and _is_simple_value(named_fields[0][1]):
+        field_name, value = named_fields[0]
+        print(f"{head} {field_name}: {value!r}>")
+    else:
+        print(head)
+        if 0:
+            print("{}# mro: {}".format(
+                indent, ", ".join(c.__name__ for c in node.__class__.__mro__[1:]),
+            ))
+        next_indent = indent + "    "
+        for field_name, value in named_fields:
+            prefix = f"{next_indent}{field_name}:"
+            if _is_simple_value(value):
+                print(f"{prefix} {value!r}")
+            elif isinstance(value, list):
+                print(f"{prefix} [")
+                for n in value:
+                    if _is_simple_value(n):
+                        print(f"{next_indent}    {n!r}")
+                    else:
+                        ast_dump(n, depth + 8, print=print)
+                print(f"{next_indent}]")
+            else:
+                print(prefix)
+                ast_dump(value, depth + 8, print=print)
+
+        print(f"{indent}>")

eric ide

mercurial