Started to implement the device file system interface. micropython

Sat, 20 Jul 2019 14:48:09 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sat, 20 Jul 2019 14:48:09 +0200
branch
micropython
changeset 7070
3368ce0e7879
parent 7069
a09a30251d4e
child 7077
3b7475b7a1ef

Started to implement the device file system interface.

eric6/MicroPython/MicroPythonFileSystem.py file | annotate | diff | comparison | revisions
eric6/MicroPython/MicroPythonSerialPort.py file | annotate | diff | comparison | revisions
--- a/eric6/MicroPython/MicroPythonFileSystem.py	Sat Jul 20 14:47:24 2019 +0200
+++ b/eric6/MicroPython/MicroPythonFileSystem.py	Sat Jul 20 14:48:09 2019 +0200
@@ -9,7 +9,11 @@
 
 from __future__ import unicode_literals
 
-from PyQt5.QtCore import QObject
+import ast
+import time
+import stat
+
+from PyQt5.QtCore import QObject, QThread
 
 
 class MicroPythonFileSystem(QObject):
@@ -25,6 +29,9 @@
     <li>pwd: get the current directory</li>
     <li>put: copy a file to the connected device</li>
     <li>get: get a file from the connected device</li>
+    <li>rm: remove a file from the connected device</li>
+    <li>mkdir: create a new directory</li>
+    <li>rmdir: remove an empty directory</li>
     </ul>
     """
     def __init__(self, parent=None):
@@ -35,35 +42,193 @@
         @type QObject
         """
         super(MicroPythonFileSystem, self).__init__(parent)
+        
+        self.__serial = None
     
-    def ls(self):
+    def setSerial(self, serial):
+        """
+        Public method to set the serial port to be used.
+        
+        Note: The serial port should be initialized and open already.
+        
+        @param serial open serial port
+        @type MicroPythonSerialPort
+        """
+        self.__serial = serial
+    
+    def __rawOn(self):
+        """
+        Private method to switch the connected device to 'raw' mode.
+        
+        Note: switching to raw mode is done with synchroneous writes.
+        """
+        if not self.__serial:
+            return
+        
+        rawReplMessage = b"raw REPL; CTRL-B to exit\r\n"
+        softRebootMessage = b"soft reboot\r\n"
+        
+        self.__serial.write(b"\x02")        # end raw mode if required
+        self.__serial.waitForBytesWritten()
+        for _i in range(3):
+            # CTRL-C three times to break out of loops
+            self.__serial.write(b"\r\x03")
+            self.__serial.waitForBytesWritten()
+            QThread.msleep(10)
+        self.__serial.readAll()             # read all data and discard it
+        self.__serial.write(b"\r\x01")      # send CTRL-A to enter raw mode
+        self.__serial.readUntil(rawReplMessage)
+        self.__serial.write(b"\x04")        # send CTRL-D to soft reset
+        self.__serial.readUntil(softRebootMessage)
+        
+        # some MicroPython devices seem to need to be convinced in some
+        # special way
+        data = self.__serial.readUntil(rawReplMessage)
+        if not data.endswith(rawReplMessage):
+            self.__serial.write(b"\r\x01")  # send CTRL-A again
+            self.__serial.readUntil(rawReplMessage)
+        self.__serial.readAll()             # read all data and discard it
+    
+    def __rawOff(self):
+        """
+        Private method to switch 'raw' mode off.
+        """
+        if self.__serial:
+            self.__serial.write(b"\x02")    # send CTRL-B to cancel raw mode
+    
+    def __execute(self, commands):
+        """
+        Private method to send commands to the connected device and return the
+        result.
+        
+        If no serial connection is available, empty results will be returned.
+        
+        @param commands list of commands to be executed
+        @type str
+        @return tuple containing stdout and stderr output of the device
+        @rtype tuple of (bytes, bytes)
+        """
+        if not self.__serial:
+            return b"", b""
+        
+        result = bytearray()
+        err = b""
+        
+        self.__rawOn()
+        QThread.msleep(10)
+        for command in commands:
+            if command:
+                commandBytes = command.encode("utf-8")
+                self.__serial.write(commandBytes + b"\x04")
+                # read until prompt
+                response = self.__serial.readUntil(b"\x04>")
+                # split stdout, stderr
+                out, err = response[2:-2].split(b"\x04")
+                result += out
+                if err:
+                    return b"", err
+        QThread.msleep(10)
+        self.__rawOff()
+        
+        return bytes(result), err
+    
+    def __shortError(self, error):
+        """
+        Private method to create a shortened error message.
+        
+        @param error verbose error message
+        @type bytes
+        @return shortened error message
+        @rtype str
+        """
+        if error:
+            decodedError = error.decode("utf-8")
+            try:
+                return decodedError.split["\r\n"][-2]
+            except Exception:
+                return decodedError
+        return self.tr("Detected an error without indications.")
+    
+    ##################################################################
+    ## Methods below implement the file system commands
+    ##################################################################
+    
+    def ls(self, dirname=""):
         """
         Public method to get a directory listing of the connected device.
         
+        @param dirname name of the directory to be listed
+        @type str
         @return tuple containg the directory listing
         @rtype tuple of str
+        @exception IOError raised to indicate an issue with the device
         """
