eric6/DebugClients/Python/coverage/debug.py

changeset 7427
362cd1b6f81a
parent 6942
2602857055c5
diff -r dc171b1d8261 -r 362cd1b6f81a eric6/DebugClients/Python/coverage/debug.py
--- a/eric6/DebugClients/Python/coverage/debug.py	Wed Feb 19 19:38:36 2020 +0100
+++ b/eric6/DebugClients/Python/coverage/debug.py	Sat Feb 22 14:27:42 2020 +0100
@@ -1,19 +1,21 @@
 # Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
-# For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt
+# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
 
 """Control of and utilities for debugging."""
 
 import contextlib
+import functools
 import inspect
+import itertools
 import os
-import re
+import pprint
 import sys
 try:
     import _thread
 except ImportError:
     import thread as _thread
 
-from coverage.backward import StringIO
+from coverage.backward import reprlib, StringIO
 from coverage.misc import isolate_module
 
 os = isolate_module(os)
@@ -23,28 +25,28 @@
 # debugging the configuration mechanisms you usually use to control debugging!
 # This is a list of forced debugging options.
 FORCED_DEBUG = []
-
-# A hack for debugging testing in sub-processes.
-_TEST_NAME_FILE = ""    # "/tmp/covtest.txt"
+FORCED_DEBUG_FILE = None
 
 
 class DebugControl(object):
     """Control and output for debugging."""
 
+    show_repr_attr = False      # For SimpleReprMixin
+
     def __init__(self, options, output):
         """Configure the options and output file for debugging."""
         self.options = list(options) + FORCED_DEBUG
-        self.raw_output = output
         self.suppress_callers = False
 
         filters = []
         if self.should('pid'):
             filters.append(add_pid_and_tid)
