src/eric7/MicroPython/MicroPythonDeviceInterface.py

branch
mpy_network
changeset 9799
a79430a8811d
parent 9775
c6806d24468b
child 9810
39d3b227358c
--- a/src/eric7/MicroPython/MicroPythonDeviceInterface.py	Fri Feb 24 18:36:43 2023 +0100
+++ b/src/eric7/MicroPython/MicroPythonDeviceInterface.py	Sat Feb 25 19:18:07 2023 +0100
@@ -35,6 +35,9 @@
     executeAsyncFinished = pyqtSignal()
     dataReceived = pyqtSignal(bytes)
 
+    PasteModePrompt = b"=== "
+    TracebackMarker = b"Traceback (most recent call last):"
+
     def __init__(self, parent=None):
         """
         Constructor
@@ -107,6 +110,56 @@
         """
         self.__serial.isConnected() and self.__serial.write(data)
 
+    def __pasteOn(self):
+        """
+        Private method to switch the connected device to 'paste' mode.
+
+        Note: switching to paste mode is done with synchronous writes.
+
+        @return flag indicating success
+        @rtype bool
+        """
+        if not self.__serial:
+            return False
+
+        pasteMessage = b"paste mode; Ctrl-C to cancel, Ctrl-D to finish\r\n=== "
+        self.__serial.write(b"\x02")  # end raw mode if required
+        written = self.__serial.waitForBytesWritten(500)
+        # time out after 500ms if device is not responding
+        if not written:
+            return False
+        for _i in range(3):
+            # CTRL-C three times to break out of loops
+            self.__serial.write(b"\r\x03")
+            written = self.__serial.waitForBytesWritten(500)
+            # time out after 500ms if device is not responding
+            if not written:
+                return False
+            QThread.msleep(10)
+        self.__serial.readAll()  # read all data and discard it
+        self.__serial.write(b"\r\x05")  # send CTRL-E to enter paste mode
+        self.__serial.readUntil(pasteMessage)
+
+        if self.__serial.hasTimedOut():
+            # it timed out; try it again and than fail
+            self.__serial.write(b"\r\x05")  # send CTRL-E again
+            self.__serial.readUntil(pasteMessage)
+            if self.__serial.hasTimedOut():
+                return False
+
+        QCoreApplication.processEvents(
+            QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
+        )
+        self.__serial.readAll()  # read all data and discard it
+        return True
+
+    def __pasteOff(self):
+        """
+        Private method to switch 'paste' mode off.
+        """
+        if self.__serial:
+            self.__serial.write(b"\x04")  # send CTRL-D to cancel paste mode
+
     def __rawOn(self):
         """
         Private method to switch the connected device to 'raw' mode.
@@ -114,7 +167,7 @@
         Note: switching to raw mode is done with synchronous writes.
 
         @return flag indicating success
-        @@rtype bool
+        @rtype bool
         """
         if not self.__serial:
             return False
@@ -189,7 +242,7 @@
 
         return True
 
