src/eric7/DebugClients/Python/ThreadExtension.py

Sat, 26 Apr 2025 12:34:32 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sat, 26 Apr 2025 12:34:32 +0200
branch
eric7
changeset 11240
c48c615c04a3
parent 11090
f5f5f5803935
permissions
-rw-r--r--

MicroPython
- Added a configuration option to disable the support for the no longer produced Pimoroni Pico Wireless Pack.

# -*- coding: utf-8 -*-

# Copyright (c) 2014 - 2025 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing an import hook patching thread modules to get debugged too.
"""

import _thread
import contextlib
import os
import sys
import threading

from DebugBase import DebugBase

_qtThreadNumber = 1


class ThreadExtension:
    """
    Class implementing the thread support for the debugger.

    Provides methods for intercepting thread creation, retrieving the running
    threads and their name and state.
    """

    def __init__(self):
        """
        Constructor
        """
        self.threadNumber = 1
        self._original_start_new_thread = None

        self.clientLock = threading.RLock()

        # dictionary of all threads running {id: DebugBase}
        self.threads = {_thread.get_ident(): self}

        # the "current" thread, basically for variables view
        self.currentThread = self
        # the thread we are at a breakpoint continuing at next command
        self.currentThreadExec = self

        # special objects representing the main scripts thread and frame
        self.mainThread = self

    def attachThread(self, target=None, args=None, kwargs=None, mainThread=False):
        """
        Public method to setup a standard thread for DebugClient to debug.

        If mainThread is True, then we are attaching to the already
        started mainthread of the app and the rest of the args are ignored.

        @param target start function of the target thread (i.e. the user
            code)
        @type function
        @param args arguments to pass to target
        @type list of Any
        @param kwargs keyword arguments to pass to target
        @type dict of Any
        @param mainThread True, if we are attaching to the already started
            mainthread of the app
        @type bool
        @return identifier of the created thread
        @rtype int
        """
        if kwargs is None:
            kwargs = {}

        if mainThread:
            ident = _thread.get_ident()
            name = "MainThread"
            newThread = self.mainThread
            newThread.isMainThread = True
            if self.debugging:
                sys.setprofile(newThread.profile)

        else:
            newThread = DebugBase(self)
            ident = self._original_start_new_thread(
                newThread.bootstrap, (target, args, kwargs)
            )
            name = "Thread-{0}".format(self.threadNumber)
            self.threadNumber += 1

        newThread.id = ident
        newThread.name = name

        self.threads[ident] = newThread

        return ident

    def threadTerminated(self, threadId):
        """
        Public method called when a DebugThread has exited.

        @param threadId id of the DebugThread that has exited
        @type int
        """
        self.lockClient()
        try:
            with contextlib.suppress(KeyError):
                del self.threads[threadId]
        finally:
            self.unlockClient()

    def lockClient(self, blocking=True):
        """
        Public method to acquire the lock for this client.

        @param blocking flag to indicating a blocking lock
        @type bool
        @return flag indicating successful locking
        @rtype bool
        """
        return self.clientLock.acquire(blocking)

    def unlockClient(self):
        """
        Public method to release the lock for this client.
        """
        with contextlib.suppress(RuntimeError):
            self.clientLock.release()

    def setCurrentThread(self, threadId):
        """
        Public method to set the current thread.

        @param threadId the id the current thread should be set to.
        @type int
        """
        try:
            self.lockClient()
            if threadId is None:
                self.currentThread = None
            else:
                self.currentThread = self.threads.get(threadId)
        finally:
            self.unlockClient()

    def dumpThreadList(self):
        """
        Public method to send the list of threads.
        """
        self.updateThreadList()

        threadList = []
        currentId = _thread.get_ident()
        # update thread names set by user (threading.setName)
        threadNames = {t.ident: t.name for t in threading.enumerate()}

        for threadId, thd in self.threads.items():
            d = {"id": threadId}
            try:
                d["name"] = threadNames.get(threadId, thd.name)
                d["broken"] = thd.isBroken
                d["except"] = thd.isException
            except Exception:
                d["name"] = "UnknownThread"
                d["broken"] = False
                d["except"] = False

            threadList.append(d)

        self.sendJsonCommand(
            "ResponseThreadList",
            {
                "currentID": currentId,
                "threadList": threadList,
            },
        )

    def getExecutedFrame(self, frame):
        """
        Public method to return the currently executed frame.

        @param frame the current frame
        @type frame object
        @return the frame which is excecuted (without debugger frames)
        @rtype frame object
        """
        # to get the currently executed frame, skip all frames belonging to the
        # debugger
        while frame is not None:
            baseName = os.path.basename(frame.f_code.co_filename)
            if not baseName.startswith(
                (
                    "DebugClientBase.py",
                    "DebugBase.py",
                    "AsyncFile.py",
                    "ThreadExtension.py",
                )
            ):
                break
            frame = frame.f_back

        return frame

    def updateThreadList(self):
        """
        Public method to update the list of running threads.
        """
        frames = sys._current_frames()
        for threadId, frame in frames.items():
            # skip our own timer thread
            if frame.f_code.co_name == "__eventPollTimer":
                continue

            # Unknown thread
            if threadId not in self.threads:
                newThread = DebugBase(self)
                name = "Thread-{0}".format(self.threadNumber)
                self.threadNumber += 1

                newThread.id = threadId
                newThread.name = name
                self.threads[threadId] = newThread

            # adjust current frame
            if "__pypy__" not in sys.builtin_module_names:
                # Don't update with None
                currentFrame = self.getExecutedFrame(frame)
                if (
                    currentFrame is not None
                    and self.threads[threadId].isBroken is False
                ):
                    self.threads[threadId].currentFrame = currentFrame

        # Clean up obsolet because terminated threads
        self.threads = {
            id_: thrd for id_, thrd in self.threads.items() if id_ in frames
        }

    #######################################################################
    ## Methods below deal with patching various modules to support
    ## debugging of threads.
    #######################################################################

    def patchPyThread(self, module):
        """
        Public method to patch Python _thread module.

        @param module reference to the imported module to be patched
        @type module
        """
        # make thread hooks available to system
        self._original_start_new_thread = module.start_new_thread
        module.start_new_thread = self.attachThread

    def patchGreenlet(self, module):
        """
        Public method to patch the 'greenlet' module.

        @param module reference to the imported module to be patched
        @type module
        @return flag indicating that the module was processed
        @rtype bool
        """
        # Check for greenlet.settrace
        if hasattr(module, "settrace"):
            DebugBase.pollTimerEnabled = False
            return True
        return False

    def patchPyThreading(self, module):
        """
        Public method to patch the Python threading module.

        @param module reference to the imported module to be patched
        @type module
        """
        # _debugClient as a class attribute can't be accessed in following
        # class. Therefore we need a global variable.
        _debugClient = self

        def _bootstrap(self, run):
            """
            Bootstrap for threading, which reports exceptions correctly.

            @param run the run method of threading.Thread
            @type method pointer
            """
            newThread = DebugBase(_debugClient)
            newThread.name = self.name

            _debugClient.threads[self.ident] = newThread
            _debugClient.dumpThreadList()

            # see DebugBase.bootstrap
            sys.settrace(newThread.trace_dispatch)
            try:
                run()
            except Exception:
                excinfo = sys.exc_info()
                newThread.user_exception(excinfo, True)
            finally:
                sys.settrace(None)
                _debugClient.dumpThreadList()

        class ThreadWrapper(module.Thread):
            """
            Wrapper class for threading.Thread.
            """

            def __init__(self, *args, **kwargs):
                """
                Constructor
                """
                # Overwrite the provided run method with our own, to
                # intercept the thread creation by threading.Thread
                self.run = lambda s=self, run=self.run: _bootstrap(s, run)

                super().__init__(*args, **kwargs)

        module.Thread = ThreadWrapper

        # Special handling of threading.(_)Timer
        timer = module.Timer

        class TimerWrapper(timer, ThreadWrapper):
            """
            Wrapper class for threading.(_)Timer.
            """

            def __init__(self, interval, function, *args, **kwargs):
                """
                Constructor
                """
                super().__init__(interval, function, *args, **kwargs)

        module.Timer = TimerWrapper

        # Special handling of threading._DummyThread
        class DummyThreadWrapper(module._DummyThread, ThreadWrapper):
            """
            Wrapper class for threading._DummyThread.
            """

            def __init__(self, *args, **kwargs):
                """
                Constructor
                """
                super().__init__(*args, **kwargs)

        module._DummyThread = DummyThreadWrapper

    def patchQThread(self, module):
        """
        Public method to patch the QtCore module's QThread.

        @param module reference to the imported module to be patched
        @type module
        """
        # _debugClient as a class attribute can't be accessed in following
        # class. Therefore we need a global variable.
        _debugClient = self

        def _bootstrapQThread(self, run):
            """
            Bootstrap for QThread, which reports exceptions correctly.

            @param run the run method of *.QThread
            @type method pointer
            """
            global _qtThreadNumber

            newThread = DebugBase(_debugClient)
            ident = _thread.get_ident()
            name = "QtThread-{0}".format(_qtThreadNumber)

            _qtThreadNumber += 1

            newThread.id = ident
            newThread.name = name

            _debugClient.threads[ident] = newThread
            _debugClient.dumpThreadList()

            # see DebugBase.bootstrap
            sys.settrace(newThread.trace_dispatch)
            try:
                run()
            except SystemExit:
                # *.QThreads doesn't like SystemExit
                pass
            except Exception:
                excinfo = sys.exc_info()
                newThread.user_exception(excinfo, True)
            finally:
                sys.settrace(None)
                _debugClient.dumpThreadList()

        class QThreadWrapper(module.QThread):
            """
            Wrapper class for *.QThread.
            """

            def __init__(self, *args, **kwargs):
                """
                Constructor
                """
                # Overwrite the provided run method with our own, to
                # intercept the thread creation by Qt
                self.run = lambda s=self, run=self.run: (_bootstrapQThread(s, run))

                super().__init__(*args, **kwargs)

        class QRunnableWrapper(module.QRunnable):
            """
            Wrapper class for *.QRunnable.
            """

            def __init__(self, *args, **kwargs):
                """
                Constructor
                """
                # Overwrite the provided run method with our own, to
                # intercept the thread creation by Qt
                self.run = lambda s=self, run=self.run: (_bootstrapQThread(s, run))

                super().__init__(*args, **kwargs)

        module.QThread = QThreadWrapper
        module.QRunnable = QRunnableWrapper

eric ide

mercurial