src/eric7/MicroPython/MicroPythonDeviceInterface.py

branch
mpy_network
changeset 9990
54c614d91eff
parent 9989
286c2a21f36f
child 10008
c5bcafe3485c
--- a/src/eric7/MicroPython/MicroPythonDeviceInterface.py	Thu Apr 27 17:59:09 2023 +0200
+++ b/src/eric7/MicroPython/MicroPythonDeviceInterface.py	Fri Apr 28 12:07:41 2023 +0200
@@ -1,25 +1,13 @@
 # -*- coding: utf-8 -*-
 
-# Copyright (c) 2019 - 2023 Detlev Offenbach <detlev@die-offenbachs.de>
+# Copyright (c) 2023 Detlev Offenbach <detlev@die-offenbachs.de>
 #
 
 """
-Module implementing some file system commands for MicroPython.
+Module  implementing an interface base class to talk to a connected MicroPython device.
 """
 
-from PyQt6.QtCore import (
-    QCoreApplication,
-    QEventLoop,
-    QObject,
-    QThread,
-    QTimer,
-    pyqtSignal,
-    pyqtSlot,
-)
-
-from eric7 import Preferences
-
-from .MicroPythonSerialPort import MicroPythonSerialPort
+from PyQt6.QtCore import QObject, pyqtSignal, pyqtSlot
 
 
 class MicroPythonDeviceInterface(QObject):
@@ -28,16 +16,13 @@
 
     @signal executeAsyncFinished() emitted to indicate the end of an
         asynchronously executed list of commands (e.g. a script)
-    @signal dataReceived(data) emitted to send data received via the serial
-        connection for further processing
+    @signal dataReceived(data) emitted to send data received via the connection
+        for further processing
     """
 
     executeAsyncFinished = pyqtSignal()
     dataReceived = pyqtSignal(bytes)
 
-    PasteModePrompt = b"=== "
-    TracebackMarker = b"Traceback (most recent call last):"
-
     def __init__(self, parent=None):
         """
         Constructor
@@ -47,43 +32,35 @@
         """
         super().__init__(parent)
 
-        self.__repl = parent
-
-        self.__blockReadyRead = False
-
-        self.__serial = MicroPythonSerialPort(
-            timeout=Preferences.getMicroPython("SerialTimeout"), parent=self
-        )
-        self.__serial.readyRead.connect(self.__readSerial)
-
     @pyqtSlot()
-    def __readSerial(self):
+    def connectToDevice(self, connection):
         """
-        Private slot to read all available serial data and emit it with the
-        "dataReceived" signal for further processing.
-        """
-        if not self.__blockReadyRead:
-            data = bytes(self.__serial.readAll())
-            self.dataReceived.emit(data)
+        Public slot to connect to the device.
 
-    @pyqtSlot()
-    def connectToDevice(self, port):
-        """
-        Public slot to start the manager.
-
-        @param port name of the port to be used
+        @param connection name of the connection to be used
         @type str
         @return flag indicating success
         @rtype bool
+        @exception NotImplementedError raised to indicate that this method needs to
+            be implemented in a derived class
         """
-        return self.__serial.openSerialLink(port)
+        raise NotImplementedError(
+            "This method needs to be implemented in a derived class."
+        )
+
+        return False
 
     @pyqtSlot()
     def disconnectFromDevice(self):
         """
-        Public slot to stop the thread.
+        Public slot to disconnect from the device.
+
+        @exception NotImplementedError raised to indicate that this method needs to
+            be implemented in a derived class
         """
-        self.__serial.closeSerialLink()
+        raise NotImplementedError(
+            "This method needs to be implemented in a derived class."
+        )
 
     def isConnected(self):
         """
@@ -91,15 +68,21 @@
 
         @return flag indicating the connection status
         @rtype bool
+        @exception NotImplementedError raised to indicate that this method needs to
+            be implemented in a derived class
         """
-        return self.__serial.isConnected()
+        raise NotImplementedError(
+            "This method needs to be implemented in a derived class."
+        )
+
+        return False
 
     @pyqtSlot()
     def handlePreferencesChanged(self):
         """
         Public slot to handle a change of the preferences.
         """
-        self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout"))
+        pass
 
     def write(self, data):
         """
@@ -107,149 +90,37 @@
 
         @param data data to be written
         @type bytes or bytearray
