src/eric7/DebugClients/Python/coverage/debug.py

branch
eric7-maintenance
changeset 9264
18a7312cfdb3
parent 9252
32dd11232e06
equal deleted inserted replaced
9241:d23e9854aea4 9264:18a7312cfdb3
1 # Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
2 # For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
3
4 """Control of and utilities for debugging."""
5
6 import contextlib
7 import functools
8 import inspect
9 import io
10 import itertools
11 import os
12 import pprint
13 import reprlib
14 import sys
15 import types
16 import _thread
17
18 from coverage.misc import isolate_module
19
20 os = isolate_module(os)
21
22
23 # When debugging, it can be helpful to force some options, especially when
24 # debugging the configuration mechanisms you usually use to control debugging!
25 # This is a list of forced debugging options.
26 FORCED_DEBUG = []
27 FORCED_DEBUG_FILE = None
28
29
30 class DebugControl:
31 """Control and output for debugging."""
32
33 show_repr_attr = False # For SimpleReprMixin
34
35 def __init__(self, options, output):
36 """Configure the options and output file for debugging."""
37 self.options = list(options) + FORCED_DEBUG
38 self.suppress_callers = False
39
40 filters = []
41 if self.should('pid'):
42 filters.append(add_pid_and_tid)
43 self.output = DebugOutputFile.get_one(
44 output,
45 show_process=self.should('process'),
46 filters=filters,
47 )
48 self.raw_output = self.output.outfile
49
50 def __repr__(self):
51 return f"<DebugControl options={self.options!r} raw_output={self.raw_output!r}>"
52
53 def should(self, option):
54 """Decide whether to output debug information in category `option`."""
55 if option == "callers" and self.suppress_callers:
56 return False
57 return (option in self.options)
58
59 @contextlib.contextmanager
60 def without_callers(self):
61 """A context manager to prevent call stacks from being logged."""
62 old = self.suppress_callers
63 self.suppress_callers = True
64 try:
65 yield
66 finally:
67 self.suppress_callers = old
68
69 def write(self, msg):
70 """Write a line of debug output.
71
72 `msg` is the line to write. A newline will be appended.
73
74 """
75 self.output.write(msg+"\n")
76 if self.should('self'):
77 caller_self = inspect.stack()[1][0].f_locals.get('self')
78 if caller_self is not None:
79 self.output.write(f"self: {caller_self!r}\n")
80 if self.should('callers'):
81 dump_stack_frames(out=self.output, skip=1)
82 self.output.flush()
83
84
85 class DebugControlString(DebugControl):
86 """A `DebugControl` that writes to a StringIO, for testing."""
87 def __init__(self, options):
88 super().__init__(options, io.StringIO())
89
90 def get_output(self):
91 """Get the output text from the `DebugControl`."""
92 return self.raw_output.getvalue()
93
94
95 class NoDebugging:
96 """A replacement for DebugControl that will never try to do anything."""
97 def should(self, option): # pylint: disable=unused-argument
98 """Should we write debug messages? Never."""
99 return False
100
101
102 def info_header(label):
103 """Make a nice header string."""
104 return "--{:-<60s}".format(" "+label+" ")
105
106
107 def info_formatter(info):
108 """Produce a sequence of formatted lines from info.
109
110 `info` is a sequence of pairs (label, data). The produced lines are
111 nicely formatted, ready to print.
112
113 """
114 info = list(info)
115 if not info:
116 return
117 label_len = 30
118 assert all(len(l) < label_len for l, _ in info)
119 for label, data in info:
120 if data == []:
121 data = "-none-"
122 if isinstance(data, tuple) and len(repr(tuple(data))) < 30:
123 # Convert to tuple to scrub namedtuples.
124 yield "%*s: %r" % (label_len, label, tuple(data))
125 elif isinstance(data, (list, set, tuple)):
126 prefix = "%*s:" % (label_len, label)
127 for e in data:
128 yield "%*s %s" % (label_len+1, prefix, e)
129 prefix = ""
130 else:
131 yield "%*s: %s" % (label_len, label, data)
132
133
134 def write_formatted_info(write, header, info):
135 """Write a sequence of (label,data) pairs nicely.
136
137 `write` is a function write(str) that accepts each line of output.
138 `header` is a string to start the section. `info` is a sequence of
139 (label, data) pairs, where label is a str, and data can be a single
140 value, or a list/set/tuple.
141
142 """
143 write(info_header(header))
144 for line in info_formatter(info):
145 write(f" {line}")
146
147
148 def short_stack(limit=None, skip=0):
149 """Return a string summarizing the call stack.
150
151 The string is multi-line, with one line per stack frame. Each line shows
152 the function name, the file name, and the line number:
153
154 ...
155 start_import_stop : /Users/ned/coverage/trunk/tests/coveragetest.py @95
156 import_local_file : /Users/ned/coverage/trunk/tests/coveragetest.py @81
157 import_local_file : /Users/ned/coverage/trunk/coverage/backward.py @159
158 ...
159
160 `limit` is the number of frames to include, defaulting to all of them.
161
162 `skip` is the number of frames to skip, so that debugging functions can
163 call this and not be included in the result.
164
165 """
166 stack = inspect.stack()[limit:skip:-1]
167 return "\n".join("%30s : %s:%d" % (t[3], t[1], t[2]) for t in stack)
168
169
170 def dump_stack_frames(limit=None, out=None, skip=0):
171 """Print a summary of the stack to stdout, or someplace else."""
172 out = out or sys.stdout
173 out.write(short_stack(limit=limit, skip=skip+1))
174 out.write("\n")
175
176
177 def clipped_repr(text, numchars=50):
178 """`repr(text)`, but limited to `numchars`."""
179 r = reprlib.Repr()
180 r.maxstring = numchars
181 return r.repr(text)
182
183
184 def short_id(id64):
185 """Given a 64-bit id, make a shorter 16-bit one."""
186 id16 = 0
187 for offset in range(0, 64, 16):
188 id16 ^= id64 >> offset
189 return id16 & 0xFFFF
190
191
192 def add_pid_and_tid(text):
193 """A filter to add pid and tid to debug messages."""
194 # Thread ids are useful, but too long. Make a shorter one.
195 tid = f"{short_id(_thread.get_ident()):04x}"
196 text = f"{os.getpid():5d}.{tid}: {text}"
197 return text
198
199
200 class SimpleReprMixin:
201 """A mixin implementing a simple __repr__."""
202 simple_repr_ignore = ['simple_repr_ignore', '$coverage.object_id']
203
204 def __repr__(self):
205 show_attrs = (
206 (k, v) for k, v in self.__dict__.items()
207 if getattr(v, "show_repr_attr", True)
208 and not callable(v)
209 and k not in self.simple_repr_ignore
210 )
211 return "<{klass} @0x{id:x} {attrs}>".format(
212 klass=self.__class__.__name__,
213 id=id(self),
214 attrs=" ".join(f"{k}={v!r}" for k, v in show_attrs),
215 )
216
217
218 def simplify(v): # pragma: debugging
219 """Turn things which are nearly dict/list/etc into dict/list/etc."""
220 if isinstance(v, dict):
221 return {k:simplify(vv) for k, vv in v.items()}
222 elif isinstance(v, (list, tuple)):
223 return type(v)(simplify(vv) for vv in v)
224 elif hasattr(v, "__dict__"):
225 return simplify({'.'+k: v for k, v in v.__dict__.items()})
226 else:
227 return v
228
229
230 def pp(v): # pragma: debugging
231 """Debug helper to pretty-print data, including SimpleNamespace objects."""
232 # Might not be needed in 3.9+
233 pprint.pprint(simplify(v))
234
235
236 def filter_text(text, filters):
237 """Run `text` through a series of filters.
238
239 `filters` is a list of functions. Each takes a string and returns a
240 string. Each is run in turn.
241
242 Returns: the final string that results after all of the filters have
243 run.
244
245 """
246 clean_text = text.rstrip()
247 ending = text[len(clean_text):]
248 text = clean_text
249 for fn in filters:
250 lines = []
251 for line in text.splitlines():
252 lines.extend(fn(line).splitlines())
253 text = "\n".join(lines)
254 return text + ending
255
256
257 class CwdTracker: # pragma: debugging
258 """A class to add cwd info to debug messages."""
259 def __init__(self):
260 self.cwd = None
261
262 def filter(self, text):
263 """Add a cwd message for each new cwd."""
264 cwd = os.getcwd()
265 if cwd != self.cwd:
266 text = f"cwd is now {cwd!r}\n" + text
267 self.cwd = cwd
268 return text
269
270
271 class DebugOutputFile: # pragma: debugging
272 """A file-like object that includes pid and cwd information."""
273 def __init__(self, outfile, show_process, filters):
274 self.outfile = outfile
275 self.show_process = show_process
276 self.filters = list(filters)
277
278 if self.show_process:
279 self.filters.insert(0, CwdTracker().filter)
280 self.write(f"New process: executable: {sys.executable!r}\n")
281 self.write("New process: cmd: {!r}\n".format(getattr(sys, 'argv', None)))
282 if hasattr(os, 'getppid'):
283 self.write(f"New process: pid: {os.getpid()!r}, parent pid: {os.getppid()!r}\n")
284
285 SYS_MOD_NAME = '$coverage.debug.DebugOutputFile.the_one'
286 SINGLETON_ATTR = 'the_one_and_is_interim'
287
288 @classmethod
289 def get_one(cls, fileobj=None, show_process=True, filters=(), interim=False):
290 """Get a DebugOutputFile.
291
292 If `fileobj` is provided, then a new DebugOutputFile is made with it.
293
294 If `fileobj` isn't provided, then a file is chosen
295 (COVERAGE_DEBUG_FILE, or stderr), and a process-wide singleton
296 DebugOutputFile is made.
297
298 `show_process` controls whether the debug file adds process-level
299 information, and filters is a list of other message filters to apply.
300
301 `filters` are the text filters to apply to the stream to annotate with
302 pids, etc.
303
304 If `interim` is true, then a future `get_one` can replace this one.
305
306 """
307 if fileobj is not None:
308 # Make DebugOutputFile around the fileobj passed.
309 return cls(fileobj, show_process, filters)
310
311 # Because of the way igor.py deletes and re-imports modules,
312 # this class can be defined more than once. But we really want
313 # a process-wide singleton. So stash it in sys.modules instead of
314 # on a class attribute. Yes, this is aggressively gross.
315 singleton_module = sys.modules.get(cls.SYS_MOD_NAME)
316 the_one, is_interim = getattr(singleton_module, cls.SINGLETON_ATTR, (None, True))
317 if the_one is None or is_interim:
318 if fileobj is None:
319 debug_file_name = os.environ.get("COVERAGE_DEBUG_FILE", FORCED_DEBUG_FILE)
320 if debug_file_name in ("stdout", "stderr"):
321 fileobj = getattr(sys, debug_file_name)
322 elif debug_file_name:
323 fileobj = open(debug_file_name, "a")
324 else:
325 fileobj = sys.stderr
326 the_one = cls(fileobj, show_process, filters)
327 singleton_module = types.ModuleType(cls.SYS_MOD_NAME)
328 setattr(singleton_module, cls.SINGLETON_ATTR, (the_one, interim))
329 sys.modules[cls.SYS_MOD_NAME] = singleton_module
330 return the_one
331
332 def write(self, text):
333 """Just like file.write, but filter through all our filters."""
334 self.outfile.write(filter_text(text, self.filters))
335 self.outfile.flush()
336
337 def flush(self):
338 """Flush our file."""
339 self.outfile.flush()
340
341
342 def log(msg, stack=False): # pragma: debugging
343 """Write a log message as forcefully as possible."""
344 out = DebugOutputFile.get_one(interim=True)
345 out.write(msg+"\n")
346 if stack:
347 dump_stack_frames(out=out, skip=1)
348
349
350 def decorate_methods(decorator, butnot=(), private=False): # pragma: debugging
351 """A class decorator to apply a decorator to methods."""
352 def _decorator(cls):
353 for name, meth in inspect.getmembers(cls, inspect.isroutine):
354 if name not in cls.__dict__:
355 continue
356 if name != "__init__":
357 if not private and name.startswith("_"):
358 continue
359 if name in butnot:
360 continue
361 setattr(cls, name, decorator(meth))
362 return cls
363 return _decorator
364
365
366 def break_in_pudb(func): # pragma: debugging
367 """A function decorator to stop in the debugger for each call."""
368 @functools.wraps(func)
369 def _wrapper(*args, **kwargs):
370 import pudb
371 sys.stdout = sys.__stdout__
372 pudb.set_trace()
373 return func(*args, **kwargs)
374 return _wrapper
375
376
377 OBJ_IDS = itertools.count()
378 CALLS = itertools.count()
379 OBJ_ID_ATTR = "$coverage.object_id"
380
381 def show_calls(show_args=True, show_stack=False, show_return=False): # pragma: debugging
382 """A method decorator to debug-log each call to the function."""
383 def _decorator(func):
384 @functools.wraps(func)
385 def _wrapper(self, *args, **kwargs):
386 oid = getattr(self, OBJ_ID_ATTR, None)
387 if oid is None:
388 oid = f"{os.getpid():08d} {next(OBJ_IDS):04d}"
389 setattr(self, OBJ_ID_ATTR, oid)
390 extra = ""
391 if show_args:
392 eargs = ", ".join(map(repr, args))
393 ekwargs = ", ".join("{}={!r}".format(*item) for item in kwargs.items())
394 extra += "("
395 extra += eargs
396 if eargs and ekwargs:
397 extra += ", "
398 extra += ekwargs
399 extra += ")"
400 if show_stack:
401 extra += " @ "
402 extra += "; ".join(_clean_stack_line(l) for l in short_stack().splitlines())
403 callid = next(CALLS)
404 msg = f"{oid} {callid:04d} {func.__name__}{extra}\n"
405 DebugOutputFile.get_one(interim=True).write(msg)
406 ret = func(self, *args, **kwargs)
407 if show_return:
408 msg = f"{oid} {callid:04d} {func.__name__} return {ret!r}\n"
409 DebugOutputFile.get_one(interim=True).write(msg)
410 return ret
411 return _wrapper
412 return _decorator
413
414
415 def _clean_stack_line(s): # pragma: debugging
416 """Simplify some paths in a stack trace, for compactness."""
417 s = s.strip()
418 s = s.replace(os.path.dirname(__file__) + '/', '')
419 s = s.replace(os.path.dirname(os.__file__) + '/', '')
420 s = s.replace(sys.prefix + '/', '')
421 return s

eric ide

mercurial