src/eric7/MicroPython/MicroPythonWebreplDeviceInterface.py

branch
mpy_network
changeset 10008
c5bcafe3485c
child 10012
d649d500a9a1
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/eric7/MicroPython/MicroPythonWebreplDeviceInterface.py	Tue May 02 12:01:40 2023 +0200
@@ -0,0 +1,299 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2023 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module  implementing an interface to talk to a connected MicroPython device via
+a webrepl connection.
+"""
+
+from PyQt6.QtCore import QThread, pyqtSlot
+from PyQt6.QtWidgets import QInputDialog, QLineEdit
+
+from eric7 import Preferences
+from eric7.EricWidgets import EricMessageBox
+
+from .MicroPythonDeviceInterface import MicroPythonDeviceInterface
+from .MicroPythonWebreplSocket import MicroPythonWebreplSocket
+
+
+class MicroPythonWebreplDeviceInterface(MicroPythonDeviceInterface):
+    """
+    Class implementing an interface to talk to a connected MicroPython device via
+    a webrepl connection.
+    """
+
+    def __init__(self, parent=None):
+        """
+        Constructor
+
+        @param parent reference to the parent object
+        @type QObject
+        """
+        super().__init__(parent)
+
+        self.__blockReadyRead = False
+
+        self.__socket = MicroPythonWebreplSocket(
+            timeout=Preferences.getMicroPython("WebreplTimeout"), parent=self
+        )
+        self.__connected = False
+        self.__socket.readyRead.connect(self.__readSocket)
+
+    @pyqtSlot()
+    def __readSocket(self):
+        """
+        Private slot to read all available data and emit it with the
+        "dataReceived" signal for further processing.
+        """
+        if not self.__blockReadyRead:
+            data = bytes(self.__socket.readAll())
+            self.dataReceived.emit(data)
+
+    def __readAll(self):
+        """
+        Private method to read all data and emit it for further processing.
+        """
+        data = self.__socket.readAll()
+        self.dataReceived.emit(data)
+
+    @pyqtSlot()
+    def connectToDevice(self, connection):
+        """
+        Public slot to connect to the device.
+
+        @param connection name of the connection to be used in the form of an URL string
+            (ws://password@host:port)
+        @type str
+        @return flag indicating success
+        @rtype bool
+        """
+        connection = connection.replace("ws://", "")
+        try:
+            password, hostPort = connection.split("@", 1)
+        except ValueError:
+            password, hostPort = None, connection
+        if password is None:
+            password, ok = QInputDialog.getText(
+                None,
+                self.tr("WebRepl Password"),
+                self.tr("Enter the WebRepl password:"),
+                QLineEdit.EchoMode.Password,
+            )
+            if not ok:
+                return False
+
+        try:
+            host, port = hostPort.split(":", 1)
+            port = int(port)
+        except ValueError:
+            host, port = hostPort, 8266  # default port is 8266
+
+        self.__blockReadyRead = True
+        ok = self.__socket.connectToDevice(host, port)
+        if ok:
+            ok = self.__socket.login(password)
+            if not ok:
+                EricMessageBox.warning(
+                    None,
+                    self.tr("WebRepl Login"),
+                    self.tr(
+                        "The login to the selected device 'webrepl' failed. The given"
+                        " password may be incorrect."
+                    ),
+                )
+
+        self.__connected = ok
+        self.__blockReadyRead = False
+
+        return self.__connected
+
+    @pyqtSlot()
+    def disconnectFromDevice(self):
+        """
+        Public slot to disconnect from the device.
+        """
+        self.__socket.disconnect()
+        self.__connected = False
+
+    def isConnected(self):
+        """
+        Public method to get the connection status.
+
+        @return flag indicating the connection status
+        @rtype bool
+        """
+        return self.__connected
+
+    @pyqtSlot()
+    def handlePreferencesChanged(self):
+        """
+        Public slot to handle a change of the preferences.
+        """
+        self.__socket.setTimeout(Preferences.getMicroPython("WebreplTimeout"))
+
+    def write(self, data):
+        """
+        Public method to write data to the connected device.
+
+        @param data data to be written
+        @type bytes or bytearray
+        """
+        self.__connected and self.__socket.writeTextMessage(data)
+
+    def probeDevice(self):
+        """
+        Public method to check the device is responding.
+
+        If the device has not been flashed with a MicroPython firmware, the
+        probe will fail.
+
+        @return flag indicating a communicating MicroPython device
+        @rtype bool
+        """
+        if not self.__connected:
+            return False
+
+        # switch on paste mode
+        self.__blockReadyRead = True
+        ok = self.__pasteOn()
+        if not ok:
+            self.__blockReadyRead = False
+            return False
+
+        # switch off raw mode
+        QThread.msleep(10)
+        self.__pasteOff()
+        self.__blockReadyRead = False
+
+        return True
+
+    def execute(self, commands, *, mode="raw", timeout=0):
+        """
+        Public method to send commands to the connected device and return the
+        result.
+
+        @param commands list of commands to be executed
+        @type str or list of str
+        @keyparam mode submit mode to be used (one of 'raw' or 'paste') (defaults to
+            'raw'). This is ignored because webrepl always uses 'paste' mode.
+        @type str
+        @keyparam timeout per command timeout in milliseconds (0 for configured default)
+            (defaults to 0)
+        @type int (optional)
+        @return tuple containing stdout and stderr output of the device
+        @rtype tuple of (bytes, bytes)
+        """
+        if not self.__connected:
+            return b"", b"Device is not connected."
+
+        if isinstance(commands, list):
+            commands = "\n".join(commands)
+
+        # switch on paste mode
+        self.__blockReadyRead = True
+        ok = self.__pasteOn()
+        if not ok:
+            self.__blockReadyRead = False
+            return (b"", b"Could not switch to paste mode. Is the device switched on?")
+
+        # send commands
+        commandBytes = commands.encode("utf-8")
+        self.__socket.writeTextMessage(commandBytes)
+        ok = self.__socket.readUntil(commandBytes)
+        if ok != commandBytes:
+            self.__blockReadyRead = False
+            return (
+                b"",
+                "Expected '{0}', got '{1}', followed by '{2}'".format(
+                    commandBytes, ok, self.__socket.readAll()
+                ).encode("utf-8"),
+            )
+
+        # switch off paste mode causing the commands to be executed
+        self.__pasteOff()
+
+        # read until Python prompt
+        result = (
+            self.__socket.readUntil(b">>> ", timeout=timeout)
+            .replace(b">>> ", b"")
+            .strip()
+        )
+        if self.__socket.hasTimedOut():
+            self.__blockReadyRead = False
+            return b"", b"Timeout while processing commands."
+
+        # get rid of any OSD string
+        # TODO: emit the OSD data
+        if result.startswith(b"\x1b]0;"):
+            result = result.split(b"\x1b\\")[-1]
+
+        if self.TracebackMarker in result:
+            errorIndex = result.find(self.TracebackMarker)
+            out, err = result[:errorIndex], result[errorIndex:].replace(">>> ", "")
+        else:
+            out = result
+            err = b""
+
+        self.__blockReadyRead = False
+        return out, err
+
+    def executeAsync(self, commandsList, submitMode):
+        """
+        Public method to execute a series of commands over a period of time
+        without returning any result (asynchronous execution).
+
+        @param commandsList list of commands to be execute on the device
+        @type list of str
+        @param submitMode mode to be used to submit the commands
+        @type str (one of 'raw' or 'paste')
+        """
+        self.__blockReadyRead = True
+        self.__pasteOn()
+        command = b"\n".join(c.encode("utf-8)") for c in commandsList)
+        self.__socket.writeTextMessage(command)
+        self.__socket.readUntil(command)
+        self.__blockReadyRead = False
+        self.__pasteOff()
+        self.executeAsyncFinished.emit()
+
+    def __pasteOn(self):
+        """
+        Private method to switch the connected device to 'paste' mode.
+
+        Note: switching to paste mode is done with synchronous writes.
+
+        @return flag indicating success
+        @rtype bool
+        """
+        if not self.__connected:
+            return False
+
+        pasteMessage = b"paste mode; Ctrl-C to cancel, Ctrl-D to finish\r\n=== "
+
+        self.__socket.writeTextMessage(b"\x02")  # end raw mode if required
+        for _i in range(3):
+            # CTRL-C three times to break out of loops
+            self.__socket.writeTextMessage(b"\r\x03")
+            # time out after 500ms if device is not responding
+        self.__socket.readAll()  # read all data and discard it
+        self.__socket.writeTextMessage(b"\r\x05")  # send CTRL-E to enter paste mode
+        self.__socket.readUntil(pasteMessage)
+
+        if self.__socket.hasTimedOut():
+            # it timed out; try it again and than fail
+            self.__socket.writeTextMessage(b"\r\x05")  # send CTRL-E again
+            self.__socket.readUntil(pasteMessage)
+            if self.__socket.hasTimedOut():
+                return False
+
+        self.__socket.readAll()  # read all data and discard it
+        return True
+
+    def __pasteOff(self):
+        """
+        Private method to switch 'paste' mode off.
+        """
+        if self.__connected:
+            self.__socket.writeTextMessage(b"\x04")  # send CTRL-D to cancel paste mode

eric ide

mercurial