-        self.output = DebugOutputFile(
-            self.raw_output,
+        self.output = DebugOutputFile.get_one(
+            output,
             show_process=self.should('process'),
             filters=filters,
         )
+        self.raw_output = self.output.outfile
 
     def __repr__(self):
         return "<DebugControl options=%r raw_output=%r>" % (self.options, self.raw_output)
@@ -72,6 +74,10 @@
 
         """
         self.output.write(msg+"\n")
+        if self.should('self'):
+            caller_self = inspect.stack()[1][0].f_locals.get('self')
+            if caller_self is not None:
+                self.output.write("self: {!r}\n".format(caller_self))
         if self.should('callers'):
             dump_stack_frames(out=self.output, skip=1)
         self.output.flush()
@@ -87,9 +93,16 @@
         return self.raw_output.getvalue()
 
 
+class NoDebugging(object):
+    """A replacement for DebugControl that will never try to do anything."""
+    def should(self, option):               # pylint: disable=unused-argument
+        """Should we write debug messages?  Never."""
+        return False
+
+
 def info_header(label):
     """Make a nice header string."""
-    return "--{0:-<60s}".format(" "+label+" ")
+    return "--{:-<60s}".format(" "+label+" ")
 
 
 def info_formatter(info):
@@ -102,7 +115,8 @@
     info = list(info)
     if not info:
         return
-    label_len = max(len(l) for l, _d in info)
+    label_len = 30
+    assert all(len(l) < label_len for l, _ in info)
     for label, data in info:
         if data == []:
             data = "-none-"
@@ -141,7 +155,7 @@
 
     """
     stack = inspect.stack()[limit:skip:-1]
-    return "\n".join("%30s : %s @%d" % (t[3], t[1], t[2]) for t in stack)
+    return "\n".join("%30s : %s:%d" % (t[3], t[1], t[2]) for t in stack)
 
 
 def dump_stack_frames(limit=None, out=None, skip=0):
@@ -151,6 +165,13 @@
     out.write("\n")
 
 
+def clipped_repr(text, numchars=50):
+    """`repr(text)`, but limited to `numchars`."""
+    r = reprlib.Repr()
+    r.maxstring = numchars
+    return r.repr(text)
+
+
 def short_id(id64):
     """Given a 64-bit id, make a shorter 16-bit one."""
     id16 = 0
@@ -162,11 +183,47 @@
 def add_pid_and_tid(text):
     """A filter to add pid and tid to debug messages."""
     # Thread ids are useful, but too long. Make a shorter one.
-    tid = "{0:04x}".format(short_id(_thread.get_ident()))
-    text = "{0:5d}.{1}: {2}".format(os.getpid(), tid, text)
+    tid = "{:04x}".format(short_id(_thread.get_ident()))
+    text = "{:5d}.{}: {}".format(os.getpid(), tid, text)
     return text
 
 
+class SimpleReprMixin(object):
+    """A mixin implementing a simple __repr__."""
+    simple_repr_ignore = ['simple_repr_ignore', '$coverage.object_id']
+
+    def __repr__(self):
+        show_attrs = (
+            (k, v) for k, v in self.__dict__.items()
+            if getattr(v, "show_repr_attr", True)
+            and not callable(v)
+            and k not in self.simple_repr_ignore
+        )
+        return "<{klass} @0x{id:x} {attrs}>".format(
+            klass=self.__class__.__name__,
+            id=id(self),
+            attrs=" ".join("{}={!r}".format(k, v) for k, v in show_attrs),
+            )
+
+
+def simplify(v):                                            # pragma: debugging
+    """Turn things which are nearly dict/list/etc into dict/list/etc."""
+    if isinstance(v, dict):
+        return {k:simplify(vv) for k, vv in v.items()}
+    elif isinstance(v, (list, tuple)):
+        return type(v)(simplify(vv) for vv in v)
+    elif hasattr(v, "__dict__"):
+        return simplify({'.'+k: v for k, v in v.__dict__.items()})
+    else:
+        return v
+
+
+def pp(v):                                                  # pragma: debugging
+    """Debug helper to pretty-print data, including SimpleNamespace objects."""
+    # Might not be needed in 3.9+
+    pprint.pprint(simplify(v))
+
+
 def filter_text(text, filters):
     """Run `text` through a series of filters.
 
@@ -197,7 +254,7 @@
         """Add a cwd message for each new cwd."""
         cwd = os.getcwd()
         if cwd != self.cwd:
-            text = "cwd is now {0!r}\n".format(cwd) + text
+            text = "cwd is now {!r}\n".format(cwd) + text
             self.cwd = cwd
         return text
 
@@ -210,32 +267,51 @@
         self.filters = list(filters)
 
         if self.show_process:
-            self.filters.append(CwdTracker().filter)
-            cmd = " ".join(getattr(sys, 'argv', ['???']))
-            self.write("New process: executable: %s\n" % (sys.executable,))
-            self.write("New process: cmd: %s\n" % (cmd,))
+            self.filters.insert(0, CwdTracker().filter)
+            self.write("New process: executable: %r\n" % (sys.executable,))
+            self.write("New process: cmd: %r\n" % (getattr(sys, 'argv', None),))
             if hasattr(os, 'getppid'):
-                self.write("New process: parent pid: %s\n" % (os.getppid(),))
+                self.write("New process: pid: %r, parent pid: %r\n" % (os.getpid(), os.getppid()))
 
     SYS_MOD_NAME = '$coverage.debug.DebugOutputFile.the_one'
 
     @classmethod
-    def the_one(cls, fileobj=None, show_process=True, filters=()):
-        """Get the process-wide singleton DebugOutputFile.
+    def get_one(cls, fileobj=None, show_process=True, filters=(), interim=False):
+        """Get a DebugOutputFile.
+
+        If `fileobj` is provided, then a new DebugOutputFile is made with it.
 
-        If it doesn't exist yet, then create it as a wrapper around the file
-        object `fileobj`. `show_process` controls whether the debug file adds
-        process-level information.
+        If `fileobj` isn't provided, then a file is chosen
+        (COVERAGE_DEBUG_FILE, or stderr), and a process-wide singleton
+        DebugOutputFile is made.
+
+        `show_process` controls whether the debug file adds process-level
+        information, and filters is a list of other message filters to apply.
+
+        `filters` are the text filters to apply to the stream to annotate with
+        pids, etc.
+
+        If `interim` is true, then a future `get_one` can replace this one.
 
         """
+        if fileobj is not None:
+            # Make DebugOutputFile around the fileobj passed.
+            return cls(fileobj, show_process, filters)
+
         # Because of the way igor.py deletes and re-imports modules,
         # this class can be defined more than once. But we really want
         # a process-wide singleton. So stash it in sys.modules instead of
         # on a class attribute. Yes, this is aggressively gross.
-        the_one = sys.modules.get(cls.SYS_MOD_NAME)
-        if the_one is None:
-            assert fileobj is not None
-            sys.modules[cls.SYS_MOD_NAME] = the_one = cls(fileobj, show_process, filters)
+        the_one, is_interim = sys.modules.get(cls.SYS_MOD_NAME, (None, True))
+        if the_one is None or is_interim:
+            if fileobj is None:
+                debug_file_name = os.environ.get("COVERAGE_DEBUG_FILE", FORCED_DEBUG_FILE)
+                if debug_file_name:
+                    fileobj = open(debug_file_name, "a")
+                else:
+                    fileobj = sys.stderr
+            the_one = cls(fileobj, show_process, filters)
+            sys.modules[cls.SYS_MOD_NAME] = (the_one, interim)
         return the_one
 
     def write(self, text):
@@ -250,46 +326,81 @@
 
 def log(msg, stack=False):                                  # pragma: debugging
     """Write a log message as forcefully as possible."""
-    out = DebugOutputFile.the_one()
+    out = DebugOutputFile.get_one(interim=True)
     out.write(msg+"\n")
     if stack:
         dump_stack_frames(out=out, skip=1)
 
 
-def filter_aspectlib_frames(text):                          # pragma: debugging
-    """Aspectlib prints stack traces, but includes its own frames.  Scrub those out."""
-    # <<< aspectlib/__init__.py:257:function_wrapper < igor.py:143:run_tests < ...
-    text = re.sub(r"(?<= )aspectlib/[^.]+\.py:\d+:\w+ < ", "", text)
-    return text
+def decorate_methods(decorator, butnot=(), private=False):  # pragma: debugging
+    """A class decorator to apply a decorator to methods."""
+    def _decorator(cls):
+        for name, meth in inspect.getmembers(cls, inspect.isroutine):
+            if name not in cls.__dict__:
+                continue
+            if name != "__init__":
+                if not private and name.startswith("_"):
+                    continue
+            if name in butnot:
+                continue
+            setattr(cls, name, decorator(meth))
+        return cls
+    return _decorator
+
+
+def break_in_pudb(func):                                    # pragma: debugging
+    """A function decorator to stop in the debugger for each call."""
+    @functools.wraps(func)
+    def _wrapper(*args, **kwargs):
+        import pudb
+        sys.stdout = sys.__stdout__
+        pudb.set_trace()
+        return func(*args, **kwargs)
+    return _wrapper
 
 
-def enable_aspectlib_maybe():                               # pragma: debugging
-    """For debugging, we can use aspectlib to trace execution.
-
-    Define COVERAGE_ASPECTLIB to enable and configure aspectlib to trace
-    execution::
-
-        $ export COVERAGE_LOG=covaspect.txt
-        $ export COVERAGE_ASPECTLIB=coverage.Coverage:coverage.data.CoverageData
-        $ coverage run blah.py ...
-
-    This will trace all the public methods on Coverage and CoverageData,
-    writing the information to covaspect.txt.
+OBJ_IDS = itertools.count()
+CALLS = itertools.count()
+OBJ_ID_ATTR = "$coverage.object_id"
 
-    """
-    aspects = os.environ.get("COVERAGE_ASPECTLIB", "")
-    if not aspects:
-        return
-
-    import aspectlib                            # pylint: disable=import-error
-    import aspectlib.debug                      # pylint: disable=import-error
+def show_calls(show_args=True, show_stack=False, show_return=False):    # pragma: debugging
+    """A method decorator to debug-log each call to the function."""
+    def _decorator(func):
+        @functools.wraps(func)
+        def _wrapper(self, *args, **kwargs):
+            oid = getattr(self, OBJ_ID_ATTR, None)
+            if oid is None:
+                oid = "{:08d} {:04d}".format(os.getpid(), next(OBJ_IDS))
+                setattr(self, OBJ_ID_ATTR, oid)
+            extra = ""
+            if show_args:
+                eargs = ", ".join(map(repr, args))
+                ekwargs = ", ".join("{}={!r}".format(*item) for item in kwargs.items())
+                extra += "("
+                extra += eargs
+                if eargs and ekwargs:
+                    extra += ", "
+                extra += ekwargs
+                extra += ")"
+            if show_stack:
+                extra += " @ "
+                extra += "; ".join(_clean_stack_line(l) for l in short_stack().splitlines())
+            callid = next(CALLS)
+            msg = "{} {:04d} {}{}\n".format(oid, callid, func.__name__, extra)
+            DebugOutputFile.get_one(interim=True).write(msg)
+            ret = func(self, *args, **kwargs)
+            if show_return:
+                msg = "{} {:04d} {} return {!r}\n".format(oid, callid, func.__name__, ret)
+                DebugOutputFile.get_one(interim=True).write(msg)
+            return ret
+        return _wrapper
+    return _decorator
 
-    filename = os.environ.get("COVERAGE_LOG", "/tmp/covlog.txt")
-    filters = [add_pid_and_tid, filter_aspectlib_frames]
-    aspects_file = DebugOutputFile.the_one(open(filename, "a"), show_process=True, filters=filters)
-    aspect_log = aspectlib.debug.log(
-        print_to=aspects_file, attributes=['id'], stacktrace=30, use_logging=False
-    )
-    public_methods = re.compile(r'^(__init__|[a-zA-Z].*)$')
-    for aspect in aspects.split(':'):
-        aspectlib.weave(aspect, aspect_log, methods=public_methods)
+
+def _clean_stack_line(s):                                   # pragma: debugging
+    """Simplify some paths in a stack trace, for compactness."""
+    s = s.strip()
+    s = s.replace(os.path.dirname(__file__) + '/', '')
+    s = s.replace(os.path.dirname(os.__file__) + '/', '')
+    s = s.replace(sys.prefix + '/', '')
+    return s

eric ide

mercurial