diff -r 286c2a21f36f -r 54c614d91eff src/eric7/MicroPython/MicroPythonDeviceInterface.py --- a/src/eric7/MicroPython/MicroPythonDeviceInterface.py Thu Apr 27 17:59:09 2023 +0200 +++ b/src/eric7/MicroPython/MicroPythonDeviceInterface.py Fri Apr 28 12:07:41 2023 +0200 @@ -1,25 +1,13 @@ # -*- coding: utf-8 -*- -# Copyright (c) 2019 - 2023 Detlev Offenbach <detlev@die-offenbachs.de> +# Copyright (c) 2023 Detlev Offenbach <detlev@die-offenbachs.de> # """ -Module implementing some file system commands for MicroPython. +Module implementing an interface base class to talk to a connected MicroPython device. """ -from PyQt6.QtCore import ( - QCoreApplication, - QEventLoop, - QObject, - QThread, - QTimer, - pyqtSignal, - pyqtSlot, -) - -from eric7 import Preferences - -from .MicroPythonSerialPort import MicroPythonSerialPort +from PyQt6.QtCore import QObject, pyqtSignal, pyqtSlot class MicroPythonDeviceInterface(QObject): @@ -28,16 +16,13 @@ @signal executeAsyncFinished() emitted to indicate the end of an asynchronously executed list of commands (e.g. a script) - @signal dataReceived(data) emitted to send data received via the serial - connection for further processing + @signal dataReceived(data) emitted to send data received via the connection + for further processing """ executeAsyncFinished = pyqtSignal() dataReceived = pyqtSignal(bytes) - PasteModePrompt = b"=== " - TracebackMarker = b"Traceback (most recent call last):" - def __init__(self, parent=None): """ Constructor @@ -47,43 +32,35 @@ """ super().__init__(parent) - self.__repl = parent - - self.__blockReadyRead = False - - self.__serial = MicroPythonSerialPort( - timeout=Preferences.getMicroPython("SerialTimeout"), parent=self - ) - self.__serial.readyRead.connect(self.__readSerial) - @pyqtSlot() - def __readSerial(self): + def connectToDevice(self, connection): """ - Private slot to read all available serial data and emit it with the - "dataReceived" signal for further processing. - """ - if not self.__blockReadyRead: - data = bytes(self.__serial.readAll()) - self.dataReceived.emit(data) + Public slot to connect to the device. - @pyqtSlot() - def connectToDevice(self, port): - """ - Public slot to start the manager. - - @param port name of the port to be used + @param connection name of the connection to be used @type str @return flag indicating success @rtype bool + @exception NotImplementedError raised to indicate that this method needs to + be implemented in a derived class """ - return self.__serial.openSerialLink(port) + raise NotImplementedError( + "This method needs to be implemented in a derived class." + ) + + return False @pyqtSlot() def disconnectFromDevice(self): """ - Public slot to stop the thread. + Public slot to disconnect from the device. + + @exception NotImplementedError raised to indicate that this method needs to + be implemented in a derived class """ - self.__serial.closeSerialLink() + raise NotImplementedError( + "This method needs to be implemented in a derived class." + ) def isConnected(self): """ @@ -91,15 +68,21 @@ @return flag indicating the connection status @rtype bool + @exception NotImplementedError raised to indicate that this method needs to + be implemented in a derived class """ - return self.__serial.isConnected() + raise NotImplementedError( + "This method needs to be implemented in a derived class." + ) + + return False @pyqtSlot() def handlePreferencesChanged(self): """ Public slot to handle a change of the preferences. """ - self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout")) + pass def write(self, data): """ @@ -107,149 +90,37 @@ @param data data to be written @type bytes or bytearray - """ - self.__serial.isConnected() and self.__serial.write(data) - - def __pasteOn(self): - """ - Private method to switch the connected device to 'paste' mode. - - Note: switching to paste mode is done with synchronous writes. - - @return flag indicating success - @rtype bool - """ - if not self.__serial: - return False - - pasteMessage = b"paste mode; Ctrl-C to cancel, Ctrl-D to finish\r\n=== " - - self.__serial.clear() # clear any buffered output before entering paste mode - self.__serial.write(b"\x02") # end raw mode if required - written = self.__serial.waitForBytesWritten(500) - # time out after 500ms if device is not responding - if not written: - return False - for _i in range(3): - # CTRL-C three times to break out of loops - self.__serial.write(b"\r\x03") - written = self.__serial.waitForBytesWritten(500) - # time out after 500ms if device is not responding - if not written: - return False - QThread.msleep(10) - self.__serial.readAll() # read all data and discard it - self.__serial.write(b"\r\x05") # send CTRL-E to enter paste mode - self.__serial.readUntil(pasteMessage) - - if self.__serial.hasTimedOut(): - # it timed out; try it again and than fail - self.__serial.write(b"\r\x05") # send CTRL-E again - self.__serial.readUntil(pasteMessage) - if self.__serial.hasTimedOut(): - return False - - QCoreApplication.processEvents( - QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents - ) - self.__serial.readAll() # read all data and discard it - return True - - def __pasteOff(self): - """ - Private method to switch 'paste' mode off. + @exception NotImplementedError raised to indicate that this method needs to + be implemented in a derived class """ - if self.__serial: - self.__serial.write(b"\x04") # send CTRL-D to cancel paste mode - - def __rawOn(self): - """ - Private method to switch the connected device to 'raw' mode. - - Note: switching to raw mode is done with synchronous writes. - - @return flag indicating success - @rtype bool - """ - if not self.__serial: - return False - - rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>" - - self.__serial.write(b"\x02") # end raw mode if required - written = self.__serial.waitForBytesWritten(500) - # time out after 500ms if device is not responding - if not written: - return False - for _i in range(3): - # CTRL-C three times to break out of loops - self.__serial.write(b"\r\x03") - written = self.__serial.waitForBytesWritten(500) - # time out after 500ms if device is not responding - if not written: - return False - QThread.msleep(10) - self.__serial.readAll() # read all data and discard it - self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode - self.__serial.readUntil(rawReplMessage) - if self.__serial.hasTimedOut(): - # it timed out; try it again and than fail - self.__serial.write(b"\r\x01") # send CTRL-A again - self.__serial.readUntil(rawReplMessage) - if self.__serial.hasTimedOut(): - return False - - QCoreApplication.processEvents( - QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents + raise NotImplementedError( + "This method needs to be implemented in a derived class." ) - self.__serial.readAll() # read all data and discard it - return True - - def __rawOff(self): - """ - Private method to switch 'raw' mode off. - """ - if self.__serial: - self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode - self.__serial.readUntil(b">>> ") # read until Python prompt - self.__serial.readAll() # read all data and discard it def probeDevice(self): """ Public method to check the device is responding. - If the device has not been flashed with a MicroPython formware, the + If the device has not been flashed with a MicroPython firmware, the probe will fail. @return flag indicating a communicating MicroPython device @rtype bool + @exception NotImplementedError raised to indicate that this method needs to + be implemented in a derived class """ - if not self.__serial: - return False - - if not self.__serial.isConnected(): - return False + raise NotImplementedError( + "This method needs to be implemented in a derived class." + ) - # switch on raw mode - self.__blockReadyRead = True - ok = self.__pasteOn() - if not ok: - self.__blockReadyRead = False - return False - - # switch off raw mode - QThread.msleep(10) - self.__pasteOff() - self.__blockReadyRead = False - - return True + return False def execute(self, commands, *, mode="raw", timeout=0): """ Public method to send commands to the connected device and return the result. - If no serial connection is available, empty results will be returned. + If no connection is available, empty results will be returned. @param commands list of commands to be executed @type str or list of str @@ -261,171 +132,18 @@ @type int (optional) @return tuple containing stdout and stderr output of the device @rtype tuple of (bytes, bytes) + @exception NotImplementedError raised to indicate that this method needs to + be implemented in a derived class @exception ValueError raised in case of an unsupported submit mode """ + raise NotImplementedError( + "This method needs to be implemented in a derived class." + ) + if mode not in ("paste", "raw"): raise ValueError("Unsupported submit mode given ('{0}').".format(mode)) - if mode == "raw": - return self.__execute_raw(commands, timeout=timeout) - elif mode == "paste": - return self.__execute_paste(commands, timeout=timeout) - else: - # just in case - return b"", b"" - - def __execute_raw(self, commands, timeout=0): - """ - Private method to send commands to the connected device using 'raw REPL' mode - and return the result. - - If no serial connection is available, empty results will be returned. - - @param commands list of commands to be executed - @type str or list of str - @param timeout per command timeout in milliseconds (0 for configured default) - (defaults to 0) - @type int (optional) - @return tuple containing stdout and stderr output of the device - @rtype tuple of (bytes, bytes) - """ - if not self.__serial: - return b"", b"" - - if not self.__serial.isConnected(): - return b"", b"Device not connected or not switched on." - - result = bytearray() - err = b"" - - if isinstance(commands, str): - commands = [commands] - - # switch on raw mode - self.__blockReadyRead = True - ok = self.__rawOn() - if not ok: - self.__blockReadyRead = False - return (b"", b"Could not switch to raw mode. Is the device switched on?") - - # send commands - QThread.msleep(10) - for command in commands: - if command: - commandBytes = command.encode("utf-8") - self.__serial.write(commandBytes + b"\x04") - QCoreApplication.processEvents( - QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents - ) - ok = self.__serial.readUntil(b"OK") - if ok != b"OK": - self.__blockReadyRead = False - return ( - b"", - "Expected 'OK', got '{0}', followed by '{1}'".format( - ok, self.__serial.readAll() - ).encode("utf-8"), - ) - - # read until prompt - response = self.__serial.readUntil(b"\x04>", timeout=timeout) - if self.__serial.hasTimedOut(): - self.__blockReadyRead = False - return b"", b"Timeout while processing commands." - if b"\x04" in response[:-2]: - # split stdout, stderr - out, err = response[:-2].split(b"\x04") - result += out - else: - err = b"invalid response received: " + response - if err: - result = b"" - break - - # switch off raw mode - QThread.msleep(10) - self.__rawOff() - self.__blockReadyRead = False - - return bytes(result), err - - def __execute_paste(self, commands, timeout=0): - """ - Private method to send commands to the connected device using 'paste' mode - and return the result. - - If no serial connection is available, empty results will be returned. - - @param commands list of commands to be executed - @type str or list of str - @param timeout per command timeout in milliseconds (0 for configured default) - (defaults to 0) - @type int (optional) - @return tuple containing stdout and stderr output of the device - @rtype tuple of (bytes, bytes) - """ - if not self.__serial: - return b"", b"" - - if not self.__serial.isConnected(): - return b"", b"Device not connected or not switched on." - - if isinstance(commands, list): - commands = "\n".join(commands) - - # switch on paste mode - self.__blockReadyRead = True - ok = self.__pasteOn() - if not ok: - self.__blockReadyRead = False - return (b"", b"Could not switch to paste mode. Is the device switched on?") - - # send commands - QThread.msleep(10) - for command in commands.splitlines(keepends=True): - # send the data as single lines - commandBytes = command.encode("utf-8") - self.__serial.write(commandBytes) - QCoreApplication.processEvents( - QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents - ) - QThread.msleep(10) - ok = self.__serial.readUntil(commandBytes) - if ok != commandBytes: - self.__blockReadyRead = False - return ( - b"", - "Expected '{0}', got '{1}', followed by '{2}'".format( - commandBytes, ok, self.__serial.readAll() - ).encode("utf-8"), - ) - - # switch off paste mode causing the commands to be executed - self.__pasteOff() - QThread.msleep(10) - # read until Python prompt - result = ( - self.__serial.readUntil(b">>> ", timeout=timeout) - .replace(b">>> ", b"") - .strip() - ) - if self.__serial.hasTimedOut(): - self.__blockReadyRead = False - return b"", b"Timeout while processing commands." - - # get rid of any OSD string - if result.startswith(b"\x1b]0;"): - result = result.split(b"\x1b\\")[-1] - - if self.TracebackMarker in result: - errorIndex = result.find(self.TracebackMarker) - out, err = result[:errorIndex], result[errorIndex:] - else: - out = result - err = b"" - - self.__blockReadyRead = False - return out, err + return b"", b"" def executeAsync(self, commandsList, submitMode): """ @@ -434,61 +152,18 @@ @param commandsList list of commands to be execute on the device @type list of str - @param submitMode mode to be used to submit the commands - @type str (one of 'raw' or 'paste') + @param submitMode mode to be used to submit the commands (one of 'raw' + or 'paste') + @type str + @exception NotImplementedError raised to indicate that this method needs to + be implemented in a derived class @exception ValueError raised to indicate an unknown submit mode """ - if submitMode not in ("raw", "paste"): - raise ValueError("Illegal submit mode given ({0})".format(submitMode)) - - if submitMode == "raw": - startSequence = [ # sequence of commands to enter raw mode - b"\x02", # Ctrl-B: exit raw repl (just in case) - b"\r\x03\x03\x03", # Ctrl-C three times: interrupt any running program - b"\r\x01", # Ctrl-A: enter raw REPL - b'print("\\n")\r', - ] - endSequence = [ - b"\r", - b"\x04", - ] - self.__executeAsyncRaw( - startSequence - + [c.encode("utf-8") + b"\r" for c in commandsList] - + endSequence - ) - elif submitMode == "paste": - self.__executeAsyncPaste(commandsList) - - def __executeAsyncRaw(self, commandsList): - """ - Private method to execute a series of commands over a period of time - without returning any result (asynchronous execution). + raise NotImplementedError( + "This method needs to be implemented in a derived class." + ) - @param commandsList list of commands to be execute on the device - @type list of bytes - """ - if commandsList: - command = commandsList.pop(0) - self.__serial.write(command) - QTimer.singleShot(2, lambda: self.__executeAsyncRaw(commandsList)) - else: - self.__rawOff() - self.executeAsyncFinished.emit() - - def __executeAsyncPaste(self, commandsList): - """ - Private method to execute a series of commands over a period of time - without returning any result (asynchronous execution). - - @param commandsList list of commands to be execute on the device - @type list of str - """ - self.__blockReadyRead = True - self.__pasteOn() - command = b"\n".join(c.encode("utf-8)") for c in commandsList) - self.__serial.write(command) - self.__serial.readUntil(command) - self.__blockReadyRead = False - self.__pasteOff() - self.executeAsyncFinished.emit + if submitMode not in ("raw", "paste"): + raise ValueError( + "Unsupported submit mode given ('{0}').".format(submitMode) + )