Sun, 18 Sep 2016 21:35:53 +0200
Get changes from current branch.
# -*- coding: utf-8 -*- # Copyright (c) 2009 - 2016 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the debug base class which based originally on bdb. """ import sys import os import atexit import inspect import ctypes import _thread import time from inspect import CO_GENERATOR from DebugUtilities import getargvalues, formatargvalues from BreakpointWatch import Breakpoint, Watch gRecursionLimit = 64 def printerr(s): """ Module function used for debugging the debug client. @param s data to be printed """ sys.__stderr__.write('{0!s}\n'.format(s)) sys.__stderr__.flush() def setRecursionLimit(limit): """ Module function to set the recursion limit. @param limit recursion limit (integer) """ global gRecursionLimit gRecursionLimit = limit class DebugBase(object): """ Class implementing base class of the debugger. Provides methods for the 'owning' client to call to step etc. """ # Don't thrust distutils.sysconfig.get_python_lib: possible case mismatch # on Windows lib = os.path.dirname(inspect.__file__) # tuple required because it's accessed a lot of times by startswith method pathsToSkip = ('<', os.path.dirname(__file__), inspect.__file__[:-1]) filesToSkip = {} # cache for fixed file names _fnCache = {} def __init__(self, dbgClient): """ Constructor @param dbgClient the owning client """ self._dbgClient = dbgClient self._mainThread = True self.quitting = 0 self.tracePythonLibs(0) # Special handling of a recursion error self.skipFrames = 0 self.__isBroken = False self.cFrame = None # current frame we are at self.currentFrame = None # frame that we are stepping in, can be different than currentFrame self.stepFrame = None self.botframe = None self.stopframe = None self.returnframe = None self.stop_everywhere = False # provide a hook to perform a hard breakpoint # Use it like this: # if hasattr(sys, 'breakpoint): sys.breakpoint() sys.breakpoint = self.set_trace self.__recursionDepth = -1 self.setRecursionDepth(inspect.currentframe()) # background task to periodicaly check for client interactions self.eventPollFlag = False self.timer = _thread.start_new_thread(self.__eventPollTimer, ()) def __eventPollTimer(self): """ Private method to set a flag every 0.5 s to check for new messages. """ while True: time.sleep(0.5) self.eventPollFlag = True def getCurrentFrame(self): """ Public method to return the current frame. @return the current frame """ return self.currentFrame def getFrameLocals(self, frmnr=0): """ Public method to return the locals dictionary of the current frame or a frame below. @keyparam frmnr distance of frame to get locals dictionary of. 0 is the current frame (int) @return locals dictionary of the frame """ f = self.currentFrame while f is not None and frmnr > 0: f = f.f_back frmnr -= 1 return f.f_locals def storeFrameLocals(self, frmnr=0): """ Public method to store the locals into the frame, so an access to frame.f_locals returns the last data. @keyparam frmnr distance of frame to store locals dictionary to. 0 is the current frame (int) """ cf = self.currentFrame while cf is not None and frmnr > 0: cf = cf.f_back frmnr -= 1 try: if "__pypy__" in sys.builtin_module_names: import __pypy__ __pypy__.locals_to_fast(cf) return except Exception: pass ctypes.pythonapi.PyFrame_LocalsToFast( ctypes.py_object(cf), ctypes.c_int(0)) def step(self, traceMode): """ Public method to perform a step operation in this thread. @param traceMode If it is True, then the step is a step into, otherwise it is a step over. """ self.stepFrame = self.currentFrame if traceMode: self.currentFrame = None self.set_step() else: self.set_next(self.currentFrame) def stepOut(self): """ Public method to perform a step out of the current call. """ self.stepFrame = self.currentFrame self.set_return(self.currentFrame) def go(self, special): """ Public method to resume the thread. It resumes the thread stopping only at breakpoints or exceptions. @param special flag indicating a special continue operation """ self.currentFrame = None self.set_continue(special) def setRecursionDepth(self, frame): """ Public method to determine the current recursion depth. @param frame The current stack frame. """ self.__recursionDepth = 0 while frame is not None: self.__recursionDepth += 1 frame = frame.f_back def profileWithRecursion(self, frame, event, arg): """ Public method used to trace some stuff independent of the debugger trace function. @param frame current stack frame @type frame object @param event trace event @type str @param arg arguments @type depends on the previous event parameter @exception RuntimeError raised to indicate too many recursions """ if event == 'return': self.cFrame = frame.f_back self.__recursionDepth -= 1 if self._dbgClient.callTraceEnabled: self.__sendCallTrace(event, frame, self.cFrame) elif event == 'call': if self._dbgClient.callTraceEnabled: self.__sendCallTrace(event, self.cFrame, frame) self.cFrame = frame self.__recursionDepth += 1 if self.__recursionDepth > gRecursionLimit: raise RuntimeError( 'maximum recursion depth exceeded\n' '(offending frame is two down the stack)') def profile(self, frame, event, arg): """ Public method used to trace some stuff independent of the debugger trace function. @param frame current stack frame @type frame object @param event trace event @type str @param arg arguments @type depends on the previous event parameter """ if event == 'return': self.__sendCallTrace(event, frame, frame.f_back) elif event == 'call': self.__sendCallTrace(event, frame.f_back, frame) def __sendCallTrace(self, event, fromFrame, toFrame): """ Private method to send a call/return trace. @param event trace event @type str @param fromFrame originating frame @type frame object @param toFrame destination frame @type frame object """ if not self.__skipFrame(fromFrame) and not self.__skipFrame(toFrame): fromInfo = { "filename": self._dbgClient.absPath( self.fix_frame_filename(fromFrame)), "linenumber": fromFrame.f_lineno, "codename": fromFrame.f_code.co_name, } toInfo = { "filename": self._dbgClient.absPath( self.fix_frame_filename(toFrame)), "linenumber": toFrame.f_lineno, "codename": toFrame.f_code.co_name, } self._dbgClient.sendCallTrace(event, fromInfo, toInfo) def trace_dispatch(self, frame, event, arg): """ Public method reimplemented from bdb.py to do some special things. This specialty is to check the connection to the debug server for new events (i.e. new breakpoints) while we are going through the code. @param frame The current stack frame @type frame object @param event The trace event @type str @param arg The arguments @type depends on the previous event parameter @return local trace function @rtype trace function or None @exception SystemExit """ # give the client a chance to push through new break points. if self.eventPollFlag: self._dbgClient.eventPoll() self.eventPollFlag = False if self.quitting: raise SystemExit if event == 'line': if self.stop_here(frame) or self.break_here(frame): self.user_line(frame) return if event == 'call': if self.botframe is None and frame.f_lineno > 1: self.botframe = frame.f_back frame.f_trace = self.trace_dispatch self._dbgClient.mainFrame = frame self.user_line(frame) return self.trace_dispatch if not (self.stop_here(frame) or self.__checkBreakInFrame(frame) or Watch.watches != []): # No need to trace this function return return self.trace_dispatch if event == 'return': if self.stop_here(frame): # Ignore return events in generator except when stepping. if self.stopframe and frame.f_code.co_flags & CO_GENERATOR: return # The program has finished if we have just left the first frame if (frame == self._dbgClient.mainFrame and self._mainThread): atexit._run_exitfuncs() self._dbgClient.progTerminated(arg) if self.quitting and not self._dbgClient.passive: raise SystemExit return if event == 'exception': if not self.__skipFrame(frame): # When stepping with next/until/return in a generator frame, # skip the internal StopIteration exception (with no traceback) # triggered by a subiterator run with the 'yield from' # statement. if not (frame.f_code.co_flags & CO_GENERATOR and arg[0] is StopIteration and arg[2] is None): self.user_exception(frame, arg) # Stop at the StopIteration or GeneratorExit exception when the # user has set stopframe in a generator by issuing a return # command, or a next/until command at the last statement in the # generator before the exception. elif (self.stopframe and frame is not self.stopframe and self.stopframe.f_code.co_flags & CO_GENERATOR and arg[0] in (StopIteration, GeneratorExit)): self.user_exception(frame, arg) return if event == 'c_call': return if event == 'c_exception': return if event == 'c_return': return print('DebugBase.trace_dispatch:' # __IGNORE_WARNING__ ' unknown debugging event: ', repr(event)) return self.trace_dispatch def set_trace(self, frame=None): """ Public method to start debugging from 'frame'. If frame is not specified, debugging starts from caller's frame. Because of jump optimizations it's not possible to use sys.breakpoint() as last instruction in a function or method. @keyparam frame frame to start debugging from @type frame object """ if frame is None: frame = sys._getframe().f_back # Skip set_trace method frame.f_trace = self.trace_dispatch while frame is not None: # stop at erics debugger frame if frame.f_back.f_code == self._dbgClient.handleLine.__code__: frame.f_trace = self.trace_dispatch self.botframe = frame self._dbgClient.mainFrame = frame break frame = frame.f_back self.stop_everywhere = True sys.settrace(self.trace_dispatch) sys.setprofile(self._dbgClient.callTraceEnabled) def run(self, cmd, globals=None, locals=None): """ Public method to start a given command under debugger control. @param cmd command / code to execute under debugger control @type str or CodeType @keyparam globals dictionary of global variables for cmd @type dict @keyparam locals dictionary of local variables for cmd @type dict """ if globals is None: import __main__ globals = __main__.__dict__ if locals is None: locals = globals sys.settrace(self.trace_dispatch) if isinstance(cmd, str): cmd = compile(cmd, "<string>", "exec") try: exec(cmd, globals, locals) except SystemExit: pass finally: self.quitting = 1 sys.settrace(None) def _set_stopinfo(self, stopframe, returnframe): """ Protected method to update the frame pointers. @param stopframe the frame object where to stop @type frame object @param returnframe the frame object where to stop on a function return @type frame object """ self.stopframe = stopframe self.returnframe = returnframe self.stop_everywhere = False def set_continue(self, special): """ Public method to stop only on next breakpoint. @param special flag indicating a special continue operation @type bool """ # Here we only set a new stop frame if it is a normal continue. if not special: self._set_stopinfo(self.botframe, None) # Disable tracing if not started in debug mode if not self._dbgClient.debugging: sys.settrace(None) sys.setprofile(None) def set_step(self): """ Public method to stop after one line of code. """ self._set_stopinfo(None, None) self.stop_everywhere = True def set_next(self, frame): """ Public method to stop on the next line in or below the given frame. @param frame the frame object @type frame object """ self._set_stopinfo(frame, frame.f_back) frame.f_back.f_trace = self.trace_dispatch frame.f_trace = self.trace_dispatch def set_return(self, frame): """ Public method to stop when returning from the given frame. @param frame the frame object @type frame object """ self._set_stopinfo(None, frame.f_back) def set_quit(self): """ Public method to quit. Disables the trace functions and resets all frame pointer. """ self.currentFrame = None sys.setprofile(None) sys.settrace(None) self.stopframe = None self.returnframe = None self.quitting = 1 def fix_frame_filename(self, frame): """ Public method used to fixup the filename for a given frame. The logic employed here is that if a module was loaded from a .pyc file, then the correct .py to operate with should be in the same path as the .pyc. The reason this logic is needed is that when a .pyc file is generated, the filename embedded and thus what is readable in the code object of the frame object is the fully qualified filepath when the pyc is generated. If files are moved from machine to machine this can break debugging as the .pyc will refer to the .py on the original machine. Another case might be sharing code over a network... This logic deals with that. @param frame the frame object @type frame object @return fixed up file name @rtype str """ # get module name from __file__ fn = frame.f_globals.get('__file__') try: return self._fnCache[fn] except KeyError: if fn and fn != frame.f_code.co_filename: absFilename = os.path.abspath(fn) if absFilename.endswith(('.pyc', '.pyo')): fixedName = absFilename[:-1] if not os.path.exists(fixedName): fixedName = absFilename else: fixedName = absFilename else: fixedName = frame.f_code.co_filename # update cache self._fnCache[fn] = fixedName return fixedName def __checkBreakInFrame(self, frame): """ Private method to check if the function / method has a line number which is a breakpoint. @param frame the frame object @type frame object @return Flag indicating a function / method with breakpoint @rtype bool """ try: return Breakpoint.breakInFrameCache[ frame.f_globals.get('__file__'), frame.f_code.co_firstlineno] except KeyError: filename = self.fix_frame_filename(frame) if filename not in Breakpoint.breakInFile: Breakpoint.breakInFrameCache[ frame.f_globals.get('__file__'), frame.f_code.co_firstlineno] = False return False lineNo = frame.f_code.co_firstlineno lineNumbers = [lineNo] # No need to handle special case if a lot of lines between # (e.g. closure), because the additional lines won't cause a bp for co_lno in frame.f_code.co_lnotab[1::2]: lineNo += co_lno lineNumbers.append(lineNo) for bp in Breakpoint.breakInFile[filename]: if bp in lineNumbers: Breakpoint.breakInFrameCache[ frame.f_globals.get('__file__'), frame.f_code.co_firstlineno] = True return True Breakpoint.breakInFrameCache[ frame.f_globals.get('__file__'), frame.f_code.co_firstlineno] = False return False def break_here(self, frame): """ Public method reimplemented from bdb.py to fix the filename from the frame. See fix_frame_filename for more info. @param frame the frame object @type frame object @return flag indicating the break status @rtype bool """ filename = self.fix_frame_filename(frame) if (filename, frame.f_lineno) in Breakpoint.breaks: bp, flag = Breakpoint.effectiveBreak( filename, frame.f_lineno, frame) if bp: # flag says ok to delete temp. bp if flag and bp.temporary: self.__do_clearBreak(filename, frame.f_lineno) return True if Watch.watches != []: bp, flag = Watch.effectiveWatch(frame) if bp: # flag says ok to delete temp. watch if flag and bp.temporary: self.__do_clearWatch(bp.cond) return True return False def __do_clearBreak(self, filename, lineno): """ Private method called to clear a temporary breakpoint. @param filename name of the file the bp belongs to @type str @param lineno linenumber of the bp @type int """ Breakpoint.clear_break(filename, lineno) self._dbgClient.sendClearTemporaryBreakpoint(filename, lineno) def __do_clearWatch(self, cond): """ Private method called to clear a temporary watch expression. @param cond expression of the watch expression to be cleared @type str """ Watch.clear_watch(cond) self._dbgClient.sendClearTemporaryWatch(cond) def getStack(self): """ Public method to get the stack. @return list of lists with file name (string), line number (integer) and function name (string) """ fr = self.cFrame stack = [] while fr is not None: fname = self._dbgClient.absPath(self.fix_frame_filename(fr)) if not fname.startswith("<"): fline = fr.f_lineno ffunc = fr.f_code.co_name if ffunc == '?': ffunc = '' if ffunc and not ffunc.startswith("<"): argInfo = getargvalues(fr) try: fargs = formatargvalues( argInfo.args, argInfo.varargs, argInfo.keywords, argInfo.locals) except Exception: fargs = "" else: fargs = "" stack.append([fname, fline, ffunc, fargs]) if fr == self._dbgClient.mainFrame: fr = None else: fr = fr.f_back return stack def user_line(self, frame): """ Public method reimplemented to handle the program about to execute a particular line. @param frame the frame object """ # We never stop on line 0. if frame.f_lineno == 0: return self.currentFrame = frame fr = frame stack = [] while fr is not None: # Reset the trace function so we can be sure # to trace all functions up the stack... This gets around # problems where an exception/breakpoint has occurred # but we had disabled tracing along the way via a None # return from dispatch_call fr.f_trace = self.trace_dispatch fname = self._dbgClient.absPath(self.fix_frame_filename(fr)) if not fname.startswith("<"): fline = fr.f_lineno ffunc = fr.f_code.co_name if ffunc == '?': ffunc = '' if ffunc and not ffunc.startswith("<"): argInfo = getargvalues(fr) try: fargs = formatargvalues( argInfo.args, argInfo.varargs, argInfo.keywords, argInfo.locals) except Exception: fargs = "" else: fargs = "" stack.append([fname, fline, ffunc, fargs]) if fr == self._dbgClient.mainFrame: fr = None else: fr = fr.f_back self.__isBroken = True self._dbgClient.sendResponseLine(stack) self._dbgClient.eventLoop() self.__isBroken = False def user_exception(self, frame, excinfo, unhandled=False): """ Public method reimplemented to report an exception to the debug server. @param frame the frame object @type frame object @param excinfo details about the exception @type tuple(Exception, excval object, traceback frame object) @keyparam unhandled flag indicating an uncaught exception @type bool """ exctype, excval, exctb = excinfo if exctype in [GeneratorExit, StopIteration]: # ignore these return if exctype == SystemExit: atexit._run_exitfuncs() if excval is None: exitcode = 0 message = "" elif isinstance(excval, str): exitcode = 1 message = excval elif isinstance(excval, bytes): exitcode = 1 message = excval.decode() elif isinstance(excval, int): exitcode = excval message = "" elif isinstance(excval, SystemExit): code = excval.code if isinstance(code, str): exitcode = 1 message = code elif isinstance(code, bytes): exitcode = 1 message = code.decode() elif isinstance(code, int): exitcode = code message = "" else: exitcode = 1 message = str(code) else: exitcode = 1 message = str(excval) self._dbgClient.progTerminated(exitcode, message) return if exctype in [SyntaxError, IndentationError]: try: message = str(excval) filename = excval.filename lineno = excval.lineno charno = excval.offset realSyntaxError = os.path.exists(filename) except (AttributeError, ValueError): message = "" filename = "" lineno = 0 charno = 0 realSyntaxError = True if realSyntaxError: self._dbgClient.sendSyntaxError( message, filename, lineno, charno) self._dbgClient.eventLoop() return self.skipFrames = 0 if (exctype == RuntimeError and str(excval).startswith('maximum recursion depth exceeded') or sys.version_info >= (3, 5) and exctype == RecursionError): excval = 'maximum recursion depth exceeded' depth = 0 tb = exctb while tb: tb = tb.tb_next if (tb and tb.tb_frame.f_code.co_name == 'trace_dispatch' and __file__.startswith(tb.tb_frame.f_code.co_filename)): depth = 1 self.skipFrames += depth # always 1 if running without debugger self.skipFrames = max(1, self.skipFrames) exctype = self.__extractExceptionName(exctype) if excval is None: excval = '' if unhandled: exctypetxt = "unhandled {0!s}".format(str(exctype)) else: exctypetxt = str(exctype) stack = [] if exctb: frlist = self.__extract_stack(exctb) frlist.reverse() self.currentFrame = frlist[0] for fr in frlist[self.skipFrames:]: filename = self._dbgClient.absPath(self.fix_frame_filename(fr)) if os.path.basename(filename).startswith("DebugClientBase"): break linenr = fr.f_lineno ffunc = fr.f_code.co_name if ffunc == '?': ffunc = '' if ffunc and not ffunc.startswith("<"): argInfo = getargvalues(fr) try: fargs = formatargvalues( argInfo.args, argInfo.varargs, argInfo.keywords, argInfo.locals) except Exception: fargs = "" else: fargs = "" stack.append([filename, linenr, ffunc, fargs]) self._dbgClient.sendException(exctypetxt, str(excval), stack) if exctb is None: return self._dbgClient.eventLoop() self.skipFrames = 0 def __extractExceptionName(self, exctype): """ Private method to extract the exception name given the exception type object. @param exctype type of the exception @return exception name (string) """ return str(exctype).replace("<class '", "").replace("'>", "") def __extract_stack(self, exctb): """ Private member to return a list of stack frames. @param exctb exception traceback @return list of stack frames """ tb = exctb stack = [] while tb is not None: stack.append(tb.tb_frame) tb = tb.tb_next tb = None return stack def stop_here(self, frame): """ Public method reimplemented to filter out debugger files. Tracing is turned off for files that are part of the debugger that are called from the application being debugged. @param frame the frame object @type frame object @return flag indicating whether the debugger should stop here @rtype bool """ if self.__skipFrame(frame): return False return (self.stop_everywhere or frame is self.stopframe or frame is self.returnframe or frame is self.botframe) def tracePythonLibs(self, enable): """ Public method to update the settings to trace into Python libraries. @param enable flag to debug into Python libraries @type bool """ pathsToSkip = list(self.pathsToSkip) # don't trace into Python library? if enable: pathsToSkip = [x for x in pathsToSkip if not x.endswith( ("site-packages", "dist-packages", self.lib))] else: pathsToSkip.append(self.lib) localLib = [x for x in sys.path if x.endswith(("site-packages", "dist-packages")) and not x.startswith(self.lib)] pathsToSkip.extend(localLib) self.pathsToSkip = tuple(pathsToSkip) def __skipFrame(self, frame): """ Private method to filter out debugger files. Tracing is turned off for files that are part of the debugger that are called from the application being debugged. @param frame the frame object @type frame object @return flag indicating whether the debugger should skip this frame @rtype bool """ try: return self.filesToSkip[frame.f_code.co_filename] except KeyError: ret = frame.f_code.co_filename.startswith(self.pathsToSkip) self.filesToSkip[frame.f_code.co_filename] = ret return ret except AttributeError: # if frame is None return True def isBroken(self): """ Public method to return the broken state of the debugger. @return flag indicating the broken state @rtype bool """ return self.__isBroken # # eflag: noqa = M702