src/eric7/MicroPython/MicroPythonCommandsInterface.py

branch
eric7
changeset 9221
bf71ee032bb4
parent 9209
b99e7fd55fd3
child 9413
80c06d472826
--- a/src/eric7/MicroPython/MicroPythonCommandsInterface.py	Wed Jul 13 11:16:20 2022 +0200
+++ b/src/eric7/MicroPython/MicroPythonCommandsInterface.py	Wed Jul 13 14:55:47 2022 +0200
@@ -12,8 +12,13 @@
 import os
 
 from PyQt6.QtCore import (
-    pyqtSlot, pyqtSignal, QObject, QThread, QTimer, QCoreApplication,
-    QEventLoop
+    pyqtSlot,
+    pyqtSignal,
+    QObject,
+    QThread,
+    QTimer,
+    QCoreApplication,
+    QEventLoop,
 )
 
 from .MicroPythonSerialPort import MicroPythonSerialPort
@@ -24,7 +29,7 @@
 class MicroPythonCommandsInterface(QObject):
     """
     Class implementing some file system commands for MicroPython.
-    
+
     Commands are provided to perform operations on the file system of a
     connected MicroPython device. Supported commands are:
     <ul>
@@ -39,7 +44,7 @@
     <li>mkdir: create a new directory</li>
     <li>rmdir: remove an empty directory</li>
     </ul>
-    
+
     There are additional commands related to time and version.
     <ul>
     <li>version: get version info about MicroPython</li>
@@ -47,33 +52,34 @@
     <li>syncTime: synchronize the time of the connected device</li>
     <li>showTime: show the current time of the connected device</li>
     </ul>
-    
+
     @signal executeAsyncFinished() emitted to indicate the end of an
         asynchronously executed list of commands (e.g. a script)
     @signal dataReceived(data) emitted to send data received via the serial
         connection for further processing
     """
+
     executeAsyncFinished = pyqtSignal()
     dataReceived = pyqtSignal(bytes)
-    
+
     def __init__(self, parent=None):
         """
         Constructor
-        
+
         @param parent reference to the parent object
         @type QObject
         """
         super().__init__(parent)
-        
+
         self.__repl = parent
-        
+
         self.__blockReadyRead = False
-        
+
         self.__serial = MicroPythonSerialPort(
-            timeout=Preferences.getMicroPython("SerialTimeout"),
-            parent=self)
+            timeout=Preferences.getMicroPython("SerialTimeout"), parent=self
+        )
         self.__serial.readyRead.connect(self.__readSerial)
-    
+
     @pyqtSlot()
     def __readSerial(self):
         """
@@ -83,66 +89,66 @@
         if not self.__blockReadyRead:
             data = bytes(self.__serial.readAll())
             self.dataReceived.emit(data)
-    
+
     @pyqtSlot()
     def connectToDevice(self, port):
         """
         Public slot to start the manager.
-        
+
         @param port name of the port to be used
         @type str
         @return flag indicating success
         @rtype bool
         """
         return self.__serial.openSerialLink(port)
-    
+
     @pyqtSlot()
     def disconnectFromDevice(self):
         """
         Public slot to stop the thread.
         """
         self.__serial.closeSerialLink()
-    
+
     def isConnected(self):
         """
         Public method to get the connection status.
-        
+
         @return flag indicating the connection status
         @rtype bool
         """
         return self.__serial.isConnected()
-    
+
     @pyqtSlot()
     def handlePreferencesChanged(self):
         """
         Public slot to handle a change of the preferences.
         """
         self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout"))
-    
+
     def write(self, data):
         """
         Public method to write data to the connected device.
-        
+
         @param data data to be written
         @type bytes or bytearray
         """
         self.__serial.isConnected() and self.__serial.write(data)
-    
+
     def __rawOn(self):
         """
         Private method to switch the connected device to 'raw' mode.
-        
+
         Note: switching to raw mode is done with synchronous writes.
-        
+
         @return flag indicating success
         @@rtype bool
         """
         if not self.__serial:
             return False
-        
+
         rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>"
-        
-        self.__serial.write(b"\x02")        # end raw mode if required
+
+        self.__serial.write(b"\x02")  # end raw mode if required
         written = self.__serial.waitForBytesWritten(500)
         # time out after 500ms if device is not responding
         if not written:
@@ -155,8 +161,8 @@
             if not written:
                 return False
             QThread.msleep(10)
-        self.__serial.readAll()             # read all data and discard it
-        self.__serial.write(b"\r\x01")      # send CTRL-A to enter raw mode
+        self.__serial.readAll()  # read all data and discard it
+        self.__serial.write(b"\r\x01")  # send CTRL-A to enter raw mode
         self.__serial.readUntil(rawReplMessage)
         if self.__serial.hasTimedOut():
             # it timed out; try it again and than fail
@@ -164,28 +170,29 @@
             self.__serial.readUntil(rawReplMessage)
             if self.__serial.hasTimedOut():
                 return False
-        
+
         QCoreApplication.processEvents(
-            QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents)
-        self.__serial.readAll()             # read all data and discard it
+            QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
+        )
+        self.__serial.readAll()  # read all data and discard it
         return True
-    
+
     def __rawOff(self):
         """
         Private method to switch 'raw' mode off.
         """
         if self.__serial:
-            self.__serial.write(b"\x02")      # send CTRL-B to cancel raw mode
+            self.__serial.write(b"\x02")  # send CTRL-B to cancel raw mode
             self.__serial.readUntil(b">>> ")  # read until Python prompt
-            self.__serial.readAll()           # read all data and discard it
-    
+            self.__serial.readAll()  # read all data and discard it
+
     def execute(self, commands):
         """
         Public method to send commands to the connected device and return the
         result.
