diff -r dc171b1d8261 -r 362cd1b6f81a eric6/DebugClients/Python/coverage/debug.py --- a/eric6/DebugClients/Python/coverage/debug.py Wed Feb 19 19:38:36 2020 +0100 +++ b/eric6/DebugClients/Python/coverage/debug.py Sat Feb 22 14:27:42 2020 +0100 @@ -1,19 +1,21 @@ # Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 -# For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt +# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt """Control of and utilities for debugging.""" import contextlib +import functools import inspect +import itertools import os -import re +import pprint import sys try: import _thread except ImportError: import thread as _thread -from coverage.backward import StringIO +from coverage.backward import reprlib, StringIO from coverage.misc import isolate_module os = isolate_module(os) @@ -23,28 +25,28 @@ # debugging the configuration mechanisms you usually use to control debugging! # This is a list of forced debugging options. FORCED_DEBUG = [] - -# A hack for debugging testing in sub-processes. -_TEST_NAME_FILE = "" # "/tmp/covtest.txt" +FORCED_DEBUG_FILE = None class DebugControl(object): """Control and output for debugging.""" + show_repr_attr = False # For SimpleReprMixin + def __init__(self, options, output): """Configure the options and output file for debugging.""" self.options = list(options) + FORCED_DEBUG - self.raw_output = output self.suppress_callers = False filters = [] if self.should('pid'): filters.append(add_pid_and_tid) - self.output = DebugOutputFile( - self.raw_output, + self.output = DebugOutputFile.get_one( + output, show_process=self.should('process'), filters=filters, ) + self.raw_output = self.output.outfile def __repr__(self): return "<DebugControl options=%r raw_output=%r>" % (self.options, self.raw_output) @@ -72,6 +74,10 @@ """ self.output.write(msg+"\n") + if self.should('self'): + caller_self = inspect.stack()[1][0].f_locals.get('self') + if caller_self is not None: + self.output.write("self: {!r}\n".format(caller_self)) if self.should('callers'): dump_stack_frames(out=self.output, skip=1) self.output.flush() @@ -87,9 +93,16 @@ return self.raw_output.getvalue() +class NoDebugging(object): + """A replacement for DebugControl that will never try to do anything.""" + def should(self, option): # pylint: disable=unused-argument + """Should we write debug messages? Never.""" + return False + + def info_header(label): """Make a nice header string.""" - return "--{0:-<60s}".format(" "+label+" ") + return "--{:-<60s}".format(" "+label+" ") def info_formatter(info): @@ -102,7 +115,8 @@ info = list(info) if not info: return - label_len = max(len(l) for l, _d in info) + label_len = 30 + assert all(len(l) < label_len for l, _ in info) for label, data in info: if data == []: data = "-none-" @@ -141,7 +155,7 @@ """ stack = inspect.stack()[limit:skip:-1] - return "\n".join("%30s : %s @%d" % (t[3], t[1], t[2]) for t in stack) + return "\n".join("%30s : %s:%d" % (t[3], t[1], t[2]) for t in stack) def dump_stack_frames(limit=None, out=None, skip=0): @@ -151,6 +165,13 @@ out.write("\n") +def clipped_repr(text, numchars=50): + """`repr(text)`, but limited to `numchars`.""" + r = reprlib.Repr() + r.maxstring = numchars + return r.repr(text) + + def short_id(id64): """Given a 64-bit id, make a shorter 16-bit one.""" id16 = 0 @@ -162,11 +183,47 @@ def add_pid_and_tid(text): """A filter to add pid and tid to debug messages.""" # Thread ids are useful, but too long. Make a shorter one. - tid = "{0:04x}".format(short_id(_thread.get_ident())) - text = "{0:5d}.{1}: {2}".format(os.getpid(), tid, text) + tid = "{:04x}".format(short_id(_thread.get_ident())) + text = "{:5d}.{}: {}".format(os.getpid(), tid, text) return text +class SimpleReprMixin(object): + """A mixin implementing a simple __repr__.""" + simple_repr_ignore = ['simple_repr_ignore', '$coverage.object_id'] + + def __repr__(self): + show_attrs = ( + (k, v) for k, v in self.__dict__.items() + if getattr(v, "show_repr_attr", True) + and not callable(v) + and k not in self.simple_repr_ignore + ) + return "<{klass} @0x{id:x} {attrs}>".format( + klass=self.__class__.__name__, + id=id(self), + attrs=" ".join("{}={!r}".format(k, v) for k, v in show_attrs), + ) + + +def simplify(v): # pragma: debugging + """Turn things which are nearly dict/list/etc into dict/list/etc.""" + if isinstance(v, dict): + return {k:simplify(vv) for k, vv in v.items()} + elif isinstance(v, (list, tuple)): + return type(v)(simplify(vv) for vv in v) + elif hasattr(v, "__dict__"): + return simplify({'.'+k: v for k, v in v.__dict__.items()}) + else: + return v + + +def pp(v): # pragma: debugging + """Debug helper to pretty-print data, including SimpleNamespace objects.""" + # Might not be needed in 3.9+ + pprint.pprint(simplify(v)) + + def filter_text(text, filters): """Run `text` through a series of filters. @@ -197,7 +254,7 @@ """Add a cwd message for each new cwd.""" cwd = os.getcwd() if cwd != self.cwd: - text = "cwd is now {0!r}\n".format(cwd) + text + text = "cwd is now {!r}\n".format(cwd) + text self.cwd = cwd return text @@ -210,32 +267,51 @@ self.filters = list(filters) if self.show_process: - self.filters.append(CwdTracker().filter) - cmd = " ".join(getattr(sys, 'argv', ['???'])) - self.write("New process: executable: %s\n" % (sys.executable,)) - self.write("New process: cmd: %s\n" % (cmd,)) + self.filters.insert(0, CwdTracker().filter) + self.write("New process: executable: %r\n" % (sys.executable,)) + self.write("New process: cmd: %r\n" % (getattr(sys, 'argv', None),)) if hasattr(os, 'getppid'): - self.write("New process: parent pid: %s\n" % (os.getppid(),)) + self.write("New process: pid: %r, parent pid: %r\n" % (os.getpid(), os.getppid())) SYS_MOD_NAME = '$coverage.debug.DebugOutputFile.the_one' @classmethod - def the_one(cls, fileobj=None, show_process=True, filters=()): - """Get the process-wide singleton DebugOutputFile. + def get_one(cls, fileobj=None, show_process=True, filters=(), interim=False): + """Get a DebugOutputFile. + + If `fileobj` is provided, then a new DebugOutputFile is made with it. - If it doesn't exist yet, then create it as a wrapper around the file - object `fileobj`. `show_process` controls whether the debug file adds - process-level information. + If `fileobj` isn't provided, then a file is chosen + (COVERAGE_DEBUG_FILE, or stderr), and a process-wide singleton + DebugOutputFile is made. + + `show_process` controls whether the debug file adds process-level + information, and filters is a list of other message filters to apply. + + `filters` are the text filters to apply to the stream to annotate with + pids, etc. + + If `interim` is true, then a future `get_one` can replace this one. """ + if fileobj is not None: + # Make DebugOutputFile around the fileobj passed. + return cls(fileobj, show_process, filters) + # Because of the way igor.py deletes and re-imports modules, # this class can be defined more than once. But we really want # a process-wide singleton. So stash it in sys.modules instead of # on a class attribute. Yes, this is aggressively gross. - the_one = sys.modules.get(cls.SYS_MOD_NAME) - if the_one is None: - assert fileobj is not None - sys.modules[cls.SYS_MOD_NAME] = the_one = cls(fileobj, show_process, filters) + the_one, is_interim = sys.modules.get(cls.SYS_MOD_NAME, (None, True)) + if the_one is None or is_interim: + if fileobj is None: + debug_file_name = os.environ.get("COVERAGE_DEBUG_FILE", FORCED_DEBUG_FILE) + if debug_file_name: + fileobj = open(debug_file_name, "a") + else: + fileobj = sys.stderr + the_one = cls(fileobj, show_process, filters) + sys.modules[cls.SYS_MOD_NAME] = (the_one, interim) return the_one def write(self, text): @@ -250,46 +326,81 @@ def log(msg, stack=False): # pragma: debugging """Write a log message as forcefully as possible.""" - out = DebugOutputFile.the_one() + out = DebugOutputFile.get_one(interim=True) out.write(msg+"\n") if stack: dump_stack_frames(out=out, skip=1) -def filter_aspectlib_frames(text): # pragma: debugging - """Aspectlib prints stack traces, but includes its own frames. Scrub those out.""" - # <<< aspectlib/__init__.py:257:function_wrapper < igor.py:143:run_tests < ... - text = re.sub(r"(?<= )aspectlib/[^.]+\.py:\d+:\w+ < ", "", text) - return text +def decorate_methods(decorator, butnot=(), private=False): # pragma: debugging + """A class decorator to apply a decorator to methods.""" + def _decorator(cls): + for name, meth in inspect.getmembers(cls, inspect.isroutine): + if name not in cls.__dict__: + continue + if name != "__init__": + if not private and name.startswith("_"): + continue + if name in butnot: + continue + setattr(cls, name, decorator(meth)) + return cls + return _decorator + + +def break_in_pudb(func): # pragma: debugging + """A function decorator to stop in the debugger for each call.""" + @functools.wraps(func) + def _wrapper(*args, **kwargs): + import pudb + sys.stdout = sys.__stdout__ + pudb.set_trace() + return func(*args, **kwargs) + return _wrapper -def enable_aspectlib_maybe(): # pragma: debugging - """For debugging, we can use aspectlib to trace execution. - - Define COVERAGE_ASPECTLIB to enable and configure aspectlib to trace - execution:: - - $ export COVERAGE_LOG=covaspect.txt - $ export COVERAGE_ASPECTLIB=coverage.Coverage:coverage.data.CoverageData - $ coverage run blah.py ... - - This will trace all the public methods on Coverage and CoverageData, - writing the information to covaspect.txt. +OBJ_IDS = itertools.count() +CALLS = itertools.count() +OBJ_ID_ATTR = "$coverage.object_id" - """ - aspects = os.environ.get("COVERAGE_ASPECTLIB", "") - if not aspects: - return - - import aspectlib # pylint: disable=import-error - import aspectlib.debug # pylint: disable=import-error +def show_calls(show_args=True, show_stack=False, show_return=False): # pragma: debugging + """A method decorator to debug-log each call to the function.""" + def _decorator(func): + @functools.wraps(func) + def _wrapper(self, *args, **kwargs): + oid = getattr(self, OBJ_ID_ATTR, None) + if oid is None: + oid = "{:08d} {:04d}".format(os.getpid(), next(OBJ_IDS)) + setattr(self, OBJ_ID_ATTR, oid) + extra = "" + if show_args: + eargs = ", ".join(map(repr, args)) + ekwargs = ", ".join("{}={!r}".format(*item) for item in kwargs.items()) + extra += "(" + extra += eargs + if eargs and ekwargs: + extra += ", " + extra += ekwargs + extra += ")" + if show_stack: + extra += " @ " + extra += "; ".join(_clean_stack_line(l) for l in short_stack().splitlines()) + callid = next(CALLS) + msg = "{} {:04d} {}{}\n".format(oid, callid, func.__name__, extra) + DebugOutputFile.get_one(interim=True).write(msg) + ret = func(self, *args, **kwargs) + if show_return: + msg = "{} {:04d} {} return {!r}\n".format(oid, callid, func.__name__, ret) + DebugOutputFile.get_one(interim=True).write(msg) + return ret + return _wrapper + return _decorator - filename = os.environ.get("COVERAGE_LOG", "/tmp/covlog.txt") - filters = [add_pid_and_tid, filter_aspectlib_frames] - aspects_file = DebugOutputFile.the_one(open(filename, "a"), show_process=True, filters=filters) - aspect_log = aspectlib.debug.log( - print_to=aspects_file, attributes=['id'], stacktrace=30, use_logging=False - ) - public_methods = re.compile(r'^(__init__|[a-zA-Z].*)$') - for aspect in aspects.split(':'): - aspectlib.weave(aspect, aspect_log, methods=public_methods) + +def _clean_stack_line(s): # pragma: debugging + """Simplify some paths in a stack trace, for compactness.""" + s = s.strip() + s = s.replace(os.path.dirname(__file__) + '/', '') + s = s.replace(os.path.dirname(os.__file__) + '/', '') + s = s.replace(sys.prefix + '/', '') + return s