Sun, 16 Feb 2020 16:14:25 +0100
Continued with the multiprocess debugger.
# -*- coding: utf-8 -*- # Copyright (c) 2015 - 2020 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing utilities functions for the debug client. """ import json import os import traceback import sys # # Taken from inspect.py of Python 3.4 # from collections import namedtuple from inspect import iscode, isframe # Create constants for the compiler flags in Include/code.h # We try to get them from dis to avoid duplication, but fall # back to hardcoding so the dependency is optional try: from dis import COMPILER_FLAG_NAMES except ImportError: CO_OPTIMIZED, CO_NEWLOCALS = 0x1, 0x2 CO_VARARGS, CO_VARKEYWORDS = 0x4, 0x8 CO_NESTED, CO_GENERATOR, CO_NOFREE = 0x10, 0x20, 0x40 else: mod_dict = globals() for k, v in COMPILER_FLAG_NAMES.items(): mod_dict["CO_" + v] = k ArgInfo = namedtuple('ArgInfo', 'args varargs keywords locals') def getargvalues(frame): """ Function to get information about arguments passed into a particular frame. @param frame reference to a frame object to be processed @type frame @return tuple of four things, where 'args' is a list of the argument names, 'varargs' and 'varkw' are the names of the * and ** arguments or None and 'locals' is the locals dictionary of the given frame. @exception TypeError raised if the input parameter is not a frame object """ if not isframe(frame): raise TypeError('{0!r} is not a frame object'.format(frame)) args, varargs, kwonlyargs, varkw = _getfullargs(frame.f_code) return ArgInfo(args + kwonlyargs, varargs, varkw, frame.f_locals) def _getfullargs(co): """ Protected function to get information about the arguments accepted by a code object. @param co reference to a code object to be processed @type code @return tuple of four things, where 'args' and 'kwonlyargs' are lists of argument names, and 'varargs' and 'varkw' are the names of the * and ** arguments or None. @exception TypeError raised if the input parameter is not a code object """ if not iscode(co): raise TypeError('{0!r} is not a code object'.format(co)) nargs = co.co_argcount names = co.co_varnames nkwargs = co.co_kwonlyargcount args = list(names[:nargs]) kwonlyargs = list(names[nargs:nargs + nkwargs]) nargs += nkwargs varargs = None if co.co_flags & CO_VARARGS: varargs = co.co_varnames[nargs] nargs = nargs + 1 varkw = None if co.co_flags & CO_VARKEYWORDS: varkw = co.co_varnames[nargs] return args, varargs, kwonlyargs, varkw def formatargvalues(args, varargs, varkw, localsDict, formatarg=str, formatvarargs=lambda name: '*' + name, formatvarkw=lambda name: '**' + name, formatvalue=lambda value: '=' + repr(value)): """ Function to format an argument spec from the 4 values returned by getargvalues. @param args list of argument names @type list of str @param varargs name of the variable arguments @type str @param varkw name of the keyword arguments @type str @param localsDict reference to the local variables dictionary @type dict @keyparam formatarg argument formatting function @type func @keyparam formatvarargs variable arguments formatting function @type func @keyparam formatvarkw keyword arguments formatting function @type func @keyparam formatvalue value formating functtion @type func @return formatted call signature @rtype str """ specs = [] for i in range(len(args)): name = args[i] specs.append(formatarg(name) + formatvalue(localsDict[name])) if varargs: specs.append(formatvarargs(varargs) + formatvalue(localsDict[varargs])) if varkw: specs.append(formatvarkw(varkw) + formatvalue(localsDict[varkw])) argvalues = '(' + ', '.join(specs) + ')' if '__return__' in localsDict: argvalues += " -> " + formatvalue(localsDict['__return__']) return argvalues def prepareJsonCommand(method, params): """ Function to prepare a single command or response for transmission to the IDE. @param method command or response name to be sent @type str @param params dictionary of named parameters for the command or response @type dict @return prepared JSON command or response string @rtype str """ commandDict = { "jsonrpc": "2.0", "method": method, "params": params, } return json.dumps(commandDict) + '\n' ########################################################################### ## Things related to monkey patching below ########################################################################### PYTHON_NAMES = ["python", "pypy"] def isWindowsPlatform(): """ Function to check, if this is a Windows platform. @return flag indicating Windows platform @rtype bool """ return sys.platform.startswith(("win", "cygwin")) def isExecutable(program): """ Function to check, if the given program is executable. @param program program path to be checked @type str @return flag indicating an executable program @rtype bool """ return os.access(os.path.abspath(program), os.X_OK) def startsWithShebang(program): """ Function to check, if the given program start with a Shebang line. @param program program path to be checked @type str @return flag indicating an existing and valid shebang line @rtype bool """ try: with open(program) as f: for line in f: line = line.strip() if line: for name in PYTHON_NAMES: if line.startswith('#!/usr/bin/env {0}'.format(name)): return True return False except UnicodeDecodeError: return False except Exception: traceback.print_exc() return False def isPythonProgram(program): """ Function to check, if the given program is a Python interpreter or program. @param program program to be checked @type str @return flag indicating a Python interpreter or program @rtype bool """ if not program: return False prog = os.path.basename(program).lower() for pyname in PYTHON_NAMES: if pyname in prog: return True return ( not isWindowsPlatform() and isExecutable(program) and startsWithShebang(program) ) def patchArguments(debugClient, arguments, noRedirect=False): """ Function to patch the arguments given to start a program in order to execute it in our debugger. @param debugClient reference to the debug client object @type DebugClient @param arguments list of program arguments @type list of str @param noRedirect flag indicating to not redirect stdin and stdout @type bool @return modified argument list @rtype list of str """ args = arguments[:] # create a copy of the arguments list # support for shebang line program = os.path.basename(args[0]).lower() for pyname in PYTHON_NAMES: if pyname in program: break else: if not isWindowsPlatform() and startsWithShebang(args[0]): # insert our interpreter as first argument args.insert(0, sys.executable) # check for -c or -m invocation => debugging not supported yet if "-c" in args: cm_position = args.index("-c") elif "-m" in args: cm_position = args.index("-m") else: cm_position = 0 if cm_position > 0: # check if it belongs to the interpreter or program for pos in range(1, len(args)): if not args[pos].startswith("-"): # first argument not starting with '-' is the program found = True break else: found = False if found and cm_position < pos: # it belongs to the interpreter return arguments # extract list of interpreter arguments, i.e. all arguments before the # first one not starting with '-'. interpreter = args.pop(0) interpreterArgs = [] while args: if args[0].startswith("-"): if args[0] in ("-W", "-X"): # take two elements off the list interpreterArgs.append(args.pop(0)) interpreterArgs.append(args.pop(0)) else: interpreterArgs.append(args.pop(0)) else: break (wd, host, port, exceptions, tracePython, redirect, noencoding ) = debugClient.startOptions[:7] modifiedArguments = [interpreter] modifiedArguments.extend(interpreterArgs) modifiedArguments.extend([ os.path.join(os.path.dirname(__file__), "DebugClient.py"), "-h", host, "-p", str(port), "--no-passive", ]) if wd: modifiedArguments.extend(["-w", wd]) if not exceptions: modifiedArguments.append("-e") if tracePython: modifiedArguments.append("-t") if noRedirect or not redirect: modifiedArguments.append("-n") if noencoding: modifiedArguments.append("--no-encoding") if debugClient.multiprocessSupport: modifiedArguments.append("--multiprocess") modifiedArguments.append("--") # end the arguments for DebugClient # append the arguments for the program to be debugged modifiedArguments.extend(args) return modifiedArguments # # eflag: noqa = M702