-        
+
         If no serial connection is available, empty results will be returned.
-        
+
         @param commands list of commands to be executed
         @type str
         @return tuple containing stdout and stderr output of the device
@@ -193,23 +200,20 @@
         """
         if not self.__serial:
             return b"", b""
-        
+
         if not self.__serial.isConnected():
             return b"", b"Device not connected or not switched on."
-        
+
         result = bytearray()
         err = b""
-        
+
         # switch on raw mode
         self.__blockReadyRead = True
         ok = self.__rawOn()
         if not ok:
             self.__blockReadyRead = False
-            return (
-                b"",
-                b"Could not switch to raw mode. Is the device switched on?"
-            )
-        
+            return (b"", b"Could not switch to raw mode. Is the device switched on?")
+
         # send commands
         QThread.msleep(10)
         for command in commands:
@@ -217,15 +221,17 @@
                 commandBytes = command.encode("utf-8")
                 self.__serial.write(commandBytes + b"\x04")
                 QCoreApplication.processEvents(
-                    QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents)
+                    QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
+                )
                 ok = self.__serial.readUntil(b"OK")
                 if ok != b"OK":
                     return (
                         b"",
                         "Expected 'OK', got '{0}', followed by '{1}'".format(
-                            ok, self.__serial.readAll()).encode("utf-8")
+                            ok, self.__serial.readAll()
+                        ).encode("utf-8"),
                     )
-                
+
                 # read until prompt
                 response = self.__serial.readUntil(b"\x04>")
                 if self.__serial.hasTimedOut():
@@ -240,25 +246,26 @@
                 if err:
                     self.__blockReadyRead = False
                     return b"", err
-        
+
         # switch off raw mode
         QThread.msleep(10)
         self.__rawOff()
         self.__blockReadyRead = False
-        
+
         return bytes(result), err
-    
+
     def executeAsync(self, commandsList):
         """
         Public method to execute a series of commands over a period of time
         without returning any result (asynchronous execution).
-        
+
         @param commandsList list of commands to be execute on the device
         @type list of bytes
         """
+
         def remainingTask(commands):
             self.executeAsync(commands)
-        
+
         if commandsList:
             command = commandsList[0]
             self.__serial.write(command)
@@ -266,11 +273,11 @@
             QTimer.singleShot(2, lambda: remainingTask(remainder))
         else:
             self.executeAsyncFinished.emit()
-    
+
     def __shortError(self, error):
         """
         Private method to create a shortened error message.
-        
+
         @param error verbose error message
         @type bytes
         @return shortened error message
@@ -283,15 +290,15 @@
             except Exception:
                 return decodedError
         return self.tr("Detected an error without indications.")
-    
+
     ##################################################################
     ## Methods below implement the file system commands
     ##################################################################
-    
+
     def ls(self, dirname=""):
         """
         Public method to get a directory listing of the connected device.
-        
+
         @param dirname name of the directory to be listed
         @type str
         @return tuple containg the directory listing
@@ -305,8 +312,8 @@
                 "print(__os_.listdir())",
                 "del __os_",
             ]
-            if self.__repl.isMicrobit() else
-            [
+            if self.__repl.isMicrobit()
+            else [
                 "import os as __os_",
                 "print(__os_.listdir('{0}'))".format(dirname),
                 "del __os_",
@@ -316,12 +323,12 @@
         if err:
             raise OSError(self.__shortError(err))
         return ast.literal_eval(out.decode("utf-8"))
-    
+
     def lls(self, dirname="", fullstat=False, showHidden=False):
         """
         Public method to get a long directory listing of the connected device
         including meta data.
-        
+
         @param dirname name of the directory to be listed
         @type str
         @param fullstat flag indicating to return the full stat() tuple
