Fri, 26 Jul 2019 20:05:49 +0200
MicroPython: continued implementing the file manager widget.
# -*- coding: utf-8 -*- # Copyright (c) 2019 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing some file system commands for MicroPython. """ from __future__ import unicode_literals import ast import time import os import stat from PyQt5.QtCore import pyqtSlot, pyqtSignal, QObject, QThread from .MicroPythonSerialPort import MicroPythonSerialPort from .MicroPythonFileSystemUtilities import ( mtime2string, mode2string, decoratedName, listdirStat ) class MicroPythonFileSystem(QObject): """ Class implementing some file system commands for MicroPython. Commands are provided to perform operations on the file system of a connected MicroPython device. Supported commands are: <ul> <li>ls: directory listing</li> <li>lls: directory listing with meta data</li> <li>cd: change directory</li> <li>pwd: get the current directory</li> <li>put: copy a file to the connected device</li> <li>get: get a file from the connected device</li> <li>rm: remove a file from the connected device</li> <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash) <li>mkdir: create a new directory</li> <li>rmdir: remove an empty directory</li> </ul> There are additional commands related to time and version. <ul> <li>version: get version info about MicroPython</li> <li>syncTime: synchronize the time of the connected device</li> <li>showTime: show the current time of the connected device</li> </ul> """ def __init__(self, parent=None): """ Constructor @param parent reference to the parent object @type QObject """ super(MicroPythonFileSystem, self).__init__(parent) self.__serial = None def setSerial(self, serial): """ Public method to set the serial port to be used. Note: The serial port should be initialized and open already. @param serial open serial port @type MicroPythonSerialPort """ self.__serial = serial def __rawOn(self): """ Private method to switch the connected device to 'raw' mode. Note: switching to raw mode is done with synchronous writes. @return flag indicating success @@rtype bool """ if not self.__serial: return False rawReplMessage = b"raw REPL; CTRL-B to exit\r\n" softRebootMessage = b"soft reboot\r\n" self.__serial.write(b"\x02") # end raw mode if required self.__serial.waitForBytesWritten() for _i in range(3): # CTRL-C three times to break out of loops self.__serial.write(b"\r\x03") self.__serial.waitForBytesWritten() QThread.msleep(10) self.__serial.readAll() # read all data and discard it self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode self.__serial.readUntil(rawReplMessage) if self.__serial.hasTimedOut(): return False self.__serial.write(b"\x04") # send CTRL-D to soft reset self.__serial.readUntil(softRebootMessage) if self.__serial.hasTimedOut(): return False # some MicroPython devices seem to need to be convinced in some # special way data = self.__serial.readUntil(rawReplMessage) if self.__serial.hasTimedOut(): return False if not data.endswith(rawReplMessage): self.__serial.write(b"\r\x01") # send CTRL-A again self.__serial.readUntil(rawReplMessage) if self.__serial.hasTimedOut(): return False self.__serial.readAll() # read all data and discard it return True def __rawOff(self): """ Private method to switch 'raw' mode off. """ if self.__serial: self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode def __execute(self, commands): """ Private method to send commands to the connected device and return the result. If no serial connection is available, empty results will be returned. @param commands list of commands to be executed @type str @return tuple containing stdout and stderr output of the device @rtype tuple of (bytes, bytes) """ if not self.__serial: return b"", b"" result = bytearray() err = b"" ok = self.__rawOn() if not ok: return ( b"", b"Could not switch to raw mode. Is the device switched on?" ) QThread.msleep(10) for command in commands: if command: commandBytes = command.encode("utf-8") self.__serial.write(commandBytes + b"\x04") # read until prompt response = self.__serial.readUntil(b"\x04>") if self.__serial.hasTimedOut(): return b"", b"Timeout while processing commands." if b"\x04" in response[2:-2]: # split stdout, stderr out, err = response[2:-2].split(b"\x04") result += out else: err = b"invalid response received: " + response if err: return b"", err QThread.msleep(10) self.__rawOff() return bytes(result), err def __shortError(self, error): """ Private method to create a shortened error message. @param error verbose error message @type bytes @return shortened error message @rtype str """ if error: decodedError = error.decode("utf-8") try: return decodedError.split["\r\n"][-2] except Exception: return decodedError return self.tr("Detected an error without indications.") ################################################################## ## Methods below implement the file system commands ################################################################## def ls(self, dirname=""): """ Public method to get a directory listing of the connected device. @param dirname name of the directory to be listed @type str @return tuple containg the directory listing @rtype tuple of str @exception IOError raised to indicate an issue with the device """ commands = [ "import os", "print(os.listdir('{0}'))".format(dirname), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) return ast.literal_eval(out.decode("utf-8")) def lls(self, dirname="", fullstat=False): """ Public method to get a long directory listing of the connected device including meta data. @param dirname name of the directory to be listed @type str @param fullstat flag indicating to return the full stat() tuple @type bool @return list containing the directory listing with tuple entries of the name and and a tuple of mode, size and time (if fullstat is false) or the complete stat() tuple. 'None' is returned in case the directory doesn't exist. @rtype tuple of (str, tuple) @exception IOError raised to indicate an issue with the device """ commands = [ "import os", "\n".join([ "def stat(filename):", " try:", " rstat = os.lstat(filename)", " except:", " rstat = os.stat(filename)", " return tuple(rstat)", ]), "\n".join([ "def listdir_stat(dirname):", " try:", " files = os.listdir(dirname)", " except OSError:", " return None", " if dirname in ('', '/'):", " return list((f, stat(f)) for f in files)", " return list((f, stat(dirname + '/' + f)) for f in files)", ]), "print(listdir_stat('{0}'))".format(dirname), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) fileslist = ast.literal_eval(out.decode("utf-8")) if fileslist is None: return None else: if fullstat: return fileslist else: return [(f, (s[0], s[6], s[8])) for f, s in fileslist] def cd(self, dirname): """ Public method to change the current directory on the connected device. @param dirname directory to change to @type str @exception IOError raised to indicate an issue with the device """ assert dirname commands = [ "import os", "os.chdir('{0}')".format(dirname), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) def pwd(self): """ Public method to get the current directory of the connected device. @return current directory @rtype str @exception IOError raised to indicate an issue with the device """ commands = [ "import os", "print(os.getcwd())", ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) return out.decode("utf-8").strip() def rm(self, filename): """ Public method to remove a file from the connected device. @param filename name of the file to be removed @type str @exception IOError raised to indicate an issue with the device """ assert filename commands = [ "import os", "os.remove('{0}')".format(filename), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) def rmrf(self, name, recursive=False, force=False): """ Public method to remove a file or directory recursively. @param name of the file or directory to remove @type str @param recursive flag indicating a recursive deletion @type bool @param force flag indicating to ignore errors @type bool @return flag indicating success @rtype bool @exception IOError raised to indicate an issue with the device """ assert name commands = [ "import os", "\n".join([ "def remove_file(name, recursive=False, force=False):", " try:", " mode = os.stat(name)[0]", " if mode & 0x4000 != 0:", " if recursive:", " for file in os.listdir(name):", " success = remove_file(name + '/' + file," " recursive, force)", " if not success and not force:", " return False", " os.rmdir(name)", " else:", " if not force:", " return False", " else:", " os.remove(name)", " except:", " if not force:", " return False", " return True", ]), "print(remove_file('{0}', {1}, {2}))".format(name, recursive, force), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) return ast.literal_eval(out.decode("utf-8")) def mkdir(self, dirname): """ Public method to create a new directory. @param dirname name of the directory to create @type str @exception IOError raised to indicate an issue with the device """ assert dirname commands = [ "import os", "os.mkdir('{0}')".format(dirname), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) def rmdir(self, dirname): """ Public method to remove a directory. @param dirname name of the directory to be removed @type str @exception IOError raised to indicate an issue with the device """ assert dirname commands = [ "import os", "os.rmdir('{0}')".format(dirname), ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) def put(self, hostFileName, deviceFileName=None): """ Public method to copy a local file to the connected device. @param hostFileName name of the file to be copied @type str @param deviceFileName name of the file to copy to @type str @return flag indicating success @rtype bool @exception IOError raised to indicate an issue with the device """ if not os.path.isfile(hostFileName): raise IOError("No such file: {0}".format(hostFileName)) with open(hostFileName, "rb") as hostFile: content = hostFile.read() # convert eol '\r' content = content.replace(b"\r\n", b"\r") content = content.replace(b"\n", b"\r") if not deviceFileName: deviceFileName = os.path.basename(hostFileName) commands = [ "fd = open('{0}', 'wb')".format(deviceFileName), "f = fd.write", ] while content: chunk = content[:64] commands.append("f(" + repr(chunk) + ")") content = content[64:] commands.append("fd.close()") out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) return True def get(self, deviceFileName, hostFileName=None): """ Public method to copy a file from the connected device. @param deviceFileName name of the file to copy @type str @param hostFileName name of the file to copy to @type str @return flag indicating success @rtype bool @exception IOError raised to indicate an issue with the device """ if not hostFileName: hostFileName = deviceFileName commands = [ "\n".join([ "try:", " from microbit import uart as u", "except ImportError:", " try:", " from machine import UART", " u = UART(0, {0})".format(115200), " except Exception:", " try:", " from sys import stdout as u", " except Exception:", " raise Exception('Could not find UART module in" " device.')", ]), "f = open('{0}', 'rb')".format(deviceFileName), "r = f.read", "result = True", "\n".join([ "while result:", " result = r(32)", " if result:", " u.write(result)", ]), "f.close()", ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) # write the received bytes to the local file # convert eol to "\n" out = out.replace(b"\r\n", b"\n") out = out.replace(b"\r", b"\n") with open(hostFileName, "wb") as hostFile: hostFile.write(out) return True def version(self): """ Public method to get the MicroPython version information of the connected device. @return dictionary containing the version information @rtype dict @exception ValueError raised to indicate that the device might not be running MicroPython or there was an issue parsing the output """ commands = [ "import os", "print(os.uname())", ] try: out, err = self.__execute(commands) if err: raise ValueError(self.__shortError(err)) except ValueError: # just re-raise it raise except Exception: # Raise a value error to indicate being unable to find something # on the device that will return parseable information about the # version. It doesn't matter what the error is, it just needs to # report a failure with the expected ValueError exception. raise ValueError("Unable to determine version information.") rawOutput = out.decode("utf-8").strip() rawOutput = rawOutput[1:-1] items = rawOutput.split(",") result = {} for item in items: key, value = item.strip().split("=") result[key.strip()] = value.strip()[1:-1] return result def syncTime(self): """ Public method to set the time of the connected device to the local computer's time. @exception IOError raised to indicate an issue with the device """ now = time.localtime(time.time()) commands = [ "\n".join([ "def set_time(rtc_time):", " rtc = None", " try:", # Pyboard (it doesn't have machine.RTC()) " import pyb", " rtc = pyb.RTC()", " clock_time = rtc_time[:6] + (rtc_time[6] + 1, 0)", " rtc.datetime(clock_time)", " except:", " try:", " import machine", " rtc = machine.RTC()", " try:", # ESP8266 may use rtc.datetime() " clock_time = rtc_time[:6] +" " (rtc_time[6] + 1, 0)", " rtc.datetime(clock_time)", " except:", # ESP32 uses rtc.init() " rtc.init(rtc_time[:6])", " except:", " try:", " import rtc, time", " clock=rtc.RTC()", " clock.datetime = time.struct_time(rtc_time +" " (-1, -1))", " except:", " pass", ]), "set_time({0})".format((now.tm_year, now.tm_mon, now.tm_mday, now.tm_hour, now.tm_min, now.tm_sec, now.tm_wday)) ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) def showTime(self): """ Public method to get the current time of the device. @return time of the device @rtype str @exception IOError raised to indicate an issue with the device """ commands = [ "import time", "print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))", # __IGNORE_WARNING_M601__ ] out, err = self.__execute(commands) if err: raise IOError(self.__shortError(err)) return out.decode("utf-8").strip() class MicroPythonFileManager(QObject): """ Class implementing an interface to the device file system commands with some additional sugar. @signal longListFiles(result) emitted with a tuple of tuples containing the name, mode, size and time for each directory entry @signal currentDir(dirname) emitted to report the current directory of the device @signal currentDirChanged(dirname) emitted to report back a change of the current directory @signal getFileDone(deviceFile, localFile) emitted after the file was fetched from the connected device and written to the local file system @signal putFileDone(localFile, deviceFile) emitted after the file was copied to the connected device @signal deleteFileDone(deviceFile) emitted after the file has been deleted on the connected device @signal rsyncDone(localName, deviceName) emitted after the rsync operation has been completed @signal rsyncProgressMessage(msg) emitted to send a message about what rsync is doing @signal removeDirectoryDone() emitted after a directory has been deleted @signal createDirectoryDone() emitted after a directory was created @signal synchTimeDone() emitted after the time was synchronizde to the device @signal showTimeDone(dateTime) emitted after the date and time was fetched from the connected device @signal showVersionDone(versionInfo) emitted after the version information was fetched from the connected device @signal error(exc) emitted with a failure message to indicate a failure during the most recent operation """ longListFiles = pyqtSignal(tuple) currentDir = pyqtSignal(str) currentDirChanged = pyqtSignal(str) getFileDone = pyqtSignal(str, str) putFileDone = pyqtSignal(str, str) deleteFileDone = pyqtSignal(str) rsyncDone = pyqtSignal(str, str) rsyncProgressMessage = pyqtSignal(str) removeDirectoryDone = pyqtSignal() createDirectoryDone = pyqtSignal() synchTimeDone = pyqtSignal() showTimeDone = pyqtSignal(str) showVersionDone = pyqtSignal(dict) error = pyqtSignal(str, str) def __init__(self, port, parent=None): """ Constructor @param port port name of the device @type str @param parent reference to the parent object @type QObject """ super(MicroPythonFileManager, self).__init__(parent) self.__serialPort = port self.__serial = MicroPythonSerialPort(parent=self) self.__fs = MicroPythonFileSystem(parent=self) @pyqtSlot() def connect(self): """ Public slot to start the manager. """ self.__serial.openSerialLink(self.__serialPort) self.__fs.setSerial(self.__serial) @pyqtSlot() def disconnect(self): """ Public slot to stop the thread. """ self.__serial.closeSerialLink() @pyqtSlot(str) def lls(self, dirname): """ Public slot to get a long listing of the given directory. @param dirname name of the directory to list @type str """ try: filesList = self.__fs.lls(dirname) result = [(decoratedName(name, mode), mode2string(mode), str(size), mtime2string(time)) for name, (mode, size, time) in filesList] self.longListFiles.emit(tuple(result)) except Exception as exc: self.error.emit("lls", str(exc)) @pyqtSlot() def pwd(self): """ Public slot to get the current directory of the device. """ try: pwd = self.__fs.pwd() self.currentDir.emit(pwd) except Exception as exc: self.error.emit("pwd", str(exc)) @pyqtSlot(str) def cd(self, dirname): """ Public slot to change the current directory of the device. @param dirname name of the desired current directory @type str """ try: self.__fs.cd(dirname) self.currentDirChanged.emit(dirname) except Exception as exc: self.error.emit("cd", str(exc)) @pyqtSlot(str) @pyqtSlot(str, str) def get(self, deviceFileName, hostFileName=""): """ Public slot to get a file from the connected device. @param deviceFileName name of the file on the device @type str @param hostFileName name of the local file @type str """ if hostFileName and os.path.isdir(hostFileName): # only a local directory was given hostFileName = os.path.join(hostFileName, os.path.basename(deviceFileName)) try: self.__fs.get(deviceFileName, hostFileName) self.getFileDone.emit(deviceFileName, hostFileName) except Exception as exc: self.error.emit("get", str(exc)) @pyqtSlot(str) @pyqtSlot(str, str) def put(self, hostFileName, deviceFileName=""): """ Public slot to put a file onto the device. @param hostFileName name of the local file @type str @param deviceFileName name of the file on the connected device @type str """ try: self.__fs.put(hostFileName, deviceFileName) self.putFileDone.emit(hostFileName, deviceFileName) except Exception as exc: self.error.emit("put", str(exc)) @pyqtSlot(str) def delete(self, deviceFileName): """ Public slot to delete a file on the device. @param deviceFileName name of the file on the connected device @type str """ try: self.__fs.rm(deviceFileName) self.deleteFileDone.emit(deviceFileName) except Exception as exc: self.error.emit("delete", str(exc)) def __rsync(self, hostDirectory, deviceDirectory, mirror=True): """ Private method to synchronize a local directory to the device. @param hostDirectory name of the local directory @type str @param deviceDirectory name of the directory on the device @type str @param mirror flag indicating to mirror the local directory to the device directory @type bool @return list of errors @rtype list of str """ errors = [] if not os.path.isdir(hostDirectory): return [self.tr( "The given name '{0}' is not a directory or does not exist.") .format(hostDirectory) ] self.rsyncProgressMessage.emit( self.tr("Synchronizing <b>{0}</b>.").format(deviceDirectory) ) sourceDict = {} sourceFiles = listdirStat(hostDirectory) for name, nstat in sourceFiles: sourceDict[name] = nstat destinationDict = {} try: destinationFiles = self.__fs.lls(deviceDirectory, fullstat=True) except Exception as exc: return [str(exc)] if destinationFiles is None: # the destination directory does not exist try: self.__fs.mkdir(deviceDirectory) except Exception as exc: return [str(exc)] else: for name, nstat in destinationFiles: destinationDict[name] = nstat destinationSet = set(destinationDict.keys()) sourceSet = set(sourceDict.keys()) toAdd = sourceSet - destinationSet # add to dev toDelete = destinationSet - sourceSet # delete from dev toUpdate = destinationSet.intersection(sourceSet) # update files for sourceBasename in toAdd: # name exists in source but not in device sourceFilename = os.path.join(hostDirectory, sourceBasename) destFilename = deviceDirectory + "/" + sourceBasename self.rsyncProgressMessage.emit( self.tr("Adding <b>{0}</b>...").format(destFilename)) if os.path.isfile(sourceFilename): try: self.__fs.put(sourceFilename, destFilename) except Exception as exc: # just note issues but ignore them otherwise errors.append(str(exc)) if os.path.isdir(sourceFilename): # recurse errs = self.__rsync(sourceFilename, destFilename, mirror=mirror) # just note issues but ignore them otherwise errors.extend(errs) if mirror: for destBasename in toDelete: # name exists in device but not local, delete destFilename = deviceDirectory + "/" + destBasename self.rsyncProgressMessage.emit( self.tr("Removing <b>{0}</b>...").format(destFilename)) try: self.__fs.rmrf(destFilename, recursive=True, force=True) except Exception as exc: # just note issues but ignore them otherwise errors.append(str(exc)) for sourceBasename in toUpdate: # names exist in both; do an update sourceStat = sourceDict[sourceBasename] destStat = destinationDict[sourceBasename] sourceFilename = os.path.join(hostDirectory, sourceBasename) destFilename = deviceDirectory + "/" + sourceBasename destMode = destStat[0] if os.path.isdir(sourceFilename): if stat.S_ISDIR(destMode): # both are directories => recurs errs = self.__rsync(sourceFilename, destFilename, mirror=mirror) # just note issues but ignore them otherwise errors.extend(errs) else: self.rsyncProgressMessage.emit( self.tr("Source <b>{0}</b> is a directory and" " destination <b>{1}</b> is a file. Ignoring" " it.") .format(sourceFilename, destFilename) ) else: if stat.S_ISDIR(destMode): self.rsyncProgressMessage.emit( self.tr("Source <b>{0}</b> is a file and destination" " <b>{1}</b> is a directory. Ignoring it.") .format(sourceFilename, destFilename) ) else: if sourceStat[8] > destStat[8]: # mtime self.rsyncProgressMessage.emit( self.tr("Updating <b>{0}</b>...") .format(destFilename) ) try: self.__fs.put(sourceFilename, destFilename) except Exception as exc: errors.append(str(exc)) self.rsyncProgressMessage.emit( self.tr("Done synchronizing <b>{0}</b>.").format(deviceDirectory) ) return errors @pyqtSlot(str, str) @pyqtSlot(str, str, bool) def rsync(self, hostDirectory, deviceDirectory, mirror=True): """ Public slot to synchronize a local directory to the device. @param hostDirectory name of the local directory @type str @param deviceDirectory name of the directory on the device @type str @param mirror flag indicating to mirror the local directory to the device directory @type bool """ errors = self.__rsync(hostDirectory, deviceDirectory, mirror=mirror) if errors: self.error.emit("rsync", "\n".join(errors)) self.rsyncDone.emit(hostDirectory, deviceDirectory) @pyqtSlot(str) def mkdir(self, dirname): """ Public slot to create a new directory. @param dirname name of the directory to create @type str """ try: self.__fs.mkdir(dirname) self.createDirectoryDone.emit() except Exception as exc: self.error.emit("mkdir", str(exc)) @pyqtSlot(str) @pyqtSlot(str, bool) def rmdir(self, dirname, recursive=False): """ Public slot to (recursively) remove a directory. @param dirname name of the directory to be removed @type str @param recursive flag indicating a recursive removal @type bool """ try: if recursive: self.__fs.rmrf(dirname, recursive=True, force=True) else: self.__fs.rmdir(dirname) self.removeDirectoryDone.emit() except Exception as exc: self.error.emit("rmdir", str(exc)) ################################################################## ## some non-filesystem related methods below ################################################################## @pyqtSlot() def synchronizeTime(self): """ Public slot to set the time of the connected device to the local computer's time. """ try: self.__fs.syncTime() self.synchTimeDone.emit() except Exception as exc: self.error.emit("rmdir", str(exc)) @pyqtSlot() def showTime(self): """ Public slot to get the current date and time of the device. """ try: dt = self.__fs.showTime() self.showTimeDone.emit(dt) except Exception as exc: self.error.emit("showTime", str(exc)) @pyqtSlot() def showVersion(self): """ Public slot to get the version info for the MicroPython run by the connected device. """ try: versionInfo = self.__fs.version() self.showVersionDone.emit(versionInfo) except Exception as exc: self.error.emit("showVersion", str(exc))