-        # TODO: not implemented yet
+        commands = [
+            "import os",
+            "print(os.listdir('{0}'))".format(dirname),
+        ]
+        out, err = self.__execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+        return ast.literal_eval(out.decode("utf-8"))
     
-    def lls(self):
+    def lls(self, dirname=""):
         """
         Public method to get a long directory listing of the connected device
         including meta data.
         
-        @return tuple containg the the directory listing with tuple entries
-            containing the name, size, time and mode
+        @param dirname name of the directory to be listed
+        @type str
+        @return list containing the the directory listing with tuple entries
+            of the name and and a tuple of mode, size and time
         @rtype tuple of str
+        @exception IOError raised to indicate an issue with the device
         """
-        # TODO: not implemented yet
+        commands = [
+            "import os",
+            "\n".join([
+                "def stat(filename):",
+                "    try:",
+                "        rstat = os.lstat(filename)",
+                "    except:",
+                "        rstat = os.stat(filename)",
+                "    return tuple(rstat)",
+            ]),
+            "\n".join([
+                "def listdir_stat(dirname):",
+                "    try:",
+                "        files = os.listdir(dirname)",
+                "    except OSError:",
+                "        return []",
+                "    if dirname in ('', '/'):",
+                "        return list((f, stat(f)) for f in files)",
+                "    return list((f, stat(dirname + '/' + f)) for f in files)",
+            ]),
+            "print(listdir_stat('{0}'))".format(dirname),
+        ]
+        out, err = self.__execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+        fileslist = ast.literal_eval(out.decode("utf-8"))
+        return [(f, (s[0], s[6], s[8])) for f, s in fileslist]
     
-    def cd(self, path):
+    def cd(self, dirname):
         """
         Public method to change the current directory on the connected device.
         
-        @param path directory to change to
+        @param dirname directory to change to
         @type str
+        @exception IOError raised to indicate an issue with the device
         """
-        # TODO: not implemented yet
+        assert dirname
+        
+        commands = [
+            "import os",
+            "os.chdir('{0}')".format(dirname),
+        ]
+        out, err = self.__execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
     
     def pwd(self):
         """
@@ -71,8 +236,70 @@
         
         @return current directory
         @rtype str
+        @exception IOError raised to indicate an issue with the device
         """
-        # TODO: not implemented yet
+        commands = [
+            "import os",
+            "print(os.getcwd())",
+        ]
+        out, err = self.__execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+        return out.decode("utf-8").strip()
+     
+    def rm(self, filename):
+        """
+        Public method to remove a file from the connected device.
+        
+        @param filename name of the file to be removed
+        @type str
+        @exception IOError raised to indicate an issue with the device
+        """
+        assert filename
+        
+        commands = [
+            "import os",
+            "os.remove('{0}')".format(filename),
+        ]
+        out, err = self.__execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+    
+    def mkdir(self, dirname):
+        """
+        Public method to create a new directory.
+        
+        @param dirname name of the directory to create
+        @type str
+        @exception IOError raised to indicate an issue with the device
+        """
+        assert dirname
+   
+        commands = [
+            "import os",
+            "os.mkdir('{0}')".format(dirname),
+        ]
+        out, err = self.__execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
+    
+    def rmdir(self, dirname):
+        """
+        Public method to remove a directory.
+        
+        @param dirname name of the directory to be removed
+        @type str
+        @exception IOError raised to indicate an issue with the device
+        """
+        assert dirname
+   
+        commands = [
+            "import os",
+            "os.rmdir('{0}')".format(dirname),
+        ]
+        out, err = self.__execute(commands)
+        if err:
+            raise IOError(self.__shortError(err))
     
     def put(self, hostFileName, deviceFileName):
         """
@@ -84,6 +311,7 @@
         @type str
         @return flag indicating success
         @rtype bool
+        @exception IOError raised to indicate an issue with the device
         """
         # TODO: not implemented yet
     
@@ -97,5 +325,63 @@
         @type str
         @return flag indicating success
         @rtype bool