@@ -339,53 +346,65 @@
             # BBC micro:bit does not support directories
             [
                 "import os as __os_",
-                "\n".join([
-                    "def is_visible(filename, showHidden):",
-                    "    return showHidden or "
-                    "(filename[0] != '.' and filename[-1] != '~')",
-                ]),
-                "\n".join([
-                    "def stat(filename):",
-                    "    size = __os_.size(filename)",
-                    "    return (0, 0, 0, 0, 0, 0, size, 0, 0, 0)"
-                ]),
-                "\n".join([
-                    "def listdir_stat(showHidden):",
-                    "    files = __os_.listdir()",
-                    "    return list((f, stat(f)) for f in files if"
-                    " is_visible(f,showHidden))",
-                ]),
+                "\n".join(
+                    [
+                        "def is_visible(filename, showHidden):",
+                        "    return showHidden or "
+                        "(filename[0] != '.' and filename[-1] != '~')",
+                    ]
+                ),
+                "\n".join(
+                    [
+                        "def stat(filename):",
+                        "    size = __os_.size(filename)",
+                        "    return (0, 0, 0, 0, 0, 0, size, 0, 0, 0)",
+                    ]
+                ),
+                "\n".join(
+                    [
+                        "def listdir_stat(showHidden):",
+                        "    files = __os_.listdir()",
+                        "    return list((f, stat(f)) for f in files if"
+                        " is_visible(f,showHidden))",
+                    ]
+                ),
                 "print(listdir_stat({0}))".format(showHidden),
                 "del __os_, stat, listdir_stat, is_visible",
             ]
