eric6/MicroPython/MicroPythonFileSystem.py

Mon, 22 Jul 2019 20:17:33 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Mon, 22 Jul 2019 20:17:33 +0200
branch
micropython
changeset 7080
9a3adf033f90
parent 7077
3b7475b7a1ef
child 7081
ed510767c096
permissions
-rw-r--r--

MicroPython: continued implementing the file manager widget.

# -*- coding: utf-8 -*-

# Copyright (c) 2019 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing some file system commands for MicroPython.
"""

from __future__ import unicode_literals

import ast
import time
import stat
import os

from PyQt5.QtCore import pyqtSlot, pyqtSignal, QObject, QThread

from .MicroPythonSerialPort import MicroPythonSerialPort


class MicroPythonFileSystem(QObject):
    """
    Class implementing some file system commands for MicroPython.
    
    Some FTP like commands are provided to perform operations on the file
    system of a connected MicroPython device. Supported commands are:
    <ul>
    <li>ls: directory listing</li>
    <li>lls: directory listing with meta data</li>
    <li>cd: change directory</li>
    <li>pwd: get the current directory</li>
    <li>put: copy a file to the connected device</li>
    <li>get: get a file from the connected device</li>
    <li>rm: remove a file from the connected device</li>
    <li>mkdir: create a new directory</li>
    <li>rmdir: remove an empty directory</li>
    <li>version: get version info about MicroPython</li>
    </ul>
    """
    def __init__(self, parent=None):
        """
        Constructor
        
        @param parent reference to the parent object
        @type QObject
        """
        super(MicroPythonFileSystem, self).__init__(parent)
        
        self.__serial = None
    
    def setSerial(self, serial):
        """
        Public method to set the serial port to be used.
        
        Note: The serial port should be initialized and open already.
        
        @param serial open serial port
        @type MicroPythonSerialPort
        """
        self.__serial = serial
    
    def __rawOn(self):
        """
        Private method to switch the connected device to 'raw' mode.
        
        Note: switching to raw mode is done with synchroneous writes.
        """
        if not self.__serial:
            return
        
        rawReplMessage = b"raw REPL; CTRL-B to exit\r\n"
        softRebootMessage = b"soft reboot\r\n"
        
        self.__serial.write(b"\x02")        # end raw mode if required
        self.__serial.waitForBytesWritten()
        for _i in range(3):
            # CTRL-C three times to break out of loops
            self.__serial.write(b"\r\x03")
            self.__serial.waitForBytesWritten()
            QThread.msleep(10)
        self.__serial.readAll()             # read all data and discard it
        self.__serial.write(b"\r\x01")      # send CTRL-A to enter raw mode
        self.__serial.readUntil(rawReplMessage)
        self.__serial.write(b"\x04")        # send CTRL-D to soft reset
        self.__serial.readUntil(softRebootMessage)
        
        # some MicroPython devices seem to need to be convinced in some
        # special way
        data = self.__serial.readUntil(rawReplMessage)
        if not data.endswith(rawReplMessage):
            self.__serial.write(b"\r\x01")  # send CTRL-A again
            self.__serial.readUntil(rawReplMessage)
        self.__serial.readAll()             # read all data and discard it
    
    def __rawOff(self):
        """
        Private method to switch 'raw' mode off.
        """
        if self.__serial:
            self.__serial.write(b"\x02")    # send CTRL-B to cancel raw mode
    
    def __execute(self, commands):
        """
        Private method to send commands to the connected device and return the
        result.
        
        If no serial connection is available, empty results will be returned.
        
        @param commands list of commands to be executed
        @type str
        @return tuple containing stdout and stderr output of the device
        @rtype tuple of (bytes, bytes)
        """
        if not self.__serial:
            return b"", b""
        
        result = bytearray()
        err = b""
        
        self.__rawOn()
        QThread.msleep(10)
        for command in commands:
            if command:
                commandBytes = command.encode("utf-8")
                self.__serial.write(commandBytes + b"\x04")
                # read until prompt
                response = self.__serial.readUntil(b"\x04>")
                # split stdout, stderr
                out, err = response[2:-2].split(b"\x04")
                result += out
                if err:
                    return b"", err
        QThread.msleep(10)
        self.__rawOff()
        
        return bytes(result), err
    
    def __shortError(self, error):
        """
        Private method to create a shortened error message.
        
        @param error verbose error message
        @type bytes
        @return shortened error message
        @rtype str
        """
        if error:
            decodedError = error.decode("utf-8")
            try:
                return decodedError.split["\r\n"][-2]
            except Exception:
                return decodedError
        return self.tr("Detected an error without indications.")
    
    ##################################################################
    ## Methods below implement the file system commands
    ##################################################################
    
    def ls(self, dirname=""):
        """
        Public method to get a directory listing of the connected device.
        
        @param dirname name of the directory to be listed
        @type str
        @return tuple containg the directory listing
        @rtype tuple of str
        @exception IOError raised to indicate an issue with the device
        """
        commands = [
            "import os",
            "print(os.listdir('{0}'))".format(dirname),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
        return ast.literal_eval(out.decode("utf-8"))
    
    def lls(self, dirname=""):
        """
        Public method to get a long directory listing of the connected device
        including meta data.
        
        @param dirname name of the directory to be listed
        @type str
        @return list containing the the directory listing with tuple entries
            of the name and and a tuple of mode, size and time
        @rtype tuple of str
        @exception IOError raised to indicate an issue with the device
        """
        commands = [
            "import os",
            "\n".join([
                "def stat(filename):",
                "    try:",
                "        rstat = os.lstat(filename)",
                "    except:",
                "        rstat = os.stat(filename)",
                "    return tuple(rstat)",
            ]),
            "\n".join([
                "def listdir_stat(dirname):",
                "    try:",
                "        files = os.listdir(dirname)",
                "    except OSError:",
                "        return []",
                "    if dirname in ('', '/'):",
                "        return list((f, stat(f)) for f in files)",
                "    return list((f, stat(dirname + '/' + f)) for f in files)",
            ]),
            "print(listdir_stat('{0}'))".format(dirname),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
        fileslist = ast.literal_eval(out.decode("utf-8"))
        return [(f, (s[0], s[6], s[8])) for f, s in fileslist]
    
    def cd(self, dirname):
        """
        Public method to change the current directory on the connected device.
        
        @param dirname directory to change to
        @type str
        @exception IOError raised to indicate an issue with the device
        """
        assert dirname
        
        commands = [
            "import os",
            "os.chdir('{0}')".format(dirname),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
    
    def pwd(self):
        """
        Public method to get the current directory of the connected device.
        
        @return current directory
        @rtype str
        @exception IOError raised to indicate an issue with the device
        """
        commands = [
            "import os",
            "print(os.getcwd())",
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
        return out.decode("utf-8").strip()
     
    def rm(self, filename):
        """
        Public method to remove a file from the connected device.
        
        @param filename name of the file to be removed
        @type str
        @exception IOError raised to indicate an issue with the device
        """
        assert filename
        
        commands = [
            "import os",
            "os.remove('{0}')".format(filename),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
    
    def mkdir(self, dirname):
        """
        Public method to create a new directory.
        
        @param dirname name of the directory to create
        @type str
        @exception IOError raised to indicate an issue with the device
        """
        assert dirname
   
        commands = [
            "import os",
            "os.mkdir('{0}')".format(dirname),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
    
    def rmdir(self, dirname):
        """
        Public method to remove a directory.
        
        @param dirname name of the directory to be removed
        @type str
        @exception IOError raised to indicate an issue with the device
        """
        assert dirname
   
        commands = [
            "import os",
            "os.rmdir('{0}')".format(dirname),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
    
    def put(self, hostFileName, deviceFileName=None):
        """
        Public method to copy a local file to the connected device.
        
        @param hostFileName name of the file to be copied
        @type str
        @param deviceFileName name of the file to copy to
        @type str
        @return flag indicating success
        @rtype bool
        @exception IOError raised to indicate an issue with the device
        """
        if not os.path.isfile(hostFileName):
            raise IOError("No such file: {0}".format(hostFileName))
        
        with open(hostFileName, "rb") as hostFile:
            content = hostFile.read()
        
        if not deviceFileName:
            deviceFileName = os.path.basename(hostFileName)
        
        commands = [
            "fd = open('{0}', 'wb')".format(deviceFileName),
            "f = fd.write",
        ]
        while content:
            chunk = content[:64]
            commands.append("f(" + repr(chunk) + ")")
            content = content[64:]
        commands.append("fd.close()")
        
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
        return True
    
    def get(self, deviceFileName, hostFileName=None):
        """
        Public method to copy a file from the connected device.
        
        @param deviceFileName name of the file to copy
        @type str
        @param hostFileName name of the file to copy to
        @type str
        @return flag indicating success
        @rtype bool
        @exception IOError raised to indicate an issue with the device
        """
        if not hostFileName:
            hostFileName = deviceFileName
        
        commands = [
            "\n".join([
                "try:",
                "    from microbit import uart as u",
                "except ImportError:",
                "    try:",
                "        from machine import UART",
                "        u = UART(0, {0})".format(115200),
                "    except Exception:",
                "        try:",
                "            from sys import stdout as u",
                "        except Exception:",
                "            raise Exception('Could not find UART module in"
                " device.')",
            ]),
            "f = open('{0}', 'rb')".format(deviceFileName),
            "r = f.read",
            "result = True",
            "\n".join([
                "while result:",
                "    result = r(32)"
                "    if result:",
                "        u.write(result)",
            ]),
            "f.close()",
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
        
        # write the received bytes to the local file
        with open(hostFileName, "wb") as hostFile:
            hostFile.write(out)
        return True
    
    # TODO: add rsync function
    
    def version(self):
        """
        Public method to get the MicroPython version information of the
        connected device.
        
        @return dictionary containing the version information
        @rtype dict
        @exception ValueError raised to indicate that the device might not be
            running MicroPython or there was an issue parsing the output
        """
        commands = [
            "import os",
            "print(os.uname())",
        ]
        try:
            out, err = self.__execute(commands)
            if err:
                raise ValueError(self.__shortError(err))
        except ValueError:
            # just re-raise it
            raise
        except Exception:
            # Raise a value error to indicate being unable to find something
            # on the device that will return parseable information about the
            # version. It doesn't matter what the error is, it just needs to
            # report a failure with the expected ValueError exception.
            raise ValueError("Unable to determine version information.")
        
        rawOutput = out.decode("utf-8").strip()
        rawOutput = rawOutput[1:-1]
        items = rawOutput.split(",")
        result = {}
        for item in items:
            key, value = item.strip().split("=")
            result[key.strip()] = value.strip()[1:-1]
        return result
    
    def syncTime(self):
        """
        Public method to set the time of the connected device to the local
        computer's time.
        
        @exception IOError raised to indicate an issue with the device
        """
        now = time.localtime(time.time())
        commands = [
            "\n".join([
                "def set_time(rtc_time):",
                "    rtc = None",
                "    try:",           # Pyboard (it doesn't have machine.RTC())
                "        import pyb",
                "        rtc = pyb.RTC()",
                "        rtc.datetime(rtc_time)",
                "    except:",
                "        try:",
                "            import machine",
                "            rtc = machine.RTC()",
                "            try:",     # ESP8266 uses rtc.datetime()
                "                rtc.datetime(rtc_time)",
                "            except:",  # ESP32 uses rtc.init()
                "                rtc.init(rtc_time)",
                "        except:",
                "            pass",
            ]),
            "set_time({0})".format((now.tm_year, now.tm_mon, now.tm_mday,
                                    now.tm_wday + 1, now.tm_hour, now.tm_min,
                                    now.tm_sec, 0))
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))


class MicroPythonFileManager(QObject):
    """
    Class implementing an interface to the device file system commands with
    some additional sugar.
    
    @signal longListFiles(result) emitted with a tuple of tuples containing the
        name, mode, size and time for each directory entry
    @signal currentDir(dirname) emitted to report the current directory of the
        device
    @signal getFileDone(deviceFile, localFile) emitted after the file was
        fetched from the connected device and written to the local file system
    @signal putFileDone(localFile, deviceFile) emitted after the file was
        copied to the connected device
    @signal deleteFileDone(deviceFile) emitted after the file has been deleted
        on the connected device
    
    @signal longListFilesFailed(exc) emitted with a failure message to indicate
        a failed long listing operation
    @signal currentDirFailed(exc) emitted with a failure message to indicate
        that the current directory is not available
    @signal getFileFailed(exc) emitted with a failure message to indicate that
        the file could not be fetched
    @signal putFileFailed(exc) emitted with a failure message to indicate that
        the file could not be copied
    @signal deleteFileFailed(exc) emitted with a failure message to indicate
        that the file could not be deleted on the device
    """
    longListFiles = pyqtSignal(tuple)
    currentDir = pyqtSignal(str)
    getFileDone = pyqtSignal(str, str)
    putFileDone = pyqtSignal(str, str)
    deleteFileDone = pyqtSignal(str)
    
    longListFilesFailed = pyqtSignal(str)
    currentDirFailed = pyqtSignal(str)
    getFileFailed = pyqtSignal(str)
    putFileFailed = pyqtSignal(str)
    deleteFileFailed = pyqtSignal(str)
    
    def __init__(self, port, parent=None):
        """
        Constructor
        
        @param port port name of the device
        @type str
        @param parent reference to the parent object
        @type QObject
        """
        super(MicroPythonFileManager, self).__init__(parent)
        
        self.__serialPort = port
        self.__serial = MicroPythonSerialPort(parent=self)
        self.__fs = MicroPythonFileSystem(parent=self)
    
    @pyqtSlot()
    def connect(self):
        """
        Public slot to start the manager.
        """
        self.__serial.openSerialLink(self.__serialPort)
        self.__fs.setSerial(self.__serial)
    
    @pyqtSlot()
    def disconnect(self):
        """
        Public slot to stop the thread.
        """
        self.__serial.closeSerialLink()
    
    @pyqtSlot(str)
    def lls(self, dirname):
        """
        Public slot to get a long listing of the given directory.
        
        @param dirname name of the directory to list
        @type str
        """
        try:
            filesList = self.__fs.lls(dirname)
            result = [(decoratedName(name, mode),
                       mode2string(mode),
                       str(size),
                       mtime2string(time)) for
                      name, (mode, size, time) in filesList]
            self.longListFiles.emit(tuple(result))
        except Exception as exc:
            self.longListFilesFailed.emit(str(exc))
    
    @pyqtSlot()
    def pwd(self):
        """
        Public slot to get the current directory of the device.
        """
        try:
            pwd = self.__fs.pwd()
            self.currentDir.emit(pwd)
        except Exception as exc:
            self.currentDirFailed.emit(str(exc))
    
    @pyqtSlot(str)
    @pyqtSlot(str, str)
    def get(self, deviceFileName, hostFileName=""):
        """
        Public slot to get a file from the connected device.
        
        @param deviceFileName name of the file on the device
        @type str
        @param hostFileName name of the local file
        @type str
        """
        if hostFileName and os.path.isdir(hostFileName):
            # only a local directory was given
            hostFileName = os.path.join(hostFileName,
                                        os.path.basename(deviceFileName))
        try:
            self.__fs.get(deviceFileName, hostFileName)
            self.getFileDone.emit(deviceFileName, hostFileName)
        except Exception as exc:
            self.getFileFailed.emit(str(exc))
    
    @pyqtSlot(str)
    @pyqtSlot(str, str)
    def put(self, hostFileName, deviceFileName=""):
        """
        Public slot to put a file onto the device.
        
        @param hostFileName name of the local file
        @type str
        @param deviceFileName name of the file on the connected device
        @type str
        """
        try:
            self.__fs.put(hostFileName, deviceFileName)
            self.putFileDone(hostFileName, deviceFileName)
        except Exception as exc:
            self.putFileFailed(str(exc))
    
    @pyqtSlot(str)
    def delete(self, deviceFileName):
        """
        Public slot to delete a file on the device.
        
        @param deviceFileName name of the file on the connected device
        @type str
        """
        try:
            self.__fs.rm(deviceFileName)
            self.deleteFileDone.emit(deviceFileName)
        except Exception as exc:
            self.deleteFileFailed(str(exc))

##################################################################
## Utility methods below
##################################################################


def mtime2string(mtime):
    """
    Function to convert a time value to a string representation.
    
    @param mtime time value
    @type int
    @return string representation of the given time
    @rtype str
    """
    return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(mtime))


def mode2string(mode):
    """
    Function to convert a mode value to a string representation.
    
    @param mode mode value
    @type int
    @return string representation of the given mode value
    @rtype str
    """
    return stat.filemode(mode)


def decoratedName(name, mode, isDir=False):
    """
    Function to decorate the given name according to the given mode.
    
    @param name file or directory name
    @type str
    @param mode mode value
    @type int
    @param isDir flag indicating that name is a directory
    @type bool
    @return decorated file or directory name
    @rtype str
    """
    if stat.S_ISDIR(mode) or isDir:
        # append a '/' for directories
        return name + "/"
    else:
        # no change
        return name

eric ide

mercurial