eric7/DebugClients/Python/coverage/pytracer.py

branch
eric7
changeset 8312
800c432b34c8
parent 7975
7d493839a8fc
child 8527
2bd1325d727e
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/eric7/DebugClients/Python/coverage/pytracer.py	Sat May 15 18:45:04 2021 +0200
@@ -0,0 +1,245 @@
+# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
+# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
+
+"""Raw data collector for coverage.py."""
+
+import atexit
+import dis
+import sys
+
+from coverage import env
+
+# We need the YIELD_VALUE opcode below, in a comparison-friendly form.
+YIELD_VALUE = dis.opmap['YIELD_VALUE']
+if env.PY2:
+    YIELD_VALUE = chr(YIELD_VALUE)
+
+
+class PyTracer(object):
+    """Python implementation of the raw data tracer."""
+
+    # Because of poor implementations of trace-function-manipulating tools,
+    # the Python trace function must be kept very simple.  In particular, there
+    # must be only one function ever set as the trace function, both through
+    # sys.settrace, and as the return value from the trace function.  Put
+    # another way, the trace function must always return itself.  It cannot
+    # swap in other functions, or return None to avoid tracing a particular
+    # frame.
+    #
+    # The trace manipulator that introduced this restriction is DecoratorTools,
+    # which sets a trace function, and then later restores the pre-existing one
+    # by calling sys.settrace with a function it found in the current frame.
+    #
+    # Systems that use DecoratorTools (or similar trace manipulations) must use
+    # PyTracer to get accurate results.  The command-line --timid argument is
+    # used to force the use of this tracer.
+
+    def __init__(self):
+        # Attributes set from the collector:
+        self.data = None
+        self.trace_arcs = False
+        self.should_trace = None
+        self.should_trace_cache = None
+        self.should_start_context = None
+        self.warn = None
+        # The threading module to use, if any.
+        self.threading = None
+
+        self.cur_file_dict = None
+        self.last_line = 0          # int, but uninitialized.
+        self.cur_file_name = None
+        self.context = None
+        self.started_context = False
+
+        self.data_stack = []
+        self.last_exc_back = None
+        self.last_exc_firstlineno = 0
+        self.thread = None
+        self.stopped = False
+        self._activity = False
+
+        self.in_atexit = False
+        # On exit, self.in_atexit = True
+        atexit.register(setattr, self, 'in_atexit', True)
+
+    def __repr__(self):
+        return "<PyTracer at {}: {} lines in {} files>".format(
+            id(self),
+            sum(len(v) for v in self.data.values()),
+            len(self.data),
+        )
+
+    def log(self, marker, *args):
+        """For hard-core logging of what this tracer is doing."""
+        with open("/tmp/debug_trace.txt", "a") as f:
+            f.write("{} {:x}.{:x}[{}] {:x} {}\n".format(
+                marker,
+                id(self),
+                self.thread.ident,
+                len(self.data_stack),
+                self.threading.currentThread().ident,
+                " ".join(map(str, args))
+            ))
+
+    def _trace(self, frame, event, arg_unused):
+        """The trace function passed to sys.settrace."""
+
+        #self.log(":", frame.f_code.co_filename, frame.f_lineno, event)
+
+        if (self.stopped and sys.gettrace() == self._trace):    # pylint: disable=comparison-with-callable
+            # The PyTrace.stop() method has been called, possibly by another
+            # thread, let's deactivate ourselves now.
+            #self.log("X", frame.f_code.co_filename, frame.f_lineno)
+            sys.settrace(None)
+            return None
+
+        if self.last_exc_back:
+            if frame == self.last_exc_back:
+                # Someone forgot a return event.
+                if self.trace_arcs and self.cur_file_dict:
+                    pair = (self.last_line, -self.last_exc_firstlineno)
+                    self.cur_file_dict[pair] = None
+                self.cur_file_dict, self.cur_file_name, self.last_line, self.started_context = (
+                    self.data_stack.pop()
+                )
+            self.last_exc_back = None
+
+        if event == 'call':
+            # Should we start a new context?
+            if self.should_start_context and self.context is None:
+                context_maybe = self.should_start_context(frame)    # pylint: disable=not-callable
+                if context_maybe is not None:
+                    self.context = context_maybe
+                    self.started_context = True
+                    self.switch_context(self.context)
+                else:
+                    self.started_context = False
+            else:
+                self.started_context = False
+
+            # Entering a new frame.  Decide if we should trace
+            # in this file.
+            self._activity = True
+            self.data_stack.append(
+                (
+                    self.cur_file_dict,
+                    self.cur_file_name,
+                    self.last_line,
+                    self.started_context,
+                )
+            )
+            filename = frame.f_code.co_filename
+            self.cur_file_name = filename
+            disp = self.should_trace_cache.get(filename)
+            if disp is None:
+                disp = self.should_trace(filename, frame)   # pylint: disable=not-callable
+                self.should_trace_cache[filename] = disp    # pylint: disable=unsupported-assignment-operation
+
+            self.cur_file_dict = None
+            if disp.trace:
+                tracename = disp.source_filename
+                if tracename not in self.data:              # pylint: disable=unsupported-membership-test
+                    self.data[tracename] = {}               # pylint: disable=unsupported-assignment-operation
+                self.cur_file_dict = self.data[tracename]   # pylint: disable=unsubscriptable-object
+            # The call event is really a "start frame" event, and happens for
+            # function calls and re-entering generators.  The f_lasti field is
+            # -1 for calls, and a real offset for generators.  Use <0 as the
+            # line number for calls, and the real line number for generators.
+            if getattr(frame, 'f_lasti', -1) < 0:
+                self.last_line = -frame.f_code.co_firstlineno
+            else:
+                self.last_line = frame.f_lineno
+        elif event == 'line':
+            # Record an executed line.
+            if self.cur_file_dict is not None:
+                lineno = frame.f_lineno
+                #if frame.f_code.co_filename != self.cur_file_name:
+                #    self.log("*", frame.f_code.co_filename, self.cur_file_name, lineno)
+                if self.trace_arcs:
+                    self.cur_file_dict[(self.last_line, lineno)] = None
+                else:
+                    self.cur_file_dict[lineno] = None
+                self.last_line = lineno
+        elif event == 'return':
+            if self.trace_arcs and self.cur_file_dict:
+                # Record an arc leaving the function, but beware that a
+                # "return" event might just mean yielding from a generator.
+                # Jython seems to have an empty co_code, so just assume return.
+                code = frame.f_code.co_code
+                if (not code) or code[frame.f_lasti] != YIELD_VALUE:
+                    first = frame.f_code.co_firstlineno
+                    self.cur_file_dict[(self.last_line, -first)] = None
+            # Leaving this function, pop the filename stack.
+            self.cur_file_dict, self.cur_file_name, self.last_line, self.started_context = (
+                self.data_stack.pop()
+            )
+            # Leaving a context?
+            if self.started_context:
+                self.context = None
+                self.switch_context(None)
+        elif event == 'exception':
+            self.last_exc_back = frame.f_back
+            self.last_exc_firstlineno = frame.f_code.co_firstlineno
+        return self._trace
+
+    def start(self):
+        """Start this Tracer.
+
+        Return a Python function suitable for use with sys.settrace().
+
+        """
+        self.stopped = False
+        if self.threading:
+            if self.thread is None:
+                self.thread = self.threading.currentThread()
+            else:
+                if self.thread.ident != self.threading.currentThread().ident:
+                    # Re-starting from a different thread!? Don't set the trace
+                    # function, but we are marked as running again, so maybe it
+                    # will be ok?
+                    #self.log("~", "starting on different threads")
+                    return self._trace
+
+        sys.settrace(self._trace)
+        return self._trace
+
+    def stop(self):
+        """Stop this Tracer."""
+        # Get the active tracer callback before setting the stop flag to be
+        # able to detect if the tracer was changed prior to stopping it.
+        tf = sys.gettrace()
+
+        # Set the stop flag. The actual call to sys.settrace(None) will happen
+        # in the self._trace callback itself to make sure to call it from the
+        # right thread.
+        self.stopped = True
+
+        if self.threading and self.thread.ident != self.threading.currentThread().ident:
+            # Called on a different thread than started us: we can't unhook
+            # ourselves, but we've set the flag that we should stop, so we
+            # won't do any more tracing.
+            #self.log("~", "stopping on different threads")
+            return
+
+        if self.warn:
+            # PyPy clears the trace function before running atexit functions,
+            # so don't warn if we are in atexit on PyPy and the trace function
+            # has changed to None.
+            dont_warn = (env.PYPY and env.PYPYVERSION >= (5, 4) and self.in_atexit and tf is None)
+            if (not dont_warn) and tf != self._trace:   # pylint: disable=comparison-with-callable
+                self.warn(                              # pylint: disable=not-callable
+                    "Trace function changed, measurement is likely wrong: %r" % (tf,),
+                    slug="trace-changed",
+                )
+
+    def activity(self):
+        """Has there been any activity?"""
+        return self._activity
+
+    def reset_activity(self):
+        """Reset the activity() flag."""
+        self._activity = False
+
+    def get_stats(self):
+        """Return a dictionary of statistics, or None."""
+        return None

eric ide

mercurial