-        """
-        self.__serial.isConnected() and self.__serial.write(data)
-
-    def __pasteOn(self):
-        """
-        Private method to switch the connected device to 'paste' mode.
-
-        Note: switching to paste mode is done with synchronous writes.
-
-        @return flag indicating success
-        @rtype bool
-        """
-        if not self.__serial:
-            return False
-
-        pasteMessage = b"paste mode; Ctrl-C to cancel, Ctrl-D to finish\r\n=== "
-
-        self.__serial.clear()  # clear any buffered output before entering paste mode
-        self.__serial.write(b"\x02")  # end raw mode if required
-        written = self.__serial.waitForBytesWritten(500)
-        # time out after 500ms if device is not responding
-        if not written:
-            return False
-        for _i in range(3):
-            # CTRL-C three times to break out of loops
-            self.__serial.write(b"\r\x03")
-            written = self.__serial.waitForBytesWritten(500)
-            # time out after 500ms if device is not responding
-            if not written:
-                return False
-            QThread.msleep(10)
-        self.__serial.readAll()  # read all data and discard it
-        self.__serial.write(b"\r\x05")  # send CTRL-E to enter paste mode
-        self.__serial.readUntil(pasteMessage)
-
-        if self.__serial.hasTimedOut():
-            # it timed out; try it again and than fail
-            self.__serial.write(b"\r\x05")  # send CTRL-E again
-            self.__serial.readUntil(pasteMessage)
-            if self.__serial.hasTimedOut():
-                return False
-
-        QCoreApplication.processEvents(
-            QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
-        )
-        self.__serial.readAll()  # read all data and discard it
-        return True
-
-    def __pasteOff(self):
-        """
-        Private method to switch 'paste' mode off.
+        @exception NotImplementedError raised to indicate that this method needs to
+            be implemented in a derived class
         """
-        if self.__serial:
-            self.__serial.write(b"\x04")  # send CTRL-D to cancel paste mode
-
-    def __rawOn(self):
-        """
-        Private method to switch the connected device to 'raw' mode.
-
-        Note: switching to raw mode is done with synchronous writes.
-
-        @return flag indicating success
-        @rtype bool
-        """
-        if not self.__serial:
-            return False
-
-        rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>"
-
-        self.__serial.write(b"\x02")  # end raw mode if required
-        written = self.__serial.waitForBytesWritten(500)
-        # time out after 500ms if device is not responding
-        if not written:
-            return False
-        for _i in range(3):
-            # CTRL-C three times to break out of loops
-            self.__serial.write(b"\r\x03")
-            written = self.__serial.waitForBytesWritten(500)
-            # time out after 500ms if device is not responding
-            if not written:
-                return False
-            QThread.msleep(10)
-        self.__serial.readAll()  # read all data and discard it
-        self.__serial.write(b"\r\x01")  # send CTRL-A to enter raw mode
-        self.__serial.readUntil(rawReplMessage)
-        if self.__serial.hasTimedOut():
-            # it timed out; try it again and than fail
-            self.__serial.write(b"\r\x01")  # send CTRL-A again
-            self.__serial.readUntil(rawReplMessage)
-            if self.__serial.hasTimedOut():
-                return False
-
-        QCoreApplication.processEvents(
-            QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
+        raise NotImplementedError(
+            "This method needs to be implemented in a derived class."
         )
-        self.__serial.readAll()  # read all data and discard it
-        return True
-
-    def __rawOff(self):
-        """
-        Private method to switch 'raw' mode off.
-        """
-        if self.__serial:
-            self.__serial.write(b"\x02")  # send CTRL-B to cancel raw mode
-            self.__serial.readUntil(b">>> ")  # read until Python prompt
-            self.__serial.readAll()  # read all data and discard it
 
     def probeDevice(self):
         """
         Public method to check the device is responding.
 
-        If the device has not been flashed with a MicroPython formware, the
+        If the device has not been flashed with a MicroPython firmware, the
         probe will fail.
 
         @return flag indicating a communicating MicroPython device
         @rtype bool
+        @exception NotImplementedError raised to indicate that this method needs to
+            be implemented in a derived class
         """
-        if not self.__serial:
-            return False
-
-        if not self.__serial.isConnected():
-            return False
+        raise NotImplementedError(
+            "This method needs to be implemented in a derived class."
+        )
 
-        # switch on raw mode
-        self.__blockReadyRead = True
-        ok = self.__pasteOn()
-        if not ok:
-            self.__blockReadyRead = False
-            return False
-
-        # switch off raw mode
-        QThread.msleep(10)
-        self.__pasteOff()
-        self.__blockReadyRead = False
-
-        return True
+        return False
 
     def execute(self, commands, *, mode="raw", timeout=0):
         """
         Public method to send commands to the connected device and return the
         result.
 