-            if self.__repl.isMicrobit() else
-            [
+            if self.__repl.isMicrobit()
+            else [
                 "import os as __os_",
-                "\n".join([
-                    "def is_visible(filename, showHidden):",
-                    "    return showHidden or "
-                    "(filename[0] != '.' and filename[-1] != '~')",
-                ]),
-                "\n".join([
-                    "def stat(filename):",
-                    "    try:",
-                    "        rstat = __os_.lstat(filename)",
-                    "    except:",
-                    "        rstat = __os_.stat(filename)",
-                    "    return tuple(rstat)",
-                ]),
-                "\n".join([
-                    "def listdir_stat(dirname, showHidden):",
-                    "    try:",
-                    "        files = __os_.listdir(dirname)",
-                    "    except OSError:",
-                    "        return []",
-                    "    if dirname in ('', '/'):",
-                    "        return list((f, stat(f)) for f in files if"
-                    " is_visible(f, showHidden))",
-                    "    return list((f, stat(dirname + '/' + f))"
-                    " for f in files if is_visible(f, showHidden))",
-                ]),
+                "\n".join(
+                    [
+                        "def is_visible(filename, showHidden):",
+                        "    return showHidden or "
+                        "(filename[0] != '.' and filename[-1] != '~')",
+                    ]
+                ),
+                "\n".join(
+                    [
+                        "def stat(filename):",
+                        "    try:",
+                        "        rstat = __os_.lstat(filename)",
+                        "    except:",
+                        "        rstat = __os_.stat(filename)",
+                        "    return tuple(rstat)",
+                    ]
+                ),
+                "\n".join(
+                    [
+                        "def listdir_stat(dirname, showHidden):",
+                        "    try:",
+                        "        files = __os_.listdir(dirname)",
+                        "    except OSError:",
+                        "        return []",
+                        "    if dirname in ('', '/'):",
+                        "        return list((f, stat(f)) for f in files if"
+                        " is_visible(f, showHidden))",
+                        "    return list((f, stat(dirname + '/' + f))"
+                        " for f in files if is_visible(f, showHidden))",
+                    ]
+                ),
                 "print(listdir_stat('{0}', {1}))".format(dirname, showHidden),
                 "del __os_, stat, listdir_stat, is_visible",
             ]
@@ -401,11 +420,11 @@
                 return fileslist
             else:
                 return [(f, (s[0], s[6], s[8])) for f, s in fileslist]
-    
+
     def cd(self, dirname):
         """
         Public method to change the current directory on the connected device.
-        
+
         @param dirname directory to change to
         @type str
         @exception OSError raised to indicate an issue with the device
@@ -419,11 +438,11 @@
             out, err = self.execute(commands)
             if err:
                 raise OSError(self.__shortError(err))
-    
+
     def pwd(self):
         """
         Public method to get the current directory of the connected device.
-        
+
         @return current directory
         @rtype str
         @exception OSError raised to indicate an issue with the device
@@ -431,7 +450,7 @@
         if self.__repl.isMicrobit():
             # BBC micro:bit does not support directories
             return ""
-        
+
         commands = [
             "import os as __os_",
             "print(__os_.getcwd())",
@@ -441,11 +460,11 @@
         if err:
             raise OSError(self.__shortError(err))
         return out.decode("utf-8").strip()
-    
+
     def rm(self, filename):
         """
         Public method to remove a file from the connected device.
-        
+
         @param filename name of the file to be removed
         @type str
         @exception OSError raised to indicate an issue with the device
@@ -459,11 +478,11 @@
             out, err = self.execute(commands)
             if err:
                 raise OSError(self.__shortError(err))
-    
+
     def rmrf(self, name, recursive=False, force=False):
         """
         Public method to remove a file or directory recursively.
-        
+
         @param name of the file or directory to remove
         @type str
         @param recursive flag indicating a recursive deletion
@@ -477,43 +496,44 @@
         if name:
             commands = [
                 "import os as __os_",
-                "\n".join([
-                    "def remove_file(name, recursive=False, force=False):",
-                    "    try:",
-                    "        mode = __os_.stat(name)[0]",
-                    "        if mode & 0x4000 != 0:",
-                    "            if recursive:",
-                    "                for file in __os_.listdir(name):",
-                    "                    success = remove_file("
-                    "name + '/' + file, recursive, force)",
-                    "                    if not success and not force:",
-                    "                        return False",
-                    "                __os_.rmdir(name)",
-                    "            else:",
-                    "                if not force:",
-                    "                    return False",
-                    "        else:",
-                    "            __os_.remove(name)",
-                    "    except:",
-                    "        if not force:",
-                    "            return False",
-                    "    return True",
-                ]),
-                "print(remove_file('{0}', {1}, {2}))".format(name, recursive,
-                                                             force),
+                "\n".join(
+                    [
+                        "def remove_file(name, recursive=False, force=False):",
+                        "    try:",
+                        "        mode = __os_.stat(name)[0]",
+                        "        if mode & 0x4000 != 0:",
+                        "            if recursive:",
+                        "                for file in __os_.listdir(name):",
+                        "                    success = remove_file("
+                        "name + '/' + file, recursive, force)",
+                        "                    if not success and not force:",
+                        "                        return False",
+                        "                __os_.rmdir(name)",
+                        "            else:",
+                        "                if not force:",
+                        "                    return False",
+                        "        else:",
+                        "            __os_.remove(name)",
+                        "    except:",
+                        "        if not force:",
+                        "            return False",
+                        "    return True",
+                    ]
+                ),
+                "print(remove_file('{0}', {1}, {2}))".format(name, recursive, force),
                 "del __os_, remove_file",
             ]
             out, err = self.execute(commands)
             if err:
                 raise OSError(self.__shortError(err))
             return ast.literal_eval(out.decode("utf-8"))
-        
+
         return False
-    
+
     def mkdir(self, dirname):
         """
         Public method to create a new directory.
-        
+
         @param dirname name of the directory to create
         @type str
         @exception OSError raised to indicate an issue with the device
@@ -527,11 +547,11 @@
             out, err = self.execute(commands)
             if err:
                 raise OSError(self.__shortError(err))
-    
+
     def rmdir(self, dirname):
         """
         Public method to remove a directory.
-        
+
         @param dirname name of the directory to be removed
         @type str
         @exception OSError raised to indicate an issue with the device
@@ -545,11 +565,11 @@
             out, err = self.execute(commands)
             if err:
                 raise OSError(self.__shortError(err))
-    
+
     def put(self, hostFileName, deviceFileName=None):
         """
         Public method to copy a local file to the connected device.
-        
+
         @param hostFileName name of the file to be copied
         @type str
         @param deviceFileName name of the file to copy to
@@ -560,16 +580,16 @@
         """
         if not os.path.isfile(hostFileName):
             raise OSError("No such file: {0}".format(hostFileName))
-        
+
         with open(hostFileName, "rb") as hostFile:
             content = hostFile.read()
             # convert eol '\r'
             content = content.replace(b"\r\n", b"\r")
             content = content.replace(b"\n", b"\r")
-        
+
         if not deviceFileName:
             deviceFileName = os.path.basename(hostFileName)
-        
+
         commands = [
             "fd = open('{0}', 'wb')".format(deviceFileName),
             "f = fd.write",
@@ -578,20 +598,22 @@
             chunk = content[:64]
             commands.append("f(" + repr(chunk) + ")")
             content = content[64:]
-        commands.extend([
-            "fd.close()",
-            "del f, fd",
-        ])
-        
+        commands.extend(
+            [
+                "fd.close()",
+                "del f, fd",
+            ]
+        )
+
         out, err = self.execute(commands)
         if err:
             raise OSError(self.__shortError(err))
         return True
-    
+
     def get(self, deviceFileName, hostFileName=None):
         """
         Public method to copy a file from the connected device.
-        
+
         @param deviceFileName name of the file to copy
         @type str
         @param hostFileName name of the file to copy to
@@ -602,37 +624,39 @@
         """
         if not hostFileName:
             hostFileName = deviceFileName
-        
+
         commands = [
-            "\n".join([
-                "def send_data():",
-                "    try:",
-                "        from microbit import uart as u",
-                "    except ImportError:",
-                "        try:",
-                "            from machine import UART",
-                "            u = UART(0, {0})".format(115200),
-                "        except Exception:",
-                "            try:",
-                "                from sys import stdout as u",
-                "            except Exception:",
-                "                raise Exception('Could not find UART module"
-                " in device.')",
-                "    f = open('{0}', 'rb')".format(deviceFileName),
-                "    r = f.read",
-                "    result = True",
-                "    while result:",
-                "        result = r(32)",
-                "        if result:",
-                "            u.write(result)",
-                "    f.close()",
-            ]),
+            "\n".join(
+                [
+                    "def send_data():",
+                    "    try:",
+                    "        from microbit import uart as u",
+                    "    except ImportError:",
+                    "        try:",
+                    "            from machine import UART",
+                    "            u = UART(0, {0})".format(115200),
+                    "        except Exception:",
+                    "            try:",
+                    "                from sys import stdout as u",
+                    "            except Exception:",
+                    "                raise Exception('Could not find UART module"
+                    " in device.')",
+                    "    f = open('{0}', 'rb')".format(deviceFileName),
+                    "    r = f.read",
+                    "    result = True",
+                    "    while result:",
+                    "        result = r(32)",
+                    "        if result:",
+                    "            u.write(result)",
+                    "    f.close()",
+                ]
+            ),
             "send_data()",
         ]
         out, err = self.execute(commands)
         if err:
             raise OSError(self.__shortError(err))
-        
+
         # write the received bytes to the local file
         # convert eol to "\n"
         out = out.replace(b"\r\n", b"\n")
@@ -640,12 +664,12 @@
         with open(hostFileName, "wb") as hostFile:
             hostFile.write(out)
         return True
-    
+
     def fileSystemInfo(self):
         """
         Public method to obtain information about the currently mounted file
         systems.
-        
+
         @return tuple of tuples containing the file system name, the total
             size, the used size and the free size
         @rtype tuple of tuples of (str, int, int, int)
@@ -653,20 +677,22 @@
         """
         commands = [
             "import os as __os_",
-            "\n".join([
-                "def fsinfo():",
-                "    infolist = []",
-                "    info = __os_.statvfs('/')",
-                "    if info[0] == 0:",
-                # assume it is just mount points
-                "        fsnames = __os_.listdir('/')",
-                "        for fs in fsnames:",
-                "            fs = '/' + fs",
-                "            infolist.append((fs, __os_.statvfs(fs)))",
-                "    else:",
-                "        infolist.append(('/', info))",
-                "    return infolist",
-            ]),
+            "\n".join(
+                [
+                    "def fsinfo():",
+                    "    infolist = []",
+                    "    info = __os_.statvfs('/')",
+                    "    if info[0] == 0:",
+                    # assume it is just mount points
+                    "        fsnames = __os_.listdir('/')",
+                    "        for fs in fsnames:",
+                    "            fs = '/' + fs",
+                    "            infolist.append((fs, __os_.statvfs(fs)))",
+                    "    else:",
+                    "        infolist.append(('/', info))",
+                    "    return infolist",
+                ]
+            ),
             "print(fsinfo())",
             "del __os_, fsinfo",
         ]
@@ -683,18 +709,18 @@
                 freeSize = info[4] * info[1]
                 usedSize = totalSize - freeSize
                 filesystemInfos.append((fs, totalSize, usedSize, freeSize))
-        
+
         return tuple(filesystemInfos)
-    
+
     ##################################################################
     ## non-filesystem related methods below
     ##################################################################
-    
+
     def version(self):
         """
         Public method to get the MicroPython version information of the
         connected device.
-        
+
         @return dictionary containing the version information
         @rtype dict
         @exception OSError raised to indicate an issue with the device
@@ -707,7 +733,7 @@
         out, err = self.execute(commands)
         if err:
             raise OSError(self.__shortError(err))
-        
+
         rawOutput = out.decode("utf-8").strip()
         rawOutput = rawOutput[1:-1]
         items = rawOutput.split(",")
@@ -716,32 +742,36 @@
             key, value = item.strip().split("=")
             result[key.strip()] = value.strip()[1:-1]
         return result
-    
+
     def getImplementation(self):
         """
         Public method to get some implementation information of the connected
         device.
-        
+
         @return dictionary containing the implementation information
         @rtype dict
         @exception OSError raised to indicate an issue with the device
         """
         commands = [
             "import sys as __sys_",
-            "res = {}",                             # __IGNORE_WARNING_M613__
-            "\n".join([
-                "try:",
-                "    res['name'] = __sys_.implementation.name",
-                "except AttributeError:",
-                "    res['name'] = 'unknown'",
-            ]),
-            "\n".join([
-                "try:",
-                "    res['version'] = '.'.join((str(i) for i in"
-                " __sys_.implementation.version))",
-                "except AttributeError:",
-                "    res['version'] = 'unknown'",
-            ]),
+            "res = {}",  # __IGNORE_WARNING_M613__
+            "\n".join(
+                [
+                    "try:",
+                    "    res['name'] = __sys_.implementation.name",
+                    "except AttributeError:",
+                    "    res['name'] = 'unknown'",
+                ]
+            ),
+            "\n".join(
+                [
+                    "try:",
+                    "    res['version'] = '.'.join((str(i) for i in"
+                    " __sys_.implementation.version))",
+                    "except AttributeError:",
+                    "    res['version'] = 'unknown'",
+                ]
+            ),
             "print(res)",
             "del res, __sys_",
         ]
@@ -749,18 +779,17 @@
         if err:
             raise OSError(self.__shortError(err))
         return ast.literal_eval(out.decode("utf-8"))
-    
+
     def getBoardInformation(self):
         """
         Public method to get some information data of the connected board.
-        
+
         @return dictionary containing the determined data
         @rtype dict
         @exception OSError raised to indicate an issue with the device
         """
         commands = [
-            "res = {}",                             # __IGNORE_WARNING_M613__
-            
+            "res = {}",  # __IGNORE_WARNING_M613__
             "import gc as __gc_",
             "__gc_.enable()",
             "__gc_.collect()",
@@ -773,7 +802,6 @@
             "res['mem_free_kb'] = mem_free / 1024.0",
             "res['mem_free_pc'] = mem_free / mem_total * 100.0",
             "del __gc_, mem_alloc, mem_free, mem_total",
-            
             "import os as __os_",
             "uname = __os_.uname()",
             "res['sysname'] = uname.sysname",
@@ -781,66 +809,70 @@
             "res['release'] = uname.release",
             "res['version'] = uname.version",
             "res['machine'] = uname.machine",
-            
             "import sys as __sys_",
             "res['py_platform'] = __sys_.platform",
             "res['py_version'] = __sys_.version",
-            "\n".join([
-                "try:",
-                "    res['mpy_name'] = __sys_.implementation.name",
-                "except AttributeError:",
-                "    res['mpy_name'] = 'unknown'",
-            ]),
-            "\n".join([
-                "try:",
-                "    res['mpy_version'] = '.'.join((str(i) for i in"
-                " __sys_.implementation.version))",
-                "except AttributeError:",
-                "    res['mpy_version'] = 'unknown'",
-            ]),
-            
+            "\n".join(
+                [
+                    "try:",
+                    "    res['mpy_name'] = __sys_.implementation.name",
+                    "except AttributeError:",
+                    "    res['mpy_name'] = 'unknown'",
+                ]
+            ),
+            "\n".join(
+                [
+                    "try:",
+                    "    res['mpy_version'] = '.'.join((str(i) for i in"
+                    " __sys_.implementation.version))",
+                    "except AttributeError:",
+                    "    res['mpy_version'] = 'unknown'",
+                ]
+            ),
             "stat_ = __os_.statvfs('/flash')",
             "res['flash_total_kb'] = stat_[2] * stat_[0] / 1024.0",
             "res['flash_free_kb'] = stat_[3] * stat_[0] / 1024.0",
-            "res['flash_used_kb'] = res['flash_total_kb'] -"
-            " res['flash_free_kb']",
+            "res['flash_used_kb'] = res['flash_total_kb'] -" " res['flash_free_kb']",
             "res['flash_free_pc'] = res['flash_free_kb'] /"
             " res['flash_total_kb'] * 100.0",
             "res['flash_used_pc'] = res['flash_used_kb'] /"
             " res['flash_total_kb'] * 100.0",
-            
-            "\n".join([
-                "try:",
-                "    import machine as __mc_",
-                "    res['mc_frequency_mhz'] = __mc_.freq() / 1000000.0",
-                "    res['mc_id'] = ':'.join(['{0:X}'.format(x)"
-                " for x in __mc_.unique_id()])",
-                "    del __mc_",
-                "except ImportError:",
-                "\n".join([
-                    "    try:",
-                    "        import microcontroller as __mc_",
-                    "        res['mc_frequency_mhz'] = __mc_.cpu.frequency"
-                    " / 1000000.0",
-                    "        res['mc_temp_c'] = __mc_.cpu.temperature",
-                    "        res['mc_id'] = ':'.join(['{0:X}'.format(x)"
-                    " for x in __mc_.cpu.uid])",
-                    "        del __mc_",
-                    "    except ImportError:",
-                    "        res['mc_frequency'] = None",
-                    "        res['mc_temp'] = None",
-                ]),
-            ]),
-            
-            "\n".join([
-                "try:",
-                "    import ulab as __ulab_",
-                "    res['ulab'] = __ulab_.__version__",
-                "    del __ulab_",
-                "except ImportError:",
-                "    res['ulab'] = None",
-            ]),
-            
+            "\n".join(
+                [
+                    "try:",
+                    "    import machine as __mc_",
+                    "    res['mc_frequency_mhz'] = __mc_.freq() / 1000000.0",
+                    "    res['mc_id'] = ':'.join(['{0:X}'.format(x)"
+                    " for x in __mc_.unique_id()])",
+                    "    del __mc_",
+                    "except ImportError:",
+                    "\n".join(
+                        [
+                            "    try:",
+                            "        import microcontroller as __mc_",
+                            "        res['mc_frequency_mhz'] = __mc_.cpu.frequency"
+                            " / 1000000.0",
+                            "        res['mc_temp_c'] = __mc_.cpu.temperature",
+                            "        res['mc_id'] = ':'.join(['{0:X}'.format(x)"
+                            " for x in __mc_.cpu.uid])",
+                            "        del __mc_",
+                            "    except ImportError:",
+                            "        res['mc_frequency'] = None",
+                            "        res['mc_temp'] = None",
+                        ]
+                    ),
+                ]
+            ),
+            "\n".join(
+                [
+                    "try:",
+                    "    import ulab as __ulab_",
+                    "    res['ulab'] = __ulab_.__version__",
+                    "    del __ulab_",
+                    "except ImportError:",
+                    "    res['ulab'] = None",
+                ]
+            ),
             "print(res)",
             "del res, stat_, __os_, __sys_",
         ]
@@ -848,12 +880,12 @@
         if err:
             raise OSError(self.__shortError(err))
         return ast.literal_eval(out.decode("utf-8"))
-    
+
     def syncTime(self, deviceType):
         """
         Public method to set the time of the connected device to the local
         computer's time.
-        
+
         @param deviceType type of board to sync time to
         @type str
         @exception OSError raised to indicate an issue with the device
@@ -874,56 +906,64 @@
             # subseconds)
             # http://docs.micropython.org/en/latest/library/pyb.RTC.html
             # #pyb.RTC.datetime
-            set_time = "\n".join([
-                "def set_time(rtc_time):",
-                "    import pyb",
-                "    rtc = pyb.RTC()",
-                "    rtc.datetime(rtc_time[:7] + (0,))",
-            ])
+            set_time = "\n".join(
+                [
+                    "def set_time(rtc_time):",
+                    "    import pyb",
+                    "    rtc = pyb.RTC()",
+                    "    rtc.datetime(rtc_time[:7] + (0,))",
+                ]
+            )
         elif deviceType == "esp":
             # The machine.RTC documentation was incorrect and doesn't agree
             # with the code, so no link is presented here. The order of the
             # arguments is the same as the pyboard except for LoBo MPy.
-            set_time = "\n".join([
-                "def set_time(rtc_time):",
-                "    import machine",
-                "    rtc = machine.RTC()",
-                "    try:",                 # ESP8266 may use rtc.datetime()
-                "        rtc.datetime(rtc_time[:7] + (0,))",
-                "    except Exception:",    # ESP32 uses rtc.init()
-                "        import os",
-                "        if 'LoBo' in os.uname()[0]:",      # LoBo MPy
-                "            clock_time = rtc_time[:3] +"
-                " rtc_time[4:7] + (rtc_time[3], rtc_time[7])",
-                "        else:",
-                "            clock_time = rtc_time[:7] + (0,)",
-                "        rtc.init(clock_time)",
-            ])
+            set_time = "\n".join(
+                [
+                    "def set_time(rtc_time):",
+                    "    import machine",
+                    "    rtc = machine.RTC()",
+                    "    try:",  # ESP8266 may use rtc.datetime()
+                    "        rtc.datetime(rtc_time[:7] + (0,))",
+                    "    except Exception:",  # ESP32 uses rtc.init()
+                    "        import os",
+                    "        if 'LoBo' in os.uname()[0]:",  # LoBo MPy
+                    "            clock_time = rtc_time[:3] +"
+                    " rtc_time[4:7] + (rtc_time[3], rtc_time[7])",
+                    "        else:",
+                    "            clock_time = rtc_time[:7] + (0,)",
+                    "        rtc.init(clock_time)",
+                ]
+            )
         elif deviceType == "circuitpython":
-            set_time = "\n".join([
-                "def set_time(rtc_time):",
-                "    import rtc",
-                "    import time",
-                "    clock = rtc.RTC()",
-                "    clock_time = rtc_time[:3] + rtc_time[4:7] + (rtc_time[3],"
-                " rtc_time[7], rtc_time[8])",
-                "    clock.datetime = time.struct_time(clock_time)",
-            ])
+            set_time = "\n".join(
+                [
+                    "def set_time(rtc_time):",
+                    "    import rtc",
+                    "    import time",
+                    "    clock = rtc.RTC()",
+                    "    clock_time = rtc_time[:3] + rtc_time[4:7] + (rtc_time[3],"
+                    " rtc_time[7], rtc_time[8])",
+                    "    clock.datetime = time.struct_time(clock_time)",
+                ]
+            )
         elif deviceType in ("bbc_microbit", "calliope"):
             # BBC micro:bit and Calliope mini don't support time commands
             return
         elif deviceType == "rp2040":
             # Raspberry Pi Pico (RP2040) - machine.RTC doesn't exist
-            set_time = "\n".join([
-                "def set_time(rtc_time):",
-                "    setup_0 = rtc_time[0] << 12 | rtc_time[1] << 8 |"
-                " rtc_time[2]",
-                "    setup_1 = (rtc_time[3] % 7) << 24 | rtc_time[4] << 16 |"
-                " rtc_time[5] << 8 | rtc_time[6]",
-                "    machine.mem32[0x4005c004] = setup_0",
-                "    machine.mem32[0x4005c008] = setup_1",
-                "    machine.mem32[0x4005c00c] |= 0x10",
-            ])
+            set_time = "\n".join(
+                [
+                    "def set_time(rtc_time):",
+                    "    setup_0 = rtc_time[0] << 12 | rtc_time[1] << 8 |"
+                    " rtc_time[2]",
+                    "    setup_1 = (rtc_time[3] % 7) << 24 | rtc_time[4] << 16 |"
+                    " rtc_time[5] << 8 | rtc_time[6]",
+                    "    machine.mem32[0x4005c004] = setup_0",
+                    "    machine.mem32[0x4005c008] = setup_1",
+                    "    machine.mem32[0x4005c00c] |= 0x10",
+                ]
+            )
         elif deviceType == "pycom":
             # PyCom's machine.RTC takes its arguments in a slightly
             # different order than the official machine.RTC.
@@ -931,60 +971,73 @@
             # tzinfo]])
             # https://docs.pycom.io/firmwareapi/pycom/machine/rtc/
             # #rtc-init-datetime-none-source-rtc-internal-rc
-            set_time = "\n".join([
-                "def set_time(rtc_time):",
-                "    import pycom",
-                "    rtc_time2 = rtc_time[:3] + rtc_time[4:7]",
-                "    import machine",
-                "    rtc = machine.RTC()",
-                "    rtc.init(rtc_time2)",
-            ])
+            set_time = "\n".join(
+                [
+                    "def set_time(rtc_time):",
+                    "    import pycom",
+                    "    rtc_time2 = rtc_time[:3] + rtc_time[4:7]",
+                    "    import machine",
+                    "    rtc = machine.RTC()",
+                    "    rtc.init(rtc_time2)",
+                ]
+            )
         else:
             # no set_time() support for generic boards
             return
-        
+
         now = time.localtime(time.time())
         commands = [
             set_time,
-            "set_time({0})".format((
-                now.tm_year, now.tm_mon, now.tm_mday, now.tm_wday + 1,
-                now.tm_hour, now.tm_min, now.tm_sec, now.tm_yday, now.tm_isdst
-            )),
+            "set_time({0})".format(
+                (
+                    now.tm_year,
+                    now.tm_mon,
+                    now.tm_mday,
+                    now.tm_wday + 1,
+                    now.tm_hour,
+                    now.tm_min,
+                    now.tm_sec,
+                    now.tm_yday,
+                    now.tm_isdst,
+                )
+            ),
             "del set_time",
         ]
         out, err = self.execute(commands)
         if err:
             raise OSError(self.__shortError(err))
-    
+
     def getTime(self):
         """
         Public method to get the current time of the device.
-        
+
         @return time of the device
         @rtype str
         @exception OSError raised to indicate an issue with the device
         """
         commands = [
-            "\n".join([
-                "try:",
-                "    import rtc as __rtc_",
-                "    print('{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'"
-                ".format(*__rtc_.RTC().datetime[:6]))",
-                "    del __rtc_",
-                "except:",
-                "    import time as __time_",
-                "    try:",
-                "        print(__time_.strftime('%Y-%m-%d %H:%M:%S',"
-                # __IGNORE_WARNING_M601__
-                " __time_.localtime()))",
-                "    except AttributeError:",
-                "        tm = __time_.localtime()",
-                "        print('{0:04d}-{1:02d}-{2:02d}"
-                " {3:02d}:{4:02d}:{5:02d}'"
-                ".format(tm[0], tm[1], tm[2], tm[3], tm[4], tm[5]))",
-                "        del tm",
-                "    del __time_"
-            ]),
+            "\n".join(
+                [
+                    "try:",
+                    "    import rtc as __rtc_",
+                    "    print('{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'"
+                    ".format(*__rtc_.RTC().datetime[:6]))",
+                    "    del __rtc_",
+                    "except:",
+                    "    import time as __time_",
+                    "    try:",
+                    "        print(__time_.strftime('%Y-%m-%d %H:%M:%S',"
+                    # __IGNORE_WARNING_M601__
+                    " __time_.localtime()))",
+                    "    except AttributeError:",
+                    "        tm = __time_.localtime()",
+                    "        print('{0:04d}-{1:02d}-{2:02d}"
+                    " {3:02d}:{4:02d}:{5:02d}'"
+                    ".format(tm[0], tm[1], tm[2], tm[3], tm[4], tm[5]))",
+                    "        del tm",
+                    "    del __time_",
+                ]
+            ),
         ]
         out, err = self.execute(commands)
         if err:

eric ide

mercurial