Sat, 20 Jul 2019 14:48:09 +0200
Started to implement the device file system interface.
# -*- coding: utf-8 -*- # Copyright (c) 2019 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing some file system commands for MicroPython. """ from __future__ import unicode_literals import ast import time import stat from PyQt5.QtCore import QObject, QThread class MicroPythonFileSystem(QObject): """ Class implementing some file system commands for MicroPython. Some FTP like commands are provided to perform operations on the file system of a connected MicroPython device. Supported commands are: <ul> <li>ls: directory listing</li> <li>lls: directory listing with meta data</li> <li>cd: change directory</li> <li>pwd: get the current directory</li> <li>put: copy a file to the connected device</li> <li>get: get a file from the connected device</li> <li>rm: remove a file from the connected device</li> <li>mkdir: create a new directory</li> <li>rmdir: remove an empty directory</li> </ul> """ def __init__(self, parent=None): """ Constructor @param parent reference to the parent object @type QObject """ super(MicroPythonFileSystem, self).__init__(parent) self.__serial = None def setSerial(self, serial): """ Public method to set the serial port to be used. Note: The serial port should be initialized and open already. @param serial open serial port @type MicroPythonSerialPort """ self.__serial = serial def __rawOn(self): """ Private method to switch the connected device to 'raw' mode. Note: switching to raw mode is done with synchroneous writes. """ if not self.__serial: return rawReplMessage = b"raw REPL; CTRL-B to exit\r\n" softRebootMessage = b"soft reboot\r\n" self.__serial.write(b"\x02") # end raw mode if required self.__serial.waitForBytesWritten() for _i in range(3): # CTRL-C three times to break out of loops self.__serial.write(b"\r\x03") self.__serial.waitForBytesWritten() QThread.msleep(10) self.__serial.readAll() # read all data and discard it self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode self.__serial.readUntil(rawReplMessage) self.__serial.write(b"\x04") # send CTRL-D to soft reset self.__serial.readUntil(softRebootMessage) # some MicroPython devices seem to need to be convinced in some # special way data = self.__serial.readUntil(rawReplMessage) if not data.endswith(rawReplMessage): self.__serial.write(b"\r\x01") # send CTRL-A again self.__serial.readUntil(rawReplMessage) self.__serial.readAll() # read all data and discard it def __rawOff(self): """ Private method to switch 'raw' mode off. """ if self.__serial: self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode def __execute(self, commands): """ Private method to send commands to the connected device and return the result. If no serial connection is available, empty results will be returned. @param commands list of commands to be executed @type str @return tuple containing stdout and stderr output of the device @rtype tuple of (bytes, bytes) """ if not self.__serial: return b"", b"" result = bytearray() err = b"" self.__rawOn() QThread.msleep(10) for command in commands: if command: commandBytes = command.encode("utf-8") self.__serial.write(commandBytes + b"\x04") # read until prompt response = self.__serial.readUntil(b"\x04>") # split stdout, stderr out, err = response[2:-2].split(b"\x04") result += out if err: return b"", err QThread.msleep(10) self.__rawOff() return bytes(result), err def __shortError(self, error): """ Private method to create a shortened error message. @param error verbose error message @type bytes @return shortened error message @rtype str """ if error: decodedError = error.decode("utf-8") try: return decodedError.split["\r\n"][-2] except Exception: return decodedError return self.tr("Detected an error without indications.") ################################################################## ## Methods below implement the file system commands ################################################################## def ls(self, dirname=""): """ Public method to get a directory listing of the connected device. @param dirname name of the directory to be listed @type str @return tuple containg the directory listing @rtype tuple of str @exception IOError raised to indicate an issue with the device """ commands = [ "import os", "print(os.listdir('{0}'))".format(dirname), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) return ast.literal_eval(out.decode("utf-8")) def lls(self, dirname=""): """ Public method to get a long directory listing of the connected device including meta data. @param dirname name of the directory to be listed @type str @return list containing the the directory listing with tuple entries of the name and and a tuple of mode, size and time @rtype tuple of str @exception IOError raised to indicate an issue with the device """ commands = [ "import os", "\n".join([ "def stat(filename):", " try:", " rstat = os.lstat(filename)", " except:", " rstat = os.stat(filename)", " return tuple(rstat)", ]), "\n".join([ "def listdir_stat(dirname):", " try:", " files = os.listdir(dirname)", " except OSError:", " return []", " if dirname in ('', '/'):", " return list((f, stat(f)) for f in files)", " return list((f, stat(dirname + '/' + f)) for f in files)", ]), "print(listdir_stat('{0}'))".format(dirname), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) fileslist = ast.literal_eval(out.decode("utf-8")) return [(f, (s[0], s[6], s[8])) for f, s in fileslist] def cd(self, dirname): """ Public method to change the current directory on the connected device. @param dirname directory to change to @type str @exception IOError raised to indicate an issue with the device """ assert dirname commands = [ "import os", "os.chdir('{0}')".format(dirname), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) def pwd(self): """ Public method to get the current directory of the connected device. @return current directory @rtype str @exception IOError raised to indicate an issue with the device """ commands = [ "import os", "print(os.getcwd())", ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) return out.decode("utf-8").strip() def rm(self, filename): """ Public method to remove a file from the connected device. @param filename name of the file to be removed @type str @exception IOError raised to indicate an issue with the device """ assert filename commands = [ "import os", "os.remove('{0}')".format(filename), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) def mkdir(self, dirname): """ Public method to create a new directory. @param dirname name of the directory to create @type str @exception IOError raised to indicate an issue with the device """ assert dirname commands = [ "import os", "os.mkdir('{0}')".format(dirname), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) def rmdir(self, dirname): """ Public method to remove a directory. @param dirname name of the directory to be removed @type str @exception IOError raised to indicate an issue with the device """ assert dirname commands = [ "import os", "os.rmdir('{0}')".format(dirname), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) def put(self, hostFileName, deviceFileName): """ Public method to copy a local file to the connected device. @param hostFileName name of the file to be copied @type str @param deviceFileName name of the file to copy to @type str @return flag indicating success @rtype bool @exception IOError raised to indicate an issue with the device """ # TODO: not implemented yet def get(self, deviceFileName, hostFileName): """ Public method to copy a file from the connected device. @param deviceFileName name of the file to copy @type str @param hostFileName name of the file to copy to @type str @return flag indicating success @rtype bool @exception IOError raised to indicate an issue with the device """ # TODO: not implemented yet ################################################################## ## Utility methods below ################################################################## def mtime2string(self, mtime): """ Public method to convert a time value to a string representation. @param mtime time value @type int @return string representation of the given time @rtype str """ return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(mtime)) def mode2string(self, mode): """ Public method to convert a mode value to a string representation. @param mode mode value @type int @return string representation of the given mode value @rtype str """ return stat.filemode(mode) # TODO: remove this ## ##if __name__ == "__main__": ## from PyQt5.QtCore import QCoreApplication, QTimer ## from MicroPythonSerialPort import MicroPythonSerialPort ## ## app = QCoreApplication([]) ## ## serial = MicroPythonSerialPort() ## serial.openSerialLink("/dev/ttyUSB0") ## fs = MicroPythonFileSystem() ## fs.setSerial(serial) ## ## def tf(): ## fs.cd("/flash") ## print(fs.pwd()) ## fs.cd("odroid_go") ## print(fs.pwd()) ## ll = fs.lls() ## print(ll) ## for f, (m, s, t) in ll: ## print(fs.mode2string(m), s, fs.mtime2string(t), f) ## fs.cd("..") ## print(fs.pwd()) ## ll = fs.lls("odroid_go") ## print(ll) ## for f, (m, s, t) in ll: ## print(fs.mode2string(m), s, fs.mtime2string(t), f) ## ## QTimer.singleShot(0, tf) ## app.exec_()