-        If no serial connection is available, empty results will be returned.
+        If no connection is available, empty results will be returned.
 
         @param commands list of commands to be executed
         @type str or list of str
@@ -261,171 +132,18 @@
         @type int (optional)
         @return tuple containing stdout and stderr output of the device
         @rtype tuple of (bytes, bytes)
+        @exception NotImplementedError raised to indicate that this method needs to
+            be implemented in a derived class
         @exception ValueError raised in case of an unsupported submit mode
         """
+        raise NotImplementedError(
+            "This method needs to be implemented in a derived class."
+        )
+
         if mode not in ("paste", "raw"):
             raise ValueError("Unsupported submit mode given ('{0}').".format(mode))
 
-        if mode == "raw":
-            return self.__execute_raw(commands, timeout=timeout)
-        elif mode == "paste":
-            return self.__execute_paste(commands, timeout=timeout)
-        else:
-            # just in case
-            return b"", b""
-
-    def __execute_raw(self, commands, timeout=0):
-        """
-        Private method to send commands to the connected device using 'raw REPL' mode
-        and return the result.
-
-        If no serial connection is available, empty results will be returned.
-
-        @param commands list of commands to be executed
-        @type str or list of str
-        @param timeout per command timeout in milliseconds (0 for configured default)
-            (defaults to 0)
-        @type int (optional)
-        @return tuple containing stdout and stderr output of the device
-        @rtype tuple of (bytes, bytes)
-        """
-        if not self.__serial:
-            return b"", b""
-
-        if not self.__serial.isConnected():
-            return b"", b"Device not connected or not switched on."
-
-        result = bytearray()
-        err = b""
-
-        if isinstance(commands, str):
-            commands = [commands]
-
-        # switch on raw mode
-        self.__blockReadyRead = True
-        ok = self.__rawOn()
-        if not ok:
-            self.__blockReadyRead = False
-            return (b"", b"Could not switch to raw mode. Is the device switched on?")
-
-        # send commands
-        QThread.msleep(10)
-        for command in commands:
-            if command:
-                commandBytes = command.encode("utf-8")
-                self.__serial.write(commandBytes + b"\x04")
-                QCoreApplication.processEvents(
-                    QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
-                )
-                ok = self.__serial.readUntil(b"OK")
-                if ok != b"OK":
-                    self.__blockReadyRead = False
-                    return (
-                        b"",
-                        "Expected 'OK', got '{0}', followed by '{1}'".format(
-                            ok, self.__serial.readAll()
-                        ).encode("utf-8"),
-                    )
-
-                # read until prompt
-                response = self.__serial.readUntil(b"\x04>", timeout=timeout)
-                if self.__serial.hasTimedOut():
-                    self.__blockReadyRead = False
-                    return b"", b"Timeout while processing commands."
-                if b"\x04" in response[:-2]:
-                    # split stdout, stderr
-                    out, err = response[:-2].split(b"\x04")
-                    result += out
-                else:
-                    err = b"invalid response received: " + response
-                if err:
-                    result = b""
-                    break
-
-        # switch off raw mode
-        QThread.msleep(10)
-        self.__rawOff()
-        self.__blockReadyRead = False
-
-        return bytes(result), err
-
-    def __execute_paste(self, commands, timeout=0):
-        """
-        Private method to send commands to the connected device using 'paste' mode
-        and return the result.
-
-        If no serial connection is available, empty results will be returned.
-
-        @param commands list of commands to be executed
-        @type str or list of str
-        @param timeout per command timeout in milliseconds (0 for configured default)
-            (defaults to 0)
-        @type int (optional)
-        @return tuple containing stdout and stderr output of the device
-        @rtype tuple of (bytes, bytes)
-        """
-        if not self.__serial:
-            return b"", b""
-
-        if not self.__serial.isConnected():
-            return b"", b"Device not connected or not switched on."
-
-        if isinstance(commands, list):
-            commands = "\n".join(commands)
-
-        # switch on paste mode
-        self.__blockReadyRead = True
-        ok = self.__pasteOn()
-        if not ok:
-            self.__blockReadyRead = False
-            return (b"", b"Could not switch to paste mode. Is the device switched on?")
-
-        # send commands
-        QThread.msleep(10)
-        for command in commands.splitlines(keepends=True):
-            # send the data as single lines
-            commandBytes = command.encode("utf-8")
-            self.__serial.write(commandBytes)
-            QCoreApplication.processEvents(
-                QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
-            )
-            QThread.msleep(10)
-            ok = self.__serial.readUntil(commandBytes)
-            if ok != commandBytes:
-                self.__blockReadyRead = False
-                return (
-                    b"",
-                    "Expected '{0}', got '{1}', followed by '{2}'".format(
-                        commandBytes, ok, self.__serial.readAll()
-                    ).encode("utf-8"),
-                )
-
-        # switch off paste mode causing the commands to be executed
-        self.__pasteOff()
-        QThread.msleep(10)
-        # read until Python prompt
-        result = (
-            self.__serial.readUntil(b">>> ", timeout=timeout)
-            .replace(b">>> ", b"")
-            .strip()
-        )
-        if self.__serial.hasTimedOut():
-            self.__blockReadyRead = False
-            return b"", b"Timeout while processing commands."
-
-        # get rid of any OSD string
-        if result.startswith(b"\x1b]0;"):
-            result = result.split(b"\x1b\\")[-1]
-
-        if self.TracebackMarker in result:
-            errorIndex = result.find(self.TracebackMarker)
-            out, err = result[:errorIndex], result[errorIndex:]
-        else:
-            out = result
-            err = b""
-
-        self.__blockReadyRead = False
-        return out, err
+        return b"", b""
 
     def executeAsync(self, commandsList, submitMode):
         """
