--- a/eric6/MicroPython/MicroPythonFileSystem.py Sat Jul 20 14:47:24 2019 +0200 +++ b/eric6/MicroPython/MicroPythonFileSystem.py Sat Jul 20 14:48:09 2019 +0200 @@ -9,7 +9,11 @@ from __future__ import unicode_literals -from PyQt5.QtCore import QObject +import ast +import time +import stat + +from PyQt5.QtCore import QObject, QThread class MicroPythonFileSystem(QObject): @@ -25,6 +29,9 @@ <li>pwd: get the current directory</li> <li>put: copy a file to the connected device</li> <li>get: get a file from the connected device</li> + <li>rm: remove a file from the connected device</li> + <li>mkdir: create a new directory</li> + <li>rmdir: remove an empty directory</li> </ul> """ def __init__(self, parent=None): @@ -35,35 +42,193 @@ @type QObject """ super(MicroPythonFileSystem, self).__init__(parent) + + self.__serial = None - def ls(self): + def setSerial(self, serial): + """ + Public method to set the serial port to be used. + + Note: The serial port should be initialized and open already. + + @param serial open serial port + @type MicroPythonSerialPort + """ + self.__serial = serial + + def __rawOn(self): + """ + Private method to switch the connected device to 'raw' mode. + + Note: switching to raw mode is done with synchroneous writes. + """ + if not self.__serial: + return + + rawReplMessage = b"raw REPL; CTRL-B to exit\r\n" + softRebootMessage = b"soft reboot\r\n" + + self.__serial.write(b"\x02") # end raw mode if required + self.__serial.waitForBytesWritten() + for _i in range(3): + # CTRL-C three times to break out of loops + self.__serial.write(b"\r\x03") + self.__serial.waitForBytesWritten() + QThread.msleep(10) + self.__serial.readAll() # read all data and discard it + self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode + self.__serial.readUntil(rawReplMessage) + self.__serial.write(b"\x04") # send CTRL-D to soft reset + self.__serial.readUntil(softRebootMessage) + + # some MicroPython devices seem to need to be convinced in some + # special way + data = self.__serial.readUntil(rawReplMessage) + if not data.endswith(rawReplMessage): + self.__serial.write(b"\r\x01") # send CTRL-A again + self.__serial.readUntil(rawReplMessage) + self.__serial.readAll() # read all data and discard it + + def __rawOff(self): + """ + Private method to switch 'raw' mode off. + """ + if self.__serial: + self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode + + def __execute(self, commands): + """ + Private method to send commands to the connected device and return the + result. + + If no serial connection is available, empty results will be returned. + + @param commands list of commands to be executed + @type str + @return tuple containing stdout and stderr output of the device + @rtype tuple of (bytes, bytes) + """ + if not self.__serial: + return b"", b"" + + result = bytearray() + err = b"" + + self.__rawOn() + QThread.msleep(10) + for command in commands: + if command: + commandBytes = command.encode("utf-8") + self.__serial.write(commandBytes + b"\x04") + # read until prompt + response = self.__serial.readUntil(b"\x04>") + # split stdout, stderr + out, err = response[2:-2].split(b"\x04") + result += out + if err: + return b"", err + QThread.msleep(10) + self.__rawOff() + + return bytes(result), err + + def __shortError(self, error): + """ + Private method to create a shortened error message. + + @param error verbose error message + @type bytes + @return shortened error message + @rtype str + """ + if error: + decodedError = error.decode("utf-8") + try: + return decodedError.split["\r\n"][-2] + except Exception: + return decodedError + return self.tr("Detected an error without indications.") + + ################################################################## + ## Methods below implement the file system commands + ################################################################## + + def ls(self, dirname=""): """ Public method to get a directory listing of the connected device. + @param dirname name of the directory to be listed + @type str @return tuple containg the directory listing @rtype tuple of str + @exception IOError raised to indicate an issue with the device """ - # TODO: not implemented yet + commands = [ + "import os", + "print(os.listdir('{0}'))".format(dirname), + ] + out, err = self.__execute(commands) + if err: + raise IOError(self.__shortError(err)) + return ast.literal_eval(out.decode("utf-8")) - def lls(self): + def lls(self, dirname=""): """ Public method to get a long directory listing of the connected device including meta data. - @return tuple containg the the directory listing with tuple entries - containing the name, size, time and mode + @param dirname name of the directory to be listed + @type str + @return list containing the the directory listing with tuple entries + of the name and and a tuple of mode, size and time @rtype tuple of str + @exception IOError raised to indicate an issue with the device """ - # TODO: not implemented yet + commands = [ + "import os", + "\n".join([ + "def stat(filename):", + " try:", + " rstat = os.lstat(filename)", + " except:", + " rstat = os.stat(filename)", + " return tuple(rstat)", + ]), + "\n".join([ + "def listdir_stat(dirname):", + " try:", + " files = os.listdir(dirname)", + " except OSError:", + " return []", + " if dirname in ('', '/'):", + " return list((f, stat(f)) for f in files)", + " return list((f, stat(dirname + '/' + f)) for f in files)", + ]), + "print(listdir_stat('{0}'))".format(dirname), + ] + out, err = self.__execute(commands) + if err: + raise IOError(self.__shortError(err)) + fileslist = ast.literal_eval(out.decode("utf-8")) + return [(f, (s[0], s[6], s[8])) for f, s in fileslist] - def cd(self, path): + def cd(self, dirname): """ Public method to change the current directory on the connected device. - @param path directory to change to + @param dirname directory to change to @type str + @exception IOError raised to indicate an issue with the device """ - # TODO: not implemented yet + assert dirname + + commands = [ + "import os", + "os.chdir('{0}')".format(dirname), + ] + out, err = self.__execute(commands) + if err: + raise IOError(self.__shortError(err)) def pwd(self): """ @@ -71,8 +236,70 @@ @return current directory @rtype str + @exception IOError raised to indicate an issue with the device """ - # TODO: not implemented yet + commands = [ + "import os", + "print(os.getcwd())", + ] + out, err = self.__execute(commands) + if err: + raise IOError(self.__shortError(err)) + return out.decode("utf-8").strip() + + def rm(self, filename): + """ + Public method to remove a file from the connected device. + + @param filename name of the file to be removed + @type str + @exception IOError raised to indicate an issue with the device + """ + assert filename + + commands = [ + "import os", + "os.remove('{0}')".format(filename), + ] + out, err = self.__execute(commands) + if err: + raise IOError(self.__shortError(err)) + + def mkdir(self, dirname): + """ + Public method to create a new directory. + + @param dirname name of the directory to create + @type str + @exception IOError raised to indicate an issue with the device + """ + assert dirname + + commands = [ + "import os", + "os.mkdir('{0}')".format(dirname), + ] + out, err = self.__execute(commands) + if err: + raise IOError(self.__shortError(err)) + + def rmdir(self, dirname): + """ + Public method to remove a directory. + + @param dirname name of the directory to be removed + @type str + @exception IOError raised to indicate an issue with the device + """ + assert dirname + + commands = [ + "import os", + "os.rmdir('{0}')".format(dirname), + ] + out, err = self.__execute(commands) + if err: + raise IOError(self.__shortError(err)) def put(self, hostFileName, deviceFileName): """ @@ -84,6 +311,7 @@ @type str @return flag indicating success @rtype bool + @exception IOError raised to indicate an issue with the device """ # TODO: not implemented yet @@ -97,5 +325,63 @@ @type str @return flag indicating success @rtype bool + @exception IOError raised to indicate an issue with the device """ # TODO: not implemented yet + + ################################################################## + ## Utility methods below + ################################################################## + + def mtime2string(self, mtime): + """ + Public method to convert a time value to a string representation. + + @param mtime time value + @type int + @return string representation of the given time + @rtype str + """ + return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(mtime)) + + def mode2string(self, mode): + """ + Public method to convert a mode value to a string representation. + + @param mode mode value + @type int + @return string representation of the given mode value + @rtype str + """ + return stat.filemode(mode) +# TODO: remove this +## +##if __name__ == "__main__": +## from PyQt5.QtCore import QCoreApplication, QTimer +## from MicroPythonSerialPort import MicroPythonSerialPort +## +## app = QCoreApplication([]) +## +## serial = MicroPythonSerialPort() +## serial.openSerialLink("/dev/ttyUSB0") +## fs = MicroPythonFileSystem() +## fs.setSerial(serial) +## +## def tf(): +## fs.cd("/flash") +## print(fs.pwd()) +## fs.cd("odroid_go") +## print(fs.pwd()) +## ll = fs.lls() +## print(ll) +## for f, (m, s, t) in ll: +## print(fs.mode2string(m), s, fs.mtime2string(t), f) +## fs.cd("..") +## print(fs.pwd()) +## ll = fs.lls("odroid_go") +## print(ll) +## for f, (m, s, t) in ll: +## print(fs.mode2string(m), s, fs.mtime2string(t), f) +## +## QTimer.singleShot(0, tf) +## app.exec_()