src/eric7/MicroPython/MicroPythonDeviceInterface.py

Mon, 27 Feb 2023 17:43:51 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Mon, 27 Feb 2023 17:43:51 +0100
branch
mpy_network
changeset 9821
6b1b06d74532
parent 9810
39d3b227358c
child 9826
9340ce7fb12f
permissions
-rw-r--r--

Fixed some code formatting issues.

# -*- coding: utf-8 -*-

# Copyright (c) 2019 - 2023 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing some file system commands for MicroPython.
"""

from PyQt6.QtCore import (
    QCoreApplication,
    QEventLoop,
    QObject,
    QThread,
    QTimer,
    pyqtSignal,
    pyqtSlot,
)

from eric7 import Preferences

from .MicroPythonSerialPort import MicroPythonSerialPort


class MicroPythonDeviceInterface(QObject):
    """
    Class implementing an interface to talk to a connected MicroPython device.

    @signal executeAsyncFinished() emitted to indicate the end of an
        asynchronously executed list of commands (e.g. a script)
    @signal dataReceived(data) emitted to send data received via the serial
        connection for further processing
    """

    executeAsyncFinished = pyqtSignal()
    dataReceived = pyqtSignal(bytes)

    PasteModePrompt = b"=== "
    TracebackMarker = b"Traceback (most recent call last):"

    def __init__(self, parent=None):
        """
        Constructor

        @param parent reference to the parent object
        @type QObject
        """
        super().__init__(parent)

        self.__repl = parent

        self.__blockReadyRead = False

        self.__serial = MicroPythonSerialPort(
            timeout=Preferences.getMicroPython("SerialTimeout"), parent=self
        )
        self.__serial.readyRead.connect(self.__readSerial)

    @pyqtSlot()
    def __readSerial(self):
        """
        Private slot to read all available serial data and emit it with the
        "dataReceived" signal for further processing.
        """
        if not self.__blockReadyRead:
            data = bytes(self.__serial.readAll())
            self.dataReceived.emit(data)

    @pyqtSlot()
    def connectToDevice(self, port):
        """
        Public slot to start the manager.

        @param port name of the port to be used
        @type str
        @return flag indicating success
        @rtype bool
        """
        return self.__serial.openSerialLink(port)

    @pyqtSlot()
    def disconnectFromDevice(self):
        """
        Public slot to stop the thread.
        """
        self.__serial.closeSerialLink()

    def isConnected(self):
        """
        Public method to get the connection status.

        @return flag indicating the connection status
        @rtype bool
        """
        return self.__serial.isConnected()

    @pyqtSlot()
    def handlePreferencesChanged(self):
        """
        Public slot to handle a change of the preferences.
        """
        self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout"))

    def write(self, data):
        """
        Public method to write data to the connected device.

        @param data data to be written
        @type bytes or bytearray
        """
        self.__serial.isConnected() and self.__serial.write(data)

    def __pasteOn(self):
        """
        Private method to switch the connected device to 'paste' mode.

        Note: switching to paste mode is done with synchronous writes.

        @return flag indicating success
        @rtype bool
        """
        if not self.__serial:
            return False

        pasteMessage = b"paste mode; Ctrl-C to cancel, Ctrl-D to finish\r\n=== "

        self.__serial.clear()  # clear any buffered output before entering paste mode
        self.__serial.write(b"\x02")  # end raw mode if required
        written = self.__serial.waitForBytesWritten(500)
        # time out after 500ms if device is not responding
        if not written:
            return False
        for _i in range(3):
            # CTRL-C three times to break out of loops
            self.__serial.write(b"\r\x03")
            written = self.__serial.waitForBytesWritten(500)
            # time out after 500ms if device is not responding
            if not written:
                return False
            QThread.msleep(10)
        self.__serial.readAll()  # read all data and discard it
        self.__serial.write(b"\r\x05")  # send CTRL-E to enter paste mode
        self.__serial.readUntil(pasteMessage)

        if self.__serial.hasTimedOut():
            # it timed out; try it again and than fail
            self.__serial.write(b"\r\x05")  # send CTRL-E again
            self.__serial.readUntil(pasteMessage)
            if self.__serial.hasTimedOut():
                return False

        QCoreApplication.processEvents(
            QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
        )
        self.__serial.readAll()  # read all data and discard it
        return True

    def __pasteOff(self):
        """
        Private method to switch 'paste' mode off.
        """
        if self.__serial:
            self.__serial.write(b"\x04")  # send CTRL-D to cancel paste mode

    def __rawOn(self):
        """
        Private method to switch the connected device to 'raw' mode.

        Note: switching to raw mode is done with synchronous writes.

        @return flag indicating success
        @rtype bool
        """
        if not self.__serial:
            return False

        rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>"

        self.__serial.write(b"\x02")  # end raw mode if required
        written = self.__serial.waitForBytesWritten(500)
        # time out after 500ms if device is not responding
        if not written:
            return False
        for _i in range(3):
            # CTRL-C three times to break out of loops
            self.__serial.write(b"\r\x03")
            written = self.__serial.waitForBytesWritten(500)
            # time out after 500ms if device is not responding
            if not written:
                return False
            QThread.msleep(10)
        self.__serial.readAll()  # read all data and discard it
        self.__serial.write(b"\r\x01")  # send CTRL-A to enter raw mode
        self.__serial.readUntil(rawReplMessage)
        if self.__serial.hasTimedOut():
            # it timed out; try it again and than fail
            self.__serial.write(b"\r\x01")  # send CTRL-A again
            self.__serial.readUntil(rawReplMessage)
            if self.__serial.hasTimedOut():
                return False

        QCoreApplication.processEvents(
            QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
        )
        self.__serial.readAll()  # read all data and discard it
        return True

    def __rawOff(self):
        """
        Private method to switch 'raw' mode off.
        """
        if self.__serial:
            self.__serial.write(b"\x02")  # send CTRL-B to cancel raw mode
            self.__serial.readUntil(b">>> ")  # read until Python prompt
            self.__serial.readAll()  # read all data and discard it

    def probeDevice(self):
        """
        Public method to check the device is responding.

        If the device has not been flashed with a MicroPython formware, the
        probe will fail.

        @return flag indicating a communicating MicroPython device
        @rtype bool
        """
        if not self.__serial:
            return False

        if not self.__serial.isConnected():
            return False

        # switch on raw mode
        self.__blockReadyRead = True
        ok = self.__rawOn()
        if not ok:
            self.__blockReadyRead = False
            return False

        # switch off raw mode
        QThread.msleep(10)
        self.__rawOff()
        self.__blockReadyRead = False

        return True

    def execute(self, commands, *, mode="raw", timeout=0):
        """
        Public method to send commands to the connected device and return the
        result.

        If no serial connection is available, empty results will be returned.

        @param commands list of commands to be executed
        @type str or list of str
        @keyparam mode submit mode to be used (one of 'raw' or 'paste') (defaults to
            'raw')
        @type str
        @keyparam timeout per command timeout in milliseconds (0 for configured default)
            (defaults to 0)
        @type int (optional)
        @return tuple containing stdout and stderr output of the device
        @rtype tuple of (bytes, bytes)
        @exception ValueError raised in case of an unsupported submit mode
        """
        if mode not in ("paste", "raw"):
            raise ValueError("Unsupported submit mode given ('{0}').".format(mode))

        if mode == "raw":
            return self.__execute_raw(commands, timeout=timeout)
        elif mode == "paste":
            return self.__execute_paste(commands, timeout=timeout)
        else:
            # just in case
            return b"", b""

    def __execute_raw(self, commands, timeout=0):
        """
        Private method to send commands to the connected device using 'raw REPL' mode
        and return the result.

        If no serial connection is available, empty results will be returned.

        @param commands list of commands to be executed
        @type str or list of str
        @param timeout per command timeout in milliseconds (0 for configured default)
            (defaults to 0)
        @type int (optional)
        @return tuple containing stdout and stderr output of the device
        @rtype tuple of (bytes, bytes)
        """
        if not self.__serial:
            return b"", b""

        if not self.__serial.isConnected():
            return b"", b"Device not connected or not switched on."

        result = bytearray()
        err = b""

        if isinstance(commands, str):
            commands = [commands]

        # switch on raw mode
        self.__blockReadyRead = True
        ok = self.__rawOn()
        if not ok:
            self.__blockReadyRead = False
            return (b"", b"Could not switch to raw mode. Is the device switched on?")

        # send commands
        QThread.msleep(10)
        for command in commands:
            if command:
                commandBytes = command.encode("utf-8")
                self.__serial.write(commandBytes + b"\x04")
                QCoreApplication.processEvents(
                    QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
                )
                ok = self.__serial.readUntil(b"OK")
                if ok != b"OK":
                    self.__blockReadyRead = False
                    return (
                        b"",
                        "Expected 'OK', got '{0}', followed by '{1}'".format(
                            ok, self.__serial.readAll()
                        ).encode("utf-8"),
                    )

                # read until prompt
                response = self.__serial.readUntil(b"\x04>", timeout=timeout)
                if self.__serial.hasTimedOut():
                    self.__blockReadyRead = False
                    return b"", b"Timeout while processing commands."
                if b"\x04" in response[:-2]:
                    # split stdout, stderr
                    out, err = response[:-2].split(b"\x04")
                    result += out
                else:
                    err = b"invalid response received: " + response
                if err:
                    result = b""
                    break

        # switch off raw mode
        QThread.msleep(10)
        self.__rawOff()
        self.__blockReadyRead = False

        return bytes(result), err

    def __execute_paste(self, commands, timeout=0):
        """
        Private method to send commands to the connected device using 'paste' mode
        and return the result.

        If no serial connection is available, empty results will be returned.

        @param commands list of commands to be executed
        @type str or list of str
        @param timeout per command timeout in milliseconds (0 for configured default)
            (defaults to 0)
        @type int (optional)
        @return tuple containing stdout and stderr output of the device
        @rtype tuple of (bytes, bytes)
        """
        if not self.__serial:
            return b"", b""

        if not self.__serial.isConnected():
            return b"", b"Device not connected or not switched on."

        if isinstance(commands, list):
            commands = "\n".join(commands)

        # switch on paste mode
        self.__blockReadyRead = True
        ok = self.__pasteOn()
        if not ok:
            self.__blockReadyRead = False
            return (b"", b"Could not switch to paste mode. Is the device switched on?")

        # send commands
        QThread.msleep(10)
        for command in commands.splitlines(keepends=True):
            # send the data as single lines
            commandBytes = command.encode("utf-8")
            self.__serial.write(commandBytes)
            QCoreApplication.processEvents(
                QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
            )
            QThread.msleep(10)
            ok = self.__serial.readUntil(commandBytes)
            if ok != commandBytes:
                self.__blockReadyRead = False
                return (
                    b"",
                    "Expected '{0}', got '{1}', followed by '{2}'".format(
                        commandBytes, ok, self.__serial.readAll()
                    ).encode("utf-8"),
                )

        # switch off paste mode causing the commands to be executed
        self.__pasteOff()
        QThread.msleep(10)
        # read until Python prompt
        result = (
            self.__serial.readUntil(b">>> ", timeout=timeout)
            .replace(b">>> ", b"")
            .strip()
        )
        if self.__serial.hasTimedOut():
            self.__blockReadyRead = False
            return b"", b"Timeout while processing commands."

        # get rid of any OSD string
        if result.startswith(b"\x1b]0;"):
            result = result.split(b"\x1b\\")[-1]

        if self.TracebackMarker in result:
            errorIndex = result.find(self.TracebackMarker)
            out, err = result[:errorIndex], result[errorIndex:]
        else:
            out = result
            err = b""

        self.__blockReadyRead = False
        return out, err

    def executeAsync(self, commandsList):
        """
        Public method to execute a series of commands over a period of time
        without returning any result (asynchronous execution).

        @param commandsList list of commands to be execute on the device
        @type list of bytes
        """
        if commandsList:
            command = commandsList.pop(0)
            self.__serial.write(command)
            QTimer.singleShot(2, lambda: self.executeAsync(commandsList))
        else:
            self.executeAsyncFinished.emit()

    def executeAsyncPaste(self, commandsList):
        """
        Public method to execute a series of commands over a period of time
        without returning any result (asynchronous execution).

        @param commandsList list of commands to be execute on the device
        @type list of bytes
        """
        if commandsList:
            self.__blockReadyRead = True
            command = commandsList.pop(0)
            if command == "@PasteOn@":
                self.__pasteOn()
            else:
                self.__serial.write(command)
                self.__serial.readUntil(command)
            QTimer.singleShot(2, lambda: self.executeAsyncPaste(commandsList))
        else:
            self.__blockReadyRead = False
            self.__pasteOff()
            self.executeAsyncFinished.emit()

eric ide

mercurial