Sun, 24 Jul 2016 21:34:54 +0200
Improvements on eric's set_trace and set_continue if not in debugging mode.
# -*- coding: utf-8 -*- # Copyright (c) 2009 - 2016 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the debug base class. """ import sys import bdb import os import atexit import inspect import ctypes import _thread import time from inspect import CO_GENERATOR from DebugProtocol import ResponseClearWatch, ResponseClearBreak, \ ResponseLine, ResponseSyntax, ResponseException, CallTrace from DebugUtilities import getargvalues, formatargvalues from BreakpointWatch import Breakpoint, Watch gRecursionLimit = 64 def printerr(s): """ Module function used for debugging the debug client. @param s data to be printed """ sys.__stderr__.write('{0!s}\n'.format(s)) sys.__stderr__.flush() def setRecursionLimit(limit): """ Module function to set the recursion limit. @param limit recursion limit (integer) """ global gRecursionLimit gRecursionLimit = limit class DebugBase(bdb.Bdb): """ Class implementing base class of the debugger. Provides simple wrapper methods around bdb for the 'owning' client to call to step etc. """ # Don't thrust distutils.sysconfig.get_python_lib: possible case mismatch # on Windows lib = os.path.dirname(bdb.__file__) # tuple required because it's accessed a lot of times by startswith method pathsToSkip = ('<', os.path.dirname(__file__), bdb.__file__[:-1]) filesToSkip = {} # cache for fixed file names _fnCache = {} def __init__(self, dbgClient): """ Constructor @param dbgClient the owning client """ bdb.Bdb.__init__(self) self._dbgClient = dbgClient self._mainThread = True self.tracePythonLibs(0) # Special handling of a recursion error self.skipFrames = 0 self.__isBroken = False self.cFrame = None # current frame we are at self.currentFrame = None # frame that we are stepping in, can be different than currentFrame self.stepFrame = None # provide a hook to perform a hard breakpoint # Use it like this: # if hasattr(sys, 'breakpoint): sys.breakpoint() sys.breakpoint = self.set_trace # initialize parent bdb.Bdb.reset(self) self.__recursionDepth = -1 self.setRecursionDepth(inspect.currentframe()) # background task to periodicaly check for client interactions self.eventPollFlag = False self.timer = _thread.start_new_thread(self.__eventPollTimer, ()) def __eventPollTimer(self): """ Private method to set a flag every 0.5 s to check for new messages. """ while True: time.sleep(0.5) self.eventPollFlag = True def getCurrentFrame(self): """ Public method to return the current frame. @return the current frame """ return self.currentFrame def getFrameLocals(self, frmnr=0): """ Public method to return the locals dictionary of the current frame or a frame below. @keyparam frmnr distance of frame to get locals dictionary of. 0 is the current frame (int) @return locals dictionary of the frame """ f = self.currentFrame while f is not None and frmnr > 0: f = f.f_back frmnr -= 1 return f.f_locals def storeFrameLocals(self, frmnr=0): """ Public method to store the locals into the frame, so an access to frame.f_locals returns the last data. @keyparam frmnr distance of frame to store locals dictionary to. 0 is the current frame (int) """ cf = self.currentFrame while cf is not None and frmnr > 0: cf = cf.f_back frmnr -= 1 ctypes.pythonapi.PyFrame_LocalsToFast( ctypes.py_object(cf), ctypes.c_int(0)) def step(self, traceMode): """ Public method to perform a step operation in this thread. @param traceMode If it is True, then the step is a step into, otherwise it is a step over. """ self.stepFrame = self.currentFrame if traceMode: self.currentFrame = None self.set_step() else: self.set_next(self.currentFrame) def stepOut(self): """ Public method to perform a step out of the current call. """ self.stepFrame = self.currentFrame self.set_return(self.currentFrame) def go(self, special): """ Public method to resume the thread. It resumes the thread stopping only at breakpoints or exceptions. @param special flag indicating a special continue operation """ self.currentFrame = None self.set_continue(special) def setRecursionDepth(self, frame): """ Public method to determine the current recursion depth. @param frame The current stack frame. """ self.__recursionDepth = 0 while frame is not None: self.__recursionDepth += 1 frame = frame.f_back def profileWithRecursion(self, frame, event, arg): """ Public method used to trace some stuff independent of the debugger trace function. @param frame current stack frame @type frame object @param event trace event @type str @param arg arguments @type depends on the previous event parameter @exception RuntimeError raised to indicate too many recursions """ if event == 'return': self.cFrame = frame.f_back self.__recursionDepth -= 1 if self._dbgClient.callTraceEnabled: self.__sendCallTrace(event, frame, self.cFrame) elif event == 'call': if self._dbgClient.callTraceEnabled: self.__sendCallTrace(event, self.cFrame, frame) self.cFrame = frame self.__recursionDepth += 1 if self.__recursionDepth > gRecursionLimit: raise RuntimeError( 'maximum recursion depth exceeded\n' '(offending frame is two down the stack)') def profile(self, frame, event, arg): """ Public method used to trace some stuff independent of the debugger trace function. @param frame current stack frame @type frame object @param event trace event @type str @param arg arguments @type depends on the previous event parameter """ if event == 'return': self.__sendCallTrace(event, frame, frame.f_back) elif event == 'call': self.__sendCallTrace(event, frame.f_back, frame) def __sendCallTrace(self, event, fromFrame, toFrame): """ Private method to send a call/return trace. @param event trace event @type str @param fromFrame originating frame @type frame object @param toFrame destination frame @type frame object """ if not self.__skipFrame(fromFrame) and not self.__skipFrame(toFrame): fromStr = "{0}:{1}:{2}".format( self._dbgClient.absPath(self.fix_frame_filename(fromFrame)), fromFrame.f_lineno, fromFrame.f_code.co_name) toStr = "{0}:{1}:{2}".format( self._dbgClient.absPath(self.fix_frame_filename(toFrame)), toFrame.f_lineno, toFrame.f_code.co_name) self._dbgClient.write("{0}{1}@@{2}@@{3}\n".format( CallTrace, event[0], fromStr, toStr)) def trace_dispatch(self, frame, event, arg): """ Public method reimplemented from bdb.py to do some special things. This specialty is to check the connection to the debug server for new events (i.e. new breakpoints) while we are going through the code. @param frame The current stack frame @type frame object @param event The trace event @type str @param arg The arguments @type depends on the previous event parameter @return local trace function @rtype trace function or None @exception bdb.BdbQuit """ if self.quitting: return # None # give the client a chance to push through new break points. if self.eventPollFlag: self._dbgClient.eventPoll() self.eventPollFlag = False if event == 'line': if self.stop_here(frame) or self.break_here(frame): self.user_line(frame) if self.quitting: raise bdb.BdbQuit return if event == 'call': if self.botframe is None: # First call of dispatch since reset() # (CT) Note that this may also be None! self.botframe = frame.f_back return self.trace_dispatch if not (self.stop_here(frame) or self.__checkBreakInFrame(frame) or Watch.watches != []): # No need to trace this function return if self.quitting: raise bdb.BdbQuit return self.trace_dispatch if event == 'return': if self.stop_here(frame) or frame == self.returnframe: # Ignore return events in generator except when stepping. if self.stopframe and frame.f_code.co_flags & CO_GENERATOR: return # The program has finished if we have just left the first frame if frame == self._dbgClient.mainFrame and \ self._mainThread: atexit._run_exitfuncs() self._dbgClient.progTerminated(arg) elif frame is not self.stepFrame: self.stepFrame = None self.user_line(frame) if self.quitting and not self._dbgClient.passive: raise bdb.BdbQuit return if event == 'exception': if not self.__skipFrame(frame): # When stepping with next/until/return in a generator frame, # skip the internal StopIteration exception (with no traceback) # triggered by a subiterator run with the 'yield from' # statement. if not (frame.f_code.co_flags & CO_GENERATOR and arg[0] is StopIteration and arg[2] is None): self.user_exception(frame, arg) if self.quitting: raise bdb.BdbQuit # Stop at the StopIteration or GeneratorExit exception when the # user has set stopframe in a generator by issuing a return # command, or a next/until command at the last statement in the # generator before the exception. elif (self.stopframe and frame is not self.stopframe and self.stopframe.f_code.co_flags & CO_GENERATOR and arg[0] in (StopIteration, GeneratorExit)): self.user_exception(frame, arg) if self.quitting: raise bdb.BdbQuit return if event == 'c_call': return if event == 'c_exception': return if event == 'c_return': return print('bdb.Bdb.dispatch: unknown debugging event: ', repr(event)) # __IGNORE_WARNING__ return self.trace_dispatch def set_trace(self, frame=None): """ Public method to start debugging from 'frame'. If frame is not specified, debugging starts from caller's frame. @param frame frame to start debugging from @type frame object """ if frame is None: frame = sys._getframe().f_back # stop at erics debugger frame while frame: if not self.__skipFrame(frame): frame.f_trace = self.trace_dispatch frame = frame.f_back self.botframe = frame if self.__skipFrame(frame): break self.set_step() sys.settrace(self.trace_dispatch) sys.setprofile(self._dbgClient.callTraceEnabled) def set_continue(self, special): """ Public method reimplemented from bdb.py to always get informed of exceptions. @param special flag indicating a special continue operation """ # Modified version of the one found in bdb.py # Here we only set a new stop frame if it is a normal continue. if not special: self._set_stopinfo(self.botframe, None) # Disable tracing if not started in debug mode if not self._dbgClient.debugging: sys.settrace(None) sys.setprofile(None) def set_quit(self): """ Public method to quit. It wraps call to bdb to clear the current frame properly. """ self.currentFrame = None sys.setprofile(None) bdb.Bdb.set_quit(self) def fix_frame_filename(self, frame): """ Public method used to fixup the filename for a given frame. The logic employed here is that if a module was loaded from a .pyc file, then the correct .py to operate with should be in the same path as the .pyc. The reason this logic is needed is that when a .pyc file is generated, the filename embedded and thus what is readable in the code object of the frame object is the fully qualified filepath when the pyc is generated. If files are moved from machine to machine this can break debugging as the .pyc will refer to the .py on the original machine. Another case might be sharing code over a network... This logic deals with that. @param frame the frame object @type frame object @return fixed up file name @rtype str """ # get module name from __file__ fn = frame.f_globals.get('__file__') try: return self._fnCache[fn] except KeyError: if fn and fn != frame.f_code.co_filename: absFilename = os.path.abspath(fn) if absFilename.endswith(('.pyc', '.pyo')): fixedName = absFilename[:-1] if not os.path.exists(fixedName): fixedName = absFilename else: fixedName = absFilename else: fixedName = frame.f_code.co_filename # update cache self._fnCache[fn] = fixedName return fixedName def __checkBreakInFrame(self, frame): """ Private method to check if the function / method has a line number which is a breakpoint. @param frame the frame object @type frame object @return Flag indicating a function / method with breakpoint @rtype bool """ try: return Breakpoint.breakInFrameCache[ frame.f_globals.get('__file__'), frame.f_code.co_firstlineno] except KeyError: filename = self.fix_frame_filename(frame) if filename not in Breakpoint.breakInFile: Breakpoint.breakInFrameCache[ frame.f_globals.get('__file__'), frame.f_code.co_firstlineno] = False return False lineNo = frame.f_code.co_firstlineno lineNumbers = [lineNo] # No need to handle special case if a lot of lines between # (e.g. closure), because the additional lines won't cause a bp for co_lno in frame.f_code.co_lnotab[1::2]: lineNo += co_lno lineNumbers.append(lineNo) for bp in Breakpoint.breakInFile[filename]: if bp in lineNumbers: Breakpoint.breakInFrameCache[ frame.f_globals.get('__file__'), frame.f_code.co_firstlineno] = True return True Breakpoint.breakInFrameCache[ frame.f_globals.get('__file__'), frame.f_code.co_firstlineno] = False return False def break_here(self, frame): """ Public method reimplemented from bdb.py to fix the filename from the frame. See fix_frame_filename for more info. @param frame the frame object @type frame object @return flag indicating the break status @rtype bool """ filename = self.fix_frame_filename(frame) if (filename, frame.f_lineno) in Breakpoint.breaks: bp, flag = Breakpoint.effectiveBreak( filename, frame.f_lineno, frame) if bp: # flag says ok to delete temp. bp if flag and bp.temporary: self.__do_clearBreak(filename, frame.f_lineno) return True if Watch.watches != []: bp, flag = Watch.effectiveWatch(frame) if bp: # flag says ok to delete temp. watch if flag and bp.temporary: self.__do_clearWatch(bp.cond) return True return False def __do_clearBreak(self, filename, lineno): """ Private method called to clear a temporary breakpoint. @param filename name of the file the bp belongs to @type str @param lineno linenumber of the bp @type int """ Breakpoint.clear_break(filename, lineno) self._dbgClient.write('{0}{1},{2:d}\n'.format( ResponseClearBreak, filename, lineno)) def __do_clearWatch(self, cond): """ Private method called to clear a temporary watch expression. @param cond expression of the watch expression to be cleared @type str """ Watch.clear_watch(cond) self._dbgClient.write('{0}{1}\n'.format(ResponseClearWatch, cond)) def getStack(self): """ Public method to get the stack. @return list of lists with file name (string), line number (integer) and function name (string) """ fr = self.cFrame stack = [] while fr is not None: fname = self._dbgClient.absPath(self.fix_frame_filename(fr)) if not fname.startswith("<"): fline = fr.f_lineno ffunc = fr.f_code.co_name if ffunc == '?': ffunc = '' if ffunc and not ffunc.startswith("<"): argInfo = getargvalues(fr) try: fargs = formatargvalues( argInfo.args, argInfo.varargs, argInfo.keywords, argInfo.locals) except Exception: fargs = "" else: fargs = "" stack.append([fname, fline, ffunc, fargs]) if fr == self._dbgClient.mainFrame: fr = None else: fr = fr.f_back return stack def user_line(self, frame): """ Public method reimplemented to handle the program about to execute a particular line. @param frame the frame object """ # We never stop on line 0. if frame.f_lineno == 0: return # See if we are skipping at the start of a newly loaded program. # Then get the complete frame stack without eric or Python supplied # libraries if self._dbgClient.mainFrame is None: fr = frame while (fr is not None and fr.f_code != self._dbgClient.handleLine.__code__): if (fr.f_code.co_filename == '<string>' or os.path.dirname(fr.f_code.co_filename) == self.lib): break self._dbgClient.mainFrame = fr fr = fr.f_back self.currentFrame = frame fr = frame stack = [] while fr is not None: # Reset the trace function so we can be sure # to trace all functions up the stack... This gets around # problems where an exception/breakpoint has occurred # but we had disabled tracing along the way via a None # return from dispatch_call fr.f_trace = self.trace_dispatch fname = self._dbgClient.absPath(self.fix_frame_filename(fr)) if not fname.startswith("<"): fline = fr.f_lineno ffunc = fr.f_code.co_name if ffunc == '?': ffunc = '' if ffunc and not ffunc.startswith("<"): argInfo = getargvalues(fr) try: fargs = formatargvalues( argInfo.args, argInfo.varargs, argInfo.keywords, argInfo.locals) except Exception: fargs = "" else: fargs = "" stack.append([fname, fline, ffunc, fargs]) if fr == self._dbgClient.mainFrame: fr = None else: fr = fr.f_back self.__isBroken = True self._dbgClient.write('{0}{1}\n'.format(ResponseLine, str(stack))) self._dbgClient.eventLoop() self.__isBroken = False def user_exception(self, frame, excinfo, unhandled=False): """ Public method reimplemented to report an exception to the debug server. @param frame the frame object @param excinfo information about the exception @param unhandled flag indicating an uncaught exception """ exctype, excval, exctb = excinfo if exctype in [GeneratorExit, StopIteration]: # ignore these return if exctype in [SystemExit, bdb.BdbQuit]: atexit._run_exitfuncs() if excval is None: excval = 0 elif isinstance(excval, str): self._dbgClient.write(excval) excval = 1 elif isinstance(excval, bytes): self._dbgClient.write(excval.decode()) excval = 1 if isinstance(excval, int): self._dbgClient.progTerminated(excval) else: self._dbgClient.progTerminated(excval.code) return if exctype in [SyntaxError, IndentationError]: try: message = str(excval) filename = excval.filename linenr = excval.lineno charnr = excval.offset except (AttributeError, ValueError): exclist = [] realSyntaxError = True else: exclist = [message, [filename, linenr, charnr]] realSyntaxError = os.path.exists(filename) if realSyntaxError: self._dbgClient.write("{0}{1}\n".format( ResponseSyntax, str(exclist))) self._dbgClient.eventLoop() return self.skipFrames = 0 if (exctype == RuntimeError and str(excval).startswith('maximum recursion depth exceeded') or sys.version_info >= (3, 5) and exctype == RecursionError): excval = 'maximum recursion depth exceeded' depth = 0 tb = exctb while tb: tb = tb.tb_next if (tb and tb.tb_frame.f_code.co_name == 'trace_dispatch' and __file__.startswith(tb.tb_frame.f_code.co_filename)): depth = 1 self.skipFrames += depth # always 1 if running without debugger self.skipFrames = max(1, self.skipFrames) exctype = self.__extractExceptionName(exctype) if excval is None: excval = '' if unhandled: exctypetxt = "unhandled {0!s}".format(str(exctype)) else: exctypetxt = str(exctype) try: exclist = [exctypetxt, str(excval)] except TypeError: exclist = [exctypetxt, str(excval)] if exctb: frlist = self.__extract_stack(exctb) frlist.reverse() self.currentFrame = frlist[0] for fr in frlist[self.skipFrames:]: filename = self._dbgClient.absPath(self.fix_frame_filename(fr)) if os.path.basename(filename).startswith("DebugClient") or \ os.path.basename(filename) == "bdb.py": break linenr = fr.f_lineno ffunc = fr.f_code.co_name if ffunc == '?': ffunc = '' if ffunc and not ffunc.startswith("<"): argInfo = getargvalues(fr) try: fargs = formatargvalues( argInfo.args, argInfo.varargs, argInfo.keywords, argInfo.locals) except Exception: fargs = "" else: fargs = "" exclist.append([filename, linenr, ffunc, fargs]) self._dbgClient.write("{0}{1}\n".format( ResponseException, str(exclist))) if exctb is None: return self._dbgClient.eventLoop() self.skipFrames = 0 def __extractExceptionName(self, exctype): """ Private method to extract the exception name given the exception type object. @param exctype type of the exception @return exception name (string) """ return str(exctype).replace("<class '", "").replace("'>", "") def __extract_stack(self, exctb): """ Private member to return a list of stack frames. @param exctb exception traceback @return list of stack frames """ tb = exctb stack = [] while tb is not None: stack.append(tb.tb_frame) tb = tb.tb_next tb = None return stack def stop_here(self, frame): """ Public method reimplemented to filter out debugger files. Tracing is turned off for files that are part of the debugger that are called from the application being debugged. @param frame the frame object @return flag indicating whether the debugger should stop here """ if self.__skipFrame(frame): return False return bdb.Bdb.stop_here(self, frame) def tracePythonLibs(self, enable): """ Public method to update the settings to trace into Python libraries. @param enable flag to debug into Python libraries @type int """ pathsToSkip = list(self.pathsToSkip) # don't trace into Python library? if enable: pathsToSkip = [x for x in pathsToSkip if not x.endswith( ("site-packages", "dist-packages", self.lib))] else: pathsToSkip.append(self.lib) localLib = [x for x in sys.path if x.endswith(("site-packages", "dist-packages")) and not x.startswith(self.lib)] pathsToSkip.extend(localLib) self.pathsToSkip = tuple(pathsToSkip) def __skipFrame(self, frame): """ Private method to filter out debugger files. Tracing is turned off for files that are part of the debugger that are called from the application being debugged. @param frame the frame object @type frame object @return flag indicating whether the debugger should skip this frame @rtype bool """ try: return self.filesToSkip[frame.f_code.co_filename] except KeyError: ret = frame.f_code.co_filename.startswith(self.pathsToSkip) self.filesToSkip[frame.f_code.co_filename] = ret return ret except AttributeError: # if frame is None return True def isBroken(self): """ Public method to return the broken state of the debugger. @return flag indicating the broken state @rtype bool """ return self.__isBroken # # eflag: noqa = M702