@@ -434,61 +152,18 @@
 
         @param commandsList list of commands to be execute on the device
         @type list of str
-        @param submitMode mode to be used to submit the commands
-        @type str (one of 'raw' or 'paste')
+        @param submitMode mode to be used to submit the commands (one of 'raw'
+            or 'paste')
+        @type str
+        @exception NotImplementedError raised to indicate that this method needs to
+            be implemented in a derived class
         @exception ValueError raised to indicate an unknown submit mode
         """
-        if submitMode not in ("raw", "paste"):
-            raise ValueError("Illegal submit mode given ({0})".format(submitMode))
-
-        if submitMode == "raw":
-            startSequence = [  # sequence of commands to enter raw mode
-                b"\x02",  # Ctrl-B: exit raw repl (just in case)
-                b"\r\x03\x03\x03",  # Ctrl-C three times: interrupt any running program
-                b"\r\x01",  # Ctrl-A: enter raw REPL
-                b'print("\\n")\r',
-            ]
-            endSequence = [
-                b"\r",
-                b"\x04",
-            ]
-            self.__executeAsyncRaw(
-                startSequence
-                + [c.encode("utf-8") + b"\r" for c in commandsList]
-                + endSequence
-            )
-        elif submitMode == "paste":
-            self.__executeAsyncPaste(commandsList)
-
-    def __executeAsyncRaw(self, commandsList):
-        """
-        Private method to execute a series of commands over a period of time
-        without returning any result (asynchronous execution).
+        raise NotImplementedError(
+            "This method needs to be implemented in a derived class."
+        )
 
-        @param commandsList list of commands to be execute on the device
-        @type list of bytes
-        """
-        if commandsList:
-            command = commandsList.pop(0)
-            self.__serial.write(command)
-            QTimer.singleShot(2, lambda: self.__executeAsyncRaw(commandsList))
-        else:
-            self.__rawOff()
-            self.executeAsyncFinished.emit()
-
-    def __executeAsyncPaste(self, commandsList):
-        """
-        Private method to execute a series of commands over a period of time
-        without returning any result (asynchronous execution).
-
-        @param commandsList list of commands to be execute on the device
-        @type list of str
-        """
-        self.__blockReadyRead = True
-        self.__pasteOn()
-        command = b"\n".join(c.encode("utf-8)") for c in commandsList)
-        self.__serial.write(command)
-        self.__serial.readUntil(command)
-        self.__blockReadyRead = False
-        self.__pasteOff()
-        self.executeAsyncFinished.emit
+        if submitMode not in ("raw", "paste"):
+            raise ValueError(
+                "Unsupported submit mode given ('{0}').".format(submitMode)
+            )

eric ide

mercurial