eric6/MicroPython/MicroPythonCommandsInterface.py

branch
micropython
changeset 7095
8e10acb1cd85
child 7102
5e77aa4671e6
diff -r d5f340dfb986 -r 8e10acb1cd85 eric6/MicroPython/MicroPythonCommandsInterface.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/eric6/MicroPython/MicroPythonCommandsInterface.py	Mon Jul 29 20:20:18 2019 +0200
@@ -0,0 +1,734 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2019 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing some file system commands for MicroPython.
+"""
+
+from __future__ import unicode_literals
+
+import ast
+import time
+import os
+
+from PyQt5.QtCore import pyqtSlot, pyqtSignal, QObject, QThread, QTimer
+
+from .MicroPythonSerialPort import MicroPythonSerialPort
+
+import Preferences
+
+
+class MicroPythonCommandsInterface(QObject):
+    """
+    Class implementing some file system commands for MicroPython.
+    
+    Commands are provided to perform operations on the file system of a
+    connected MicroPython device. Supported commands are:
+    <ul>
+    <li>ls: directory listing</li>
+    <li>lls: directory listing with meta data</li>
+    <li>cd: change directory</li>
+    <li>pwd: get the current directory</li>
+    <li>put: copy a file to the connected device</li>
+    <li>get: get a file from the connected device</li>
+    <li>rm: remove a file from the connected device</li>
+    <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash)
+    <li>mkdir: create a new directory</li>
+    <li>rmdir: remove an empty directory</li>
+    </ul>
+    
+    There are additional commands related to time and version.
+    <ul>
+    <li>version: get version info about MicroPython</li>
+    <li>getImplementation: get some implementation information</li>
+    <li>syncTime: synchronize the time of the connected device</li>
+    <li>showTime: show the current time of the connected device</li>
+    </ul>
+    
+    @signal executeAsyncFinished() emitted to indicate the end of an
+        asynchronously executed list of commands (e.g. a script)
+    @signal dataReceived(data) emitted to send data received via the serial
+        connection for further processing
+    """
+    executeAsyncFinished = pyqtSignal()
+    dataReceived = pyqtSignal(bytes)
+    
+    def __init__(self, parent=None):
+        """
+        Constructor
+        
+        @param parent reference to the parent object
+        @type QObject
+        """
+        super(MicroPythonCommandsInterface, self).__init__(parent)
+        
+        self.__blockReadyRead = False
+        
+        self.__serial = MicroPythonSerialPort(
+            timeout=Preferences.getMicroPython("SerialTimeout"),
+            parent=self)
+        self.__serial.readyRead.connect(self.__readSerial)
+    
+    @pyqtSlot()
+    def __readSerial(self):
+        """
+        Private slot to read all available serial data and emit it with the
+        "dataReceived" signal for further processing.
+        """
+        if not self.__blockReadyRead:
+            data = bytes(self.__serial.readAll())
+            self.dataReceived.emit(data)
+    
+    @pyqtSlot()
+    def connectToDevice(self, port):
+        """
+        Public slot to start the manager.
+        
+        @param port name of the port to be used
+        @type str
+        @return flag indicating success
+        @rtype bool
+        """
+        return self.__serial.openSerialLink(port)
+    
+    @pyqtSlot()
+    def disconnectFromDevice(self):
+        """
+        Public slot to stop the thread.
+        """
+        self.__serial.closeSerialLink()
+    
+    def isConnected(self):
+        """
+        Public method to get the connection status.
+        
+        @return flag indicating the connection status
+        @rtype bool
+        """
+        return self.__serial.isConnected()
+    
+    @pyqtSlot()
+    def handlePreferencesChanged(self):
+        """
+        Public slot to handle a change of the preferences.
+        """
+        self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout"))
+    
+    def write(self, data):
+        """
+        Public method to write data to the connected device.
+        
+        @param data data to be written
+        @type bytes or bytearray
+        """
+        self.__serial.isConnected() and self.__serial.write(data)
+    
+    def __rawOn(self):
+        """
+        Private method to switch the connected device to 'raw' mode.
+        
+        Note: switching to raw mode is done with synchronous writes.
+        
+        @return flag indicating success
+        @@rtype bool
+        """
+        if not self.__serial:
+            return False
+        
+        rawReplMessage = b"raw REPL; CTRL-B to exit\r\n"
+        softRebootMessage = b"soft reboot\r\n"
+        
+        self.__serial.write(b"\x02")        # end raw mode if required
+        self.__serial.waitForBytesWritten()
+        for _i in range(3):
+            # CTRL-C three times to break out of loops
+            self.__serial.write(b"\r\x03")
+            self.__serial.waitForBytesWritten()
+            QThread.msleep(10)
+        self.__serial.readAll()             # read all data and discard it
+        self.__serial.write(b"\r\x01")      # send CTRL-A to enter raw mode
+        self.__serial.readUntil(rawReplMessage)
+        if self.__serial.hasTimedOut():
+            return False
+        self.__serial.write(b"\x04")        # send CTRL-D to soft reset
+        self.__serial.readUntil(softRebootMessage)
+        if self.__serial.hasTimedOut():
+            return False
+        
+        # some MicroPython devices seem to need to be convinced in some
+        # special way
+        data = self.__serial.readUntil(rawReplMessage)
+        if self.__serial.hasTimedOut():
+            return False
+        if not data.endswith(rawReplMessage):
+            self.__serial.write(b"\r\x01")  # send CTRL-A again
+            self.__serial.readUntil(rawReplMessage)
+            if self.__serial.hasTimedOut():
+                return False
+        self.__serial.readAll()             # read all data and discard it
+        return True
+    
+    def __rawOff(self):
+        """
+        Private method to switch 'raw' mode off.
+        """
+        if self.__serial:
+            self.__serial.write(b"\x02")    # send CTRL-B to cancel raw mode
+    
+    def execute(self, commands):
+        """
+        Public method to send commands to the connected device and return the
+        result.
+        
+        If no serial connection is available, empty results will be returned.
+        
+        @param commands list of commands to be executed
+        @type str
+        @return tuple containing stdout and stderr output of the device
+        @rtype tuple of (bytes, bytes)
+        """
+        if not self.__serial:
+            return b"", b""
+        
+        if not self.__serial.isConnected():
+            return b"", b"Device not connected or not switched on."
+        
+        result = bytearray()
+        err = b""
+        
+        self.__blockReadyRead = True
+        ok = self.__rawOn()
+        if not ok:
+            self.__blockReadyRead = False
+            return (
+                b"",
+                b"Could not switch to raw mode. Is the device switched on?"
+            )
+        
+        QThread.msleep(10)
+        for command in commands:
+            if command:
+                commandBytes = command.encode("utf-8")
+                self.__serial.write(commandBytes + b"\x04")
+                # read until prompt
+                response = self.__serial.readUntil(b"\x04>")
+                if self.__serial.hasTimedOut():
+                    self.__blockReadyRead = False
+                    return b"", b"Timeout while processing commands."
+                if b"\x04" in response[2:-2]:
+                    # split stdout, stderr
+                    out, err = response[2:-2].split(b"\x04")
+                    result += out
+                else:
+                    err = b"invalid response received: " + response
+                if err:
+                    self.__blockReadyRead = False
+                    return b"", err
+        QThread.msleep(10)
+        self.__rawOff()
+        self.__blockReadyRead = False
+        
+        return bytes(result), err
+    
+    def executeAsync(self, commandsList):
+        """
+        Public method to execute a series of commands over a period of time
+        without returning any result (asynchronous execution).
+        
+        @param commandsList list of commands to be execute on the device
+        @type list of bytes
+        """
+        def remainingTask(commands):
+            self.executeAsync(commands)
+        
+        if commandsList:
+            command = commandsList[0]
+            self.__serial.write(command)
+            remainder = commandsList[1:]
+            QTimer.singleShot(2, lambda: remainingTask(remainder))
+        else:
+            self.executeAsyncFinished.emit()
+    
+    def __shortError(self, error):
+        """
+        Private method to create a shortened error message.
+        
+        @param error verbose error message
+        @type bytes
+        @return shortened error message
+        @rtype str
+        """
+        if error:
+            decodedError = error.decode("utf-8")
+            try:
+                return decodedError.split["\r\n"][-2]
+            except Exception:
+                return decodedError
+        return self.tr("Detected an error without indications.")
+    
+    ##################################################################
+    ## Methods below implement the file system commands
+    ##################################################################
+    
+    def ls(self, dirname=""):
+        """
+        Public method to get a directory listing of the connected device.
+        
+        @param dirname name of the directory to be listed
+        @type str
+        @return tuple containg the directory listing
+        @rtype tuple of str
+        @exception IOError raised to indicate an issue with the device
+        """
+        commands = [
+            "import os",
+            "print(os.listdir('{0}'))".format(dirname),
+        ]
+        out, err = self.execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+        return ast.literal_eval(out.decode("utf-8"))
+    
+    def lls(self, dirname="", fullstat=False):
+        """
+        Public method to get a long directory listing of the connected device
+        including meta data.
+        
+        @param dirname name of the directory to be listed
+        @type str
+        @param fullstat flag indicating to return the full stat() tuple
+        @type bool
+        @return list containing the directory listing with tuple entries of
+            the name and and a tuple of mode, size and time (if fullstat is
+            false) or the complete stat() tuple. 'None' is returned in case the
+            directory doesn't exist.
+        @rtype tuple of (str, tuple)
+        @exception IOError raised to indicate an issue with the device
+        """
+        commands = [
+            "import os",
+            "\n".join([
+                "def stat(filename):",
+                "    try:",
+                "        rstat = os.lstat(filename)",
+                "    except:",
+                "        rstat = os.stat(filename)",
+                "    return tuple(rstat)",
+            ]),
+            "\n".join([
+                "def listdir_stat(dirname):",
+                "    try:",
+                "        files = os.listdir(dirname)",
+                "    except OSError:",
+                "        return None",
+                "    if dirname in ('', '/'):",
+                "        return list((f, stat(f)) for f in files)",
+                "    return list((f, stat(dirname + '/' + f)) for f in files)",
+            ]),
+            "print(listdir_stat('{0}'))".format(dirname),
+        ]
+        out, err = self.execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+        fileslist = ast.literal_eval(out.decode("utf-8"))
+        if fileslist is None:
+            return None
+        else:
+            if fullstat:
+                return fileslist
+            else:
+                return [(f, (s[0], s[6], s[8])) for f, s in fileslist]
+    
+    def cd(self, dirname):
+        """
+        Public method to change the current directory on the connected device.
+        
+        @param dirname directory to change to
+        @type str
+        @exception IOError raised to indicate an issue with the device
+        """
+        assert dirname
+        
+        commands = [
+            "import os",
+            "os.chdir('{0}')".format(dirname),
+        ]
+        out, err = self.execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+    
+    def pwd(self):
+        """
+        Public method to get the current directory of the connected device.
+        
+        @return current directory
+        @rtype str
+        @exception IOError raised to indicate an issue with the device
+        """
+        commands = [
+            "import os",
+            "print(os.getcwd())",
+        ]
+        out, err = self.execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+        return out.decode("utf-8").strip()
+     
+    def rm(self, filename):
+        """
+        Public method to remove a file from the connected device.
+        
+        @param filename name of the file to be removed
+        @type str
+        @exception IOError raised to indicate an issue with the device
+        """
+        assert filename
+        
+        commands = [
+            "import os",
+            "os.remove('{0}')".format(filename),
+        ]
+        out, err = self.execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+    
+    def rmrf(self, name, recursive=False, force=False):
+        """
+        Public method to remove a file or directory recursively.
+        
+        @param name of the file or directory to remove
+        @type str
+        @param recursive flag indicating a recursive deletion
+        @type bool
+        @param force flag indicating to ignore errors
+        @type bool
+        @return flag indicating success
+        @rtype bool
+        @exception IOError raised to indicate an issue with the device
+        """
+        assert name
+        
+        commands = [
+            "import os",
+            "\n".join([
+                "def remove_file(name, recursive=False, force=False):",
+                "    try:",
+                "        mode = os.stat(name)[0]",
+                "        if mode & 0x4000 != 0:",
+                "            if recursive:",
+                "                for file in os.listdir(name):",
+                "                    success = remove_file(name + '/' + file,"
+                " recursive, force)",
+                "                    if not success and not force:",
+                "                        return False",
+                "                os.rmdir(name)",
+                "            else:",
+                "                if not force:",
+                "                    return False",
+                "        else:",
+                "            os.remove(name)",
+                "    except:",
+                "        if not force:",
+                "            return False",
+                "    return True",
+            ]),
+            "print(remove_file('{0}', {1}, {2}))".format(name, recursive,
+                                                         force),
+        ]
+        out, err = self.execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+        return ast.literal_eval(out.decode("utf-8"))
+    
+    def mkdir(self, dirname):
+        """
+        Public method to create a new directory.
+        
+        @param dirname name of the directory to create
+        @type str
+        @exception IOError raised to indicate an issue with the device
+        """
+        assert dirname
+   
+        commands = [
+            "import os",
+            "os.mkdir('{0}')".format(dirname),
+        ]
+        out, err = self.execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+    
+    def rmdir(self, dirname):
+        """
+        Public method to remove a directory.
+        
+        @param dirname name of the directory to be removed
+        @type str
+        @exception IOError raised to indicate an issue with the device
+        """
+        assert dirname
+   
+        commands = [
+            "import os",
+            "os.rmdir('{0}')".format(dirname),
+        ]
+        out, err = self.execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+    
+    def put(self, hostFileName, deviceFileName=None):
+        """
+        Public method to copy a local file to the connected device.
+        
+        @param hostFileName name of the file to be copied
+        @type str
+        @param deviceFileName name of the file to copy to
+        @type str
+        @return flag indicating success
+        @rtype bool
+        @exception IOError raised to indicate an issue with the device
+        """
+        if not os.path.isfile(hostFileName):
+            raise IOError("No such file: {0}".format(hostFileName))
+        
+        with open(hostFileName, "rb") as hostFile:
+            content = hostFile.read()
+            # convert eol '\r'
+            content = content.replace(b"\r\n", b"\r")
+            content = content.replace(b"\n", b"\r")
+        
+        if not deviceFileName:
+            deviceFileName = os.path.basename(hostFileName)
+        
+        commands = [
+            "fd = open('{0}', 'wb')".format(deviceFileName),
+            "f = fd.write",
+        ]
+        while content:
+            chunk = content[:64]
+            commands.append("f(" + repr(chunk) + ")")
+            content = content[64:]
+        commands.append("fd.close()")
+        
+        out, err = self.execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+        return True
+    
+    def get(self, deviceFileName, hostFileName=None):
+        """
+        Public method to copy a file from the connected device.
+        
+        @param deviceFileName name of the file to copy
+        @type str
+        @param hostFileName name of the file to copy to
+        @type str
+        @return flag indicating success
+        @rtype bool
+        @exception IOError raised to indicate an issue with the device
+        """
+        if not hostFileName:
+            hostFileName = deviceFileName
+        
+        commands = [
+            "\n".join([
+                "try:",
+                "    from microbit import uart as u",
+                "except ImportError:",
+                "    try:",
+                "        from machine import UART",
+                "        u = UART(0, {0})".format(115200),
+                "    except Exception:",
+                "        try:",
+                "            from sys import stdout as u",
+                "        except Exception:",
+                "            raise Exception('Could not find UART module in"
+                " device.')",
+            ]),
+            "f = open('{0}', 'rb')".format(deviceFileName),
+            "r = f.read",
+            "result = True",
+            "\n".join([
+                "while result:",
+                "    result = r(32)",
+                "    if result:",
+                "        u.write(result)",
+            ]),
+            "f.close()",
+        ]
+        out, err = self.execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+        
+        # write the received bytes to the local file
+        # convert eol to "\n"
+        out = out.replace(b"\r\n", b"\n")
+        out = out.replace(b"\r", b"\n")
+        with open(hostFileName, "wb") as hostFile:
+            hostFile.write(out)
+        return True
+    
+    def fileSystemInfo(self):
+        """
+        Public method to obtain information about the currently mounted file
+        systems.
+        
+        @return tuple of tuples containing the file system name, the total
+            size, the used size and the free size
+        @rtype tuple of tuples of (str, int, int, int)
+        @exception IOError raised to indicate an issue with the device
+        """
+        commands = [
+            "import os",
+            "\n".join([
+                "def fsinfo():",
+                "    infolist = []",
+                "    fsnames = os.listdir('/')",
+                "    for fs in fsnames:",
+                "        fs = '/' + fs",
+                "        infolist.append((fs, os.statvfs(fs)))",
+                "    return infolist",
+            ]),
+            "print(fsinfo())",
+        ]
+        out, err = self.execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+        infolist = ast.literal_eval(out.decode("utf-8"))
+        if infolist is None:
+            return None
+        else:
+            filesystemInfos = []
+            for fs, info in infolist:
+                totalSize = info[2] * info[1]
+                freeSize = info[4] * info[1]
+                usedSize = totalSize - freeSize
+                filesystemInfos.append((fs, totalSize, usedSize, freeSize))
+        
+        return tuple(filesystemInfos)
+    
+    ##################################################################
+    ## non-filesystem related methods below
+    ##################################################################
+    
+    def version(self):
+        """
+        Public method to get the MicroPython version information of the
+        connected device.
+        
+        @return dictionary containing the version information
+        @rtype dict
+        @exception IOError raised to indicate an issue with the device
+        """
+        commands = [
+            "import os",
+            "print(os.uname())",
+        ]
+        out, err = self.execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+        
+        rawOutput = out.decode("utf-8").strip()
+        rawOutput = rawOutput[1:-1]
+        items = rawOutput.split(",")
+        result = {}
+        for item in items:
+            key, value = item.strip().split("=")
+            result[key.strip()] = value.strip()[1:-1]
+        return result
+    
+    def getImplementation(self):
+        """
+        Public method to get some implementation information of the connected
+        device.
+        
+        @return dictionary containing the implementation information
+        @rtype dict
+        @exception IOError raised to indicate an issue with the device
+        """
+        commands = [
+            "import sys",
+            "res = {}",                             # __IGNORE_WARNING_M613__
+            "\n".join([
+                "try:",
+                "    res['name'] = sys.implementation.name",
+                "except AttributeError:",
+                "    res['name'] = 'unknown'",
+            ]),
+            "\n".join([
+                "try:",
+                "    res['version'] = '.'.join((str(i) for i in"
+                " sys.implementation.version))",
+                "except AttributeError:",
+                "    res['version'] = 'unknown'",
+            ]),
+            "print(res)",
+        ]
+        out, err = self.execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+        return ast.literal_eval(out.decode("utf-8"))
+    
+    def syncTime(self):
+        """
+        Public method to set the time of the connected device to the local
+        computer's time.
+        
+        @exception IOError raised to indicate an issue with the device
+        """
+        now = time.localtime(time.time())
+        commands = [
+            "\n".join([
+                "def set_time(rtc_time):",
+                "    rtc = None",
+                "    try:",           # Pyboard (it doesn't have machine.RTC())
+                "        import pyb",
+                "        rtc = pyb.RTC()",
+                "        clock_time = rtc_time[:6] + (rtc_time[6] + 1, 0)",
+                "        rtc.datetime(clock_time)",
+                "    except:",
+                "        try:",
+                "            import machine",
+                "            rtc = machine.RTC()",
+                "            try:",     # ESP8266 may use rtc.datetime()
+                "                clock_time = rtc_time[:6] +"
+                " (rtc_time[6] + 1, 0)",
+                "                rtc.datetime(clock_time)",
+                "            except:",  # ESP32 uses rtc.init()
+                "                rtc.init(rtc_time[:6])",
+                "        except:",
+                "            try:",
+                "                import rtc, time",
+                "                clock=rtc.RTC()",
+                "                clock.datetime = time.struct_time(rtc_time +"
+                " (-1, -1))",
+                "            except:",
+                "                pass",
+            ]),
+            "set_time({0})".format((now.tm_year, now.tm_mon, now.tm_mday,
+                                    now.tm_hour, now.tm_min, now.tm_sec,
+                                    now.tm_wday))
+        ]
+        out, err = self.execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+    
+    def showTime(self):
+        """
+        Public method to get the current time of the device.
+        
+        @return time of the device
+        @rtype str
+        @exception IOError raised to indicate an issue with the device
+        """
+        commands = [
+            "import time",
+            "print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))",
+            # __IGNORE_WARNING_M601__
+        ]
+        out, err = self.execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+        return out.decode("utf-8").strip()

eric ide

mercurial