--- a/src/eric7/MicroPython/MicroPythonDeviceInterface.py Fri Feb 24 18:36:43 2023 +0100 +++ b/src/eric7/MicroPython/MicroPythonDeviceInterface.py Sat Feb 25 19:18:07 2023 +0100 @@ -35,6 +35,9 @@ executeAsyncFinished = pyqtSignal() dataReceived = pyqtSignal(bytes) + PasteModePrompt = b"=== " + TracebackMarker = b"Traceback (most recent call last):" + def __init__(self, parent=None): """ Constructor @@ -107,6 +110,56 @@ """ self.__serial.isConnected() and self.__serial.write(data) + def __pasteOn(self): + """ + Private method to switch the connected device to 'paste' mode. + + Note: switching to paste mode is done with synchronous writes. + + @return flag indicating success + @rtype bool + """ + if not self.__serial: + return False + + pasteMessage = b"paste mode; Ctrl-C to cancel, Ctrl-D to finish\r\n=== " + self.__serial.write(b"\x02") # end raw mode if required + written = self.__serial.waitForBytesWritten(500) + # time out after 500ms if device is not responding + if not written: + return False + for _i in range(3): + # CTRL-C three times to break out of loops + self.__serial.write(b"\r\x03") + written = self.__serial.waitForBytesWritten(500) + # time out after 500ms if device is not responding + if not written: + return False + QThread.msleep(10) + self.__serial.readAll() # read all data and discard it + self.__serial.write(b"\r\x05") # send CTRL-E to enter paste mode + self.__serial.readUntil(pasteMessage) + + if self.__serial.hasTimedOut(): + # it timed out; try it again and than fail + self.__serial.write(b"\r\x05") # send CTRL-E again + self.__serial.readUntil(pasteMessage) + if self.__serial.hasTimedOut(): + return False + + QCoreApplication.processEvents( + QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents + ) + self.__serial.readAll() # read all data and discard it + return True + + def __pasteOff(self): + """ + Private method to switch 'paste' mode off. + """ + if self.__serial: + self.__serial.write(b"\x04") # send CTRL-D to cancel paste mode + def __rawOn(self): """ Private method to switch the connected device to 'raw' mode. @@ -114,7 +167,7 @@ Note: switching to raw mode is done with synchronous writes. @return flag indicating success - @@rtype bool + @rtype bool """ if not self.__serial: return False @@ -189,7 +242,7 @@ return True - def execute(self, commands, timeout=0): + def execute(self, commands, *, mode="raw", timeout=0): """ Public method to send commands to the connected device and return the result. @@ -198,6 +251,36 @@ @param commands list of commands to be executed @type str or list of str + @keyparam mode submit mode to be used (one of 'raw' or 'paste') (defaults to + 'raw') + @type str + @keyparam timeout per command timeout in milliseconds (0 for configured default) + (defaults to 0) + @type int (optional) + @return tuple containing stdout and stderr output of the device + @rtype tuple of (bytes, bytes) + @exception ValueError raised in case of an unsupported submit mode + """ + if mode not in ("paste", "raw"): + raise ValueError("Unsupported submit mode given ('{0}').".format(mode)) + + if mode == "raw": + return self.__execute_raw(commands, timeout=timeout) + elif mode == "paste": + return self.__execute_paste(commands, timeout=timeout) + else: + # just in case + return b"", b"" + + def __execute_raw(self, commands, timeout=0): + """ + Private method to send commands to the connected device using 'raw REPL' mode + and return the result. + + If no serial connection is available, empty results will be returned. + + @param commands list of commands to be executed + @type str or list of str @param timeout per command timeout in milliseconds (0 for configured default) (defaults to 0) @type int (optional) @@ -234,6 +317,7 @@ ) ok = self.__serial.readUntil(b"OK") if ok != b"OK": + self.__blockReadyRead = False return ( b"", "Expected 'OK', got '{0}', followed by '{1}'".format( @@ -263,6 +347,76 @@ return bytes(result), err + def __execute_paste(self, commands, timeout=0): + """ + Private method to send commands to the connected device using 'paste' mode + and return the result. + + If no serial connection is available, empty results will be returned. + + @param commands list of commands to be executed + @type str or list of str + @param timeout per command timeout in milliseconds (0 for configured default) + (defaults to 0) + @type int (optional) + @return tuple containing stdout and stderr output of the device + @rtype tuple of (bytes, bytes) + """ + if not self.__serial: + return b"", b"" + + if not self.__serial.isConnected(): + return b"", b"Device not connected or not switched on." + + if isinstance(commands, list): + commands = "\n".join(commands) + + # switch on raw mode + self.__blockReadyRead = True + ok = self.__pasteOn() + if not ok: + self.__blockReadyRead = False + return (b"", b"Could not switch to raw mode. Is the device switched on?") + + # send commands + QThread.msleep(10) + for command in commands.splitlines(keepends=True): + # send the data as single lines + commandBytes = command.encode("utf-8") + self.__serial.write(commandBytes) + QCoreApplication.processEvents( + QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents + ) + QThread.msleep(10) + ok = self.__serial.readUntil(commandBytes) + if ok != commandBytes: + self.__blockReadyRead = False + return ( + b"", + "Expected '{0}', got '{1}', followed by '{2}'".format( + commandBytes, ok, self.__serial.readAll() + ).encode("utf-8"), + ) + + # switch off paste mode causing the commands to be executed + self.__pasteOff() + QThread.msleep(10) + result = self.__serial.readUntil(b">>> ", timeout=timeout).replace(b">>> ", b"") + # read until Python prompt + if self.__serial.hasTimedOut(): + self.__blockReadyRead = False + return b"", b"Timeout while processing commands." + + if self.TracebackMarker in result: + errorIndex = result.find(self.TracebackMarker) + out, err = result[:errorIndex], result[errorIndex:] + else: + out = result.strip() + err = b"" + + self.__blockReadyRead = False + return out, err + def executeAsync(self, commandsList): """ Public method to execute a series of commands over a period of time @@ -277,3 +431,25 @@ QTimer.singleShot(2, lambda: self.executeAsync(commandsList)) else: self.executeAsyncFinished.emit() + + def executeAsyncPaste(self, commandsList): + """ + Public method to execute a series of commands over a period of time + without returning any result (asynchronous execution). + + @param commandsList list of commands to be execute on the device + @type list of bytes + """ + if commandsList: + self.__blockReadyRead = True + command = commandsList.pop(0) + if command == "@PasteOn@": + self.__pasteOn() + else: + self.__serial.write(command) + self.__serial.readUntil(command) + QTimer.singleShot(2, lambda: self.executeAsyncPaste(commandsList)) + else: + self.__blockReadyRead = False + self.__pasteOff() + self.executeAsyncFinished.emit()