eric6/MicroPython/MicroPythonFileSystem.py

Sat, 20 Jul 2019 14:48:09 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sat, 20 Jul 2019 14:48:09 +0200
branch
micropython
changeset 7070
3368ce0e7879
parent 7067
3fc4082fc6ba
child 7077
3b7475b7a1ef
permissions
-rw-r--r--

Started to implement the device file system interface.

# -*- coding: utf-8 -*-

# Copyright (c) 2019 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing some file system commands for MicroPython.
"""

from __future__ import unicode_literals

import ast
import time
import stat

from PyQt5.QtCore import QObject, QThread


class MicroPythonFileSystem(QObject):
    """
    Class implementing some file system commands for MicroPython.
    
    Some FTP like commands are provided to perform operations on the file
    system of a connected MicroPython device. Supported commands are:
    <ul>
    <li>ls: directory listing</li>
    <li>lls: directory listing with meta data</li>
    <li>cd: change directory</li>
    <li>pwd: get the current directory</li>
    <li>put: copy a file to the connected device</li>
    <li>get: get a file from the connected device</li>
    <li>rm: remove a file from the connected device</li>
    <li>mkdir: create a new directory</li>
    <li>rmdir: remove an empty directory</li>
    </ul>
    """
    def __init__(self, parent=None):
        """
        Constructor
        
        @param parent reference to the parent object
        @type QObject
        """
        super(MicroPythonFileSystem, self).__init__(parent)
        
        self.__serial = None
    
    def setSerial(self, serial):
        """
        Public method to set the serial port to be used.
        
        Note: The serial port should be initialized and open already.
        
        @param serial open serial port
        @type MicroPythonSerialPort
        """
        self.__serial = serial
    
    def __rawOn(self):
        """
        Private method to switch the connected device to 'raw' mode.
        
        Note: switching to raw mode is done with synchroneous writes.
        """
        if not self.__serial:
            return
        
        rawReplMessage = b"raw REPL; CTRL-B to exit\r\n"
        softRebootMessage = b"soft reboot\r\n"
        
        self.__serial.write(b"\x02")        # end raw mode if required
        self.__serial.waitForBytesWritten()
        for _i in range(3):
            # CTRL-C three times to break out of loops
            self.__serial.write(b"\r\x03")
            self.__serial.waitForBytesWritten()
            QThread.msleep(10)
        self.__serial.readAll()             # read all data and discard it
        self.__serial.write(b"\r\x01")      # send CTRL-A to enter raw mode
        self.__serial.readUntil(rawReplMessage)
        self.__serial.write(b"\x04")        # send CTRL-D to soft reset
        self.__serial.readUntil(softRebootMessage)
        
        # some MicroPython devices seem to need to be convinced in some
        # special way
        data = self.__serial.readUntil(rawReplMessage)
        if not data.endswith(rawReplMessage):
            self.__serial.write(b"\r\x01")  # send CTRL-A again
            self.__serial.readUntil(rawReplMessage)
        self.__serial.readAll()             # read all data and discard it
    
    def __rawOff(self):
        """
        Private method to switch 'raw' mode off.
        """
        if self.__serial:
            self.__serial.write(b"\x02")    # send CTRL-B to cancel raw mode
    
    def __execute(self, commands):
        """
        Private method to send commands to the connected device and return the
        result.
        
        If no serial connection is available, empty results will be returned.
        
        @param commands list of commands to be executed
        @type str
        @return tuple containing stdout and stderr output of the device
        @rtype tuple of (bytes, bytes)
        """
        if not self.__serial:
            return b"", b""
        
        result = bytearray()
        err = b""
        
        self.__rawOn()
        QThread.msleep(10)
        for command in commands:
            if command:
                commandBytes = command.encode("utf-8")
                self.__serial.write(commandBytes + b"\x04")
                # read until prompt
                response = self.__serial.readUntil(b"\x04>")
                # split stdout, stderr
                out, err = response[2:-2].split(b"\x04")
                result += out
                if err:
                    return b"", err
        QThread.msleep(10)
        self.__rawOff()
        
        return bytes(result), err
    
    def __shortError(self, error):
        """
        Private method to create a shortened error message.
        
        @param error verbose error message
        @type bytes
        @return shortened error message
        @rtype str
        """
        if error:
            decodedError = error.decode("utf-8")
            try:
                return decodedError.split["\r\n"][-2]
            except Exception:
                return decodedError
        return self.tr("Detected an error without indications.")
    
    ##################################################################
    ## Methods below implement the file system commands
    ##################################################################
    
    def ls(self, dirname=""):
        """
        Public method to get a directory listing of the connected device.
        
        @param dirname name of the directory to be listed
        @type str
        @return tuple containg the directory listing
        @rtype tuple of str
        @exception IOError raised to indicate an issue with the device
        """
        commands = [
            "import os",
            "print(os.listdir('{0}'))".format(dirname),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
        return ast.literal_eval(out.decode("utf-8"))
    
    def lls(self, dirname=""):
        """
        Public method to get a long directory listing of the connected device
        including meta data.
        
        @param dirname name of the directory to be listed
        @type str
        @return list containing the the directory listing with tuple entries
            of the name and and a tuple of mode, size and time
        @rtype tuple of str
        @exception IOError raised to indicate an issue with the device
        """
        commands = [
            "import os",
            "\n".join([
                "def stat(filename):",
                "    try:",
                "        rstat = os.lstat(filename)",
                "    except:",
                "        rstat = os.stat(filename)",
                "    return tuple(rstat)",
            ]),
            "\n".join([
                "def listdir_stat(dirname):",
                "    try:",
                "        files = os.listdir(dirname)",
                "    except OSError:",
                "        return []",
                "    if dirname in ('', '/'):",
                "        return list((f, stat(f)) for f in files)",
                "    return list((f, stat(dirname + '/' + f)) for f in files)",
            ]),
            "print(listdir_stat('{0}'))".format(dirname),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
        fileslist = ast.literal_eval(out.decode("utf-8"))
        return [(f, (s[0], s[6], s[8])) for f, s in fileslist]
    
    def cd(self, dirname):
        """
        Public method to change the current directory on the connected device.
        
        @param dirname directory to change to
        @type str
        @exception IOError raised to indicate an issue with the device
        """
        assert dirname
        
        commands = [
            "import os",
            "os.chdir('{0}')".format(dirname),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
    
    def pwd(self):
        """
        Public method to get the current directory of the connected device.
        
        @return current directory
        @rtype str
        @exception IOError raised to indicate an issue with the device
        """
        commands = [
            "import os",
            "print(os.getcwd())",
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
        return out.decode("utf-8").strip()
     
    def rm(self, filename):
        """
        Public method to remove a file from the connected device.
        
        @param filename name of the file to be removed
        @type str
        @exception IOError raised to indicate an issue with the device
        """
        assert filename
        
        commands = [
            "import os",
            "os.remove('{0}')".format(filename),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
    
    def mkdir(self, dirname):
        """
        Public method to create a new directory.
        
        @param dirname name of the directory to create
        @type str
        @exception IOError raised to indicate an issue with the device
        """
        assert dirname
   
        commands = [
            "import os",
            "os.mkdir('{0}')".format(dirname),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
    
    def rmdir(self, dirname):
        """
        Public method to remove a directory.
        
        @param dirname name of the directory to be removed
        @type str
        @exception IOError raised to indicate an issue with the device
        """
        assert dirname
   
        commands = [
            "import os",
            "os.rmdir('{0}')".format(dirname),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
    
    def put(self, hostFileName, deviceFileName):
        """
        Public method to copy a local file to the connected device.
        
        @param hostFileName name of the file to be copied
        @type str
        @param deviceFileName name of the file to copy to
        @type str
        @return flag indicating success
        @rtype bool
        @exception IOError raised to indicate an issue with the device
        """
        # TODO: not implemented yet
    
    def get(self, deviceFileName, hostFileName):
        """
        Public method to copy a file from the connected device.
        
        @param deviceFileName name of the file to copy
        @type str
        @param hostFileName name of the file to copy to
        @type str
        @return flag indicating success
        @rtype bool
        @exception IOError raised to indicate an issue with the device
        """
        # TODO: not implemented yet
    
    ##################################################################
    ## Utility methods below
    ##################################################################
    
    def mtime2string(self, mtime):
        """
        Public method to convert a time value to a string representation.
        
        @param mtime time value
        @type int
        @return string representation of the given time
        @rtype str
        """
        return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(mtime))
    
    def mode2string(self, mode):
        """
        Public method to convert a mode value to a string representation.
        
        @param mode mode value
        @type int
        @return string representation of the given mode value
        @rtype str
        """
        return stat.filemode(mode)
# TODO: remove this
##
##if __name__ == "__main__":
##    from PyQt5.QtCore import QCoreApplication, QTimer
##    from MicroPythonSerialPort import MicroPythonSerialPort
##    
##    app = QCoreApplication([])
##    
##    serial = MicroPythonSerialPort()
##    serial.openSerialLink("/dev/ttyUSB0")
##    fs = MicroPythonFileSystem()
##    fs.setSerial(serial)
##    
##    def tf():
##        fs.cd("/flash")
##        print(fs.pwd())
##        fs.cd("odroid_go")
##        print(fs.pwd())
##        ll = fs.lls()
##        print(ll)
##        for f, (m, s, t) in ll:
##            print(fs.mode2string(m), s, fs.mtime2string(t), f)
##        fs.cd("..")
##        print(fs.pwd())
##        ll = fs.lls("odroid_go")
##        print(ll)
##        for f, (m, s, t) in ll:
##            print(fs.mode2string(m), s, fs.mtime2string(t), f)
##    
##    QTimer.singleShot(0, tf)
##    app.exec_()

eric ide

mercurial