eric6/MicroPython/MicroPythonFileSystem.py

Wed, 24 Jul 2019 20:12:19 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Wed, 24 Jul 2019 20:12:19 +0200
branch
micropython
changeset 7082
ec199ef0cfc6
parent 7081
ed510767c096
child 7083
217862c28319
permissions
-rw-r--r--

MicroPython: continued implementing the file manager widget.

# -*- coding: utf-8 -*-

# Copyright (c) 2019 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing some file system commands for MicroPython.
"""

from __future__ import unicode_literals

import ast
import time
import os
import stat

from PyQt5.QtCore import pyqtSlot, pyqtSignal, QObject, QThread

from .MicroPythonSerialPort import MicroPythonSerialPort
from .MicroPythonFileSystemUtilities import (
    mtime2string, mode2string, decoratedName, listdirStat
)


class MicroPythonFileSystem(QObject):
    """
    Class implementing some file system commands for MicroPython.
    
    Commands are provided to perform operations on the file system of a
    connected MicroPython device. Supported commands are:
    <ul>
    <li>ls: directory listing</li>
    <li>lls: directory listing with meta data</li>
    <li>cd: change directory</li>
    <li>pwd: get the current directory</li>
    <li>put: copy a file to the connected device</li>
    <li>get: get a file from the connected device</li>
    <li>rm: remove a file from the connected device</li>
    <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash)
    <li>mkdir: create a new directory</li>
    <li>rmdir: remove an empty directory</li>
    </ul>
    
    There are additional commands related to time and version.
    <ul>
    <li>version: get version info about MicroPython</li>
    <li>syncTime: synchronize the time of the connected device</li>
    <li>showTime: show the current time of the connected device</li>
    </ul>
    """
    def __init__(self, parent=None):
        """
        Constructor
        
        @param parent reference to the parent object
        @type QObject
        """
        super(MicroPythonFileSystem, self).__init__(parent)
        
        self.__serial = None
    
    def setSerial(self, serial):
        """
        Public method to set the serial port to be used.
        
        Note: The serial port should be initialized and open already.
        
        @param serial open serial port
        @type MicroPythonSerialPort
        """
        self.__serial = serial
    
    def __rawOn(self):
        """
        Private method to switch the connected device to 'raw' mode.
        
        Note: switching to raw mode is done with synchronous writes.
        
        @return flag indicating success
        @@rtype bool
        """
        if not self.__serial:
            return False
        
        rawReplMessage = b"raw REPL; CTRL-B to exit\r\n"
        softRebootMessage = b"soft reboot\r\n"
        
        self.__serial.write(b"\x02")        # end raw mode if required
        self.__serial.waitForBytesWritten()
        for _i in range(3):
            # CTRL-C three times to break out of loops
            self.__serial.write(b"\r\x03")
            self.__serial.waitForBytesWritten()
            QThread.msleep(10)
        self.__serial.readAll()             # read all data and discard it
        self.__serial.write(b"\r\x01")      # send CTRL-A to enter raw mode
        self.__serial.readUntil(rawReplMessage)
        if self.__serial.hasTimedOut():
            return False
        self.__serial.write(b"\x04")        # send CTRL-D to soft reset
        self.__serial.readUntil(softRebootMessage)
        if self.__serial.hasTimedOut():
            return False
        
        # some MicroPython devices seem to need to be convinced in some
        # special way
        data = self.__serial.readUntil(rawReplMessage)
        if self.__serial.hasTimedOut():
            return False
        if not data.endswith(rawReplMessage):
            self.__serial.write(b"\r\x01")  # send CTRL-A again
            self.__serial.readUntil(rawReplMessage)
            if self.__serial.hasTimedOut():
                return False
        self.__serial.readAll()             # read all data and discard it
        return True
    
    def __rawOff(self):
        """
        Private method to switch 'raw' mode off.
        """
        if self.__serial:
            self.__serial.write(b"\x02")    # send CTRL-B to cancel raw mode
    
    def __execute(self, commands):
        """
        Private method to send commands to the connected device and return the
        result.
        
        If no serial connection is available, empty results will be returned.
        
        @param commands list of commands to be executed
        @type str
        @return tuple containing stdout and stderr output of the device
        @rtype tuple of (bytes, bytes)
        """
        if not self.__serial:
            return b"", b""
        
        result = bytearray()
        err = b""
        
        ok = self.__rawOn()
        if not ok:
            return (
                b"",
                b"Could not switch to raw mode. Is the device switched on?"
            )
        
        QThread.msleep(10)
        for command in commands:
            if command:
                commandBytes = command.encode("utf-8")
                self.__serial.write(commandBytes + b"\x04")
                # read until prompt
                response = self.__serial.readUntil(b"\x04>")
                if self.__serial.hasTimedOut():
                    return b"", b"Timeout while processing commands."
                if b"\x04" in response[2:-2]:
                    # split stdout, stderr
                    out, err = response[2:-2].split(b"\x04")
                    result += out
                else:
                    err = b"invalid response received: " + response
                if err:
                    return b"", err
        QThread.msleep(10)
        self.__rawOff()
        
        return bytes(result), err
    
    def __shortError(self, error):
        """
        Private method to create a shortened error message.
        
        @param error verbose error message
        @type bytes
        @return shortened error message
        @rtype str
        """
        if error:
            decodedError = error.decode("utf-8")
            try:
                return decodedError.split["\r\n"][-2]
            except Exception:
                return decodedError
        return self.tr("Detected an error without indications.")
    
    ##################################################################
    ## Methods below implement the file system commands
    ##################################################################
    
    def ls(self, dirname=""):
        """
        Public method to get a directory listing of the connected device.
        
        @param dirname name of the directory to be listed
        @type str
        @return tuple containg the directory listing
        @rtype tuple of str
        @exception IOError raised to indicate an issue with the device
        """
        commands = [
            "import os",
            "print(os.listdir('{0}'))".format(dirname),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
        return ast.literal_eval(out.decode("utf-8"))
    
    def lls(self, dirname="", fullstat=False):
        """
        Public method to get a long directory listing of the connected device
        including meta data.
        
        @param dirname name of the directory to be listed
        @type str
        @param fullstat flag indicating to return the full stat() tuple
        @type bool
        @return list containing the directory listing with tuple entries of
            the name and and a tuple of mode, size and time (if fullstat is
            false) or the complete stat() tuple. 'None' is returned in case the
            directory doesn't exist.
        @rtype tuple of (str, tuple)
        @exception IOError raised to indicate an issue with the device
        """
        commands = [
            "import os",
            "\n".join([
                "def stat(filename):",
                "    try:",
                "        rstat = os.lstat(filename)",
                "    except:",
                "        rstat = os.stat(filename)",
                "    return tuple(rstat)",
            ]),
            "\n".join([
                "def listdir_stat(dirname):",
                "    try:",
                "        files = os.listdir(dirname)",
                "    except OSError:",
                "        return None",
                "    if dirname in ('', '/'):",
                "        return list((f, stat(f)) for f in files)",
                "    return list((f, stat(dirname + '/' + f)) for f in files)",
            ]),
            "print(listdir_stat('{0}'))".format(dirname),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
        fileslist = ast.literal_eval(out.decode("utf-8"))
        if fileslist is None:
            return None
        else:
            if fullstat:
                return fileslist
            else:
                return [(f, (s[0], s[6], s[8])) for f, s in fileslist]
    
    def cd(self, dirname):
        """
        Public method to change the current directory on the connected device.
        
        @param dirname directory to change to
        @type str
        @exception IOError raised to indicate an issue with the device
        """
        assert dirname
        
        commands = [
            "import os",
            "os.chdir('{0}')".format(dirname),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
    
    def pwd(self):
        """
        Public method to get the current directory of the connected device.
        
        @return current directory
        @rtype str
        @exception IOError raised to indicate an issue with the device
        """
        commands = [
            "import os",
            "print(os.getcwd())",
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
        return out.decode("utf-8").strip()
     
    def rm(self, filename):
        """
        Public method to remove a file from the connected device.
        
        @param filename name of the file to be removed
        @type str
        @exception IOError raised to indicate an issue with the device
        """
        assert filename
        
        commands = [
            "import os",
            "os.remove('{0}')".format(filename),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
    
    def rmrf(self, name, recursive=False, force=False):
        """
        Public method to remove a file or directory recursively.
        
        @param name of the file or directory to remove
        @type str
        @param recursive flag indicating a recursive deletion
        @type bool
        @param force flag indicating to ignore errors
        @type bool
        @return flag indicating success
        @rtype bool
        @exception IOError raised to indicate an issue with the device
        """
        assert name
        
        commands = [
            "import os"
            "\n".join([
                "def remove_file(name, recursive=False, force=False):",
                "    try:",
                "        mode = os.stat(name)[0]",
                "        if mode & 0x4000 != 0:",
                "            if recursive:",
                "                for file in os.listdir(name):",
                "                    success = remove_file(name + '/' + file,"
                " recursive, force)",
                "                    if not success and not force:",
                "                        return False",
                "                os.rmdir(name)",
                "            else:",
                "                if not force:",
                "                    return False",
                "        else:",
                "            os.remove(name)",
                "    except:",
                "        if not force:",
                "            return False",
                "    return True",
            ]),
            "print(remove_file('{0}', {1}, {2}))".format(name, recursive,
                                                         force),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
        return ast.literal_eval(out.decode("utf-8"))
    
    def mkdir(self, dirname):
        """
        Public method to create a new directory.
        
        @param dirname name of the directory to create
        @type str
        @exception IOError raised to indicate an issue with the device
        """
        assert dirname
   
        commands = [
            "import os",
            "os.mkdir('{0}')".format(dirname),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
    
    def rmdir(self, dirname):
        """
        Public method to remove a directory.
        
        @param dirname name of the directory to be removed
        @type str
        @exception IOError raised to indicate an issue with the device
        """
        assert dirname
   
        commands = [
            "import os",
            "os.rmdir('{0}')".format(dirname),
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
    
    def put(self, hostFileName, deviceFileName=None):
        """
        Public method to copy a local file to the connected device.
        
        @param hostFileName name of the file to be copied
        @type str
        @param deviceFileName name of the file to copy to
        @type str
        @return flag indicating success
        @rtype bool
        @exception IOError raised to indicate an issue with the device
        """
        if not os.path.isfile(hostFileName):
            raise IOError("No such file: {0}".format(hostFileName))
        
        with open(hostFileName, "rb") as hostFile:
            content = hostFile.read()
        
        if not deviceFileName:
            deviceFileName = os.path.basename(hostFileName)
        
        commands = [
            "fd = open('{0}', 'wb')".format(deviceFileName),
            "f = fd.write",
        ]
        while content:
            chunk = content[:64]
            commands.append("f(" + repr(chunk) + ")")
            content = content[64:]
        commands.append("fd.close()")
        
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
        return True
    
    def get(self, deviceFileName, hostFileName=None):
        """
        Public method to copy a file from the connected device.
        
        @param deviceFileName name of the file to copy
        @type str
        @param hostFileName name of the file to copy to
        @type str
        @return flag indicating success
        @rtype bool
        @exception IOError raised to indicate an issue with the device
        """
        if not hostFileName:
            hostFileName = deviceFileName
        
        commands = [
            "\n".join([
                "try:",
                "    from microbit import uart as u",
                "except ImportError:",
                "    try:",
                "        from machine import UART",
                "        u = UART(0, {0})".format(115200),
                "    except Exception:",
                "        try:",
                "            from sys import stdout as u",
                "        except Exception:",
                "            raise Exception('Could not find UART module in"
                " device.')",
            ]),
            "f = open('{0}', 'rb')".format(deviceFileName),
            "r = f.read",
            "result = True",
            "\n".join([
                "while result:",
                "    result = r(32)",
                "    if result:",
                "        u.write(result)",
            ]),
            "f.close()",
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
        
        # write the received bytes to the local file
        with open(hostFileName, "wb") as hostFile:
            hostFile.write(out)
        return True
    
    def version(self):
        """
        Public method to get the MicroPython version information of the
        connected device.
        
        @return dictionary containing the version information
        @rtype dict
        @exception ValueError raised to indicate that the device might not be
            running MicroPython or there was an issue parsing the output
        """
        commands = [
            "import os",
            "print(os.uname())",
        ]
        try:
            out, err = self.__execute(commands)
            if err:
                raise ValueError(self.__shortError(err))
        except ValueError:
            # just re-raise it
            raise
        except Exception:
            # Raise a value error to indicate being unable to find something
            # on the device that will return parseable information about the
            # version. It doesn't matter what the error is, it just needs to
            # report a failure with the expected ValueError exception.
            raise ValueError("Unable to determine version information.")
        
        rawOutput = out.decode("utf-8").strip()
        rawOutput = rawOutput[1:-1]
        items = rawOutput.split(",")
        result = {}
        for item in items:
            key, value = item.strip().split("=")
            result[key.strip()] = value.strip()[1:-1]
        return result
    
    def syncTime(self):
        """
        Public method to set the time of the connected device to the local
        computer's time.
        
        @exception IOError raised to indicate an issue with the device
        """
        now = time.localtime(time.time())
        commands = [
            "\n".join([
                "def set_time(rtc_time):",
                "    rtc = None",
                "    try:",           # Pyboard (it doesn't have machine.RTC())
                "        import pyb",
                "        rtc = pyb.RTC()",
                "        clock_time = rtc_time[:6] + (rtc_time[6] + 1, 0)",
                "        rtc.datetime(clock_time)",
                "    except:",
                "        try:",
                "            import machine",
                "            rtc = machine.RTC()",
                "            try:",     # ESP8266 may use rtc.datetime()
                "                clock_time = rtc_time[:6] +"
                " (rtc_time[6] + 1, 0)",
                "                rtc.datetime(clock_time)",
                "            except:",  # ESP32 uses rtc.init()
                "                rtc.init(rtc_time[:6])",
                "        except:",
                "            try:",
                "                import rtc, time",
                "                clock=rtc.RTC()",
                "                clock.datetime = time.struct_time(rtc_time +"
                " (-1, -1))",
                "            except:",
                "                pass",
            ]),
            "set_time({0})".format((now.tm_year, now.tm_mon, now.tm_mday,
                                    now.tm_hour, now.tm_min, now.tm_sec,
                                    now.tm_wday))
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
    
    def showTime(self):
        """
        Public method to get the current time of the device.
        
        @return time of the device
        @rtype str
        @exception IOError raised to indicate an issue with the device
        """
        commands = [
            "import time",
            "print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))",
            # __IGNORE_WARNING_M601__
        ]
        out, err = self.__execute(commands)
        if err:
            raise IOError(self.__shortError(err))
        return out.decode("utf-8").strip()


class MicroPythonFileManager(QObject):
    """
    Class implementing an interface to the device file system commands with
    some additional sugar.
    
    @signal longListFiles(result) emitted with a tuple of tuples containing the
        name, mode, size and time for each directory entry
    @signal currentDir(dirname) emitted to report the current directory of the
        device
    @signal currentDirChanged(dirname) emitted to report back a change of the
        current directory
    @signal getFileDone(deviceFile, localFile) emitted after the file was
        fetched from the connected device and written to the local file system
    @signal putFileDone(localFile, deviceFile) emitted after the file was
        copied to the connected device
    @signal deleteFileDone(deviceFile) emitted after the file has been deleted
        on the connected device
    @signal rsyncDone(localName, deviceName) emitted after the rsync operation
        has been completed
    @signal rsyncMessages(list) emitted with a list of messages
    @signal removeDirectoryDone() emitted after a directory has been deleted
    @signal createDirectoryDone() emitted after a directory was created
    @signal synchTimeDone() emitted after the time was synchronizde to the
        device
    @signal showTimeDone(dateTime) emitted after the date and time was fetched
        from the connected device
    @signal showVersionDone(versionInfo) emitted after the version information
        was fetched from the connected device
    
    @signal error(exc) emitted with a failure message to indicate a failure
        during the most recent operation
    """
    longListFiles = pyqtSignal(tuple)
    currentDir = pyqtSignal(str)
    currentDirChanged = pyqtSignal(str)
    getFileDone = pyqtSignal(str, str)
    putFileDone = pyqtSignal(str, str)
    deleteFileDone = pyqtSignal(str)
    rsyncDone = pyqtSignal(str, str)
    rsyncMessages = pyqtSignal(list)
    removeDirectoryDone = pyqtSignal()
    createDirectoryDone = pyqtSignal()
    synchTimeDone = pyqtSignal()
    showTimeDone = pyqtSignal(str)
    showVersionDone = pyqtSignal(dict)
    
    error = pyqtSignal(str, str)
    
    def __init__(self, port, parent=None):
        """
        Constructor
        
        @param port port name of the device
        @type str
        @param parent reference to the parent object
        @type QObject
        """
        super(MicroPythonFileManager, self).__init__(parent)
        
        self.__serialPort = port
        self.__serial = MicroPythonSerialPort(parent=self)
        self.__fs = MicroPythonFileSystem(parent=self)
    
    @pyqtSlot()
    def connect(self):
        """
        Public slot to start the manager.
        """
        self.__serial.openSerialLink(self.__serialPort)
        self.__fs.setSerial(self.__serial)
    
    @pyqtSlot()
    def disconnect(self):
        """
        Public slot to stop the thread.
        """
        self.__serial.closeSerialLink()
    
    @pyqtSlot(str)
    def lls(self, dirname):
        """
        Public slot to get a long listing of the given directory.
        
        @param dirname name of the directory to list
        @type str
        """
        try:
            filesList = self.__fs.lls(dirname)
            result = [(decoratedName(name, mode),
                       mode2string(mode),
                       str(size),
                       mtime2string(time)) for
                      name, (mode, size, time) in filesList]
            self.longListFiles.emit(tuple(result))
        except Exception as exc:
            self.error.emit("lls", str(exc))
    
    @pyqtSlot()
    def pwd(self):
        """
        Public slot to get the current directory of the device.
        """
        try:
            pwd = self.__fs.pwd()
            self.currentDir.emit(pwd)
        except Exception as exc:
            self.error.emit("pwd", str(exc))
    
    @pyqtSlot(str)
    def cd(self, dirname):
        """
        Public slot to change the current directory of the device.
        
        @param dirname name of the desired current directory
        @type str
        """
        try:
            self.__fs.cd(dirname)
            self.currentDirChanged.emit(dirname)
        except Exception as exc:
            self.error.emit("cd", str(exc))
    
    @pyqtSlot(str)
    @pyqtSlot(str, str)
    def get(self, deviceFileName, hostFileName=""):
        """
        Public slot to get a file from the connected device.
        
        @param deviceFileName name of the file on the device
        @type str
        @param hostFileName name of the local file
        @type str
        """
        if hostFileName and os.path.isdir(hostFileName):
            # only a local directory was given
            hostFileName = os.path.join(hostFileName,
                                        os.path.basename(deviceFileName))
        try:
            self.__fs.get(deviceFileName, hostFileName)
            self.getFileDone.emit(deviceFileName, hostFileName)
        except Exception as exc:
            self.error.emit("get", str(exc))
    
    @pyqtSlot(str)
    @pyqtSlot(str, str)
    def put(self, hostFileName, deviceFileName=""):
        """
        Public slot to put a file onto the device.
        
        @param hostFileName name of the local file
        @type str
        @param deviceFileName name of the file on the connected device
        @type str
        """
        try:
            self.__fs.put(hostFileName, deviceFileName)
            self.putFileDone.emit(hostFileName, deviceFileName)
        except Exception as exc:
            self.error.emit("put", str(exc))
    
    @pyqtSlot(str)
    def delete(self, deviceFileName):
        """
        Public slot to delete a file on the device.
        
        @param deviceFileName name of the file on the connected device
        @type str
        """
        try:
            self.__fs.rm(deviceFileName)
            self.deleteFileDone.emit(deviceFileName)
        except Exception as exc:
            self.error.emit("delete", str(exc))
    
    def __rsync(self, hostDirectory, deviceDirectory, mirror=True):
        """
        Private method to synchronize a local directory to the device.
        
        @param hostDirectory name of the local directory
        @type str
        @param deviceDirectory name of the directory on the device
        @type str
        @param mirror flag indicating to mirror the local directory to
            the device directory
        @type bool
        @return tuple containing a list of messages and list of errors
        @rtype tuple of (list of str, list of str)
        """
        messages = []
        errors = []
        
        if not os.isdir(hostDirectory):
            return ([], [self.tr(
                "The given name '{0}' is not a directory or does not exist.")
                .format(hostDirectory)
            ])
        
        sourceDict = {}
        sourceFiles = listdirStat(hostDirectory)
        for name, nstat in sourceFiles:
            sourceDict[name] = nstat
        
        destinationDict = {}
        try:
            destinationFiles = self.__fs.lls(deviceDirectory, fullstat=True)
        except Exception as exc:
            return ([], [str(exc)])
        if destinationFiles is None:
            # the destination directory does not exist
            try:
                self.__fs.mkdir(deviceDirectory)
            except Exception as exc:
                return ([], [str(exc)])
        else:
            for name, nstat in destinationFiles:
                destinationDict[name] = nstat
        
        destinationSet = set(destinationDict.keys())
        sourceSet = set(sourceDict.keys())
        toAdd = sourceSet - destinationSet                  # add to dev
        toDelete = destinationSet - sourceSet               # delete from dev
        toUpdate = destinationSet.intersection(sourceSet)   # update files
        
        for sourceBasename in toAdd:
            # name exists in source but not in device
            sourceFilename = os.path.join(hostDirectory, sourceBasename)
            destFilename = deviceDirectory + "/" + sourceBasename
            if os.path.isfile(sourceFilename):
                try:
                    self.__fs.put(sourceFilename, destFilename)
                except Exception as exc:
                    messages.append(str(exc))
            if os.path.isdir(sourceFilename):
                # recurse
                msg, err = self.__rsync(sourceFilename, destFilename,
                                        mirror=mirror)
                messages.extend(msg)
                errors.extend(err)
        
        if mirror:
            for destBasename in toDelete:
                # name exists in device but not local, delete
                destFilename = deviceDirectory + "/" + destBasename
                try:
                    self.__fs.rmrf(destFilename, recursive=True, force=True)
                except Exception as exc:
                    # ignore errors here
                    messages.append(str(exc))
        
        for sourceBasename in toUpdate:
            # names exist in both; do an update
            sourceStat = sourceDict[sourceBasename]
            destStat = destinationDict[sourceBasename]
            sourceFilename = os.path.join(hostDirectory, sourceBasename)
            destFilename = deviceDirectory + "/" + sourceBasename
            destMode = destStat[0]
            if os.path.isdir(sourceFilename):
                if stat.S_ISDIR(destMode):
                    # both are directories => recurse
                    msg, err = self.__rsync(sourceFilename, destFilename,
                                            mirror=mirror)
                    messages.extend(msg)
                    errors.extend(err)
                else:
                    messages.append(self.tr(
                        "Source '{0}' is a directory and destination '{1}'"
                        " is a file. Ignoring it."
                    ).format(sourceFilename, destFilename))
            else:
                if stat.S_ISDIR(destMode):
                    messages.append(self.tr(
                        "Source '{0}' is a file and destination '{1}' is"
                        " a directory. Ignoring it."
                    ).format(sourceFilename, destFilename))
                else:
                    if sourceStat[8] > destStat[8]:     # mtime
                        messages.append(self.tr(
                            "'{0}' is newer than '{1}' - copying"
                        ).format(sourceFilename, destFilename))
                        try:
                            self.__fs.put(sourceFilename, destFilename)
                        except Exception as exc:
                            messages.append(str(exc))
        
        return messages, errors
    
    @pyqtSlot(str, str)
    @pyqtSlot(str, str, bool)
    def rsync(self, hostDirectory, deviceDirectory, mirror=True):
        """
        Public slot to synchronize a local directory to the device.
        
        @param hostDirectory name of the local directory
        @type str
        @param deviceDirectory name of the directory on the device
        @type str
        @param mirror flag indicating to mirror the local directory to
            the device directory
        @type bool
        """
        messages, errors = self.__rsync(hostDirectory, deviceDirectory,
                                        mirror=mirror)
        if errors:
            self.error.emit("rsync", "\n".join(errors))
        
        if messages:
            self.rsyncMessages.emit(messages)
        
        self.rsyncDone.emit(hostDirectory, deviceDirectory)
    
    @pyqtSlot(str)
    def mkdir(self, dirname):
        """
        Public slot to create a new directory.
        
        @param dirname name of the directory to create
        @type str
        """
        try:
            self.__fs.mkdir(dirname)
            self.createDirectoryDone.emit()
        except Exception as exc:
            self.error.emit("mkdir", str(exc))
    
    @pyqtSlot(str)
    @pyqtSlot(str, bool)
    def rmdir(self, dirname, recursive=False):
        """
        Public slot to (recursively) remove a directory.
        
        @param dirname name of the directory to be removed
        @type str
        @param recursive flag indicating a recursive removal
        @type bool
        """
        try:
            if recursive:
                self.__fs.rmrf(dirname, recursive=True, force=True)
            else:
                self.__fs.rmdir(dirname)
            self.removeDirectoryDone.emit()
        except Exception as exc:
            self.error.emit("rmdir", str(exc))
    
    ##################################################################
    ## some non-filesystem related methods below
    ##################################################################
    
    @pyqtSlot()
    def synchronizeTime(self):
        """
        Public slot to set the time of the connected device to the local
        computer's time.
        """
        try:
            self.__fs.syncTime()
            self.synchTimeDone.emit()
        except Exception as exc:
            self.error.emit("rmdir", str(exc))
    
    @pyqtSlot()
    def showTime(self):
        """
        Public slot to get the current date and time of the device.
        """
        try:
            dt = self.__fs.showTime()
            self.showTimeDone.emit(dt)
        except Exception as exc:
            self.error.emit("showTime", str(exc))
    
    @pyqtSlot()
    def showVersion(self):
        """
        Public slot to get the version info for the MicroPython run by the
        connected device.
        """
        try:
            versionInfo = self.__fs.version()
            self.showVersionDone.emit(versionInfo)
        except Exception as exc:
            self.error.emit("showVersion", str(exc))

eric ide

mercurial