src/eric7/MicroPython/MicroPythonSerialDeviceInterface.py

Sun, 16 Mar 2025 12:53:12 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sun, 16 Mar 2025 12:53:12 +0100
branch
eric7
changeset 11170
6d6199d668fb
parent 11090
f5f5f5803935
permissions
-rw-r--r--

Added the Adafruit Feather nRF52840 to the list of known NRF52 boards and changed the list of known CircuitPython boards to be more explicit with respect to Adafruit boards (i.e. VID 0x239A).

# -*- coding: utf-8 -*-

# Copyright (c) 2019 - 2025 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module  implementing an interface to talk to a connected MicroPython device via
a serial link.
"""

from PyQt6.QtCore import QCoreApplication, QEventLoop, QThread, QTimer, pyqtSlot

from eric7 import Preferences

from .MicroPythonDeviceInterface import MicroPythonDeviceInterface
from .MicroPythonSerialPort import MicroPythonSerialPort


class MicroPythonSerialDeviceInterface(MicroPythonDeviceInterface):
    """
    Class implementing an interface to talk to a connected MicroPython device via
    a serial link.
    """

    def __init__(self, parent=None):
        """
        Constructor

        @param parent reference to the parent object
        @type QObject
        """
        super().__init__(parent)

        self.__blockReadyRead = False

        self.__serial = MicroPythonSerialPort(
            timeout=Preferences.getMicroPython("SerialTimeout"), parent=self
        )
        self.__serial.readyRead.connect(self.__readSerial)

    @pyqtSlot()
    def __readSerial(self):
        """
        Private slot to read all available serial data and emit it with the
        "dataReceived" signal for further processing.
        """
        if not self.__blockReadyRead:
            data = bytes(self.__serial.readAll())
            self.dataReceived.emit(data)

    def connectToDevice(self, connection):
        """
        Public method to connect to the device.

        @param connection name of the connection to be used
        @type str
        @return flag indicating success and an error message
        @rtype tuple of (bool, str)
        """
        return self.__serial.openSerialLink(connection)

    @pyqtSlot()
    def disconnectFromDevice(self):
        """
        Public slot to disconnect from the device.
        """
        self.__serial.closeSerialLink()

    def isConnected(self):
        """
        Public method to get the connection status.

        @return flag indicating the connection status
        @rtype bool
        """
        return self.__serial.isConnected()

    @pyqtSlot()
    def handlePreferencesChanged(self):
        """
        Public slot to handle a change of the preferences.
        """
        self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout"))

    def write(self, data):
        """
        Public method to write data to the connected device.

        @param data data to be written
        @type bytes or bytearray
        """
        self.__serial.isConnected() and self.__serial.write(data)

    def __pasteOn(self):
        """
        Private method to switch the connected device to 'paste' mode.

        Note: switching to paste mode is done with synchronous writes.

        @return flag indicating success
        @rtype bool
        """
        if not self.__serial:
            return False

        pasteMessage = b"paste mode; Ctrl-C to cancel, Ctrl-D to finish\r\n=== "

        self.__serial.clear()  # clear any buffered output before entering paste mode
        self.__serial.write(b"\x02")  # end raw mode if required
        written = self.__serial.waitForBytesWritten(500)
        # time out after 500ms if device is not responding
        if not written:
            return False
        for _i in range(3):
            # CTRL-C three times to break out of loops
            self.__serial.write(b"\r\x03")
            written = self.__serial.waitForBytesWritten(500)
            # time out after 500ms if device is not responding
            if not written:
                return False
            QThread.msleep(10)
        self.__serial.readAll()  # read all data and discard it
        self.__serial.write(b"\r\x05")  # send CTRL-E to enter paste mode
        self.__serial.readUntil(pasteMessage)

        if self.__serial.hasTimedOut():
            # it timed out; try it again and than fail
            self.__serial.write(b"\r\x05")  # send CTRL-E again
            self.__serial.readUntil(pasteMessage)
            if self.__serial.hasTimedOut():
                return False

        QCoreApplication.processEvents(
            QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
        )
        self.__serial.readAll()  # read all data and discard it
        return True

    def __pasteOff(self):
        """
        Private method to switch 'paste' mode off.
        """
        if self.__serial:
            self.__serial.write(b"\x04")  # send CTRL-D to cancel paste mode

    def __rawOn(self):
        """
        Private method to switch the connected device to 'raw' mode.

        Note: switching to raw mode is done with synchronous writes.

        @return flag indicating success
        @rtype bool
        """
        if not self.__serial:
            return False

        rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>"

        self.__serial.write(b"\x02")  # end raw mode if required
        written = self.__serial.waitForBytesWritten(500)
        # time out after 500ms if device is not responding
        if not written:
            return False
        for _i in range(3):
            # CTRL-C three times to break out of loops
            self.__serial.write(b"\r\x03")
            written = self.__serial.waitForBytesWritten(500)
            # time out after 500ms if device is not responding
            if not written:
                return False
            QThread.msleep(10)
        self.__serial.readAll()  # read all data and discard it
        self.__serial.write(b"\r\x01")  # send CTRL-A to enter raw mode
        self.__serial.readUntil(rawReplMessage)
        if self.__serial.hasTimedOut():
            # it timed out; try it again and than fail
            self.__serial.write(b"\r\x01")  # send CTRL-A again
            self.__serial.readUntil(rawReplMessage)
            if self.__serial.hasTimedOut():
                return False

        QCoreApplication.processEvents(
            QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
        )
        self.__serial.readAll()  # read all data and discard it
        return True

    def __rawOff(self):
        """
        Private method to switch 'raw' mode off.
        """
        if self.__serial:
            self.__serial.write(b"\x02")  # send CTRL-B to cancel raw mode
            self.__serial.readUntil(b">>> ")  # read until Python prompt
            self.__serial.readAll()  # read all data and discard it

    def probeDevice(self):
        """
        Public method to check the device is responding.

        If the device has not been flashed with a MicroPython firmware, the
        probe will fail.

        @return flag indicating a communicating MicroPython device
        @rtype bool
        """
        if not self.__serial:
            return False

        if not self.__serial.isConnected():
            return False

        # switch on paste mode
        self.__blockReadyRead = True
        ok = self.__pasteOn()
        if not ok:
            self.__blockReadyRead = False
            return False

        # switch off paste mode
        QThread.msleep(10)
        self.__pasteOff()
        self.__blockReadyRead = False

        return True

    def execute(self, commands, *, mode="raw", timeout=0):
        """
        Public method to send commands to the connected device and return the
        result.

        If no serial connection is available, empty results will be returned.

        @param commands list of commands to be executed
        @type str or list of str
        @keyparam mode submit mode to be used (one of 'raw' or 'paste') (defaults to
            'raw')
        @type str
        @keyparam timeout per command timeout in milliseconds (0 for configured default)
            (defaults to 0)
        @type int (optional)
        @return tuple containing stdout and stderr output of the device
        @rtype tuple of (bytes, bytes)
        @exception ValueError raised in case of an unsupported submit mode
        """
        if mode not in ("paste", "raw"):
            raise ValueError("Unsupported submit mode given ('{0}').".format(mode))

        if mode == "raw":
            return self.__execute_raw(commands, timeout=timeout)
        elif mode == "paste":
            return self.__execute_paste(commands, timeout=timeout)
        else:
            # just in case
            return b"", b""

    def __execute_raw(self, commands, timeout=0):
        """
        Private method to send commands to the connected device using 'raw REPL' mode
        and return the result.

        If no serial connection is available, empty results will be returned.

        @param commands list of commands to be executed
        @type str or list of str
        @param timeout per command timeout in milliseconds (0 for configured default)
            (defaults to 0)
        @type int (optional)
        @return tuple containing stdout and stderr output of the device
        @rtype tuple of (bytes, bytes)
        """
        if not self.__serial:
            return b"", b""

        if not self.__serial.isConnected():
            return b"", b"Device not connected or not switched on."

        result = bytearray()
        err = b""

        if isinstance(commands, str):
            commands = [commands]

        # switch on raw mode
        self.__blockReadyRead = True
        ok = self.__rawOn()
        if not ok:
            self.__blockReadyRead = False
            return (b"", b"Could not switch to raw mode. Is the device switched on?")

        # send commands
        QThread.msleep(10)
        for command in commands:
            if command:
                commandBytes = command.encode("utf-8")
                self.__serial.write(commandBytes + b"\x04")
                QCoreApplication.processEvents(
                    QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
                )
                ok = self.__serial.readUntil(b"OK")
                if ok != b"OK":
                    self.__blockReadyRead = False
                    return (
                        b"",
                        "Expected 'OK', got '{0}', followed by '{1}'".format(
                            ok, self.__serial.readAll()
                        ).encode("utf-8"),
                    )

                # read until prompt
                response = self.__serial.readUntil(b"\x04>", timeout=timeout)
                if self.__serial.hasTimedOut():
                    out, err = b"", b"Timeout while processing commands."
                    break
                if b"\x04" in response[:-2]:
                    # split stdout, stderr
                    out, err = response[:-2].split(b"\x04")
                    result += out
                else:
                    err = b"invalid response received: " + response
                if err:
                    result = b""
                    break

        # switch off raw mode
        QThread.msleep(10)
        self.__rawOff()
        self.__blockReadyRead = False

        return bytes(result), err

    def __execute_paste(self, commands, timeout=0):
        """
        Private method to send commands to the connected device using 'paste' mode
        and return the result.

        If no serial connection is available, empty results will be returned.

        @param commands list of commands to be executed
        @type str or list of str
        @param timeout per command timeout in milliseconds (0 for configured default)
            (defaults to 0)
        @type int (optional)
        @return tuple containing stdout and stderr output of the device
        @rtype tuple of (bytes, bytes)
        """
        if not self.__serial:
            return b"", b""

        if not self.__serial.isConnected():
            return b"", b"Device is not connected or not switched on."

        if isinstance(commands, list):
            commands = "\n".join(commands)

        # switch on paste mode
        self.__blockReadyRead = True
        ok = self.__pasteOn()
        if not ok:
            self.__blockReadyRead = False
            return (b"", b"Could not switch to paste mode. Is the device switched on?")

        # send commands
        QThread.msleep(10)
        for command in commands.splitlines(keepends=True):
            # send the data as single lines
            commandBytes = command.encode("utf-8")
            self.__serial.write(commandBytes)
            QCoreApplication.processEvents(
                QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
            )
            QThread.msleep(10)
            ok = self.__serial.readUntil(commandBytes)
            if ok != commandBytes:
                self.__blockReadyRead = False
                return (
                    b"",
                    "Expected '{0}', got '{1}', followed by '{2}'".format(
                        commandBytes, ok, self.__serial.readAll()
                    ).encode("utf-8"),
                )

        # switch off paste mode causing the commands to be executed
        self.__pasteOff()
        QThread.msleep(10)
        # read until Python prompt
        result = (
            self.__serial.readUntil(b">>> ", timeout=timeout)
            .replace(b">>> ", b"")
            .strip()
        )
        if self.__serial.hasTimedOut():
            out, err = b"", b"Timeout while processing commands."
        else:
            # get rid of any OSD string and send it
            while result.startswith(b"\x1b]0;"):
                osd, result = result.split(b"\x1b\\", 1)
                self.osdInfo.emit(osd[4:].decode("utf-8"))

            if self.TracebackMarker in result:
                errorIndex = result.find(self.TracebackMarker)
                out, err = result[:errorIndex], result[errorIndex:]
            else:
                out = result
                err = b""

        self.__blockReadyRead = False
        return out, err

    def executeAsync(self, commandsList, submitMode):
        """
        Public method to execute a series of commands over a period of time
        without returning any result (asynchronous execution).

        @param commandsList list of commands to be execute on the device
        @type list of str
        @param submitMode mode to be used to submit the commands
        @type str (one of 'raw' or 'paste')
        @exception ValueError raised to indicate an unknown submit mode
        """
        if submitMode not in ("raw", "paste"):
            raise ValueError("Illegal submit mode given ({0})".format(submitMode))

        if submitMode == "raw":
            startSequence = [  # sequence of commands to enter raw mode
                b"\x02",  # Ctrl-B: exit raw repl (just in case)
                b"\r\x03\x03\x03",  # Ctrl-C three times: interrupt any running program
                b"\r\x01",  # Ctrl-A: enter raw REPL
                b'print("\\n")\r',
            ]
            endSequence = [
                b"\r",
                b"\x04",
            ]
            self.__executeAsyncRaw(
                startSequence
                + [c.encode("utf-8") + b"\r" for c in commandsList]
                + endSequence
            )
        elif submitMode == "paste":
            self.__executeAsyncPaste(commandsList)

    def __executeAsyncRaw(self, commandsList):
        """
        Private method to execute a series of commands over a period of time
        without returning any result (asynchronous execution).

        @param commandsList list of commands to be execute on the device
        @type list of bytes
        """
        if commandsList:
            command = commandsList.pop(0)
            self.__serial.write(command)
            QTimer.singleShot(2, lambda: self.__executeAsyncRaw(commandsList))
        else:
            self.__rawOff()
            self.executeAsyncFinished.emit()

    def __executeAsyncPaste(self, commandsList):
        """
        Private method to execute a series of commands over a period of time
        without returning any result (asynchronous execution).

        @param commandsList list of commands to be execute on the device
        @type list of str
        """
        self.__blockReadyRead = True
        self.__pasteOn()
        command = b"\n".join(c.encode("utf-8)") for c in commandsList)
        self.__serial.write(command)
        self.__serial.readUntil(command)
        self.__blockReadyRead = False
        self.__pasteOff()
        self.executeAsyncFinished.emit()

eric ide

mercurial