Mon, 22 Jul 2019 20:17:33 +0200
MicroPython: continued implementing the file manager widget.
# -*- coding: utf-8 -*- # Copyright (c) 2019 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing some file system commands for MicroPython. """ from __future__ import unicode_literals import ast import time import stat import os from PyQt5.QtCore import pyqtSlot, pyqtSignal, QObject, QThread from .MicroPythonSerialPort import MicroPythonSerialPort class MicroPythonFileSystem(QObject): """ Class implementing some file system commands for MicroPython. Some FTP like commands are provided to perform operations on the file system of a connected MicroPython device. Supported commands are: <ul> <li>ls: directory listing</li> <li>lls: directory listing with meta data</li> <li>cd: change directory</li> <li>pwd: get the current directory</li> <li>put: copy a file to the connected device</li> <li>get: get a file from the connected device</li> <li>rm: remove a file from the connected device</li> <li>mkdir: create a new directory</li> <li>rmdir: remove an empty directory</li> <li>version: get version info about MicroPython</li> </ul> """ def __init__(self, parent=None): """ Constructor @param parent reference to the parent object @type QObject """ super(MicroPythonFileSystem, self).__init__(parent) self.__serial = None def setSerial(self, serial): """ Public method to set the serial port to be used. Note: The serial port should be initialized and open already. @param serial open serial port @type MicroPythonSerialPort """ self.__serial = serial def __rawOn(self): """ Private method to switch the connected device to 'raw' mode. Note: switching to raw mode is done with synchroneous writes. """ if not self.__serial: return rawReplMessage = b"raw REPL; CTRL-B to exit\r\n" softRebootMessage = b"soft reboot\r\n" self.__serial.write(b"\x02") # end raw mode if required self.__serial.waitForBytesWritten() for _i in range(3): # CTRL-C three times to break out of loops self.__serial.write(b"\r\x03") self.__serial.waitForBytesWritten() QThread.msleep(10) self.__serial.readAll() # read all data and discard it self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode self.__serial.readUntil(rawReplMessage) self.__serial.write(b"\x04") # send CTRL-D to soft reset self.__serial.readUntil(softRebootMessage) # some MicroPython devices seem to need to be convinced in some # special way data = self.__serial.readUntil(rawReplMessage) if not data.endswith(rawReplMessage): self.__serial.write(b"\r\x01") # send CTRL-A again self.__serial.readUntil(rawReplMessage) self.__serial.readAll() # read all data and discard it def __rawOff(self): """ Private method to switch 'raw' mode off. """ if self.__serial: self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode def __execute(self, commands): """ Private method to send commands to the connected device and return the result. If no serial connection is available, empty results will be returned. @param commands list of commands to be executed @type str @return tuple containing stdout and stderr output of the device @rtype tuple of (bytes, bytes) """ if not self.__serial: return b"", b"" result = bytearray() err = b"" self.__rawOn() QThread.msleep(10) for command in commands: if command: commandBytes = command.encode("utf-8") self.__serial.write(commandBytes + b"\x04") # read until prompt response = self.__serial.readUntil(b"\x04>") # split stdout, stderr out, err = response[2:-2].split(b"\x04") result += out if err: return b"", err QThread.msleep(10) self.__rawOff() return bytes(result), err def __shortError(self, error): """ Private method to create a shortened error message. @param error verbose error message @type bytes @return shortened error message @rtype str """ if error: decodedError = error.decode("utf-8") try: return decodedError.split["\r\n"][-2] except Exception: return decodedError return self.tr("Detected an error without indications.") ################################################################## ## Methods below implement the file system commands ################################################################## def ls(self, dirname=""): """ Public method to get a directory listing of the connected device. @param dirname name of the directory to be listed @type str @return tuple containg the directory listing @rtype tuple of str @exception IOError raised to indicate an issue with the device """ commands = [ "import os", "print(os.listdir('{0}'))".format(dirname), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) return ast.literal_eval(out.decode("utf-8")) def lls(self, dirname=""): """ Public method to get a long directory listing of the connected device including meta data. @param dirname name of the directory to be listed @type str @return list containing the the directory listing with tuple entries of the name and and a tuple of mode, size and time @rtype tuple of str @exception IOError raised to indicate an issue with the device """ commands = [ "import os", "\n".join([ "def stat(filename):", " try:", " rstat = os.lstat(filename)", " except:", " rstat = os.stat(filename)", " return tuple(rstat)", ]), "\n".join([ "def listdir_stat(dirname):", " try:", " files = os.listdir(dirname)", " except OSError:", " return []", " if dirname in ('', '/'):", " return list((f, stat(f)) for f in files)", " return list((f, stat(dirname + '/' + f)) for f in files)", ]), "print(listdir_stat('{0}'))".format(dirname), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) fileslist = ast.literal_eval(out.decode("utf-8")) return [(f, (s[0], s[6], s[8])) for f, s in fileslist] def cd(self, dirname): """ Public method to change the current directory on the connected device. @param dirname directory to change to @type str @exception IOError raised to indicate an issue with the device """ assert dirname commands = [ "import os", "os.chdir('{0}')".format(dirname), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) def pwd(self): """ Public method to get the current directory of the connected device. @return current directory @rtype str @exception IOError raised to indicate an issue with the device """ commands = [ "import os", "print(os.getcwd())", ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) return out.decode("utf-8").strip() def rm(self, filename): """ Public method to remove a file from the connected device. @param filename name of the file to be removed @type str @exception IOError raised to indicate an issue with the device """ assert filename commands = [ "import os", "os.remove('{0}')".format(filename), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) def mkdir(self, dirname): """ Public method to create a new directory. @param dirname name of the directory to create @type str @exception IOError raised to indicate an issue with the device """ assert dirname commands = [ "import os", "os.mkdir('{0}')".format(dirname), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) def rmdir(self, dirname): """ Public method to remove a directory. @param dirname name of the directory to be removed @type str @exception IOError raised to indicate an issue with the device """ assert dirname commands = [ "import os", "os.rmdir('{0}')".format(dirname), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) def put(self, hostFileName, deviceFileName=None): """ Public method to copy a local file to the connected device. @param hostFileName name of the file to be copied @type str @param deviceFileName name of the file to copy to @type str @return flag indicating success @rtype bool @exception IOError raised to indicate an issue with the device """ if not os.path.isfile(hostFileName): raise IOError("No such file: {0}".format(hostFileName)) with open(hostFileName, "rb") as hostFile: content = hostFile.read() if not deviceFileName: deviceFileName = os.path.basename(hostFileName) commands = [ "fd = open('{0}', 'wb')".format(deviceFileName), "f = fd.write", ] while content: chunk = content[:64] commands.append("f(" + repr(chunk) + ")") content = content[64:] commands.append("fd.close()") out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) return True def get(self, deviceFileName, hostFileName=None): """ Public method to copy a file from the connected device. @param deviceFileName name of the file to copy @type str @param hostFileName name of the file to copy to @type str @return flag indicating success @rtype bool @exception IOError raised to indicate an issue with the device """ if not hostFileName: hostFileName = deviceFileName commands = [ "\n".join([ "try:", " from microbit import uart as u", "except ImportError:", " try:", " from machine import UART", " u = UART(0, {0})".format(115200), " except Exception:", " try:", " from sys import stdout as u", " except Exception:", " raise Exception('Could not find UART module in" " device.')", ]), "f = open('{0}', 'rb')".format(deviceFileName), "r = f.read", "result = True", "\n".join([ "while result:", " result = r(32)" " if result:", " u.write(result)", ]), "f.close()", ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) # write the received bytes to the local file with open(hostFileName, "wb") as hostFile: hostFile.write(out) return True # TODO: add rsync function def version(self): """ Public method to get the MicroPython version information of the connected device. @return dictionary containing the version information @rtype dict @exception ValueError raised to indicate that the device might not be running MicroPython or there was an issue parsing the output """ commands = [ "import os", "print(os.uname())", ] try: out, err = self.__execute(commands) if err: raise ValueError(self.__shortError(err)) except ValueError: # just re-raise it raise except Exception: # Raise a value error to indicate being unable to find something # on the device that will return parseable information about the # version. It doesn't matter what the error is, it just needs to # report a failure with the expected ValueError exception. raise ValueError("Unable to determine version information.") rawOutput = out.decode("utf-8").strip() rawOutput = rawOutput[1:-1] items = rawOutput.split(",") result = {} for item in items: key, value = item.strip().split("=") result[key.strip()] = value.strip()[1:-1] return result def syncTime(self): """ Public method to set the time of the connected device to the local computer's time. @exception IOError raised to indicate an issue with the device """ now = time.localtime(time.time()) commands = [ "\n".join([ "def set_time(rtc_time):", " rtc = None", " try:", # Pyboard (it doesn't have machine.RTC()) " import pyb", " rtc = pyb.RTC()", " rtc.datetime(rtc_time)", " except:", " try:", " import machine", " rtc = machine.RTC()", " try:", # ESP8266 uses rtc.datetime() " rtc.datetime(rtc_time)", " except:", # ESP32 uses rtc.init() " rtc.init(rtc_time)", " except:", " pass", ]), "set_time({0})".format((now.tm_year, now.tm_mon, now.tm_mday, now.tm_wday + 1, now.tm_hour, now.tm_min, now.tm_sec, 0)) ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) class MicroPythonFileManager(QObject): """ Class implementing an interface to the device file system commands with some additional sugar. @signal longListFiles(result) emitted with a tuple of tuples containing the name, mode, size and time for each directory entry @signal currentDir(dirname) emitted to report the current directory of the device @signal getFileDone(deviceFile, localFile) emitted after the file was fetched from the connected device and written to the local file system @signal putFileDone(localFile, deviceFile) emitted after the file was copied to the connected device @signal deleteFileDone(deviceFile) emitted after the file has been deleted on the connected device @signal longListFilesFailed(exc) emitted with a failure message to indicate a failed long listing operation @signal currentDirFailed(exc) emitted with a failure message to indicate that the current directory is not available @signal getFileFailed(exc) emitted with a failure message to indicate that the file could not be fetched @signal putFileFailed(exc) emitted with a failure message to indicate that the file could not be copied @signal deleteFileFailed(exc) emitted with a failure message to indicate that the file could not be deleted on the device """ longListFiles = pyqtSignal(tuple) currentDir = pyqtSignal(str) getFileDone = pyqtSignal(str, str) putFileDone = pyqtSignal(str, str) deleteFileDone = pyqtSignal(str) longListFilesFailed = pyqtSignal(str) currentDirFailed = pyqtSignal(str) getFileFailed = pyqtSignal(str) putFileFailed = pyqtSignal(str) deleteFileFailed = pyqtSignal(str) def __init__(self, port, parent=None): """ Constructor @param port port name of the device @type str @param parent reference to the parent object @type QObject """ super(MicroPythonFileManager, self).__init__(parent) self.__serialPort = port self.__serial = MicroPythonSerialPort(parent=self) self.__fs = MicroPythonFileSystem(parent=self) @pyqtSlot() def connect(self): """ Public slot to start the manager. """ self.__serial.openSerialLink(self.__serialPort) self.__fs.setSerial(self.__serial) @pyqtSlot() def disconnect(self): """ Public slot to stop the thread. """ self.__serial.closeSerialLink() @pyqtSlot(str) def lls(self, dirname): """ Public slot to get a long listing of the given directory. @param dirname name of the directory to list @type str """ try: filesList = self.__fs.lls(dirname) result = [(decoratedName(name, mode), mode2string(mode), str(size), mtime2string(time)) for name, (mode, size, time) in filesList] self.longListFiles.emit(tuple(result)) except Exception as exc: self.longListFilesFailed.emit(str(exc)) @pyqtSlot() def pwd(self): """ Public slot to get the current directory of the device. """ try: pwd = self.__fs.pwd() self.currentDir.emit(pwd) except Exception as exc: self.currentDirFailed.emit(str(exc)) @pyqtSlot(str) @pyqtSlot(str, str) def get(self, deviceFileName, hostFileName=""): """ Public slot to get a file from the connected device. @param deviceFileName name of the file on the device @type str @param hostFileName name of the local file @type str """ if hostFileName and os.path.isdir(hostFileName): # only a local directory was given hostFileName = os.path.join(hostFileName, os.path.basename(deviceFileName)) try: self.__fs.get(deviceFileName, hostFileName) self.getFileDone.emit(deviceFileName, hostFileName) except Exception as exc: self.getFileFailed.emit(str(exc)) @pyqtSlot(str) @pyqtSlot(str, str) def put(self, hostFileName, deviceFileName=""): """ Public slot to put a file onto the device. @param hostFileName name of the local file @type str @param deviceFileName name of the file on the connected device @type str """ try: self.__fs.put(hostFileName, deviceFileName) self.putFileDone(hostFileName, deviceFileName) except Exception as exc: self.putFileFailed(str(exc)) @pyqtSlot(str) def delete(self, deviceFileName): """ Public slot to delete a file on the device. @param deviceFileName name of the file on the connected device @type str """ try: self.__fs.rm(deviceFileName) self.deleteFileDone.emit(deviceFileName) except Exception as exc: self.deleteFileFailed(str(exc)) ################################################################## ## Utility methods below ################################################################## def mtime2string(mtime): """ Function to convert a time value to a string representation. @param mtime time value @type int @return string representation of the given time @rtype str """ return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(mtime)) def mode2string(mode): """ Function to convert a mode value to a string representation. @param mode mode value @type int @return string representation of the given mode value @rtype str """ return stat.filemode(mode) def decoratedName(name, mode, isDir=False): """ Function to decorate the given name according to the given mode. @param name file or directory name @type str @param mode mode value @type int @param isDir flag indicating that name is a directory @type bool @return decorated file or directory name @rtype str """ if stat.S_ISDIR(mode) or isDir: # append a '/' for directories return name + "/" else: # no change return name