src/eric7/MicroPython/MicroPythonCommandsInterface.py

Mon, 13 Feb 2023 17:55:13 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Mon, 13 Feb 2023 17:55:13 +0100
branch
eric7
changeset 9760
11245ac9c258
parent 9755
1a09700229e7
child 9764
57496966803c
permissions
-rw-r--r--

MicroPython
- Extended the file manager to be able to load device files into an editor and save them back to the device.

# -*- coding: utf-8 -*-

# Copyright (c) 2019 - 2023 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing some file system commands for MicroPython.
"""

import ast
import os
import time

from PyQt6.QtCore import (
    QCoreApplication,
    QEventLoop,
    QObject,
    QThread,
    QTimer,
    pyqtSignal,
    pyqtSlot,
)

from eric7 import Preferences

from .MicroPythonSerialPort import MicroPythonSerialPort


class MicroPythonCommandsInterface(QObject):
    """
    Class implementing some file system commands for MicroPython.

    Commands are provided to perform operations on the file system of a
    connected MicroPython device. Supported commands are:
    <ul>
    <li>ls: directory listing</li>
    <li>lls: directory listing with meta data</li>
    <li>cd: change directory</li>
    <li>pwd: get the current directory</li>
    <li>put: copy a file to the connected device</li>
    <li>get: get a file from the connected device</li>
    <li>rm: remove a file from the connected device</li>
    <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash)
    <li>mkdir: create a new directory</li>
    <li>rmdir: remove an empty directory</li>
    </ul>

    There are additional commands related to time and version.
    <ul>
    <li>version: get version info about MicroPython</li>
    <li>getImplementation: get some implementation information</li>
    <li>syncTime: synchronize the time of the connected device</li>
    <li>showTime: show the current time of the connected device</li>
    </ul>

    @signal executeAsyncFinished() emitted to indicate the end of an
        asynchronously executed list of commands (e.g. a script)
    @signal dataReceived(data) emitted to send data received via the serial
        connection for further processing
    """

    executeAsyncFinished = pyqtSignal()
    dataReceived = pyqtSignal(bytes)

    def __init__(self, parent=None):
        """
        Constructor

        @param parent reference to the parent object
        @type QObject
        """
        super().__init__(parent)

        self.__repl = parent

        self.__blockReadyRead = False

        self.__serial = MicroPythonSerialPort(
            timeout=Preferences.getMicroPython("SerialTimeout"), parent=self
        )
        self.__serial.readyRead.connect(self.__readSerial)

    @pyqtSlot()
    def __readSerial(self):
        """
        Private slot to read all available serial data and emit it with the
        "dataReceived" signal for further processing.
        """
        if not self.__blockReadyRead:
            data = bytes(self.__serial.readAll())
            self.dataReceived.emit(data)

    @pyqtSlot()
    def connectToDevice(self, port):
        """
        Public slot to start the manager.

        @param port name of the port to be used
        @type str
        @return flag indicating success
        @rtype bool
        """
        return self.__serial.openSerialLink(port)

    @pyqtSlot()
    def disconnectFromDevice(self):
        """
        Public slot to stop the thread.
        """
        self.__serial.closeSerialLink()

    def isConnected(self):
        """
        Public method to get the connection status.

        @return flag indicating the connection status
        @rtype bool
        """
        return self.__serial.isConnected()

    @pyqtSlot()
    def handlePreferencesChanged(self):
        """
        Public slot to handle a change of the preferences.
        """
        self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout"))

    def write(self, data):
        """
        Public method to write data to the connected device.

        @param data data to be written
        @type bytes or bytearray
        """
        self.__serial.isConnected() and self.__serial.write(data)

    def __rawOn(self):
        """
        Private method to switch the connected device to 'raw' mode.

        Note: switching to raw mode is done with synchronous writes.

        @return flag indicating success
        @@rtype bool
        """
        if not self.__serial:
            return False

        rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>"

        self.__serial.write(b"\x02")  # end raw mode if required
        written = self.__serial.waitForBytesWritten(500)
        # time out after 500ms if device is not responding
        if not written:
            return False
        for _i in range(3):
            # CTRL-C three times to break out of loops
            self.__serial.write(b"\r\x03")
            written = self.__serial.waitForBytesWritten(500)
            # time out after 500ms if device is not responding
            if not written:
                return False
            QThread.msleep(10)
        self.__serial.readAll()  # read all data and discard it
        self.__serial.write(b"\r\x01")  # send CTRL-A to enter raw mode
        self.__serial.readUntil(rawReplMessage)
        if self.__serial.hasTimedOut():
            # it timed out; try it again and than fail
            self.__serial.write(b"\r\x01")  # send CTRL-A again
            self.__serial.readUntil(rawReplMessage)
            if self.__serial.hasTimedOut():
                return False

        QCoreApplication.processEvents(
            QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
        )
        self.__serial.readAll()  # read all data and discard it
        return True

    def __rawOff(self):
        """
        Private method to switch 'raw' mode off.
        """
        if self.__serial:
            self.__serial.write(b"\x02")  # send CTRL-B to cancel raw mode
            self.__serial.readUntil(b">>> ")  # read until Python prompt
            self.__serial.readAll()  # read all data and discard it

    def probeDevice(self):
        """
        Public method to check the device is responding.

        If the device has not been flashed with a MicroPython formware, the
        probe will fail.

        @return flag indicating a communicating MicroPython device
        @rtype bool
        """
        if not self.__serial:
            return False

        if not self.__serial.isConnected():
            return False

        # switch on raw mode
        self.__blockReadyRead = True
        ok = self.__rawOn()
        if not ok:
            self.__blockReadyRead = False
            return False

        # switch off raw mode
        QThread.msleep(10)
        self.__rawOff()
        self.__blockReadyRead = False

        return True

    def execute(self, commands):
        """
        Public method to send commands to the connected device and return the
        result.

        If no serial connection is available, empty results will be returned.

        @param commands list of commands to be executed
        @type str
        @return tuple containing stdout and stderr output of the device
        @rtype tuple of (bytes, bytes)
        """
        if not self.__serial:
            return b"", b""

        if not self.__serial.isConnected():
            return b"", b"Device not connected or not switched on."

        result = bytearray()
        err = b""

        # switch on raw mode
        self.__blockReadyRead = True
        ok = self.__rawOn()
        if not ok:
            self.__blockReadyRead = False
            return (b"", b"Could not switch to raw mode. Is the device switched on?")

        # send commands
        QThread.msleep(10)
        for command in commands:
            if command:
                commandBytes = command.encode("utf-8")
                self.__serial.write(commandBytes + b"\x04")
                QCoreApplication.processEvents(
                    QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
                )
                ok = self.__serial.readUntil(b"OK")
                if ok != b"OK":
                    return (
                        b"",
                        "Expected 'OK', got '{0}', followed by '{1}'".format(
                            ok, self.__serial.readAll()
                        ).encode("utf-8"),
                    )

                # read until prompt
                response = self.__serial.readUntil(b"\x04>")
                if self.__serial.hasTimedOut():
                    self.__blockReadyRead = False
                    return b"", b"Timeout while processing commands."
                if b"\x04" in response[:-2]:
                    # split stdout, stderr
                    out, err = response[:-2].split(b"\x04")
                    result += out
                else:
                    err = b"invalid response received: " + response
                if err:
                    self.__blockReadyRead = False
                    return b"", err

        # switch off raw mode
        QThread.msleep(10)
        self.__rawOff()
        self.__blockReadyRead = False

        return bytes(result), err

    def executeAsync(self, commandsList):
        """
        Public method to execute a series of commands over a period of time
        without returning any result (asynchronous execution).

        @param commandsList list of commands to be execute on the device
        @type list of bytes
        """

        def remainingTask(commands):
            self.executeAsync(commands)

        if commandsList:
            command = commandsList[0]
            self.__serial.write(command)
            remainder = commandsList[1:]
            QTimer.singleShot(2, lambda: remainingTask(remainder))
        else:
            self.executeAsyncFinished.emit()

    def __shortError(self, error):
        """
        Private method to create a shortened error message.

        @param error verbose error message
        @type bytes
        @return shortened error message
        @rtype str
        """
        if error:
            decodedError = error.decode("utf-8")
            try:
                return decodedError.split["\r\n"][-2]
            except Exception:
                return decodedError
        return self.tr("Detected an error without indications.")

    # TODO: move these methods to the devices.
    ##################################################################
    ## Methods below implement the file system commands
    ##################################################################

    def ls(self, dirname=""):
        """
        Public method to get a directory listing of the connected device.

        @param dirname name of the directory to be listed
        @type str
        @return tuple containg the directory listing
        @rtype tuple of str
        @exception OSError raised to indicate an issue with the device
        """
        commands = (
            # BBC micro:bit does not support directories
            [
                "import os as __os_",
                "print(__os_.listdir())",
                "del __os_",
            ]
            if self.__repl.isMicrobit()
            else [
                "import os as __os_",
                "print(__os_.listdir('{0}'))".format(dirname),
                "del __os_",
            ]
        )
        out, err = self.execute(commands)
        if err:
            raise OSError(self.__shortError(err))
        return ast.literal_eval(out.decode("utf-8"))

    def lls(self, dirname="", fullstat=False, showHidden=False):
        """
        Public method to get a long directory listing of the connected device
        including meta data.

        @param dirname name of the directory to be listed
        @type str
        @param fullstat flag indicating to return the full stat() tuple
        @type bool
        @param showHidden flag indicating to show hidden files as well
        @type bool
        @return list containing the directory listing with tuple entries of
            the name and and a tuple of mode, size and time (if fullstat is
            false) or the complete stat() tuple. 'None' is returned in case the
            directory doesn't exist.
        @rtype tuple of (str, tuple)
        @exception OSError raised to indicate an issue with the device
        """
        commands = (
            # BBC micro:bit does not support directories
            [
                "import os as __os_",
                "\n".join(
                    [
                        "def is_visible(filename, showHidden):",
                        "    return showHidden or "
                        "(filename[0] != '.' and filename[-1] != '~')",
                    ]
                ),
                "\n".join(
                    [
                        "def stat(filename):",
                        "    size = __os_.size(filename)",
                        "    return (0, 0, 0, 0, 0, 0, size, 0, 0, 0)",
                    ]
                ),
                "\n".join(
                    [
                        "def listdir_stat(showHidden):",
                        "    files = __os_.listdir()",
                        "    return list((f, stat(f)) for f in files if"
                        " is_visible(f,showHidden))",
                    ]
                ),
                "print(listdir_stat({0}))".format(showHidden),
                "del __os_, stat, listdir_stat, is_visible",
            ]
            if self.__repl.isMicrobit()
            else [
                "import os as __os_",
                "\n".join(
                    [
                        "def is_visible(filename, showHidden):",
                        "    return showHidden or "
                        "(filename[0] != '.' and filename[-1] != '~')",
                    ]
                ),
                "\n".join(
                    [
                        "def stat(filename):",
                        "    try:",
                        "        rstat = __os_.lstat(filename)",
                        "    except:",
                        "        rstat = __os_.stat(filename)",
                        "    return tuple(rstat)",
                    ]
                ),
                "\n".join(
                    [
                        "def listdir_stat(dirname, showHidden):",
                        "    try:",
                        "        files = __os_.listdir(dirname)",
                        "    except OSError:",
                        "        return []",
                        "    if dirname in ('', '/'):",
                        "        return list((f, stat(f)) for f in files if"
                        " is_visible(f, showHidden))",
                        "    return list((f, stat(dirname + '/' + f))"
                        " for f in files if is_visible(f, showHidden))",
                    ]
                ),
                "print(listdir_stat('{0}', {1}))".format(dirname, showHidden),
                "del __os_, stat, listdir_stat, is_visible",
            ]
        )
        out, err = self.execute(commands)
        if err:
            raise OSError(self.__shortError(err))
        fileslist = ast.literal_eval(out.decode("utf-8"))
        if fileslist is None:
            return None
        else:
            if fullstat:
                return fileslist
            else:
                return [(f, (s[0], s[6], s[8])) for f, s in fileslist]

    def cd(self, dirname):
        """
        Public method to change the current directory on the connected device.

        @param dirname directory to change to
        @type str
        @exception OSError raised to indicate an issue with the device
        """
        if dirname:
            commands = [
                "import os as __os_",
                "__os_.chdir('{0}')".format(dirname),
                "del __os_",
            ]
            out, err = self.execute(commands)
            if err:
                raise OSError(self.__shortError(err))

    def pwd(self):
        """
        Public method to get the current directory of the connected device.

        @return current directory
        @rtype str
        @exception OSError raised to indicate an issue with the device
        """
        if self.__repl.isMicrobit():
            # BBC micro:bit does not support directories
            return ""

        commands = [
            "import os as __os_",
            "print(__os_.getcwd())",
            "del __os_",
        ]
        out, err = self.execute(commands)
        if err:
            raise OSError(self.__shortError(err))
        return out.decode("utf-8").strip()

    def rm(self, filename):
        """
        Public method to remove a file from the connected device.

        @param filename name of the file to be removed
        @type str
        @exception OSError raised to indicate an issue with the device
        """
        if filename:
            commands = [
                "import os as __os_",
                "__os_.remove('{0}')".format(filename),
                "del __os_",
            ]
            out, err = self.execute(commands)
            if err:
                raise OSError(self.__shortError(err))

    def rmrf(self, name, recursive=False, force=False):
        """
        Public method to remove a file or directory recursively.

        @param name of the file or directory to remove
        @type str
        @param recursive flag indicating a recursive deletion
        @type bool
        @param force flag indicating to ignore errors
        @type bool
        @return flag indicating success
        @rtype bool
        @exception OSError raised to indicate an issue with the device
        """
        if name:
            commands = [
                "import os as __os_",
                "\n".join(
                    [
                        "def remove_file(name, recursive=False, force=False):",
                        "    try:",
                        "        mode = __os_.stat(name)[0]",
                        "        if mode & 0x4000 != 0:",
                        "            if recursive:",
                        "                for file in __os_.listdir(name):",
                        "                    success = remove_file("
                        "name + '/' + file, recursive, force)",
                        "                    if not success and not force:",
                        "                        return False",
                        "                __os_.rmdir(name)",
                        "            else:",
                        "                if not force:",
                        "                    return False",
                        "        else:",
                        "            __os_.remove(name)",
                        "    except:",
                        "        if not force:",
                        "            return False",
                        "    return True",
                    ]
                ),
                "print(remove_file('{0}', {1}, {2}))".format(name, recursive, force),
                "del __os_, remove_file",
            ]
            out, err = self.execute(commands)
            if err:
                raise OSError(self.__shortError(err))
            return ast.literal_eval(out.decode("utf-8"))

        return False

    def mkdir(self, dirname):
        """
        Public method to create a new directory.

        @param dirname name of the directory to create
        @type str
        @exception OSError raised to indicate an issue with the device
        """
        if dirname:
            commands = [
                "import os as __os_",
                "__os_.mkdir('{0}')".format(dirname),
                "del __os_",
            ]
            out, err = self.execute(commands)
            if err:
                raise OSError(self.__shortError(err))

    def rmdir(self, dirname):
        """
        Public method to remove a directory.

        @param dirname name of the directory to be removed
        @type str
        @exception OSError raised to indicate an issue with the device
        """
        if dirname:
            commands = [
                "import os as __os_",
                "__os_.rmdir('{0}')".format(dirname),
                "del __os_",
            ]
            out, err = self.execute(commands)
            if err:
                raise OSError(self.__shortError(err))

    def put(self, hostFileName, deviceFileName=None):
        """
        Public method to copy a local file to the connected device.

        @param hostFileName name of the file to be copied
        @type str
        @param deviceFileName name of the file to copy to
        @type str
        @return flag indicating success
        @rtype bool
        @exception OSError raised to indicate an issue with the device
        """
        if not os.path.isfile(hostFileName):
            raise OSError("No such file: {0}".format(hostFileName))

        if not deviceFileName:
            deviceFileName = os.path.basename(hostFileName)

        with open(hostFileName, "rb") as hostFile:
            content = hostFile.read()

        return self.putData(deviceFileName, content)

    def putData(self, deviceFileName, content):
        """
        Public method to write the given data to the connected device.

        @param deviceFileName name of the file to write to
        @type str
        @param content data to write
        @type bytes
        @return flag indicating success
        @rtype bool
        @exception OSError raised to indicate an issue with the device
        """
        if not deviceFileName:
            raise OSError("Missing device file name")

        # convert eol '\r'
        content = content.replace(b"\r\n", b"\r")
        content = content.replace(b"\n", b"\r")

        commands = [
            "fd = open('{0}', 'wb')".format(deviceFileName),
            "f = fd.write",
        ]
        while content:
            chunk = content[:64]
            commands.append("f(" + repr(chunk) + ")")
            content = content[64:]
        commands.extend(
            [
                "fd.close()",
                "del f, fd",
            ]
        )

        out, err = self.execute(commands)
        if err:
            raise OSError(self.__shortError(err))
        return True

    def get(self, deviceFileName, hostFileName=None):
        """
        Public method to copy a file from the connected device.

        @param deviceFileName name of the file to copy
        @type str
        @param hostFileName name of the file to copy to
        @type str
        @return flag indicating success
        @rtype bool
        @exception OSError raised to indicate an issue with the device
        """
        if not deviceFileName:
            raise OSError("Missing device file name")

        if not hostFileName:
            hostFileName = deviceFileName

        out = self.getData(deviceFileName)
        with open(hostFileName, "wb") as hostFile:
            hostFile.write(out)

        return True

    def getData(self, deviceFileName):
        """
        Public method to read data from the connected device.

        @param deviceFileName name of the file to read from
        @type str
        @return data read from the device
        @rtype bytes
        @exception OSError raised to indicate an issue with the device
        """
        if not deviceFileName:
            raise OSError("Missing device file name")

        commands = [
            "\n".join(
                [
                    "def send_data():",
                    "    try:",
                    "        from microbit import uart as u",
                    "    except ImportError:",
                    "        try:",
                    "            from sys import stdout as u",
                    "        except ImportError:",
                    "            try:",
                    "                from machine import UART",
                    "                u = UART(0, {0})".format(115200),
                    "            except Exception:",
                    "                raise Exception('Could not find UART module"
                    " in device.')",
                    "    f = open('{0}', 'rb')".format(deviceFileName),
                    "    r = f.read",
                    "    result = True",
                    "    while result:",
                    "        result = r(32)",
                    "        if result:",
                    "            u.write(result)",
                    "    f.close()",
                ]
            ),
            "send_data()",
        ]
        out, err = self.execute(commands)
        if err:
            raise OSError(self.__shortError(err))

        # write the received bytes to the local file
        # convert eol to "\n"
        out = out.replace(b"\r\n", b"\n")
        out = out.replace(b"\r", b"\n")

        return out

    def fileSystemInfo(self):
        """
        Public method to obtain information about the currently mounted file
        systems.

        @return tuple of tuples containing the file system name, the total
            size, the used size and the free size
        @rtype tuple of tuples of (str, int, int, int)
        @exception OSError raised to indicate an issue with the device
        """
        commands = [
            "import os as __os_",
            "\n".join(
                [
                    "def fsinfo():",
                    "    infolist = []",
                    "    info = __os_.statvfs('/')",
                    "    if info[0] == 0:",
                    # assume it is just mount points
                    "        fsnames = __os_.listdir('/')",
                    "        for fs in fsnames:",
                    "            fs = '/' + fs",
                    "            infolist.append((fs, __os_.statvfs(fs)))",
                    "    else:",
                    "        infolist.append(('/', info))",
                    "    return infolist",
                ]
            ),
            "print(fsinfo())",
            "del __os_, fsinfo",
        ]
        out, err = self.execute(commands)
        if err:
            raise OSError(self.__shortError(err))
        infolist = ast.literal_eval(out.decode("utf-8"))
        if infolist is None:
            return None
        else:
            filesystemInfos = []
            for fs, info in infolist:
                totalSize = info[2] * info[1]
                freeSize = info[4] * info[1]
                usedSize = totalSize - freeSize
                filesystemInfos.append((fs, totalSize, usedSize, freeSize))

        return tuple(filesystemInfos)

    ##################################################################
    ## non-filesystem related methods below
    ##################################################################

    def getDeviceData(self):
        """
        Public method to get some essential data for the connected board.

        @return dictionary containing the determined data
        @rtype dict
        @exception OSError raised to indicate an issue with the device
        """
        commands = [
            "res = {}",  # __IGNORE_WARNING_M613__
            "import os as __os_",
            "uname = __os_.uname()",
            "res['sysname'] = uname.sysname",
            "res['nodename'] = uname.nodename",
            "res['release'] = uname.release",
            "res['version'] = uname.version",
            "res['machine'] = uname.machine",
            "import sys as __sys_",
            "res['py_platform'] = __sys_.platform",
            "res['py_version'] = __sys_.version",
            "\n".join(
                [
                    "try:",
                    "    res['mpy_name'] = __sys_.implementation.name",
                    "except AttributeError:",
                    "    res['mpy_name'] = 'unknown'",
                ]
            ),
            "\n".join(
                [
                    "try:",
                    "    res['mpy_version'] = '.'.join((str(i) for i in"
                    " __sys_.implementation.version))",
                    "except AttributeError:",
                    "    res['mpy_version'] = 'unknown'",
                ]
            ),
            "\n".join(
                [
                    "try:",
                    "    import pimoroni as __pimoroni_",
                    "    res['mpy_variant'] = 'Pimoroni'",
                    "    del __pimoroni_",
                    "except ImportError:",
                    "    res['mpy_variant'] = ''",
                ]
            ),
            "print(res)",
            "del res, __os_, __sys_",
        ]
        out, err = self.execute(commands)
        if err:
            raise OSError(self.__shortError(err))
        return ast.literal_eval(out.decode("utf-8"))

    def getBoardInformation(self):
        """
        Public method to get some information data of the connected board.

        @return dictionary containing the determined data
        @rtype dict
        @exception OSError raised to indicate an issue with the device
        """
        commands = [
            "res = {}",  # __IGNORE_WARNING_M613__
            "import gc as __gc_",
            "__gc_.enable()",
            "__gc_.collect()",
            "mem_alloc = __gc_.mem_alloc()",
            "mem_free = __gc_.mem_free()",
            "mem_total = mem_alloc + mem_free",
            "res['mem_total_kb'] = mem_total / 1024.0",
            "res['mem_used_kb'] = mem_alloc / 1024.0",
            "res['mem_used_pc'] = mem_alloc / mem_total * 100.0",
            "res['mem_free_kb'] = mem_free / 1024.0",
            "res['mem_free_pc'] = mem_free / mem_total * 100.0",
            "del __gc_, mem_alloc, mem_free, mem_total",
            "import os as __os_",
            "uname = __os_.uname()",
            "res['sysname'] = uname.sysname",
            "res['nodename'] = uname.nodename",
            "res['release'] = uname.release",
            "res['version'] = uname.version",
            "res['machine'] = uname.machine",
            "import sys as __sys_",
            "res['py_platform'] = __sys_.platform",
            "res['py_version'] = __sys_.version",
            "\n".join(
                [
                    "try:",
                    "    res['mpy_name'] = __sys_.implementation.name",
                    "except AttributeError:",
                    "    res['mpy_name'] = 'unknown'",
                ]
            ),
            "\n".join(
                [
                    "try:",
                    "    res['mpy_version'] = '.'.join((str(i) for i in"
                    " __sys_.implementation.version))",
                    "except AttributeError:",
                    "    res['mpy_version'] = 'unknown'",
                ]
            ),
            "\n".join(
                [
                    "try:",
                    "    import pimoroni as __pimoroni_",
                    "    res['mpy_variant'] = 'Pimoroni'",
                    "    del __pimoroni_",
                    "except ImportError:",
                    "    res['mpy_variant'] = ''",
                ]
            ),
            "\n".join(
                [
                    "try:",
                    "    stat_ = __os_.statvfs('/flash')",
                    "    res['flash_info_available'] = True",
                    "    res['flash_total_kb'] = stat_[2] * stat_[0] / 1024.0",
                    "    res['flash_free_kb'] = stat_[3] * stat_[0] / 1024.0",
                    "    res['flash_used_kb'] = res['flash_total_kb'] -"
                    " res['flash_free_kb']",
                    "    res['flash_free_pc'] = res['flash_free_kb'] /"
                    " res['flash_total_kb'] * 100.0",
                    "    res['flash_used_pc'] = res['flash_used_kb'] /"
                    " res['flash_total_kb'] * 100.0",
                    "    del stat_",
                    "except AttributeError:",
                    "    res['flash_info_available'] = False",
                ]
            ),
            "\n".join(
                [
                    "try:",
                    "    import machine as __mc_",
                    "    if isinstance(__mc_.freq(), tuple):",
                    "        res['mc_frequency_mhz'] = __mc_.freq()[0] / 1000000.0",
                    "    else:",
                    "       res['mc_frequency_mhz'] = __mc_.freq() / 1000000.0",
                    "    res['mc_id'] = ':'.join(['{0:X}'.format(x)"
                    " for x in __mc_.unique_id()])",
                    "    del __mc_",
                    "except ImportError:",
                    "\n".join(
                        [
                            "    try:",
                            "        import microcontroller as __mc_",
                            "        res['mc_frequency_mhz'] = __mc_.cpu.frequency"
                            " / 1000000.0",
                            "        res['mc_temp_c'] = __mc_.cpu.temperature",
                            "        res['mc_id'] = ':'.join(['{0:X}'.format(x)"
                            " for x in __mc_.cpu.uid])",
                            "        del __mc_",
                            "    except ImportError:",
                            "        res['mc_frequency'] = None",
                            "        res['mc_temp'] = None",
                        ]
                    ),
                ]
            ),
            "\n".join(
                [
                    "try:",
                    "    import ulab as __ulab_",
                    "    res['ulab'] = __ulab_.__version__",
                    "    del __ulab_",
                    "except ImportError:",
                    "    res['ulab'] = None",
                ]
            ),
            "print(res)",
            "del res, __os_, __sys_",
        ]
        out, err = self.execute(commands)
        if err:
            raise OSError(self.__shortError(err))
        return ast.literal_eval(out.decode("utf-8"))

    def syncTime(self, deviceType, hasCPy=False):
        """
        Public method to set the time of the connected device to the local
        computer's time.

        @param deviceType type of board to sync time to
        @type str
        @param hasCPy flag indicating that the device has CircuitPython loadede
            (defaults to False)
        @type bool
        @exception OSError raised to indicate an issue with the device
        """
        # rtc_time[0] - year    4 digit
        # rtc_time[1] - month   1..12
        # rtc_time[2] - day     1..31
        # rtc_time[3] - weekday 1..7 1=Monday
        # rtc_time[4] - hour    0..23
        # rtc_time[5] - minute  0..59
        # rtc_time[6] - second  0..59
        # rtc_time[7] - yearday 1..366
        # rtc_time[8] - isdst   0, 1, or -1
        if deviceType == "circuitpython" or hasCPy:
            set_time = "\n".join(
                [
                    "def set_time(rtc_time):",
                    "    import rtc",
                    "    import time",
                    "    clock = rtc.RTC()",
                    "    clock_time = rtc_time[:3] + rtc_time[4:7] + (rtc_time[3],"
                    " rtc_time[7], rtc_time[8])",
                    "    clock.datetime = time.struct_time(clock_time)",
                ]
            )
        elif deviceType == "pyboard":
            # Pyboard (pyboard doesn't have machine.RTC()).
            # The pyb.RTC.datetime function takes the arguments in the
            # order: (year, month, day, weekday, hour, minute, second,
            # subseconds)
            # http://docs.micropython.org/en/latest/library/pyb.RTC.html
            # #pyb.RTC.datetime
            set_time = "\n".join(
                [
                    "def set_time(rtc_time):",
                    "    import pyb",
                    "    rtc = pyb.RTC()",
                    "    rtc.datetime(rtc_time[:7] + (0,))",
                ]
            )
        elif deviceType == "teensy":
            # The pyb.RTC.datetime function takes the arguments in the
            # order: (year, month, day, weekday, hour, minute, second,
            # subseconds)
            # https://docs.micropython.org/en/latest/library/machine.RTC.html
            # #machine-rtc
            set_time = "\n".join(
                [
                    "def set_time(rtc_time):",
                    "    import machine",
                    "    rtc = machine.RTC()",
                    "    rtc.init(rtc_time[:7] + (0,))",
                ]
            )
        elif deviceType == "esp":
            # The machine.RTC documentation was incorrect and doesn't agree
            # with the code, so no link is presented here. The order of the
            # arguments is the same as the pyboard except for LoBo MPy.
            set_time = "\n".join(
                [
                    "def set_time(rtc_time):",
                    "    import machine",
                    "    rtc = machine.RTC()",
                    "    try:",  # ESP8266 may use rtc.datetime()
                    "        rtc.datetime(rtc_time[:7] + (0,))",
                    "    except Exception:",  # ESP32 uses rtc.init()
                    "        import os",
                    "        if 'LoBo' in os.uname()[0]:",  # LoBo MPy
                    "            clock_time = rtc_time[:3] +"
                    " rtc_time[4:7] + (rtc_time[3], rtc_time[7])",
                    "        else:",
                    "            clock_time = rtc_time[:7] + (0,)",
                    "        rtc.init(clock_time)",
                ]
            )
        elif deviceType in ("bbc_microbit", "calliope"):
            # BBC micro:bit and Calliope mini with MicroPython don't support
            # time commands.
            return
        elif deviceType == "rp2040":
            # Raspberry Pi Pico (RP2040) - machine.RTC doesn't exist
            set_time = "\n".join(
                [
                    "def set_time(rtc_time):",
                    "    setup_0 = rtc_time[0] << 12 | rtc_time[1] << 8 |"
                    " rtc_time[2]",
                    "    setup_1 = (rtc_time[3] % 7) << 24 | rtc_time[4] << 16 |"
                    " rtc_time[5] << 8 | rtc_time[6]",
                    "    machine.mem32[0x4005c004] = setup_0",
                    "    machine.mem32[0x4005c008] = setup_1",
                    "    machine.mem32[0x4005c00c] |= 0x10",
                ]
            )
        elif deviceType == "pycom":
            # PyCom's machine.RTC takes its arguments in a slightly
            # different order than the official machine.RTC.
            # (year, month, day, hour, minute, second[, microsecond[,
            # tzinfo]])
            # https://docs.pycom.io/firmwareapi/pycom/machine/rtc/
            # #rtc-init-datetime-none-source-rtc-internal-rc
            set_time = "\n".join(
                [
                    "def set_time(rtc_time):",
                    "    import pycom",
                    "    rtc_time2 = rtc_time[:3] + rtc_time[4:7]",
                    "    import machine",
                    "    rtc = machine.RTC()",
                    "    rtc.init(rtc_time2)",
                ]
            )
        else:
            # no set_time() support for generic boards
            return

        now = time.localtime(time.time())
        commands = [
            set_time,
            "set_time({0})".format(
                (
                    now.tm_year,
                    now.tm_mon,
                    now.tm_mday,
                    now.tm_wday + 1,
                    now.tm_hour,
                    now.tm_min,
                    now.tm_sec,
                    now.tm_yday,
                    now.tm_isdst,
                )
            ),
            "del set_time",
        ]
        out, err = self.execute(commands)
        if err:
            raise OSError(self.__shortError(err))

    def getTime(self):
        """
        Public method to get the current time of the device.

        @return time of the device
        @rtype str
        @exception OSError raised to indicate an issue with the device
        """
        commands = [
            "\n".join(
                [
                    "try:",
                    "    import rtc as __rtc_",
                    "    print('{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'"
                    ".format(*__rtc_.RTC().datetime[:6]))",
                    "    del __rtc_",
                    "except:",
                    "    import time as __time_",
                    "    try:",
                    "        print(__time_.strftime('%Y-%m-%d %H:%M:%S',"
                    # __IGNORE_WARNING_M601__
                    " __time_.localtime()))",
                    "    except AttributeError:",
                    "        tm = __time_.localtime()",
                    "        print('{0:04d}-{1:02d}-{2:02d}"
                    " {3:02d}:{4:02d}:{5:02d}'"
                    ".format(tm[0], tm[1], tm[2], tm[3], tm[4], tm[5]))",
                    "        del tm",
                    "    del __time_",
                ]
            ),
        ]
        out, err = self.execute(commands)
        if err:
            if b"NotImplementedError" in err:
                return "&lt;unsupported&gt; &lt;unsupported&gt;"
            raise OSError(self.__shortError(err))
        return out.decode("utf-8").strip()

    def getModules(self):
        """
        Public method to show a list of modules built into the firmware.

        @return list of builtin modules
        @rtype list of str
        @exception OSError raised to indicate an issue with the device
        """
        commands = ["help('modules')"]
        out, err = self.execute(commands)
        if err:
            raise OSError(self.__shortError(err))

        modules = []
        for line in out.decode("utf-8").splitlines()[:-1]:
            modules.extend(line.split())
        return modules

eric ide

mercurial