diff -r e9e7eca7efee -r bf71ee032bb4 src/eric7/DebugClients/Python/DebugClientBase.py --- a/src/eric7/DebugClients/Python/DebugClientBase.py Wed Jul 13 11:16:20 2022 +0200 +++ b/src/eric7/DebugClients/Python/DebugClientBase.py Wed Jul 13 14:55:47 2022 +0200 @@ -24,7 +24,7 @@ import DebugClientCapabilities import DebugVariables -from DebugBase import setRecursionLimit, printerr # __IGNORE_WARNING__ +from DebugBase import setRecursionLimit, printerr # __IGNORE_WARNING__ from AsyncFile import AsyncFile, AsyncPendingWrite from DebugConfig import SpecialAttributes, NonExpandableTypes from FlexCompleter import Completer @@ -42,9 +42,9 @@ def DebugClientInput(prompt="", echo=True): """ Replacement for the standard input() builtin. - + This function works with the split debugger. - + @param prompt prompt to be shown @type str @param echo flag indicating echoing of the input @@ -57,14 +57,16 @@ else: return DebugClientInstance.input(prompt, echo=echo) + # Use our own input(). try: - DebugClientOrigInput = __builtins__.__dict__['input'] - __builtins__.__dict__['input'] = DebugClientInput + DebugClientOrigInput = __builtins__.__dict__["input"] + __builtins__.__dict__["input"] = DebugClientInput except (AttributeError, KeyError): import __main__ - DebugClientOrigInput = __main__.__builtins__.__dict__['input'] - __main__.__builtins__.__dict__['input'] = DebugClientInput + + DebugClientOrigInput = __main__.__builtins__.__dict__["input"] + __main__.__builtins__.__dict__["input"] = DebugClientInput ############################################################################### @@ -72,7 +74,7 @@ def DebugClientClose(fd): """ Replacement for the standard os.close(fd). - + @param fd open file descriptor to be closed (integer) """ if DebugClientInstance is None: @@ -80,8 +82,9 @@ else: DebugClientInstance.close(fd) + # use our own close(). -if 'close' in dir(os): +if "close" in dir(os): DebugClientOrigClose = os.close os.close = DebugClientClose @@ -91,15 +94,16 @@ def DebugClientSetRecursionLimit(limit): """ Replacement for the standard sys.setrecursionlimit(limit). - + @param limit recursion limit (integer) """ rl = max(limit, 64) setRecursionLimit(rl) DebugClientOrigSetRecursionLimit(rl + 64) + # use our own setrecursionlimit(). -if 'setrecursionlimit' in dir(sys): +if "setrecursionlimit" in dir(sys): DebugClientOrigSetRecursionLimit = sys.setrecursionlimit sys.setrecursionlimit = DebugClientSetRecursionLimit DebugClientSetRecursionLimit(sys.getrecursionlimit()) @@ -113,7 +117,7 @@ It provides access to the Python interpeter from a debugger running in another process. - + The protocol between the debugger and the client is based on JSONRPC 2.0 PDUs. Each one is sent on a single line, i.e. commands or responses are separated by a linefeed character. @@ -121,50 +125,51 @@ If the debugger closes the session there is no response from the client. The client may close the session at any time as a result of the script being debugged closing or crashing. - + <b>Note</b>: This class is meant to be subclassed by individual DebugClient classes. Do not instantiate it directly. """ + clientCapabilities = DebugClientCapabilities.HasAll - + Type2Indicators = { # Python types - 'list': '[]', - 'tuple': '()', - 'dict': '{:}', # __IGNORE_WARNING_M613__ - 'set': '{}', # __IGNORE_WARNING_M613__ - 'frozenset': '{}', # __IGNORE_WARNING_M613__ - 'numpy.ndarray': '[ndarray]', # __IGNORE_WARNING_M613__ - 'collections.abc.ItemsView': '[]', - 'collections.abc.KeysView': '[]', - 'collections.abc.ValuesView': '[]', + "list": "[]", + "tuple": "()", + "dict": "{:}", # __IGNORE_WARNING_M613__ + "set": "{}", # __IGNORE_WARNING_M613__ + "frozenset": "{}", # __IGNORE_WARNING_M613__ + "numpy.ndarray": "[ndarray]", # __IGNORE_WARNING_M613__ + "collections.abc.ItemsView": "[]", + "collections.abc.KeysView": "[]", + "collections.abc.ValuesView": "[]", } - + def __init__(self): """ Constructor """ self.breakpoints = {} self.redirect = True - + # special objects representing the main scripts thread and frame self.mainThread = self self.framenr = 0 - + # The context to run the debugged program in. - self.debugMod = types.ModuleType('__main__') - self.debugMod.__dict__['__builtins__'] = __builtins__ + self.debugMod = types.ModuleType("__main__") + self.debugMod.__dict__["__builtins__"] = __builtins__ # The list of complete lines to execute. - self.buffer = '' - + self.buffer = "" + # The precompiled regexp to filter variables against self.globalsFilterObjects = None self.localsFilterObjects = None self._fncache = {} self.dircache = [] - self.passive = False # used to indicate the passive mode + self.passive = False # used to indicate the passive mode self.running = None self.test = None self.debugging = False @@ -175,58 +180,58 @@ self.writestream = None self.errorstream = None self.pollingDisabled = False - + self.__debuggerId = "" - + self.callTraceEnabled = None - + self.compile_command = codeop.CommandCompiler() - + self.coding_re = re.compile(r"coding[:=]\s*([-\w_.]+)") - self.defaultCoding = 'utf-8' + self.defaultCoding = "utf-8" self.__coding = self.defaultCoding self.noencoding = False - + self.startOptions = None - + def getCoding(self): """ Public method to return the current coding. - + @return codec name (string) """ return self.__coding - + def __setCoding(self, filename): """ Private method to set the coding used by a python file. - + @param filename name of the file to inspect (string) """ if self.noencoding: self.__coding = sys.getdefaultencoding() else: - default = 'utf-8' + default = "utf-8" try: - with open(filename, 'rb') as f: + with open(filename, "rb") as f: # read the first and second line text = f.readline() text = "{0}{1}".format(text, f.readline()) except OSError: self.__coding = default return - + for line in text.splitlines(): m = self.coding_re.search(line) if m: self.__coding = m.group(1) return self.__coding = default - + def input(self, prompt, echo=True): """ Public method to implement input() using the event loop. - + @param prompt prompt to be shown @type str @param echo flag indicating echoing of the input @@ -234,10 +239,13 @@ @return the entered string @rtype str """ - self.sendJsonCommand("RequestRaw", { - "prompt": prompt, - "echo": echo, - }) + self.sendJsonCommand( + "RequestRaw", + { + "prompt": prompt, + "echo": echo, + }, + ) self.eventLoop(True) return self.rawLine @@ -245,7 +253,7 @@ """ Public method to close the session with the debugger and optionally terminate. - + @param terminate flag indicating to terminate (boolean) """ with contextlib.suppress(Exception): @@ -254,7 +262,7 @@ self.debugging = False self.multiprocessSupport = False self.noDebugList = [] - + # make sure we close down our end of the socket # might be overkill as normally stdin, stdout and stderr # SHOULD be closed on exit, but it does not hurt to do it here @@ -266,10 +274,10 @@ # Ok, go away. sys.exit() - def __compileFileSource(self, filename, mode='exec'): + def __compileFileSource(self, filename, mode="exec"): """ Private method to compile source code read from a file. - + @param filename name of the source file @type str @param mode kind of code to be generated (exec or eval) @@ -278,13 +286,13 @@ """ with codecs.open(filename, encoding=self.__coding) as fp: statement = fp.read() - + return self.__compileCommand(statement, filename=filename, mode=mode) - + def __compileCommand(self, statement, filename="<string>", mode="exec"): """ Private method to compile source code. - + @param statement source code string to be compiled @type str @param filename name of the source file @@ -294,7 +302,7 @@ @return compiled code object (None in case of errors) """ try: - code = compile(statement + '\n', filename, mode) + code = compile(statement + "\n", filename, mode) except SyntaxError: exctype, excval, exctb = sys.exc_info() try: @@ -304,53 +312,57 @@ charno = excval.offset if charno is None: charno = 0 - + except (AttributeError, ValueError): message = "" filename = "" lineno = 0 charno = 0 - + self.sendSyntaxError(message, filename, lineno, charno, self.name) return None - + return code - + def handleJsonCommand(self, jsonStr): """ Public method to handle a command serialized as a JSON string. - + @param jsonStr string containing the command received from the IDE @type str """ -## printerr(jsonStr) ## debug # __IGNORE_WARNING_M891__ - + ## printerr(jsonStr) ## debug # __IGNORE_WARNING_M891__ + try: commandDict = json.loads(jsonStr.strip()) except (TypeError, ValueError) as err: printerr("Error handling command: " + jsonStr) printerr(str(err)) return - + method = commandDict["method"] params = commandDict["params"] - + if method == "RequestVariables": self.__dumpVariables( - params["frameNumber"], params["scope"], params["filters"]) - + params["frameNumber"], params["scope"], params["filters"] + ) + elif method == "RequestVariable": self.__dumpVariable( - params["variable"], params["frameNumber"], - params["scope"], params["filters"]) - + params["variable"], + params["frameNumber"], + params["scope"], + params["filters"], + ) + elif method == "RequestStack": stack = self.mainThread.getStack() self.sendResponseLine(stack, self.mainThread.name) - + elif method == "RequestThreadList": self.dumpThreadList() - + elif method == "RequestThreadSet": if params["threadID"] == -1: # -1 is indication for the main thread @@ -364,49 +376,53 @@ self.setCurrentThread(threadId) self.sendJsonCommand("ResponseThreadSet", {}) stack = self.currentThread.getStack() - self.sendJsonCommand("ResponseStack", { - "stack": stack, - "threadName": self.currentThread.name, - }) - + self.sendJsonCommand( + "ResponseStack", + { + "stack": stack, + "threadName": self.currentThread.name, + }, + ) + elif method == "RequestDisassembly": if self.disassembly is not None: - self.sendJsonCommand("ResponseDisassembly", { - "disassembly": self.disassembly - }) + self.sendJsonCommand( + "ResponseDisassembly", {"disassembly": self.disassembly} + ) else: - self.sendJsonCommand("ResponseDisassembly", { - "disassembly": {} - }) - + self.sendJsonCommand("ResponseDisassembly", {"disassembly": {}}) + elif method == "RequestCapabilities": clientType = "Python3" - self.sendJsonCommand("ResponseCapabilities", { - "capabilities": self.__clientCapabilities(), - "clientType": clientType - }) - + self.sendJsonCommand( + "ResponseCapabilities", + {"capabilities": self.__clientCapabilities(), "clientType": clientType}, + ) + elif method == "RequestBanner": - self.sendJsonCommand("ResponseBanner", { - "version": "Python {0}".format(sys.version), - "platform": socket.gethostname(), - }) - + self.sendJsonCommand( + "ResponseBanner", + { + "version": "Python {0}".format(sys.version), + "platform": socket.gethostname(), + }, + ) + elif method == "RequestSetFilter": self.__generateFilterObjects(params["scope"], params["filter"]) - + elif method == "RequestCallTrace": if params["enable"]: callTraceEnabled = self.profile else: callTraceEnabled = None - + if self.debugging: sys.setprofile(callTraceEnabled) else: # remember for later self.callTraceEnabled = callTraceEnabled - + elif method == "RequestEnvironment": for key, value in params["environment"].items(): if key.endswith("+"): @@ -423,7 +439,7 @@ del os.environ[key] else: os.environ[key] = value - + elif method == "RequestLoad": self._fncache = {} self.dircache = [] @@ -433,38 +449,39 @@ sys.argv.append(params["filename"]) sys.argv.extend(params["argv"]) sys.path = self.__getSysPath(os.path.dirname(sys.argv[0])) - if params["workdir"] == '': + if params["workdir"] == "": os.chdir(sys.path[1]) else: os.chdir(params["workdir"]) - + self.running = sys.argv[0] self.debugging = True self.multiprocessSupport = params["multiprocess"] - + self.threads.clear() self.attachThread(mainThread=True) - + # set the system exception handling function to ensure, that # we report on all unhandled exceptions sys.excepthook = self.__unhandled_exception self.__interceptSignals() - + # clear all old breakpoints, they'll get set after we have # started Breakpoint.clear_all_breaks() Watch.clear_all_watches() - + self.mainThread.tracePythonLibs(params["traceInterpreter"]) - + # This will eventually enter a local event loop. - self.debugMod.__dict__['__file__'] = self.running - sys.modules['__main__'] = self.debugMod + self.debugMod.__dict__["__file__"] = self.running + sys.modules["__main__"] = self.debugMod code = self.__compileFileSource(self.running) if code: sys.setprofile(self.callTraceEnabled) - self.mainThread.run(code, self.debugMod.__dict__, debug=True, - closeSession=False) + self.mainThread.run( + code, self.debugMod.__dict__, debug=True, closeSession=False + ) elif method == "RequestRun": self.disassembly = None @@ -473,79 +490,43 @@ sys.argv.append(params["filename"]) sys.argv.extend(params["argv"]) sys.path = self.__getSysPath(os.path.dirname(sys.argv[0])) - if params["workdir"] == '': + if params["workdir"] == "": os.chdir(sys.path[1]) else: os.chdir(params["workdir"]) self.running = sys.argv[0] self.botframe = None - + self.threads.clear() self.attachThread(mainThread=True) - + # set the system exception handling function to ensure, that # we report on all unhandled exceptions sys.excepthook = self.__unhandled_exception self.__interceptSignals() - + self.mainThread.tracePythonLibs(False) - - self.debugMod.__dict__['__file__'] = sys.argv[0] - sys.modules['__main__'] = self.debugMod + + self.debugMod.__dict__["__file__"] = sys.argv[0] + sys.modules["__main__"] = self.debugMod res = 0 code = self.__compileFileSource(self.running) if code: - self.mainThread.run(code, self.debugMod.__dict__, debug=False, - closeSession=False) + self.mainThread.run( + code, self.debugMod.__dict__, debug=False, closeSession=False + ) elif method == "RequestCoverage": from coverage import Coverage + self.disassembly = None sys.argv = [] self.__setCoding(params["filename"]) sys.argv.append(params["filename"]) sys.argv.extend(params["argv"]) sys.path = self.__getSysPath(os.path.dirname(sys.argv[0])) - if params["workdir"] == '': - os.chdir(sys.path[1]) - else: - os.chdir(params["workdir"]) - - # set the system exception handling function to ensure, that - # we report on all unhandled exceptions - sys.excepthook = self.__unhandled_exception - self.__interceptSignals() - - # generate a coverage object - self.cover = Coverage( - auto_data=True, - data_file="{0}.coverage".format( - os.path.splitext(sys.argv[0])[0])) - - if params["erase"]: - self.cover.erase() - sys.modules['__main__'] = self.debugMod - self.debugMod.__dict__['__file__'] = sys.argv[0] - code = self.__compileFileSource(sys.argv[0]) - if code: - self.running = sys.argv[0] - self.cover.start() - self.mainThread.run(code, self.debugMod.__dict__, debug=False, - closeSession=False) - self.cover.stop() - self.cover.save() - - elif method == "RequestProfile": - sys.setprofile(None) - import PyProfile - self.disassembly = None - sys.argv = [] - self.__setCoding(params["filename"]) - sys.argv.append(params["filename"]) - sys.argv.extend(params["argv"]) - sys.path = self.__getSysPath(os.path.dirname(sys.argv[0])) - if params["workdir"] == '': + if params["workdir"] == "": os.chdir(sys.path[1]) else: os.chdir(params["workdir"]) @@ -554,20 +535,60 @@ # we report on all unhandled exceptions sys.excepthook = self.__unhandled_exception self.__interceptSignals() - + + # generate a coverage object + self.cover = Coverage( + auto_data=True, + data_file="{0}.coverage".format(os.path.splitext(sys.argv[0])[0]), + ) + + if params["erase"]: + self.cover.erase() + sys.modules["__main__"] = self.debugMod + self.debugMod.__dict__["__file__"] = sys.argv[0] + code = self.__compileFileSource(sys.argv[0]) + if code: + self.running = sys.argv[0] + self.cover.start() + self.mainThread.run( + code, self.debugMod.__dict__, debug=False, closeSession=False + ) + self.cover.stop() + self.cover.save() + + elif method == "RequestProfile": + sys.setprofile(None) + import PyProfile + + self.disassembly = None + sys.argv = [] + self.__setCoding(params["filename"]) + sys.argv.append(params["filename"]) + sys.argv.extend(params["argv"]) + sys.path = self.__getSysPath(os.path.dirname(sys.argv[0])) + if params["workdir"] == "": + os.chdir(sys.path[1]) + else: + os.chdir(params["workdir"]) + + # set the system exception handling function to ensure, that + # we report on all unhandled exceptions + sys.excepthook = self.__unhandled_exception + self.__interceptSignals() + # generate a profile object self.prof = PyProfile.PyProfile(sys.argv[0]) - + if params["erase"]: self.prof.erase() - self.debugMod.__dict__['__file__'] = sys.argv[0] - sys.modules['__main__'] = self.debugMod - script = '' + self.debugMod.__dict__["__file__"] = sys.argv[0] + sys.modules["__main__"] = self.debugMod + script = "" with codecs.open(sys.argv[0], encoding=self.__coding) as fp: script = fp.read() - if script and not script.endswith('\n'): - script += '\n' - + if script and not script.endswith("\n"): + script += "\n" + if script: self.running = sys.argv[0] res = 0 @@ -580,13 +601,13 @@ except Exception: excinfo = sys.exc_info() self.__unhandled_exception(*excinfo) - + self.prof.save() self.progTerminated(res, closeSession=False) - + elif method == "ExecuteStatement": if self.buffer: - self.buffer = self.buffer + '\n' + params["statement"] + self.buffer = self.buffer + "\n" + params["statement"] else: self.buffer = params["statement"] @@ -594,23 +615,28 @@ code = self.compile_command(self.buffer, self.readstream.name) except (OverflowError, SyntaxError, ValueError): # Report the exception - sys.last_type, sys.last_value, sys.last_traceback = ( - sys.exc_info()) - self.sendJsonCommand("ClientOutput", { - "text": "".join(traceback.format_exception_only( - sys.last_type, sys.last_value)) - }) - self.buffer = '' + sys.last_type, sys.last_value, sys.last_traceback = sys.exc_info() + self.sendJsonCommand( + "ClientOutput", + { + "text": "".join( + traceback.format_exception_only( + sys.last_type, sys.last_value + ) + ) + }, + ) + self.buffer = "" else: if code is None: self.sendJsonCommand("ResponseContinue", {}) return else: - self.buffer = '' + self.buffer = "" try: if self.running is None: - exec(code, self.debugMod.__dict__) # secok + exec(code, self.debugMod.__dict__) # secok else: if self.currentThread is None: # program has terminated @@ -630,31 +656,29 @@ cf = cf.f_back frmnr -= 1 _globals = cf.f_globals - _locals = ( - self.currentThread.getFrameLocals( - self.framenr)) + _locals = self.currentThread.getFrameLocals( + self.framenr + ) # transfer all locals into a new globals # to emulate Python scoping rules _updatedGlobals = {} _updatedGlobals.update(_globals) _updatedGlobals.update(_locals) - #- reset sys.stdout to our redirector - #- (unconditionally) + # - reset sys.stdout to our redirector + # - (unconditionally) if "sys" in _globals: __stdout = _updatedGlobals["sys"].stdout - _updatedGlobals["sys"].stdout = ( - self.writestream - ) - exec(code, _updatedGlobals, _locals) # secok + _updatedGlobals["sys"].stdout = self.writestream + exec(code, _updatedGlobals, _locals) # secok _updatedGlobals["sys"].stdout = __stdout elif "sys" in _locals: __stdout = _locals["sys"].stdout _locals["sys"].stdout = self.writestream - exec(code, _updatedGlobals, _locals) # secok + exec(code, _updatedGlobals, _locals) # secok _locals["sys"].stdout = __stdout else: - exec(code, _updatedGlobals, _locals) # secok - + exec(code, _updatedGlobals, _locals) # secok + self.currentThread.storeFrameLocals(self.framenr) except SystemExit as exc: self.progTerminated(exc.code) @@ -670,19 +694,17 @@ del tblist[:1] tlist = traceback.format_list(tblist) if tlist: - tlist.insert( - 0, "Traceback (innermost last):\n") - tlist.extend(traceback.format_exception_only( - exc_type, exc_value)) + tlist.insert(0, "Traceback (innermost last):\n") + tlist.extend( + traceback.format_exception_only(exc_type, exc_value) + ) finally: tblist = exc_tb = None - self.sendJsonCommand("ClientOutput", { - "text": "".join(tlist) - }) - + self.sendJsonCommand("ClientOutput", {"text": "".join(tlist)}) + self.sendJsonCommand("ResponseOK", {}) - + elif method == "RequestStep": self.currentThreadExec.step(True) self.eventExit = True @@ -690,59 +712,62 @@ elif method == "RequestStepOver": self.currentThreadExec.step(False) self.eventExit = True - + elif method == "RequestStepOut": self.currentThreadExec.stepOut() self.eventExit = True - + elif method == "RequestStepQuit": if self.passive: self.progTerminated(42) else: self.set_quit() self.eventExit = True - + elif method == "RequestMoveIP": newLine = params["newLine"] self.currentThreadExec.move_instruction_pointer(newLine) - + elif method == "RequestContinue": self.currentThreadExec.go(params["special"]) self.eventExit = True - + elif method == "RequestContinueUntil": newLine = params["newLine"] self.currentThreadExec.set_until(lineno=newLine) self.eventExit = True - + elif method == "RawInput": # If we are handling raw mode input then break out of the current # event loop. self.rawLine = params["input"] self.eventExit = True - + elif method == "RequestBreakpoint": if params["setBreakpoint"]: - if params["condition"] in ['None', '']: + if params["condition"] in ["None", ""]: cond = None elif params["condition"] is not None: try: - cond = compile(params["condition"], '<string>', 'eval') + cond = compile(params["condition"], "<string>", "eval") except SyntaxError: - self.sendJsonCommand("ResponseBPConditionError", { - "filename": params["filename"], - "line": params["line"], - }) + self.sendJsonCommand( + "ResponseBPConditionError", + { + "filename": params["filename"], + "line": params["line"], + }, + ) return else: cond = None - + Breakpoint( - params["filename"], params["line"], params["temporary"], - cond) + params["filename"], params["line"], params["temporary"], cond + ) else: Breakpoint.clear_break(params["filename"], params["line"]) - + elif method == "RequestBreakpointEnable": bp = Breakpoint.get_break(params["filename"], params["line"]) if bp is not None: @@ -750,34 +775,34 @@ bp.enable() else: bp.disable() - + elif method == "RequestBreakpointIgnore": bp = Breakpoint.get_break(params["filename"], params["line"]) if bp is not None: bp.ignore = params["count"] - + elif method == "RequestWatch": if params["setWatch"]: - if params["condition"].endswith( - ('??created??', '??changed??')): + if params["condition"].endswith(("??created??", "??changed??")): compiledCond, flag = params["condition"].split() else: compiledCond = params["condition"] - flag = '' - + flag = "" + try: - compiledCond = compile(compiledCond, '<string>', 'eval') + compiledCond = compile(compiledCond, "<string>", "eval") except SyntaxError: - self.sendJsonCommand("ResponseWatchConditionError", { - "condition": params["condition"], - }) + self.sendJsonCommand( + "ResponseWatchConditionError", + { + "condition": params["condition"], + }, + ) return - Watch( - params["condition"], compiledCond, flag, - params["temporary"]) + Watch(params["condition"], compiledCond, flag, params["temporary"]) else: Watch.clear_watch(params["condition"]) - + elif method == "RequestWatchEnable": wp = Watch.get_watch(params["condition"]) if wp is not None: @@ -785,35 +810,35 @@ wp.enable() else: wp.disable() - + elif method == "RequestWatchIgnore": wp = Watch.get_watch(params["condition"]) if wp is not None: wp.ignore = params["count"] - + elif method == "RequestShutdown": self.sessionClose() - + elif method == "RequestSetNoDebugList": self.noDebugList = params["noDebug"][:] - + elif method == "RequestCompletion": self.__completionList(params["text"]) - + def setDisassembly(self, disassembly): """ Public method to store a disassembly of the code object raising an exception. - + @param disassembly dictionary containing the disassembly information @type dict """ self.disassembly = disassembly - + def sendJsonCommand(self, method, params): """ Public method to send a single command or response to the IDE. - + @param method command or response command name to be sent @type str @param params dictionary of named parameters for the command or @@ -823,55 +848,60 @@ # send debugger ID with all responses if "debuggerId" not in params: params["debuggerId"] = self.__debuggerId - + cmd = prepareJsonCommand(method, params) - + self.writestream.write_p(cmd) self.writestream.flush() - + def sendClearTemporaryBreakpoint(self, filename, lineno): """ Public method to signal the deletion of a temporary breakpoint. - + @param filename name of the file the bp belongs to @type str @param lineno linenumber of the bp @type int """ - self.sendJsonCommand("ResponseClearBreakpoint", { - "filename": filename, - "line": lineno - }) - + self.sendJsonCommand( + "ResponseClearBreakpoint", {"filename": filename, "line": lineno} + ) + def sendClearTemporaryWatch(self, condition): """ Public method to signal the deletion of a temporary watch expression. - + @param condition condition of the watch expression to be cleared @type str """ - self.sendJsonCommand("ResponseClearWatch", { - "condition": condition, - }) - + self.sendJsonCommand( + "ResponseClearWatch", + { + "condition": condition, + }, + ) + def sendResponseLine(self, stack, threadName): """ Public method to send the current call stack. - + @param stack call stack @type list @param threadName name of the thread sending the event @type str """ - self.sendJsonCommand("ResponseLine", { - "stack": stack, - "threadName": threadName, - }) - + self.sendJsonCommand( + "ResponseLine", + { + "stack": stack, + "threadName": threadName, + }, + ) + def sendCallTrace(self, event, fromInfo, toInfo): """ Public method to send a call trace entry. - + @param event trace event (call or return) @type str @param fromInfo dictionary containing the origin info @@ -881,17 +911,19 @@ @type dict with 'filename', 'linenumber' and 'codename' as keys """ - self.sendJsonCommand("CallTrace", { - "event": event[0], - "from": fromInfo, - "to": toInfo, - }) - - def sendException(self, exceptionType, exceptionMessage, stack, - threadName): + self.sendJsonCommand( + "CallTrace", + { + "event": event[0], + "from": fromInfo, + "to": toInfo, + }, + ) + + def sendException(self, exceptionType, exceptionMessage, stack, threadName): """ Public method to send information for an exception. - + @param exceptionType type of exception raised @type str @param exceptionMessage message of the exception @@ -901,17 +933,20 @@ @param threadName name of the thread sending the event @type str """ - self.sendJsonCommand("ResponseException", { - "type": exceptionType, - "message": exceptionMessage, - "stack": stack, - "threadName": threadName, - }) - + self.sendJsonCommand( + "ResponseException", + { + "type": exceptionType, + "message": exceptionMessage, + "stack": stack, + "threadName": threadName, + }, + ) + def sendSyntaxError(self, message, filename, lineno, charno, threadName): """ Public method to send information for a syntax error. - + @param message syntax error message @type str @param filename name of the faulty file @@ -923,64 +958,70 @@ @param threadName name of the thread sending the event @type str """ - self.sendJsonCommand("ResponseSyntax", { - "message": message, - "filename": filename, - "linenumber": lineno, - "characternumber": charno, - "threadName": threadName, - }) - + self.sendJsonCommand( + "ResponseSyntax", + { + "message": message, + "filename": filename, + "linenumber": lineno, + "characternumber": charno, + "threadName": threadName, + }, + ) + def sendPassiveStartup(self, filename, exceptions): """ Public method to send the passive start information. - + @param filename name of the script @type str @param exceptions flag to enable exception reporting of the IDE @type bool """ - self.sendJsonCommand("PassiveStartup", { - "filename": filename, - "exceptions": exceptions, - }) - + self.sendJsonCommand( + "PassiveStartup", + { + "filename": filename, + "exceptions": exceptions, + }, + ) + def sendDebuggerId(self, debuggerId): """ Public method to send the debug client id. - + @param debuggerId id of this debug client instance (made up of hostname and process ID) @type str """ # debugger ID is added automatically by sendJsonCommand self.sendJsonCommand("DebuggerId", {}) - + def __clientCapabilities(self): """ Private method to determine the clients capabilities. - + @return client capabilities (integer) """ try: - import PyProfile # __IGNORE_WARNING__ + import PyProfile # __IGNORE_WARNING__ + with contextlib.suppress(KeyError): - del sys.modules['PyProfile'] + del sys.modules["PyProfile"] return self.clientCapabilities except ImportError: - return ( - self.clientCapabilities & ~DebugClientCapabilities.HasProfiler) - + return self.clientCapabilities & ~DebugClientCapabilities.HasProfiler + def readReady(self, stream): """ Public method called when there is data ready to be read. - + @param stream file like object that has data to be read @return flag indicating an error condition @rtype bool """ error = False - + self.lockClient() try: command = stream.readCommand() @@ -993,18 +1034,18 @@ self.sessionClose() else: self.handleJsonCommand(command) - + return error def writeReady(self, stream): """ Public method called when we are ready to write data. - + @param stream file like object that has data to be written """ stream.write_p("") stream.flush() - + def __interact(self): """ Private method to interact with the debugger. @@ -1021,7 +1062,7 @@ def eventLoop(self, disablePolling=False): """ Public method implementing our event loop. - + @param disablePolling flag indicating to enter an event loop with polling disabled (boolean) """ @@ -1034,18 +1075,18 @@ if self.writestream.nWriteErrors > self.writestream.MAX_TRIES: break - + if AsyncPendingWrite(self.writestream): wrdy.append(self.writestream) if AsyncPendingWrite(self.errorstream): wrdy.append(self.errorstream) - + try: rrdy, wrdy, xrdy = select.select([self.readstream], wrdy, []) except (KeyboardInterrupt, OSError): selectErrors += 1 - if selectErrors <= 10: # arbitrarily selected + if selectErrors <= 10: # arbitrarily selected # just carry on continue else: @@ -1054,10 +1095,10 @@ except ValueError: # the client socket might already be closed, i.e. its fd is -1 break - + # reset the select error counter selectErrors = 0 - + if self.readstream in rrdy: error = self.readReady(self.readstream) if error: @@ -1078,14 +1119,14 @@ """ if self.pollingDisabled: return - + wrdy = [] if AsyncPendingWrite(self.writestream): wrdy.append(self.writestream) if AsyncPendingWrite(self.errorstream): wrdy.append(self.errorstream) - + # immediate return if nothing is ready. try: rrdy, wrdy, xrdy = select.select([self.readstream], wrdy, [], 0) @@ -1100,16 +1141,15 @@ if self.errorstream in wrdy: self.writeReady(self.errorstream) - - def connectDebugger(self, port, remoteAddress=None, redirect=True, - name=""): + + def connectDebugger(self, port, remoteAddress=None, redirect=True, name=""): """ Public method to establish a session with the debugger. - + It opens a network connection to the debugger, connects it to stdin, stdout and stderr and saves these file objects in case the application being debugged redirects them itself. - + @param port the port number to connect to @type int @param remoteAddress the network address of the debug server host @@ -1125,59 +1165,59 @@ elif "@@i" in remoteAddress: remoteAddress = remoteAddress.split("@@i")[0] sock = socket.create_connection((remoteAddress, port)) - + stdinName = sys.stdin.name # Special case if in a multiprocessing.Process if isinstance(stdinName, int): - stdinName = '<stdin>' - + stdinName = "<stdin>" + self.readstream = AsyncFile(sock, sys.stdin.mode, stdinName) self.writestream = AsyncFile(sock, sys.stdout.mode, sys.stdout.name) self.errorstream = AsyncFile(sock, sys.stderr.mode, sys.stderr.name) - + if redirect: sys.stdin = self.readstream sys.stdout = self.writestream sys.stderr = self.errorstream self.redirect = redirect - + # attach to the main thread here self.attachThread(mainThread=True) - + if not name: name = "main" self.__debuggerId = "{0}/{1}/{2}".format( socket.gethostname(), os.getpid(), name ) - + self.sendDebuggerId(self.__debuggerId) def __unhandled_exception(self, exctype, excval, exctb): """ Private method called to report an uncaught exception. - + @param exctype the type of the exception @param excval data about the exception @param exctb traceback for the exception """ self.mainThread.user_exception((exctype, excval, exctb), True) - + def __interceptSignals(self): """ Private method to intercept common signals. """ for signum in [ - signal.SIGABRT, # abnormal termination - signal.SIGFPE, # floating point exception - signal.SIGILL, # illegal instruction - signal.SIGSEGV, # segmentation violation + signal.SIGABRT, # abnormal termination + signal.SIGFPE, # floating point exception + signal.SIGILL, # illegal instruction + signal.SIGSEGV, # segmentation violation ]: signal.signal(signum, self.__signalHandler) - + def __signalHandler(self, signalNumber, stackFrame): """ Private method to handle signals. - + @param signalNumber number of the signal to be handled @type int @param stackFrame current stack frame @@ -1193,41 +1233,44 @@ message = "Segmentation Violation" else: message = "Unknown Signal '{0}'".format(signalNumber) - + filename = self.absPath(stackFrame) - + linenr = stackFrame.f_lineno ffunc = stackFrame.f_code.co_name - - if ffunc == '?': - ffunc = '' - + + if ffunc == "?": + ffunc = "" + if ffunc and not ffunc.startswith("<"): argInfo = getargvalues(stackFrame) try: fargs = formatargvalues( - argInfo.args, argInfo.varargs, - argInfo.keywords, argInfo.locals) + argInfo.args, argInfo.varargs, argInfo.keywords, argInfo.locals + ) except Exception: fargs = "" else: fargs = "" - - self.sendJsonCommand("ResponseSignal", { - "message": message, - "filename": filename, - "linenumber": linenr, - "function": ffunc, - "arguments": fargs, - }) - + + self.sendJsonCommand( + "ResponseSignal", + { + "message": message, + "filename": filename, + "linenumber": linenr, + "function": ffunc, + "arguments": fargs, + }, + ) + def absPath(self, fn): """ Public method to convert a filename to an absolute name. sys.path is used as a set of possible prefixes. The name stays relative if a file could not be found. - + @param fn filename (string) @return the converted filename (string) """ @@ -1254,18 +1297,18 @@ for p in self.dircache: afn = os.path.abspath(os.path.join(p, fn)) nafn = os.path.normcase(afn) - + if os.path.exists(nafn): self._fncache[fn] = afn return afn - + # Nothing found. return fn def getRunning(self): """ Public method to return the main script we are currently running. - + @return flag indicating a running debug session (boolean) """ return self.running @@ -1273,7 +1316,7 @@ def progTerminated(self, status, message="", closeSession=True): """ Public method to tell the debugger that the program has terminated. - + @param status return status @type int @param message status message @@ -1288,27 +1331,30 @@ status = 1 if message is None: message = "" - + if self.running: self.set_quit() program = self.running self.running = None - self.sendJsonCommand("ResponseExit", { - "status": status, - "message": message, - "program": program, - }) - + self.sendJsonCommand( + "ResponseExit", + { + "status": status, + "message": message, + "program": program, + }, + ) + # reset coding self.__coding = self.defaultCoding - + if closeSession: self.sessionClose(False) def __dumpVariables(self, frmnr, scope, filterList): """ Private method to return the variables of a frame to the debug server. - + @param frmnr distance of frame reported on. 0 is the current frame @type int @param scope 1 to report global variables, 0 for local variables @@ -1318,18 +1364,18 @@ """ if self.currentThread is None: return - + self.resolverCache = [{}, {}] frmnr += self.currentThread.skipFrames if scope == 0: self.framenr = frmnr - + f = self.currentThread.getCurrentFrame() - + while f is not None and frmnr > 0: f = f.f_back frmnr -= 1 - + if f is None: if scope: varDict = self.debugMod.__dict__ @@ -1341,22 +1387,28 @@ scope = -1 else: varDict = f.f_locals - + # Update known types list DebugVariables.updateTypeMap() - - varlist = [] if scope < 0 else self.__formatVariablesList( - varDict.items(), scope, filterList) - - self.sendJsonCommand("ResponseVariables", { - "scope": scope, - "variables": varlist, - }) - + + varlist = ( + [] + if scope < 0 + else self.__formatVariablesList(varDict.items(), scope, filterList) + ) + + self.sendJsonCommand( + "ResponseVariables", + { + "scope": scope, + "variables": varlist, + }, + ) + def __dumpVariable(self, var, frmnr, scope, filterList): """ Private method to return the variables of a frame to the debug server. - + @param var list encoded name of the requested variable @type list of str @param frmnr distance of frame reported on. 0 is the current frame @@ -1368,14 +1420,14 @@ """ if self.currentThread is None: return - + frmnr += self.currentThread.skipFrames f = self.currentThread.getCurrentFrame() - + while f is not None and frmnr > 0: f = f.f_back frmnr -= 1 - + if f is None: if scope: varDict = self.debugMod.__dict__ @@ -1387,17 +1439,16 @@ scope = -1 else: varDict = f.f_locals - + varlist = [] - + # fast path if variable was looked up before (see elif) if scope != -1 and str(var) in self.resolverCache[scope]: varGen = self.resolverCache[scope][str(var)] idx, varDict = next(varGen) if idx != -2: # more elements available var.insert(0, idx) - varlist = self.__formatVariablesList( - varDict, scope, filterList) + varlist = self.__formatVariablesList(varDict, scope, filterList) elif scope != -1: variable = varDict # Lookup the wanted attribute @@ -1407,10 +1458,10 @@ variable = resolver.resolve(variable, attribute) if variable is None: break - + else: break - + idx = -3 # Requested variable doesn't exist anymore # If found, get the details of attribute if variable is not None: @@ -1419,30 +1470,32 @@ varGen = resolver.getVariableList(variable) # cache for next lookup self.resolverCache[scope][str(var)] = varGen - + idx, varDict = next(varGen) if idx != -2: # more elements available - varlist = self.__formatVariablesList( - varDict, scope, filterList) - + varlist = self.__formatVariablesList(varDict, scope, filterList) + var.insert(0, idx) - - self.sendJsonCommand("ResponseVariable", { - "scope": scope, - "variable": var, - "variables": varlist, - }) - + + self.sendJsonCommand( + "ResponseVariable", + { + "scope": scope, + "variable": var, + "variables": varlist, + }, + ) + def __formatVariablesList(self, variables, scope, filterList=None): """ Private method to produce a formated variables list. - + The dictionary passed in to it is scanned. Variables are only added to the list, if their type is not contained in the filter list and their name doesn't match any of the filter expressions. The formated variables list (a list of tuples of 3 values) is returned. - + @param variables variables list to be processed @type list of tuple of (str, Any) or (str, str, Any) @param scope 1 to filter using the globals filter, 0 using the locals @@ -1460,14 +1513,12 @@ @rtype list of tuple of (str, str, str) """ filterList = set(filterList or []) - + varlist = [] patternFilterObjects = ( - self.globalsFilterObjects - if scope else - self.localsFilterObjects + self.globalsFilterObjects if scope else self.localsFilterObjects ) - + for variabel in variables: valtype = None rvalue = None @@ -1477,33 +1528,30 @@ # Special case for some Qt variables, where the real type is # overwritten key, valtype, rvalue = variabel - + # filter based on the filter pattern if patternFilterObjects and patternFilterObjects.match(str(key)): continue - + # filter hidden attributes (filter #0) - if '__' in filterList and str(key)[:2] == '__': + if "__" in filterList and str(key)[:2] == "__": continue - + hasChildren = False # special handling for '__builtins__' (it's way too big) - if key == '__builtins__': - rvalue = '<module builtins (built-in)>' - valtype = 'module' + if key == "__builtins__": + rvalue = "<module builtins (built-in)>" + valtype = "module" if valtype in filterList: continue - elif ( - (key in SpecialAttributes and - "special_attributes" in filterList) or - (key == "__hash__" and - "builtin_function_or_method" in filterList) + elif (key in SpecialAttributes and "special_attributes" in filterList) or ( + key == "__hash__" and "builtin_function_or_method" in filterList ): continue elif valtype is None: # valtypestr, e.g. class 'PyQt6.QtCore.QPoint' valtypestr = str(type(value)) - _, valtype = valtypestr.split(' ', 1) + _, valtype = valtypestr.split(" ", 1) # valtype is the real type, e.g. PyQt6.QtCore.QPoint # valtype_filter is used for filtering, where the base class is # also important @@ -1517,38 +1565,43 @@ valtype = "class" elif valtype == "method-wrapper": valtype = valtype_filter = "builtin_function_or_method" - + # Don't process variables which types are on filter list if ( - valtype_filter in filterList or - (valtype_filter in ("sip.enumtype", "sip.wrappertype") and - 'class' in filterList) or - (valtype_filter in ( - "sip.methoddescriptor", "method_descriptor") and - 'method' in filterList) or - (valtype_filter in ("numpy.ndarray", "array.array") and - 'list' in filterList) or - (valtype_filter == "django.MultiValueDict" and - 'dict' in filterList) or - 'instance' in filterList + valtype_filter in filterList + or ( + valtype_filter in ("sip.enumtype", "sip.wrappertype") + and "class" in filterList + ) + or ( + valtype_filter in ("sip.methoddescriptor", "method_descriptor") + and "method" in filterList + ) + or ( + valtype_filter in ("numpy.ndarray", "array.array") + and "list" in filterList + ) + or ( + valtype_filter == "django.MultiValueDict" + and "dict" in filterList + ) + or "instance" in filterList ): continue - + length = -2 - indicator = '' - - if valtype == 'str': + indicator = "" + + if valtype == "str": rvalue = repr(value) length = len(rvalue) elif valtype in NonExpandableTypes: rvalue = repr(value) - + if rvalue is not None: - varlist.append( - (key, indicator, valtype, hasChildren, length, rvalue) - ) + varlist.append((key, indicator, valtype, hasChildren, length, rvalue)) continue - + try: for dtype in DebugVariables._ArrayTypes: if isinstance(value, dtype): @@ -1556,104 +1609,105 @@ length = len(value) except TypeError: length = -1 # Uninitialized array - + dtype = str(dtype)[8:-2] # Standard array type indicators - indicator = self.Type2Indicators.get(dtype, '') - + indicator = self.Type2Indicators.get(dtype, "") + # Special handling of some array types - if valtype == 'array.array': - indicator = '[<{0}>]'.format(value.typecode) - elif valtype == 'collections.defaultdict': + if valtype == "array.array": + indicator = "[<{0}>]".format(value.typecode) + elif valtype == "collections.defaultdict": if value.default_factory is None: def_factory = "None" else: def_factory = value.default_factory.__name__ - indicator = '{{:<{0}>}}'.format(def_factory) + indicator = "{{:<{0}>}}".format(def_factory) elif valtype == "numpy.ndarray" and length > -1: length = "x".join(str(x) for x in value.shape) elif valtype.endswith(".MultiValueDict"): - indicator = "{:}" # __IGNORE_WARNING__ + indicator = "{:}" # __IGNORE_WARNING__ valtype = "django.MultiValueDict" # shortened type break else: rvalue = repr(value) - + hasChildren = True except Exception: - rvalue = '' - - varlist.append( - (key, indicator, valtype, hasChildren, length, rvalue) - ) - + rvalue = "" + + varlist.append((key, indicator, valtype, hasChildren, length, rvalue)) + return varlist - + def __generateFilterObjects(self, scope, filterString): """ Private slot to convert a filter string to a list of filter objects. - + @param scope 1 to generate filter for global variables, 0 for local variables (int) @param filterString string of filter patterns separated by ';' """ patternFilterObjects = None if filterString.strip(): - pattern = filterString.replace(';', '|') + pattern = filterString.replace(";", "|") with contextlib.suppress(re.error): patternFilterObjects = re.compile(pattern) - + if scope: self.globalsFilterObjects = patternFilterObjects else: self.localsFilterObjects = patternFilterObjects - + def __completionList(self, text): """ Private slot to handle the request for a commandline completion list. - + @param text the text to be completed (string) """ - completerDelims = ' \t\n`~!@#$%^&*()-=+[{]}\\|;:\'",<>/?' - + completerDelims = " \t\n`~!@#$%^&*()-=+[{]}\\|;:'\",<>/?" + completions = set() # find position of last delim character pos = -1 while pos >= -len(text): if text[pos] in completerDelims: if pos == -1: - text = '' + text = "" else: - text = text[pos + 1:] + text = text[pos + 1 :] break pos -= 1 - + # Get local and global completions with contextlib.suppress(AttributeError): localdict = self.currentThread.getFrameLocals(self.framenr) localCompleter = Completer(localdict).complete self.__getCompletionList(text, localCompleter, completions) - + cf = self.currentThread.getCurrentFrame() frmnr = self.framenr while cf is not None and frmnr > 0: cf = cf.f_back frmnr -= 1 - + globaldict = self.debugMod.__dict__ if cf is None else cf.f_globals - + globalCompleter = Completer(globaldict).complete self.__getCompletionList(text, globalCompleter, completions) - - self.sendJsonCommand("ResponseCompletion", { - "completions": list(completions), - "text": text, - }) + + self.sendJsonCommand( + "ResponseCompletion", + { + "completions": list(completions), + "text": text, + }, + ) def __getCompletionList(self, text, completer, completions): """ Private method to create a completions list. - + @param text text to complete (string) @param completer completer method @param completions set where to add new completions strings (set) @@ -1670,13 +1724,22 @@ comp = completer(text, state) except Exception: comp = None - - def startDebugger(self, filename=None, host=None, port=None, - enableTrace=True, exceptions=True, tracePython=False, - redirect=True, passive=True, multiprocessSupport=False): + + def startDebugger( + self, + filename=None, + host=None, + port=None, + enableTrace=True, + exceptions=True, + tracePython=False, + redirect=True, + passive=True, + multiprocessSupport=False, + ): """ Public method used to start the remote debugger. - + @param filename the program to be debugged @type str @param host hostname of the debug server @@ -1699,10 +1762,10 @@ @type bool """ if host is None: - host = os.getenv('ERICHOST', 'localhost') + host = os.getenv("ERICHOST", "localhost") if port is None: - port = os.getenv('ERICPORT', 42424) - + port = os.getenv("ERICPORT", 42424) + remoteAddress = self.__resolveHost(host) name = os.path.basename(filename) if filename is not None else "" self.connectDebugger(port, remoteAddress, redirect, name=name) @@ -1717,32 +1780,41 @@ self.__setCoding(self.running) self.passive = passive self.__interact() - + # setup the debugger variables self._fncache = {} self.dircache = [] self.debugging = True - + self.attachThread(mainThread=True) self.mainThread.tracePythonLibs(tracePython) - + # set the system exception handling function to ensure, that # we report on all unhandled exceptions sys.excepthook = self.__unhandled_exception self.__interceptSignals() - + # now start debugging if enableTrace: self.mainThread.set_trace() - - def startProgInDebugger(self, progargs, wd='', host=None, - port=None, exceptions=True, tracePython=False, - redirect=True, passive=True, - multiprocessSupport=False, codeStr="", - scriptModule=""): + + def startProgInDebugger( + self, + progargs, + wd="", + host=None, + port=None, + exceptions=True, + tracePython=False, + redirect=True, + passive=True, + multiprocessSupport=False, + codeStr="", + scriptModule="", + ): """ Public method used to start the remote debugger. - + @param progargs commandline for the program to be debugged (list of strings) @param wd working directory for the program execution (string) @@ -1767,10 +1839,10 @@ @rtype int """ if host is None: - host = os.getenv('ERICHOST', 'localhost') + host = os.getenv("ERICHOST", "localhost") if port is None: - port = os.getenv('ERICPORT', 42424) - + port = os.getenv("ERICPORT", 42424) + remoteAddress = self.__resolveHost(host) if progargs: if not progargs[0].startswith("-"): @@ -1782,7 +1854,7 @@ else: name = "debug_client_code" self.connectDebugger(port, remoteAddress, redirect, name=name) - + self._fncache = {} self.dircache = [] if codeStr: @@ -1792,7 +1864,7 @@ sys.argv = progargs[:] sys.argv[0] = os.path.abspath(sys.argv[0]) sys.path = self.__getSysPath(os.path.dirname(sys.argv[0])) - if wd == '': + if wd == "": os.chdir(sys.path[1]) else: os.chdir(wd) @@ -1800,50 +1872,53 @@ self.__setCoding(self.running) self.debugging = True self.multiprocessSupport = multiprocessSupport - + self.passive = passive if passive: self.sendPassiveStartup(self.running, exceptions) - + self.attachThread(mainThread=True) self.mainThread.tracePythonLibs(tracePython) - + # set the system exception handling function to ensure, that # we report on all unhandled exceptions sys.excepthook = self.__unhandled_exception self.__interceptSignals() - + # This will eventually enter a local event loop. - self.debugMod.__dict__['__file__'] = self.running - sys.modules['__main__'] = self.debugMod + self.debugMod.__dict__["__file__"] = self.running + sys.modules["__main__"] = self.debugMod if codeStr: code = self.__compileCommand(codeStr) elif scriptModule: import runpy + modName, modSpec, code = runpy._get_module_details(scriptModule) self.running = code.co_filename self.debugMod.__dict__.clear() - self.debugMod.__dict__.update({ - "__name__": "__main__", - "__file__": self.running, - "__package__": modSpec.parent, - "__loader__": modSpec.loader, - "__spec__": modSpec, - "__builtins__": __builtins__, - }) + self.debugMod.__dict__.update( + { + "__name__": "__main__", + "__file__": self.running, + "__package__": modSpec.parent, + "__loader__": modSpec.loader, + "__spec__": modSpec, + "__builtins__": __builtins__, + } + ) else: code = self.__compileFileSource(self.running) res = ( self.mainThread.run(code, self.debugMod.__dict__, debug=True) - if code else - 42 # should not happen + if code + else 42 # should not happen ) return res def run_call(self, scriptname, func, *args): """ Public method used to start the remote debugger and call a function. - + @param scriptname name of the script to be debugged (string) @param func function to be called @param *args arguments being passed to func @@ -1853,41 +1928,40 @@ res = self.mainThread.runcall(func, *args) self.progTerminated(res, closeSession=False) return res - + def __resolveHost(self, host): """ Private method to resolve a hostname to an IP address. - + @param host hostname of the debug server (string) @return IP address (string) """ try: host, version = host.split("@@") except ValueError: - version = 'v4' - + version = "v4" + family = ( 0 - if version.startswith("i") else - (socket.AF_INET if version == 'v4' else socket.AF_INET6) + if version.startswith("i") + else (socket.AF_INET if version == "v4" else socket.AF_INET6) ) - + with contextlib.suppress(OSError): - addrinfo = socket.getaddrinfo( - host, None, family, socket.SOCK_STREAM) + addrinfo = socket.getaddrinfo(host, None, family, socket.SOCK_STREAM) return addrinfo[0][4][0] - + return None - + def main(self): """ Public method implementing the main method. """ - if '--' in sys.argv: + if "--" in sys.argv: args = sys.argv[1:] host = None port = None - wd = '' + wd = "" tracePython = False exceptions = True redirect = True @@ -1896,48 +1970,48 @@ codeStr = "" scriptModule = "" while args[0]: - if args[0] == '-h': + if args[0] == "-h": host = args[1] del args[0] del args[0] - elif args[0] == '-p': + elif args[0] == "-p": port = int(args[1]) del args[0] del args[0] - elif args[0] == '-w': + elif args[0] == "-w": wd = args[1] del args[0] del args[0] - elif args[0] == '-t': + elif args[0] == "-t": tracePython = True del args[0] - elif args[0] == '-e': + elif args[0] == "-e": exceptions = False del args[0] - elif args[0] == '-n': + elif args[0] == "-n": redirect = False del args[0] - elif args[0] == '--no-encoding': + elif args[0] == "--no-encoding": self.noencoding = True del args[0] - elif args[0] == '--no-passive': + elif args[0] == "--no-passive": passive = False del args[0] - elif args[0] == '--multiprocess': + elif args[0] == "--multiprocess": multiprocess = True del args[0] - elif args[0] in ('-c', '--code'): + elif args[0] in ("-c", "--code"): codeStr = args[1] del args[0] del args[0] - elif args[0] in ('-m', '--module'): + elif args[0] in ("-m", "--module"): scriptModule = args[1] del args[0] del args[0] - elif args[0] == '--': + elif args[0] == "--": del args[0] break - else: # unknown option + else: # unknown option del args[0] if not args: print("No program given. Aborting!") @@ -1948,36 +2022,48 @@ else: # Store options in case a new Python process is created self.startOptions = ( - wd, host, port, exceptions, tracePython, redirect, + wd, + host, + port, + exceptions, + tracePython, + redirect, self.noencoding, ) if not self.noencoding: self.__coding = self.defaultCoding patchNewProcessFunctions(multiprocess, self) res = self.startProgInDebugger( - args, wd, host, port, exceptions=exceptions, - tracePython=tracePython, redirect=redirect, - passive=passive, multiprocessSupport=multiprocess, - codeStr=codeStr, scriptModule=scriptModule, + args, + wd, + host, + port, + exceptions=exceptions, + tracePython=tracePython, + redirect=redirect, + passive=passive, + multiprocessSupport=multiprocess, + codeStr=codeStr, + scriptModule=scriptModule, ) sys.exit(res) else: - if sys.argv[1] == '--no-encoding': + if sys.argv[1] == "--no-encoding": self.noencoding = True del sys.argv[1] - - if sys.argv[1] == '--multiprocess': + + if sys.argv[1] == "--multiprocess": self.multiprocessSupport = True del sys.argv[1] - - if sys.argv[1] == '': + + if sys.argv[1] == "": del sys.argv[1] - + try: port = int(sys.argv[1]) except (ValueError, IndexError): port = -1 - + if sys.argv[2] == "True": redirect = True elif sys.argv[2] == "False": @@ -1987,22 +2073,27 @@ redirect = int(sys.argv[2]) except (ValueError, IndexError): redirect = True - + ipOrHost = sys.argv[3] - if ':' in ipOrHost or ipOrHost[0] in '0123456789': + if ":" in ipOrHost or ipOrHost[0] in "0123456789": # IPv6 address or IPv4 address remoteAddress = ipOrHost else: remoteAddress = self.__resolveHost(ipOrHost) - - sys.argv = [''] - if '' not in sys.path: - sys.path.insert(0, '') - + + sys.argv = [""] + if "" not in sys.path: + sys.path.insert(0, "") + if port >= 0: # Store options in case a new Python process is created self.startOptions = ( - '', remoteAddress, port, True, False, redirect, + "", + remoteAddress, + port, + True, + False, + redirect, self.noencoding, ) if not self.noencoding: @@ -2013,47 +2104,51 @@ else: print("No network port given. Aborting...") # __IGNORE_WARNING_M801__ - + def close(self, fd): """ Public method implementing a close method as a replacement for os.close(). - + It prevents the debugger connections from being closed. - + @param fd file descriptor to be closed (integer) """ - if fd in [self.readstream.fileno(), self.writestream.fileno(), - self.errorstream.fileno()]: + if fd in [ + self.readstream.fileno(), + self.writestream.fileno(), + self.errorstream.fileno(), + ]: return - + DebugClientOrigClose(fd) - + def __getSysPath(self, firstEntry): """ Private slot to calculate a path list including the PYTHONPATH environment variable. - + @param firstEntry entry to be put first in sys.path (string) @return path list for use as sys.path (list of strings) """ - sysPath = [path for path in os.environ.get("PYTHONPATH", "") - .split(os.pathsep) - if path not in sys.path] + sys.path[:] + sysPath = [ + path + for path in os.environ.get("PYTHONPATH", "").split(os.pathsep) + if path not in sys.path + ] + sys.path[:] if "" in sysPath: sysPath.remove("") sysPath.insert(0, firstEntry) - sysPath.insert(0, '') + sysPath.insert(0, "") return sysPath - + def skipMultiProcessDebugging(self, scriptName): """ Public method to check, if the given script is eligible for debugging. - + @param scriptName name of the script to check @type str @return flag indicating eligibility @rtype bool """ - return any(fnmatch.fnmatch(scriptName, pattern) - for pattern in self.noDebugList) + return any(fnmatch.fnmatch(scriptName, pattern) for pattern in self.noDebugList)