-    def execute(self, commands, timeout=0):
+    def execute(self, commands, *, mode="raw", timeout=0):
         """
         Public method to send commands to the connected device and return the
         result.
@@ -198,6 +251,36 @@
 
         @param commands list of commands to be executed
         @type str or list of str
+        @keyparam mode submit mode to be used (one of 'raw' or 'paste') (defaults to
+            'raw')
+        @type str
+        @keyparam timeout per command timeout in milliseconds (0 for configured default)
+            (defaults to 0)
+        @type int (optional)
+        @return tuple containing stdout and stderr output of the device
+        @rtype tuple of (bytes, bytes)
+        @exception ValueError raised in case of an unsupported submit mode
+        """
+        if mode not in ("paste", "raw"):
+            raise ValueError("Unsupported submit mode given ('{0}').".format(mode))
+
+        if mode == "raw":
+            return self.__execute_raw(commands, timeout=timeout)
+        elif mode == "paste":
+            return self.__execute_paste(commands, timeout=timeout)
+        else:
+            # just in case
+            return b"", b""
+
+    def __execute_raw(self, commands, timeout=0):
+        """
+        Private method to send commands to the connected device using 'raw REPL' mode
+        and return the result.
+
+        If no serial connection is available, empty results will be returned.
+
+        @param commands list of commands to be executed
+        @type str or list of str
         @param timeout per command timeout in milliseconds (0 for configured default)
             (defaults to 0)
         @type int (optional)
@@ -234,6 +317,7 @@
                 )
                 ok = self.__serial.readUntil(b"OK")
                 if ok != b"OK":
+                    self.__blockReadyRead = False
                     return (
                         b"",
                         "Expected 'OK', got '{0}', followed by '{1}'".format(
@@ -263,6 +347,76 @@
 
         return bytes(result), err
 
+    def __execute_paste(self, commands, timeout=0):
+        """
+        Private method to send commands to the connected device using 'paste' mode
+        and return the result.
+
+        If no serial connection is available, empty results will be returned.
+
+        @param commands list of commands to be executed
+        @type str or list of str
+        @param timeout per command timeout in milliseconds (0 for configured default)
+            (defaults to 0)
+        @type int (optional)
+        @return tuple containing stdout and stderr output of the device
+        @rtype tuple of (bytes, bytes)
+        """
+        if not self.__serial:
+            return b"", b""
+
+        if not self.__serial.isConnected():
+            return b"", b"Device not connected or not switched on."
+
+        if isinstance(commands, list):
+            commands = "\n".join(commands)
+
+        # switch on raw mode
+        self.__blockReadyRead = True
+        ok = self.__pasteOn()
+        if not ok:
+            self.__blockReadyRead = False
+            return (b"", b"Could not switch to raw mode. Is the device switched on?")
+
+        # send commands
+        QThread.msleep(10)
+        for command in commands.splitlines(keepends=True):
+            # send the data as single lines
+            commandBytes = command.encode("utf-8")
+            self.__serial.write(commandBytes)
+            QCoreApplication.processEvents(
+                QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
+            )
+            QThread.msleep(10)
+            ok = self.__serial.readUntil(commandBytes)
+            if ok != commandBytes:
+                self.__blockReadyRead = False
+                return (
+                    b"",
+                    "Expected '{0}', got '{1}', followed by '{2}'".format(
+                        commandBytes, ok, self.__serial.readAll()
+                    ).encode("utf-8"),
+                )
+
+        # switch off paste mode causing the commands to be executed
+        self.__pasteOff()
+        QThread.msleep(10)
+        result = self.__serial.readUntil(b">>> ", timeout=timeout).replace(b">>> ", b"")
+        # read until Python prompt
+        if self.__serial.hasTimedOut():
+            self.__blockReadyRead = False
+            return b"", b"Timeout while processing commands."
+
+        if self.TracebackMarker in result:
+            errorIndex = result.find(self.TracebackMarker)
+            out, err = result[:errorIndex], result[errorIndex:]
+        else:
+            out = result.strip()
+            err = b""
+
+        self.__blockReadyRead = False
+        return out, err
+
     def executeAsync(self, commandsList):
         """
         Public method to execute a series of commands over a period of time
@@ -277,3 +431,25 @@
             QTimer.singleShot(2, lambda: self.executeAsync(commandsList))
         else:
             self.executeAsyncFinished.emit()
+
+    def executeAsyncPaste(self, commandsList):
+        """
+        Public method to execute a series of commands over a period of time
+        without returning any result (asynchronous execution).
+
+        @param commandsList list of commands to be execute on the device
+        @type list of bytes
+        """
+        if commandsList:
+            self.__blockReadyRead = True
+            command = commandsList.pop(0)
+            if command == "@PasteOn@":
+                self.__pasteOn()
+            else:
+                self.__serial.write(command)
+                self.__serial.readUntil(command)
+            QTimer.singleShot(2, lambda: self.executeAsyncPaste(commandsList))
+        else:
+            self.__blockReadyRead = False
+            self.__pasteOff()
+            self.executeAsyncFinished.emit()

eric ide

mercurial