src/eric7/MicroPython/MicroPythonSerialDeviceInterface.py

branch
mpy_network
changeset 9990
54c614d91eff
parent 9989
286c2a21f36f
child 10008
c5bcafe3485c
diff -r 286c2a21f36f -r 54c614d91eff src/eric7/MicroPython/MicroPythonSerialDeviceInterface.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/eric7/MicroPython/MicroPythonSerialDeviceInterface.py	Fri Apr 28 12:07:41 2023 +0200
@@ -0,0 +1,479 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2019 - 2023 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module  implementing an interface to talk to a connected MicroPython device via
+a serial link.
+"""
+
+from PyQt6.QtCore import QCoreApplication, QEventLoop, QThread, QTimer, pyqtSlot
+
+from eric7 import Preferences
+
+from .MicroPythonDeviceInterface import MicroPythonDeviceInterface
+from .MicroPythonSerialPort import MicroPythonSerialPort
+
+
+class MicroPythonSerialDeviceInterface(MicroPythonDeviceInterface):
+    """
+    Class implementing an interface to talk to a connected MicroPython device via
+    a serial link.
+    """
+
+    PasteModePrompt = b"=== "
+    TracebackMarker = b"Traceback (most recent call last):"
+
+    def __init__(self, parent=None):
+        """
+        Constructor
+
+        @param parent reference to the parent object
+        @type QObject
+        """
+        super().__init__(parent)
+
+        self.__blockReadyRead = False
+
+        self.__serial = MicroPythonSerialPort(
+            timeout=Preferences.getMicroPython("SerialTimeout"), parent=self
+        )
+        self.__serial.readyRead.connect(self.__readSerial)
+
+    @pyqtSlot()
+    def __readSerial(self):
+        """
+        Private slot to read all available serial data and emit it with the
+        "dataReceived" signal for further processing.
+        """
+        if not self.__blockReadyRead:
+            data = bytes(self.__serial.readAll())
+            self.dataReceived.emit(data)
+
+    @pyqtSlot()
+    def connectToDevice(self, connection):
+        """
+        Public slot to connect to the device.
+
+        @param connection name of the connection to be used
+        @type str
+        @return flag indicating success
+        @rtype bool
+        """
+        return self.__serial.openSerialLink(connection)
+
+    @pyqtSlot()
+    def disconnectFromDevice(self):
+        """
+        Public slot to disconnect from the device.
+        """
+        self.__serial.closeSerialLink()
+
+    def isConnected(self):
+        """
+        Public method to get the connection status.
+
+        @return flag indicating the connection status
+        @rtype bool
+        """
+        return self.__serial.isConnected()
+
+    @pyqtSlot()
+    def handlePreferencesChanged(self):
+        """
+        Public slot to handle a change of the preferences.
+        """
+        self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout"))
+
+    def write(self, data):
+        """
+        Public method to write data to the connected device.
+
+        @param data data to be written
+        @type bytes or bytearray
+        """
+        self.__serial.isConnected() and self.__serial.write(data)
+
+    def __pasteOn(self):
+        """
+        Private method to switch the connected device to 'paste' mode.
+
+        Note: switching to paste mode is done with synchronous writes.
+
+        @return flag indicating success
+        @rtype bool
+        """
+        if not self.__serial:
+            return False
+
+        pasteMessage = b"paste mode; Ctrl-C to cancel, Ctrl-D to finish\r\n=== "
+
+        self.__serial.clear()  # clear any buffered output before entering paste mode
+        self.__serial.write(b"\x02")  # end raw mode if required
+        written = self.__serial.waitForBytesWritten(500)
+        # time out after 500ms if device is not responding
+        if not written:
+            return False
+        for _i in range(3):
+            # CTRL-C three times to break out of loops
+            self.__serial.write(b"\r\x03")
+            written = self.__serial.waitForBytesWritten(500)
+            # time out after 500ms if device is not responding
+            if not written:
+                return False
+            QThread.msleep(10)
+        self.__serial.readAll()  # read all data and discard it
+        self.__serial.write(b"\r\x05")  # send CTRL-E to enter paste mode
+        self.__serial.readUntil(pasteMessage)
+
+        if self.__serial.hasTimedOut():
+            # it timed out; try it again and than fail
+            self.__serial.write(b"\r\x05")  # send CTRL-E again
+            self.__serial.readUntil(pasteMessage)
+            if self.__serial.hasTimedOut():
+                return False
+
+        QCoreApplication.processEvents(
+            QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
+        )
+        self.__serial.readAll()  # read all data and discard it
+        return True
+
+    def __pasteOff(self):
+        """
+        Private method to switch 'paste' mode off.
+        """
+        if self.__serial:
+            self.__serial.write(b"\x04")  # send CTRL-D to cancel paste mode
+
+    def __rawOn(self):
+        """
+        Private method to switch the connected device to 'raw' mode.
+
+        Note: switching to raw mode is done with synchronous writes.
+
+        @return flag indicating success
+        @rtype bool
+        """
+        if not self.__serial:
+            return False
+
+        rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>"
+
+        self.__serial.write(b"\x02")  # end raw mode if required
+        written = self.__serial.waitForBytesWritten(500)
+        # time out after 500ms if device is not responding
+        if not written:
+            return False
+        for _i in range(3):
+            # CTRL-C three times to break out of loops
+            self.__serial.write(b"\r\x03")
+            written = self.__serial.waitForBytesWritten(500)
+            # time out after 500ms if device is not responding
+            if not written:
+                return False
+            QThread.msleep(10)
+        self.__serial.readAll()  # read all data and discard it
+        self.__serial.write(b"\r\x01")  # send CTRL-A to enter raw mode
+        self.__serial.readUntil(rawReplMessage)
+        if self.__serial.hasTimedOut():
+            # it timed out; try it again and than fail
+            self.__serial.write(b"\r\x01")  # send CTRL-A again
+            self.__serial.readUntil(rawReplMessage)
+            if self.__serial.hasTimedOut():
+                return False
+
+        QCoreApplication.processEvents(
+            QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
+        )
+        self.__serial.readAll()  # read all data and discard it
+        return True
+
+    def __rawOff(self):
+        """
+        Private method to switch 'raw' mode off.
+        """
+        if self.__serial:
+            self.__serial.write(b"\x02")  # send CTRL-B to cancel raw mode
+            self.__serial.readUntil(b">>> ")  # read until Python prompt
+            self.__serial.readAll()  # read all data and discard it
+
+    def probeDevice(self):
+        """
+        Public method to check the device is responding.
+
+        If the device has not been flashed with a MicroPython firmware, the
+        probe will fail.
+
+        @return flag indicating a communicating MicroPython device
+        @rtype bool
+        """
+        if not self.__serial:
+            return False
+
+        if not self.__serial.isConnected():
+            return False
+
+        # switch on raw mode
+        self.__blockReadyRead = True
+        ok = self.__pasteOn()
+        if not ok:
+            self.__blockReadyRead = False
+            return False
+
+        # switch off raw mode
+        QThread.msleep(10)
+        self.__pasteOff()
+        self.__blockReadyRead = False
+
+        return True
+
+    def execute(self, commands, *, mode="raw", timeout=0):
+        """
+        Public method to send commands to the connected device and return the
+        result.
+
+        If no serial connection is available, empty results will be returned.
+
+        @param commands list of commands to be executed
+        @type str or list of str
+        @keyparam mode submit mode to be used (one of 'raw' or 'paste') (defaults to
+            'raw')
+        @type str
+        @keyparam timeout per command timeout in milliseconds (0 for configured default)
+            (defaults to 0)
+        @type int (optional)
+        @return tuple containing stdout and stderr output of the device
+        @rtype tuple of (bytes, bytes)
+        @exception ValueError raised in case of an unsupported submit mode
+        """
+        if mode not in ("paste", "raw"):
+            raise ValueError("Unsupported submit mode given ('{0}').".format(mode))
+
+        if mode == "raw":
+            return self.__execute_raw(commands, timeout=timeout)
+        elif mode == "paste":
+            return self.__execute_paste(commands, timeout=timeout)
+        else:
+            # just in case
+            return b"", b""
+
+    def __execute_raw(self, commands, timeout=0):
+        """
+        Private method to send commands to the connected device using 'raw REPL' mode
+        and return the result.
+
+        If no serial connection is available, empty results will be returned.
+
+        @param commands list of commands to be executed
+        @type str or list of str
+        @param timeout per command timeout in milliseconds (0 for configured default)
+            (defaults to 0)
+        @type int (optional)
+        @return tuple containing stdout and stderr output of the device
+        @rtype tuple of (bytes, bytes)
+        """
+        if not self.__serial:
+            return b"", b""
+
+        if not self.__serial.isConnected():
+            return b"", b"Device not connected or not switched on."
+
+        result = bytearray()
+        err = b""
+
+        if isinstance(commands, str):
+            commands = [commands]
+
+        # switch on raw mode
+        self.__blockReadyRead = True
+        ok = self.__rawOn()
+        if not ok:
+            self.__blockReadyRead = False
+            return (b"", b"Could not switch to raw mode. Is the device switched on?")
+
+        # send commands
+        QThread.msleep(10)
+        for command in commands:
+            if command:
+                commandBytes = command.encode("utf-8")
+                self.__serial.write(commandBytes + b"\x04")
+                QCoreApplication.processEvents(
+                    QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
+                )
+                ok = self.__serial.readUntil(b"OK")
+                if ok != b"OK":
+                    self.__blockReadyRead = False
+                    return (
+                        b"",
+                        "Expected 'OK', got '{0}', followed by '{1}'".format(
+                            ok, self.__serial.readAll()
+                        ).encode("utf-8"),
+                    )
+
+                # read until prompt
+                response = self.__serial.readUntil(b"\x04>", timeout=timeout)
+                if self.__serial.hasTimedOut():
+                    self.__blockReadyRead = False
+                    return b"", b"Timeout while processing commands."
+                if b"\x04" in response[:-2]:
+                    # split stdout, stderr
+                    out, err = response[:-2].split(b"\x04")
+                    result += out
+                else:
+                    err = b"invalid response received: " + response
+                if err:
+                    result = b""
+                    break
+
+        # switch off raw mode
+        QThread.msleep(10)
+        self.__rawOff()
+        self.__blockReadyRead = False
+
+        return bytes(result), err
+
+    def __execute_paste(self, commands, timeout=0):
+        """
+        Private method to send commands to the connected device using 'paste' mode
+        and return the result.
+
+        If no serial connection is available, empty results will be returned.
+
+        @param commands list of commands to be executed
+        @type str or list of str
+        @param timeout per command timeout in milliseconds (0 for configured default)
+            (defaults to 0)
+        @type int (optional)
+        @return tuple containing stdout and stderr output of the device
+        @rtype tuple of (bytes, bytes)
+        """
+        if not self.__serial:
+            return b"", b""
+
+        if not self.__serial.isConnected():
+            return b"", b"Device not connected or not switched on."
+
+        if isinstance(commands, list):
+            commands = "\n".join(commands)
+
+        # switch on paste mode
+        self.__blockReadyRead = True
+        ok = self.__pasteOn()
+        if not ok:
+            self.__blockReadyRead = False
+            return (b"", b"Could not switch to paste mode. Is the device switched on?")
+
+        # send commands
+        QThread.msleep(10)
+        for command in commands.splitlines(keepends=True):
+            # send the data as single lines
+            commandBytes = command.encode("utf-8")
+            self.__serial.write(commandBytes)
+            QCoreApplication.processEvents(
+                QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
+            )
+            QThread.msleep(10)
+            ok = self.__serial.readUntil(commandBytes)
+            if ok != commandBytes:
+                self.__blockReadyRead = False
+                return (
+                    b"",
+                    "Expected '{0}', got '{1}', followed by '{2}'".format(
+                        commandBytes, ok, self.__serial.readAll()
+                    ).encode("utf-8"),
+                )
+
+        # switch off paste mode causing the commands to be executed
+        self.__pasteOff()
+        QThread.msleep(10)
+        # read until Python prompt
+        result = (
+            self.__serial.readUntil(b">>> ", timeout=timeout)
+            .replace(b">>> ", b"")
+            .strip()
+        )
+        if self.__serial.hasTimedOut():
+            self.__blockReadyRead = False
+            return b"", b"Timeout while processing commands."
+
+        # get rid of any OSD string
+        if result.startswith(b"\x1b]0;"):
+            result = result.split(b"\x1b\\")[-1]
+
+        if self.TracebackMarker in result:
+            errorIndex = result.find(self.TracebackMarker)
+            out, err = result[:errorIndex], result[errorIndex:]
+        else:
+            out = result
+            err = b""
+
+        self.__blockReadyRead = False
+        return out, err
+
+    def executeAsync(self, commandsList, submitMode):
+        """
+        Public method to execute a series of commands over a period of time
+        without returning any result (asynchronous execution).
+
+        @param commandsList list of commands to be execute on the device
+        @type list of str
+        @param submitMode mode to be used to submit the commands
+        @type str (one of 'raw' or 'paste')
+        @exception ValueError raised to indicate an unknown submit mode
+        """
+        if submitMode not in ("raw", "paste"):
+            raise ValueError("Illegal submit mode given ({0})".format(submitMode))
+
+        if submitMode == "raw":
+            startSequence = [  # sequence of commands to enter raw mode
+                b"\x02",  # Ctrl-B: exit raw repl (just in case)
+                b"\r\x03\x03\x03",  # Ctrl-C three times: interrupt any running program
+                b"\r\x01",  # Ctrl-A: enter raw REPL
+                b'print("\\n")\r',
+            ]
+            endSequence = [
+                b"\r",
+                b"\x04",
+            ]
+            self.__executeAsyncRaw(
+                startSequence
+                + [c.encode("utf-8") + b"\r" for c in commandsList]
+                + endSequence
+            )
+        elif submitMode == "paste":
+            self.__executeAsyncPaste(commandsList)
+
+    def __executeAsyncRaw(self, commandsList):
+        """
+        Private method to execute a series of commands over a period of time
+        without returning any result (asynchronous execution).
+
+        @param commandsList list of commands to be execute on the device
+        @type list of bytes
+        """
+        if commandsList:
+            command = commandsList.pop(0)
+            self.__serial.write(command)
+            QTimer.singleShot(2, lambda: self.__executeAsyncRaw(commandsList))
+        else:
+            self.__rawOff()
+            self.executeAsyncFinished.emit()
+
+    def __executeAsyncPaste(self, commandsList):
+        """
+        Private method to execute a series of commands over a period of time
+        without returning any result (asynchronous execution).
+
+        @param commandsList list of commands to be execute on the device
+        @type list of str
+        """
+        self.__blockReadyRead = True
+        self.__pasteOn()
+        command = b"\n".join(c.encode("utf-8)") for c in commandsList)
+        self.__serial.write(command)
+        self.__serial.readUntil(command)
+        self.__blockReadyRead = False
+        self.__pasteOff()
+        self.executeAsyncFinished.emit

eric ide

mercurial