Sat, 08 Oct 2016 21:01:32 +0200
Give the next debugger command to the thread where we are stopped at the moment.
# -*- coding: utf-8 -*- # Copyright (c) 2014 - 2016 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing an import hook patching thread modules to get debugged too. """ import os.path import sys if sys.version_info[0] == 2: import thread as _thread else: import _thread import threading from DebugBase import DebugBase class ThreadExtension(object): """ Class implementing the thread support for the debugger. Provides methods for intercepting thread creation, retriving the running threads and their name and state. """ def __init__(self): """ Constructor """ self.threadNumber = 1 self.enableImportHooks = True self._original_start_new_thread = None self.clientLock = threading.RLock() # dictionary of all threads running {id: DebugBase} self.threads = {_thread.get_ident(): self} # the "current" thread, basically for variables view self.currentThread = self # the thread we are at a breakpoint continuing at next command self.currentThreadExec = self # special objects representing the main scripts thread and frame self.mainThread = self if sys.version_info[0] == 2: self.threadModName = 'thread' else: self.threadModName = '_thread' # reset already imported thread module to apply hooks at next import del sys.modules[self.threadModName] del sys.modules['threading'] sys.meta_path.insert(0, self) # provide a hook to perform a hard breakpoint # Use it like this: # if hasattr(sys, 'breakpoint): sys.breakpoint() sys.breakpoint = self.set_trace def attachThread(self, target=None, args=None, kwargs={}, mainThread=False): """ Public method to setup a standard thread for DebugClient to debug. If mainThread is True, then we are attaching to the already started mainthread of the app and the rest of the args are ignored. @param target the start function of the target thread (i.e. the user code) @param args arguments to pass to target @param kwargs keyword arguments to pass to target @param mainThread True, if we are attaching to the already started mainthread of the app @return identifier of the created thread """ if mainThread: ident = _thread.get_ident() name = 'MainThread' newThread = self.mainThread newThread.isMainThread = True if self.debugging: sys.setprofile(newThread.profile) else: newThread = DebugBase(self) ident = self._original_start_new_thread( newThread.bootstrap, (target, args, kwargs)) name = 'Thread-{0}'.format(self.threadNumber) self.threadNumber += 1 newThread.id = ident newThread.name = name self.threads[ident] = newThread return ident def threadTerminated(self, threadId): """ Public method called when a DebugThread has exited. @param threadId id of the DebugThread that has exited @type int """ self.lockClient() try: del self.threads[threadId] except KeyError: pass finally: self.unlockClient() def lockClient(self, blocking=True): """ Public method to acquire the lock for this client. @param blocking flag to indicating a blocking lock @type bool @return flag indicating successful locking @rtype bool """ if blocking: self.clientLock.acquire() else: return self.clientLock.acquire(blocking) def unlockClient(self): """ Public method to release the lock for this client. """ try: self.clientLock.release() except AssertionError: pass def setCurrentThread(self, id): """ Public method to set the current thread. @param id the id the current thread should be set to. """ try: self.lockClient() if id is None: self.currentThread = None else: self.currentThread = self.threads.get(id) finally: self.unlockClient() def dumpThreadList(self): """ Public method to send the list of threads. """ threadList = [] if len(self.threads) > 1: currentId = _thread.get_ident() # update thread names set by user (threading.setName) threadNames = {t.ident: t.getName() for t in threading.enumerate()} for id, thd in self.threads.items(): d = {"id": id} try: d["name"] = threadNames.get(id, thd.name) d["broken"] = thd.isBroken except Exception: d["name"] = 'UnknownThread' d["broken"] = False threadList.append(d) else: currentId = -1 d = {"id": -1} d["name"] = "MainThread" d["broken"] = self.isBroken threadList.append(d) self.sendJsonCommand("ResponseThreadList", { "currentID": currentId, "threadList": threadList, }) def find_module(self, fullname, path=None): """ Public method returning the module loader. @param fullname name of the module to be loaded @type str @param path path to resolve the module name @type str @return module loader object @rtype object """ if fullname in sys.modules or not self.debugging: return None if fullname == self.threadModName and self.enableImportHooks: # Disable hook to be able to import original module self.enableImportHooks = False return self return None def load_module(self, fullname): """ Public method to load a module. @param fullname name of the module to be loaded @type str @return reference to the loaded module @rtype module """ module = __import__(fullname) sys.modules[fullname] = module if (fullname == self.threadModName and self._original_start_new_thread is None): # make thread hooks available to system self._original_start_new_thread = module.start_new_thread module.start_new_thread = self.attachThread self.enableImportHooks = True return module # # eflag: noqa = M702