+        @exception IOError raised to indicate an issue with the device
         """
         # TODO: not implemented yet
+    
+    ##################################################################
+    ## Utility methods below
+    ##################################################################
+    
+    def mtime2string(self, mtime):
+        """
+        Public method to convert a time value to a string representation.
+        
+        @param mtime time value
+        @type int
+        @return string representation of the given time
+        @rtype str
+        """
+        return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(mtime))
+    
+    def mode2string(self, mode):
+        """
+        Public method to convert a mode value to a string representation.
+        
+        @param mode mode value
+        @type int
+        @return string representation of the given mode value
+        @rtype str
+        """
+        return stat.filemode(mode)
+# TODO: remove this
+##
+##if __name__ == "__main__":
+##    from PyQt5.QtCore import QCoreApplication, QTimer
+##    from MicroPythonSerialPort import MicroPythonSerialPort
+##    
+##    app = QCoreApplication([])
+##    
+##    serial = MicroPythonSerialPort()
+##    serial.openSerialLink("/dev/ttyUSB0")
+##    fs = MicroPythonFileSystem()
+##    fs.setSerial(serial)
+##    
+##    def tf():
+##        fs.cd("/flash")
+##        print(fs.pwd())
+##        fs.cd("odroid_go")
+##        print(fs.pwd())
+##        ll = fs.lls()
+##        print(ll)
+##        for f, (m, s, t) in ll:
+##            print(fs.mode2string(m), s, fs.mtime2string(t), f)
+##        fs.cd("..")
+##        print(fs.pwd())
+##        ll = fs.lls("odroid_go")
+##        print(ll)
+##        for f, (m, s, t) in ll:
+##            print(fs.mode2string(m), s, fs.mtime2string(t), f)
+##    
+##    QTimer.singleShot(0, tf)
+##    app.exec_()
--- a/eric6/MicroPython/MicroPythonSerialPort.py	Sat Jul 20 14:47:24 2019 +0200
+++ b/eric6/MicroPython/MicroPythonSerialPort.py	Sat Jul 20 14:48:09 2019 +0200
@@ -4,29 +4,43 @@
 #
 
 """
-Module implementing a QSerialPort with additional functionality.
+Module implementing a QSerialPort with additional functionality for
+MicroPython devices.
 """
 
 from __future__ import unicode_literals
 
-from PyQt5.QtCore import QIODevice
+from PyQt5.QtCore import QIODevice, QTime
 from PyQt5.QtSerialPort import QSerialPort
 
 
 class MicroPythonSerialPort(QSerialPort):
     """
-    Class implementing a QSerialPort with additional functionality
+    Class implementing a QSerialPort with additional functionality for
+    MicroPython devices.
     """
-    def __init__(self, parent=None):
+    def __init__(self, timeout=10000, parent=None):
         """
         Constructor
         
+        @param timeout timout in milliseconds to be set
+        @type int
         @param parent reference to the parent object
         @type QObject
         """
         super(MicroPythonSerialPort, self).__init__(parent)
         
         self.__connected = False
+        self.__timeout = timeout      # 10s default timeout
+    
+    def setTimeout(self, timeout):
+        """
+        Public method to set the timeout for device operations.
+        
+        @param timeout timout in milliseconds to be set
+        @type int
+        """
+        self.__timeout = timeout
     
     def openSerialLink(self, port):
         """
@@ -70,7 +84,7 @@
         return self.__connected
     
     def readUntil(self, expected=b"\n", size=None):
-        """
+        r"""
         Public method to read data until an expected sequence is found
         (default: \n) or a specific size is exceeded.
         
@@ -81,19 +95,27 @@
         @return bytes read from the device including the expected sequence
         @rtype bytes
         """
+        from PyQt5.QtCore import QCoreApplication, QEventLoop
         data = bytearray()
+        
+        t = QTime()
+        t.start()
         while True:
-            if self.waitForReadyRead():
-                c = bytes(self.read(1))
-                if c:
-                    data += c
-                    if data.endswith(expected):
-                        break
-                    if size is not None and len(data) >= size:
-                        break
-                else:
+            # TODO: check if this is still needed when used with a QThread
+            QCoreApplication.processEvents(QEventLoop.ExcludeUserInputEvents)
+##            if self.waitForReadyRead(self.__timeout):
+            c = bytes(self.read(1))
+            if c:
+                data += c
+                if data.endswith(expected):
                     break
-            else:
+                if size is not None and len(data) >= size:
+                    break
+#            else:
+#                break
+            if t.elapsed() > self.__timeout:
                 break
+##            else:
+##                break
         
         return bytes(data)

eric ide

mercurial