src/eric7/MicroPython/MicroPythonDeviceInterface.py

branch
eric7
changeset 9765
6378da868bb0
parent 9764
57496966803c
child 9766
f0e22f3a5878
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/eric7/MicroPython/MicroPythonDeviceInterface.py	Tue Feb 14 18:10:30 2023 +0100
@@ -0,0 +1,305 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2019 - 2023 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing some file system commands for MicroPython.
+"""
+
+from PyQt6.QtCore import (
+    QCoreApplication,
+    QEventLoop,
+    QObject,
+    QThread,
+    QTimer,
+    pyqtSignal,
+    pyqtSlot,
+)
+
+from eric7 import Preferences
+
+from .MicroPythonSerialPort import MicroPythonSerialPort
+
+
+class MicroPythonDeviceInterface(QObject):
+    """
+    Class implementing some file system commands for MicroPython.
+
+    Commands are provided to perform operations on the file system of a
+    connected MicroPython device. Supported commands are:
+    <ul>
+    <li>ls: directory listing</li>
+    <li>lls: directory listing with meta data</li>
+    <li>cd: change directory</li>
+    <li>pwd: get the current directory</li>
+    <li>put: copy a file to the connected device</li>
+    <li>putData: write data to a file of the connected device</li>
+    <li>get: get a file from the connected device</li>
+    <li>getData: read data of a file of the connected device</li>
+    <li>rm: remove a file from the connected device</li>
+    <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash)
+    <li>mkdir: create a new directory</li>
+    <li>rmdir: remove an empty directory</li>
+    <li>fileSystemInfo: get information about the file system
+    </ul>
+
+    There are additional non file systemcommands.
+    <ul>
+    <li>getBoardData: get information about the connected board</li>
+    <li>getDeviceData: get version info about MicroPython and some implementation
+        information</li>
+    <li>getModules: get a list of built-in modules</li>
+    <li>getTime: get the current time</li>
+    <li>syncTime: synchronize the time of the connected device</li>
+    <li>showTime: show the current time of the connected device</li>
+    </ul>
+
+    @signal executeAsyncFinished() emitted to indicate the end of an
+        asynchronously executed list of commands (e.g. a script)
+    @signal dataReceived(data) emitted to send data received via the serial
+        connection for further processing
+    """
+
+    executeAsyncFinished = pyqtSignal()
+    dataReceived = pyqtSignal(bytes)
+
+    def __init__(self, parent=None):
+        """
+        Constructor
+
+        @param parent reference to the parent object
+        @type QObject
+        """
+        super().__init__(parent)
+
+        self.__repl = parent
+
+        self.__blockReadyRead = False
+
+        self.__serial = MicroPythonSerialPort(
+            timeout=Preferences.getMicroPython("SerialTimeout"), parent=self
+        )
+        self.__serial.readyRead.connect(self.__readSerial)
+
+    @pyqtSlot()
+    def __readSerial(self):
+        """
+        Private slot to read all available serial data and emit it with the
+        "dataReceived" signal for further processing.
+        """
+        if not self.__blockReadyRead:
+            data = bytes(self.__serial.readAll())
+            self.dataReceived.emit(data)
+
+    @pyqtSlot()
+    def connectToDevice(self, port):
+        """
+        Public slot to start the manager.
+
+        @param port name of the port to be used
+        @type str
+        @return flag indicating success
+        @rtype bool
+        """
+        return self.__serial.openSerialLink(port)
+
+    @pyqtSlot()
+    def disconnectFromDevice(self):
+        """
+        Public slot to stop the thread.
+        """
+        self.__serial.closeSerialLink()
+
+    def isConnected(self):
+        """
+        Public method to get the connection status.
+
+        @return flag indicating the connection status
+        @rtype bool
+        """
+        return self.__serial.isConnected()
+
+    @pyqtSlot()
+    def handlePreferencesChanged(self):
+        """
+        Public slot to handle a change of the preferences.
+        """
+        self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout"))
+
+    def write(self, data):
+        """
+        Public method to write data to the connected device.
+
+        @param data data to be written
+        @type bytes or bytearray
+        """
+        self.__serial.isConnected() and self.__serial.write(data)
+
+    def __rawOn(self):
+        """
+        Private method to switch the connected device to 'raw' mode.
+
+        Note: switching to raw mode is done with synchronous writes.
+
+        @return flag indicating success
+        @@rtype bool
+        """
+        if not self.__serial:
+            return False
+
+        rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>"
+
+        self.__serial.write(b"\x02")  # end raw mode if required
+        written = self.__serial.waitForBytesWritten(500)
+        # time out after 500ms if device is not responding
+        if not written:
+            return False
+        for _i in range(3):
+            # CTRL-C three times to break out of loops
+            self.__serial.write(b"\r\x03")
+            written = self.__serial.waitForBytesWritten(500)
+            # time out after 500ms if device is not responding
+            if not written:
+                return False
+            QThread.msleep(10)
+        self.__serial.readAll()  # read all data and discard it
+        self.__serial.write(b"\r\x01")  # send CTRL-A to enter raw mode
+        self.__serial.readUntil(rawReplMessage)
+        if self.__serial.hasTimedOut():
+            # it timed out; try it again and than fail
+            self.__serial.write(b"\r\x01")  # send CTRL-A again
+            self.__serial.readUntil(rawReplMessage)
+            if self.__serial.hasTimedOut():
+                return False
+
+        QCoreApplication.processEvents(
+            QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
+        )
+        self.__serial.readAll()  # read all data and discard it
+        return True
+
+    def __rawOff(self):
+        """
+        Private method to switch 'raw' mode off.
+        """
+        if self.__serial:
+            self.__serial.write(b"\x02")  # send CTRL-B to cancel raw mode
+            self.__serial.readUntil(b">>> ")  # read until Python prompt
+            self.__serial.readAll()  # read all data and discard it
+
+    def probeDevice(self):
+        """
+        Public method to check the device is responding.
+
+        If the device has not been flashed with a MicroPython formware, the
+        probe will fail.
+
+        @return flag indicating a communicating MicroPython device
+        @rtype bool
+        """
+        if not self.__serial:
+            return False
+
+        if not self.__serial.isConnected():
+            return False
+
+        # switch on raw mode
+        self.__blockReadyRead = True
+        ok = self.__rawOn()
+        if not ok:
+            self.__blockReadyRead = False
+            return False
+
+        # switch off raw mode
+        QThread.msleep(10)
+        self.__rawOff()
+        self.__blockReadyRead = False
+
+        return True
+
+    def execute(self, commands):
+        """
+        Public method to send commands to the connected device and return the
+        result.
+
+        If no serial connection is available, empty results will be returned.
+
+        @param commands list of commands to be executed
+        @type str or list of str
+        @return tuple containing stdout and stderr output of the device
+        @rtype tuple of (bytes, bytes)
+        """
+        if not self.__serial:
+            return b"", b""
+
+        if not self.__serial.isConnected():
+            return b"", b"Device not connected or not switched on."
+
+        result = bytearray()
+        err = b""
+
+        if isinstance(commands, str):
+            commands = [commands]
+
+        # switch on raw mode
+        self.__blockReadyRead = True
+        ok = self.__rawOn()
+        if not ok:
+            self.__blockReadyRead = False
+            return (b"", b"Could not switch to raw mode. Is the device switched on?")
+
+        # send commands
+        QThread.msleep(10)
+        for command in commands:
+            if command:
+                commandBytes = command.encode("utf-8")
+                self.__serial.write(commandBytes + b"\x04")
+                QCoreApplication.processEvents(
+                    QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
+                )
+                ok = self.__serial.readUntil(b"OK")
+                if ok != b"OK":
+                    return (
+                        b"",
+                        "Expected 'OK', got '{0}', followed by '{1}'".format(
+                            ok, self.__serial.readAll()
+                        ).encode("utf-8"),
+                    )
+
+                # read until prompt
+                response = self.__serial.readUntil(b"\x04>")
+                if self.__serial.hasTimedOut():
+                    self.__blockReadyRead = False
+                    return b"", b"Timeout while processing commands."
+                if b"\x04" in response[:-2]:
+                    # split stdout, stderr
+                    out, err = response[:-2].split(b"\x04")
+                    result += out
+                else:
+                    err = b"invalid response received: " + response
+                if err:
+                    result = b""
+                    break
+
+        # switch off raw mode
+        QThread.msleep(10)
+        self.__rawOff()
+        self.__blockReadyRead = False
+
+        return bytes(result), err
+
+    def executeAsync(self, commandsList):
+        """
+        Public method to execute a series of commands over a period of time
+        without returning any result (asynchronous execution).
+
+        @param commandsList list of commands to be execute on the device
+        @type list of bytes
+        """
+        if commandsList:
+            command = commandsList.pop(0)
+            self.__serial.write(command)
+            QTimer.singleShot(2, lambda: self.executeAsync(commandsList))
+        else:
+            self.executeAsyncFinished.emit()

eric ide

mercurial