src/eric7/MicroPython/MicroPythonDeviceInterface.py

Tue, 14 Feb 2023 18:10:30 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Tue, 14 Feb 2023 18:10:30 +0100
branch
eric7
changeset 9765
6378da868bb0
parent 9764
src/eric7/MicroPython/MicroPythonCommandsInterface.py@57496966803c
child 9766
f0e22f3a5878
permissions
-rw-r--r--

Reorganized the MicroPython code even more.

# -*- coding: utf-8 -*-

# Copyright (c) 2019 - 2023 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing some file system commands for MicroPython.
"""

from PyQt6.QtCore import (
    QCoreApplication,
    QEventLoop,
    QObject,
    QThread,
    QTimer,
    pyqtSignal,
    pyqtSlot,
)

from eric7 import Preferences

from .MicroPythonSerialPort import MicroPythonSerialPort


class MicroPythonDeviceInterface(QObject):
    """
    Class implementing some file system commands for MicroPython.

    Commands are provided to perform operations on the file system of a
    connected MicroPython device. Supported commands are:
    <ul>
    <li>ls: directory listing</li>
    <li>lls: directory listing with meta data</li>
    <li>cd: change directory</li>
    <li>pwd: get the current directory</li>
    <li>put: copy a file to the connected device</li>
    <li>putData: write data to a file of the connected device</li>
    <li>get: get a file from the connected device</li>
    <li>getData: read data of a file of the connected device</li>
    <li>rm: remove a file from the connected device</li>
    <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash)
    <li>mkdir: create a new directory</li>
    <li>rmdir: remove an empty directory</li>
    <li>fileSystemInfo: get information about the file system
    </ul>

    There are additional non file systemcommands.
    <ul>
    <li>getBoardData: get information about the connected board</li>
    <li>getDeviceData: get version info about MicroPython and some implementation
        information</li>
    <li>getModules: get a list of built-in modules</li>
    <li>getTime: get the current time</li>
    <li>syncTime: synchronize the time of the connected device</li>
    <li>showTime: show the current time of the connected device</li>
    </ul>

    @signal executeAsyncFinished() emitted to indicate the end of an
        asynchronously executed list of commands (e.g. a script)
    @signal dataReceived(data) emitted to send data received via the serial
        connection for further processing
    """

    executeAsyncFinished = pyqtSignal()
    dataReceived = pyqtSignal(bytes)

    def __init__(self, parent=None):
        """
        Constructor

        @param parent reference to the parent object
        @type QObject
        """
        super().__init__(parent)

        self.__repl = parent

        self.__blockReadyRead = False

        self.__serial = MicroPythonSerialPort(
            timeout=Preferences.getMicroPython("SerialTimeout"), parent=self
        )
        self.__serial.readyRead.connect(self.__readSerial)

    @pyqtSlot()
    def __readSerial(self):
        """
        Private slot to read all available serial data and emit it with the
        "dataReceived" signal for further processing.
        """
        if not self.__blockReadyRead:
            data = bytes(self.__serial.readAll())
            self.dataReceived.emit(data)

    @pyqtSlot()
    def connectToDevice(self, port):
        """
        Public slot to start the manager.

        @param port name of the port to be used
        @type str
        @return flag indicating success
        @rtype bool
        """
        return self.__serial.openSerialLink(port)

    @pyqtSlot()
    def disconnectFromDevice(self):
        """
        Public slot to stop the thread.
        """
        self.__serial.closeSerialLink()

    def isConnected(self):
        """
        Public method to get the connection status.

        @return flag indicating the connection status
        @rtype bool
        """
        return self.__serial.isConnected()

    @pyqtSlot()
    def handlePreferencesChanged(self):
        """
        Public slot to handle a change of the preferences.
        """
        self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout"))

    def write(self, data):
        """
        Public method to write data to the connected device.

        @param data data to be written
        @type bytes or bytearray
        """
        self.__serial.isConnected() and self.__serial.write(data)

    def __rawOn(self):
        """
        Private method to switch the connected device to 'raw' mode.

        Note: switching to raw mode is done with synchronous writes.

        @return flag indicating success
        @@rtype bool
        """
        if not self.__serial:
            return False

        rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>"

        self.__serial.write(b"\x02")  # end raw mode if required
        written = self.__serial.waitForBytesWritten(500)
        # time out after 500ms if device is not responding
        if not written:
            return False
        for _i in range(3):
            # CTRL-C three times to break out of loops
            self.__serial.write(b"\r\x03")
            written = self.__serial.waitForBytesWritten(500)
            # time out after 500ms if device is not responding
            if not written:
                return False
            QThread.msleep(10)
        self.__serial.readAll()  # read all data and discard it
        self.__serial.write(b"\r\x01")  # send CTRL-A to enter raw mode
        self.__serial.readUntil(rawReplMessage)
        if self.__serial.hasTimedOut():
            # it timed out; try it again and than fail
            self.__serial.write(b"\r\x01")  # send CTRL-A again
            self.__serial.readUntil(rawReplMessage)
            if self.__serial.hasTimedOut():
                return False

        QCoreApplication.processEvents(
            QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
        )
        self.__serial.readAll()  # read all data and discard it
        return True

    def __rawOff(self):
        """
        Private method to switch 'raw' mode off.
        """
        if self.__serial:
            self.__serial.write(b"\x02")  # send CTRL-B to cancel raw mode
            self.__serial.readUntil(b">>> ")  # read until Python prompt
            self.__serial.readAll()  # read all data and discard it

    def probeDevice(self):
        """
        Public method to check the device is responding.

        If the device has not been flashed with a MicroPython formware, the
        probe will fail.

        @return flag indicating a communicating MicroPython device
        @rtype bool
        """
        if not self.__serial:
            return False

        if not self.__serial.isConnected():
            return False

        # switch on raw mode
        self.__blockReadyRead = True
        ok = self.__rawOn()
        if not ok:
            self.__blockReadyRead = False
            return False

        # switch off raw mode
        QThread.msleep(10)
        self.__rawOff()
        self.__blockReadyRead = False

        return True

    def execute(self, commands):
        """
        Public method to send commands to the connected device and return the
        result.

        If no serial connection is available, empty results will be returned.

        @param commands list of commands to be executed
        @type str or list of str
        @return tuple containing stdout and stderr output of the device
        @rtype tuple of (bytes, bytes)
        """
        if not self.__serial:
            return b"", b""

        if not self.__serial.isConnected():
            return b"", b"Device not connected or not switched on."

        result = bytearray()
        err = b""

        if isinstance(commands, str):
            commands = [commands]

        # switch on raw mode
        self.__blockReadyRead = True
        ok = self.__rawOn()
        if not ok:
            self.__blockReadyRead = False
            return (b"", b"Could not switch to raw mode. Is the device switched on?")

        # send commands
        QThread.msleep(10)
        for command in commands:
            if command:
                commandBytes = command.encode("utf-8")
                self.__serial.write(commandBytes + b"\x04")
                QCoreApplication.processEvents(
                    QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
                )
                ok = self.__serial.readUntil(b"OK")
                if ok != b"OK":
                    return (
                        b"",
                        "Expected 'OK', got '{0}', followed by '{1}'".format(
                            ok, self.__serial.readAll()
                        ).encode("utf-8"),
                    )

                # read until prompt
                response = self.__serial.readUntil(b"\x04>")
                if self.__serial.hasTimedOut():
                    self.__blockReadyRead = False
                    return b"", b"Timeout while processing commands."
                if b"\x04" in response[:-2]:
                    # split stdout, stderr
                    out, err = response[:-2].split(b"\x04")
                    result += out
                else:
                    err = b"invalid response received: " + response
                if err:
                    result = b""
                    break

        # switch off raw mode
        QThread.msleep(10)
        self.__rawOff()
        self.__blockReadyRead = False

        return bytes(result), err

    def executeAsync(self, commandsList):
        """
        Public method to execute a series of commands over a period of time
        without returning any result (asynchronous execution).

        @param commandsList list of commands to be execute on the device
        @type list of bytes
        """
        if commandsList:
            command = commandsList.pop(0)
            self.__serial.write(command)
            QTimer.singleShot(2, lambda: self.executeAsync(commandsList))
        else:
            self.executeAsyncFinished.emit()

eric ide

mercurial