--- a/src/eric7/MicroPython/MicroPythonCommandsInterface.py Wed Jul 13 11:16:20 2022 +0200 +++ b/src/eric7/MicroPython/MicroPythonCommandsInterface.py Wed Jul 13 14:55:47 2022 +0200 @@ -12,8 +12,13 @@ import os from PyQt6.QtCore import ( - pyqtSlot, pyqtSignal, QObject, QThread, QTimer, QCoreApplication, - QEventLoop + pyqtSlot, + pyqtSignal, + QObject, + QThread, + QTimer, + QCoreApplication, + QEventLoop, ) from .MicroPythonSerialPort import MicroPythonSerialPort @@ -24,7 +29,7 @@ class MicroPythonCommandsInterface(QObject): """ Class implementing some file system commands for MicroPython. - + Commands are provided to perform operations on the file system of a connected MicroPython device. Supported commands are: <ul> @@ -39,7 +44,7 @@ <li>mkdir: create a new directory</li> <li>rmdir: remove an empty directory</li> </ul> - + There are additional commands related to time and version. <ul> <li>version: get version info about MicroPython</li> @@ -47,33 +52,34 @@ <li>syncTime: synchronize the time of the connected device</li> <li>showTime: show the current time of the connected device</li> </ul> - + @signal executeAsyncFinished() emitted to indicate the end of an asynchronously executed list of commands (e.g. a script) @signal dataReceived(data) emitted to send data received via the serial connection for further processing """ + executeAsyncFinished = pyqtSignal() dataReceived = pyqtSignal(bytes) - + def __init__(self, parent=None): """ Constructor - + @param parent reference to the parent object @type QObject """ super().__init__(parent) - + self.__repl = parent - + self.__blockReadyRead = False - + self.__serial = MicroPythonSerialPort( - timeout=Preferences.getMicroPython("SerialTimeout"), - parent=self) + timeout=Preferences.getMicroPython("SerialTimeout"), parent=self + ) self.__serial.readyRead.connect(self.__readSerial) - + @pyqtSlot() def __readSerial(self): """ @@ -83,66 +89,66 @@ if not self.__blockReadyRead: data = bytes(self.__serial.readAll()) self.dataReceived.emit(data) - + @pyqtSlot() def connectToDevice(self, port): """ Public slot to start the manager. - + @param port name of the port to be used @type str @return flag indicating success @rtype bool """ return self.__serial.openSerialLink(port) - + @pyqtSlot() def disconnectFromDevice(self): """ Public slot to stop the thread. """ self.__serial.closeSerialLink() - + def isConnected(self): """ Public method to get the connection status. - + @return flag indicating the connection status @rtype bool """ return self.__serial.isConnected() - + @pyqtSlot() def handlePreferencesChanged(self): """ Public slot to handle a change of the preferences. """ self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout")) - + def write(self, data): """ Public method to write data to the connected device. - + @param data data to be written @type bytes or bytearray """ self.__serial.isConnected() and self.__serial.write(data) - + def __rawOn(self): """ Private method to switch the connected device to 'raw' mode. - + Note: switching to raw mode is done with synchronous writes. - + @return flag indicating success @@rtype bool """ if not self.__serial: return False - + rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>" - - self.__serial.write(b"\x02") # end raw mode if required + + self.__serial.write(b"\x02") # end raw mode if required written = self.__serial.waitForBytesWritten(500) # time out after 500ms if device is not responding if not written: @@ -155,8 +161,8 @@ if not written: return False QThread.msleep(10) - self.__serial.readAll() # read all data and discard it - self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode + self.__serial.readAll() # read all data and discard it + self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode self.__serial.readUntil(rawReplMessage) if self.__serial.hasTimedOut(): # it timed out; try it again and than fail @@ -164,28 +170,29 @@ self.__serial.readUntil(rawReplMessage) if self.__serial.hasTimedOut(): return False - + QCoreApplication.processEvents( - QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents) - self.__serial.readAll() # read all data and discard it + QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents + ) + self.__serial.readAll() # read all data and discard it return True - + def __rawOff(self): """ Private method to switch 'raw' mode off. """ if self.__serial: - self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode + self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode self.__serial.readUntil(b">>> ") # read until Python prompt - self.__serial.readAll() # read all data and discard it - + self.__serial.readAll() # read all data and discard it + def execute(self, commands): """ Public method to send commands to the connected device and return the result. - + If no serial connection is available, empty results will be returned. - + @param commands list of commands to be executed @type str @return tuple containing stdout and stderr output of the device @@ -193,23 +200,20 @@ """ if not self.__serial: return b"", b"" - + if not self.__serial.isConnected(): return b"", b"Device not connected or not switched on." - + result = bytearray() err = b"" - + # switch on raw mode self.__blockReadyRead = True ok = self.__rawOn() if not ok: self.__blockReadyRead = False - return ( - b"", - b"Could not switch to raw mode. Is the device switched on?" - ) - + return (b"", b"Could not switch to raw mode. Is the device switched on?") + # send commands QThread.msleep(10) for command in commands: @@ -217,15 +221,17 @@ commandBytes = command.encode("utf-8") self.__serial.write(commandBytes + b"\x04") QCoreApplication.processEvents( - QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents) + QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents + ) ok = self.__serial.readUntil(b"OK") if ok != b"OK": return ( b"", "Expected 'OK', got '{0}', followed by '{1}'".format( - ok, self.__serial.readAll()).encode("utf-8") + ok, self.__serial.readAll() + ).encode("utf-8"), ) - + # read until prompt response = self.__serial.readUntil(b"\x04>") if self.__serial.hasTimedOut(): @@ -240,25 +246,26 @@ if err: self.__blockReadyRead = False return b"", err - + # switch off raw mode QThread.msleep(10) self.__rawOff() self.__blockReadyRead = False - + return bytes(result), err - + def executeAsync(self, commandsList): """ Public method to execute a series of commands over a period of time without returning any result (asynchronous execution). - + @param commandsList list of commands to be execute on the device @type list of bytes """ + def remainingTask(commands): self.executeAsync(commands) - + if commandsList: command = commandsList[0] self.__serial.write(command) @@ -266,11 +273,11 @@ QTimer.singleShot(2, lambda: remainingTask(remainder)) else: self.executeAsyncFinished.emit() - + def __shortError(self, error): """ Private method to create a shortened error message. - + @param error verbose error message @type bytes @return shortened error message @@ -283,15 +290,15 @@ except Exception: return decodedError return self.tr("Detected an error without indications.") - + ################################################################## ## Methods below implement the file system commands ################################################################## - + def ls(self, dirname=""): """ Public method to get a directory listing of the connected device. - + @param dirname name of the directory to be listed @type str @return tuple containg the directory listing @@ -305,8 +312,8 @@ "print(__os_.listdir())", "del __os_", ] - if self.__repl.isMicrobit() else - [ + if self.__repl.isMicrobit() + else [ "import os as __os_", "print(__os_.listdir('{0}'))".format(dirname), "del __os_", @@ -316,12 +323,12 @@ if err: raise OSError(self.__shortError(err)) return ast.literal_eval(out.decode("utf-8")) - + def lls(self, dirname="", fullstat=False, showHidden=False): """ Public method to get a long directory listing of the connected device including meta data. - + @param dirname name of the directory to be listed @type str @param fullstat flag indicating to return the full stat() tuple @@ -339,53 +346,65 @@ # BBC micro:bit does not support directories [ "import os as __os_", - "\n".join([ - "def is_visible(filename, showHidden):", - " return showHidden or " - "(filename[0] != '.' and filename[-1] != '~')", - ]), - "\n".join([ - "def stat(filename):", - " size = __os_.size(filename)", - " return (0, 0, 0, 0, 0, 0, size, 0, 0, 0)" - ]), - "\n".join([ - "def listdir_stat(showHidden):", - " files = __os_.listdir()", - " return list((f, stat(f)) for f in files if" - " is_visible(f,showHidden))", - ]), + "\n".join( + [ + "def is_visible(filename, showHidden):", + " return showHidden or " + "(filename[0] != '.' and filename[-1] != '~')", + ] + ), + "\n".join( + [ + "def stat(filename):", + " size = __os_.size(filename)", + " return (0, 0, 0, 0, 0, 0, size, 0, 0, 0)", + ] + ), + "\n".join( + [ + "def listdir_stat(showHidden):", + " files = __os_.listdir()", + " return list((f, stat(f)) for f in files if" + " is_visible(f,showHidden))", + ] + ), "print(listdir_stat({0}))".format(showHidden), "del __os_, stat, listdir_stat, is_visible", ] - if self.__repl.isMicrobit() else - [ + if self.__repl.isMicrobit() + else [ "import os as __os_", - "\n".join([ - "def is_visible(filename, showHidden):", - " return showHidden or " - "(filename[0] != '.' and filename[-1] != '~')", - ]), - "\n".join([ - "def stat(filename):", - " try:", - " rstat = __os_.lstat(filename)", - " except:", - " rstat = __os_.stat(filename)", - " return tuple(rstat)", - ]), - "\n".join([ - "def listdir_stat(dirname, showHidden):", - " try:", - " files = __os_.listdir(dirname)", - " except OSError:", - " return []", - " if dirname in ('', '/'):", - " return list((f, stat(f)) for f in files if" - " is_visible(f, showHidden))", - " return list((f, stat(dirname + '/' + f))" - " for f in files if is_visible(f, showHidden))", - ]), + "\n".join( + [ + "def is_visible(filename, showHidden):", + " return showHidden or " + "(filename[0] != '.' and filename[-1] != '~')", + ] + ), + "\n".join( + [ + "def stat(filename):", + " try:", + " rstat = __os_.lstat(filename)", + " except:", + " rstat = __os_.stat(filename)", + " return tuple(rstat)", + ] + ), + "\n".join( + [ + "def listdir_stat(dirname, showHidden):", + " try:", + " files = __os_.listdir(dirname)", + " except OSError:", + " return []", + " if dirname in ('', '/'):", + " return list((f, stat(f)) for f in files if" + " is_visible(f, showHidden))", + " return list((f, stat(dirname + '/' + f))" + " for f in files if is_visible(f, showHidden))", + ] + ), "print(listdir_stat('{0}', {1}))".format(dirname, showHidden), "del __os_, stat, listdir_stat, is_visible", ] @@ -401,11 +420,11 @@ return fileslist else: return [(f, (s[0], s[6], s[8])) for f, s in fileslist] - + def cd(self, dirname): """ Public method to change the current directory on the connected device. - + @param dirname directory to change to @type str @exception OSError raised to indicate an issue with the device @@ -419,11 +438,11 @@ out, err = self.execute(commands) if err: raise OSError(self.__shortError(err)) - + def pwd(self): """ Public method to get the current directory of the connected device. - + @return current directory @rtype str @exception OSError raised to indicate an issue with the device @@ -431,7 +450,7 @@ if self.__repl.isMicrobit(): # BBC micro:bit does not support directories return "" - + commands = [ "import os as __os_", "print(__os_.getcwd())", @@ -441,11 +460,11 @@ if err: raise OSError(self.__shortError(err)) return out.decode("utf-8").strip() - + def rm(self, filename): """ Public method to remove a file from the connected device. - + @param filename name of the file to be removed @type str @exception OSError raised to indicate an issue with the device @@ -459,11 +478,11 @@ out, err = self.execute(commands) if err: raise OSError(self.__shortError(err)) - + def rmrf(self, name, recursive=False, force=False): """ Public method to remove a file or directory recursively. - + @param name of the file or directory to remove @type str @param recursive flag indicating a recursive deletion @@ -477,43 +496,44 @@ if name: commands = [ "import os as __os_", - "\n".join([ - "def remove_file(name, recursive=False, force=False):", - " try:", - " mode = __os_.stat(name)[0]", - " if mode & 0x4000 != 0:", - " if recursive:", - " for file in __os_.listdir(name):", - " success = remove_file(" - "name + '/' + file, recursive, force)", - " if not success and not force:", - " return False", - " __os_.rmdir(name)", - " else:", - " if not force:", - " return False", - " else:", - " __os_.remove(name)", - " except:", - " if not force:", - " return False", - " return True", - ]), - "print(remove_file('{0}', {1}, {2}))".format(name, recursive, - force), + "\n".join( + [ + "def remove_file(name, recursive=False, force=False):", + " try:", + " mode = __os_.stat(name)[0]", + " if mode & 0x4000 != 0:", + " if recursive:", + " for file in __os_.listdir(name):", + " success = remove_file(" + "name + '/' + file, recursive, force)", + " if not success and not force:", + " return False", + " __os_.rmdir(name)", + " else:", + " if not force:", + " return False", + " else:", + " __os_.remove(name)", + " except:", + " if not force:", + " return False", + " return True", + ] + ), + "print(remove_file('{0}', {1}, {2}))".format(name, recursive, force), "del __os_, remove_file", ] out, err = self.execute(commands) if err: raise OSError(self.__shortError(err)) return ast.literal_eval(out.decode("utf-8")) - + return False - + def mkdir(self, dirname): """ Public method to create a new directory. - + @param dirname name of the directory to create @type str @exception OSError raised to indicate an issue with the device @@ -527,11 +547,11 @@ out, err = self.execute(commands) if err: raise OSError(self.__shortError(err)) - + def rmdir(self, dirname): """ Public method to remove a directory. - + @param dirname name of the directory to be removed @type str @exception OSError raised to indicate an issue with the device @@ -545,11 +565,11 @@ out, err = self.execute(commands) if err: raise OSError(self.__shortError(err)) - + def put(self, hostFileName, deviceFileName=None): """ Public method to copy a local file to the connected device. - + @param hostFileName name of the file to be copied @type str @param deviceFileName name of the file to copy to @@ -560,16 +580,16 @@ """ if not os.path.isfile(hostFileName): raise OSError("No such file: {0}".format(hostFileName)) - + with open(hostFileName, "rb") as hostFile: content = hostFile.read() # convert eol '\r' content = content.replace(b"\r\n", b"\r") content = content.replace(b"\n", b"\r") - + if not deviceFileName: deviceFileName = os.path.basename(hostFileName) - + commands = [ "fd = open('{0}', 'wb')".format(deviceFileName), "f = fd.write", @@ -578,20 +598,22 @@ chunk = content[:64] commands.append("f(" + repr(chunk) + ")") content = content[64:] - commands.extend([ - "fd.close()", - "del f, fd", - ]) - + commands.extend( + [ + "fd.close()", + "del f, fd", + ] + ) + out, err = self.execute(commands) if err: raise OSError(self.__shortError(err)) return True - + def get(self, deviceFileName, hostFileName=None): """ Public method to copy a file from the connected device. - + @param deviceFileName name of the file to copy @type str @param hostFileName name of the file to copy to @@ -602,37 +624,39 @@ """ if not hostFileName: hostFileName = deviceFileName - + commands = [ - "\n".join([ - "def send_data():", - " try:", - " from microbit import uart as u", - " except ImportError:", - " try:", - " from machine import UART", - " u = UART(0, {0})".format(115200), - " except Exception:", - " try:", - " from sys import stdout as u", - " except Exception:", - " raise Exception('Could not find UART module" - " in device.')", - " f = open('{0}', 'rb')".format(deviceFileName), - " r = f.read", - " result = True", - " while result:", - " result = r(32)", - " if result:", - " u.write(result)", - " f.close()", - ]), + "\n".join( + [ + "def send_data():", + " try:", + " from microbit import uart as u", + " except ImportError:", + " try:", + " from machine import UART", + " u = UART(0, {0})".format(115200), + " except Exception:", + " try:", + " from sys import stdout as u", + " except Exception:", + " raise Exception('Could not find UART module" + " in device.')", + " f = open('{0}', 'rb')".format(deviceFileName), + " r = f.read", + " result = True", + " while result:", + " result = r(32)", + " if result:", + " u.write(result)", + " f.close()", + ] + ), "send_data()", ] out, err = self.execute(commands) if err: raise OSError(self.__shortError(err)) - + # write the received bytes to the local file # convert eol to "\n" out = out.replace(b"\r\n", b"\n") @@ -640,12 +664,12 @@ with open(hostFileName, "wb") as hostFile: hostFile.write(out) return True - + def fileSystemInfo(self): """ Public method to obtain information about the currently mounted file systems. - + @return tuple of tuples containing the file system name, the total size, the used size and the free size @rtype tuple of tuples of (str, int, int, int) @@ -653,20 +677,22 @@ """ commands = [ "import os as __os_", - "\n".join([ - "def fsinfo():", - " infolist = []", - " info = __os_.statvfs('/')", - " if info[0] == 0:", - # assume it is just mount points - " fsnames = __os_.listdir('/')", - " for fs in fsnames:", - " fs = '/' + fs", - " infolist.append((fs, __os_.statvfs(fs)))", - " else:", - " infolist.append(('/', info))", - " return infolist", - ]), + "\n".join( + [ + "def fsinfo():", + " infolist = []", + " info = __os_.statvfs('/')", + " if info[0] == 0:", + # assume it is just mount points + " fsnames = __os_.listdir('/')", + " for fs in fsnames:", + " fs = '/' + fs", + " infolist.append((fs, __os_.statvfs(fs)))", + " else:", + " infolist.append(('/', info))", + " return infolist", + ] + ), "print(fsinfo())", "del __os_, fsinfo", ] @@ -683,18 +709,18 @@ freeSize = info[4] * info[1] usedSize = totalSize - freeSize filesystemInfos.append((fs, totalSize, usedSize, freeSize)) - + return tuple(filesystemInfos) - + ################################################################## ## non-filesystem related methods below ################################################################## - + def version(self): """ Public method to get the MicroPython version information of the connected device. - + @return dictionary containing the version information @rtype dict @exception OSError raised to indicate an issue with the device @@ -707,7 +733,7 @@ out, err = self.execute(commands) if err: raise OSError(self.__shortError(err)) - + rawOutput = out.decode("utf-8").strip() rawOutput = rawOutput[1:-1] items = rawOutput.split(",") @@ -716,32 +742,36 @@ key, value = item.strip().split("=") result[key.strip()] = value.strip()[1:-1] return result - + def getImplementation(self): """ Public method to get some implementation information of the connected device. - + @return dictionary containing the implementation information @rtype dict @exception OSError raised to indicate an issue with the device """ commands = [ "import sys as __sys_", - "res = {}", # __IGNORE_WARNING_M613__ - "\n".join([ - "try:", - " res['name'] = __sys_.implementation.name", - "except AttributeError:", - " res['name'] = 'unknown'", - ]), - "\n".join([ - "try:", - " res['version'] = '.'.join((str(i) for i in" - " __sys_.implementation.version))", - "except AttributeError:", - " res['version'] = 'unknown'", - ]), + "res = {}", # __IGNORE_WARNING_M613__ + "\n".join( + [ + "try:", + " res['name'] = __sys_.implementation.name", + "except AttributeError:", + " res['name'] = 'unknown'", + ] + ), + "\n".join( + [ + "try:", + " res['version'] = '.'.join((str(i) for i in" + " __sys_.implementation.version))", + "except AttributeError:", + " res['version'] = 'unknown'", + ] + ), "print(res)", "del res, __sys_", ] @@ -749,18 +779,17 @@ if err: raise OSError(self.__shortError(err)) return ast.literal_eval(out.decode("utf-8")) - + def getBoardInformation(self): """ Public method to get some information data of the connected board. - + @return dictionary containing the determined data @rtype dict @exception OSError raised to indicate an issue with the device """ commands = [ - "res = {}", # __IGNORE_WARNING_M613__ - + "res = {}", # __IGNORE_WARNING_M613__ "import gc as __gc_", "__gc_.enable()", "__gc_.collect()", @@ -773,7 +802,6 @@ "res['mem_free_kb'] = mem_free / 1024.0", "res['mem_free_pc'] = mem_free / mem_total * 100.0", "del __gc_, mem_alloc, mem_free, mem_total", - "import os as __os_", "uname = __os_.uname()", "res['sysname'] = uname.sysname", @@ -781,66 +809,70 @@ "res['release'] = uname.release", "res['version'] = uname.version", "res['machine'] = uname.machine", - "import sys as __sys_", "res['py_platform'] = __sys_.platform", "res['py_version'] = __sys_.version", - "\n".join([ - "try:", - " res['mpy_name'] = __sys_.implementation.name", - "except AttributeError:", - " res['mpy_name'] = 'unknown'", - ]), - "\n".join([ - "try:", - " res['mpy_version'] = '.'.join((str(i) for i in" - " __sys_.implementation.version))", - "except AttributeError:", - " res['mpy_version'] = 'unknown'", - ]), - + "\n".join( + [ + "try:", + " res['mpy_name'] = __sys_.implementation.name", + "except AttributeError:", + " res['mpy_name'] = 'unknown'", + ] + ), + "\n".join( + [ + "try:", + " res['mpy_version'] = '.'.join((str(i) for i in" + " __sys_.implementation.version))", + "except AttributeError:", + " res['mpy_version'] = 'unknown'", + ] + ), "stat_ = __os_.statvfs('/flash')", "res['flash_total_kb'] = stat_[2] * stat_[0] / 1024.0", "res['flash_free_kb'] = stat_[3] * stat_[0] / 1024.0", - "res['flash_used_kb'] = res['flash_total_kb'] -" - " res['flash_free_kb']", + "res['flash_used_kb'] = res['flash_total_kb'] -" " res['flash_free_kb']", "res['flash_free_pc'] = res['flash_free_kb'] /" " res['flash_total_kb'] * 100.0", "res['flash_used_pc'] = res['flash_used_kb'] /" " res['flash_total_kb'] * 100.0", - - "\n".join([ - "try:", - " import machine as __mc_", - " res['mc_frequency_mhz'] = __mc_.freq() / 1000000.0", - " res['mc_id'] = ':'.join(['{0:X}'.format(x)" - " for x in __mc_.unique_id()])", - " del __mc_", - "except ImportError:", - "\n".join([ - " try:", - " import microcontroller as __mc_", - " res['mc_frequency_mhz'] = __mc_.cpu.frequency" - " / 1000000.0", - " res['mc_temp_c'] = __mc_.cpu.temperature", - " res['mc_id'] = ':'.join(['{0:X}'.format(x)" - " for x in __mc_.cpu.uid])", - " del __mc_", - " except ImportError:", - " res['mc_frequency'] = None", - " res['mc_temp'] = None", - ]), - ]), - - "\n".join([ - "try:", - " import ulab as __ulab_", - " res['ulab'] = __ulab_.__version__", - " del __ulab_", - "except ImportError:", - " res['ulab'] = None", - ]), - + "\n".join( + [ + "try:", + " import machine as __mc_", + " res['mc_frequency_mhz'] = __mc_.freq() / 1000000.0", + " res['mc_id'] = ':'.join(['{0:X}'.format(x)" + " for x in __mc_.unique_id()])", + " del __mc_", + "except ImportError:", + "\n".join( + [ + " try:", + " import microcontroller as __mc_", + " res['mc_frequency_mhz'] = __mc_.cpu.frequency" + " / 1000000.0", + " res['mc_temp_c'] = __mc_.cpu.temperature", + " res['mc_id'] = ':'.join(['{0:X}'.format(x)" + " for x in __mc_.cpu.uid])", + " del __mc_", + " except ImportError:", + " res['mc_frequency'] = None", + " res['mc_temp'] = None", + ] + ), + ] + ), + "\n".join( + [ + "try:", + " import ulab as __ulab_", + " res['ulab'] = __ulab_.__version__", + " del __ulab_", + "except ImportError:", + " res['ulab'] = None", + ] + ), "print(res)", "del res, stat_, __os_, __sys_", ] @@ -848,12 +880,12 @@ if err: raise OSError(self.__shortError(err)) return ast.literal_eval(out.decode("utf-8")) - + def syncTime(self, deviceType): """ Public method to set the time of the connected device to the local computer's time. - + @param deviceType type of board to sync time to @type str @exception OSError raised to indicate an issue with the device @@ -874,56 +906,64 @@ # subseconds) # http://docs.micropython.org/en/latest/library/pyb.RTC.html # #pyb.RTC.datetime - set_time = "\n".join([ - "def set_time(rtc_time):", - " import pyb", - " rtc = pyb.RTC()", - " rtc.datetime(rtc_time[:7] + (0,))", - ]) + set_time = "\n".join( + [ + "def set_time(rtc_time):", + " import pyb", + " rtc = pyb.RTC()", + " rtc.datetime(rtc_time[:7] + (0,))", + ] + ) elif deviceType == "esp": # The machine.RTC documentation was incorrect and doesn't agree # with the code, so no link is presented here. The order of the # arguments is the same as the pyboard except for LoBo MPy. - set_time = "\n".join([ - "def set_time(rtc_time):", - " import machine", - " rtc = machine.RTC()", - " try:", # ESP8266 may use rtc.datetime() - " rtc.datetime(rtc_time[:7] + (0,))", - " except Exception:", # ESP32 uses rtc.init() - " import os", - " if 'LoBo' in os.uname()[0]:", # LoBo MPy - " clock_time = rtc_time[:3] +" - " rtc_time[4:7] + (rtc_time[3], rtc_time[7])", - " else:", - " clock_time = rtc_time[:7] + (0,)", - " rtc.init(clock_time)", - ]) + set_time = "\n".join( + [ + "def set_time(rtc_time):", + " import machine", + " rtc = machine.RTC()", + " try:", # ESP8266 may use rtc.datetime() + " rtc.datetime(rtc_time[:7] + (0,))", + " except Exception:", # ESP32 uses rtc.init() + " import os", + " if 'LoBo' in os.uname()[0]:", # LoBo MPy + " clock_time = rtc_time[:3] +" + " rtc_time[4:7] + (rtc_time[3], rtc_time[7])", + " else:", + " clock_time = rtc_time[:7] + (0,)", + " rtc.init(clock_time)", + ] + ) elif deviceType == "circuitpython": - set_time = "\n".join([ - "def set_time(rtc_time):", - " import rtc", - " import time", - " clock = rtc.RTC()", - " clock_time = rtc_time[:3] + rtc_time[4:7] + (rtc_time[3]," - " rtc_time[7], rtc_time[8])", - " clock.datetime = time.struct_time(clock_time)", - ]) + set_time = "\n".join( + [ + "def set_time(rtc_time):", + " import rtc", + " import time", + " clock = rtc.RTC()", + " clock_time = rtc_time[:3] + rtc_time[4:7] + (rtc_time[3]," + " rtc_time[7], rtc_time[8])", + " clock.datetime = time.struct_time(clock_time)", + ] + ) elif deviceType in ("bbc_microbit", "calliope"): # BBC micro:bit and Calliope mini don't support time commands return elif deviceType == "rp2040": # Raspberry Pi Pico (RP2040) - machine.RTC doesn't exist - set_time = "\n".join([ - "def set_time(rtc_time):", - " setup_0 = rtc_time[0] << 12 | rtc_time[1] << 8 |" - " rtc_time[2]", - " setup_1 = (rtc_time[3] % 7) << 24 | rtc_time[4] << 16 |" - " rtc_time[5] << 8 | rtc_time[6]", - " machine.mem32[0x4005c004] = setup_0", - " machine.mem32[0x4005c008] = setup_1", - " machine.mem32[0x4005c00c] |= 0x10", - ]) + set_time = "\n".join( + [ + "def set_time(rtc_time):", + " setup_0 = rtc_time[0] << 12 | rtc_time[1] << 8 |" + " rtc_time[2]", + " setup_1 = (rtc_time[3] % 7) << 24 | rtc_time[4] << 16 |" + " rtc_time[5] << 8 | rtc_time[6]", + " machine.mem32[0x4005c004] = setup_0", + " machine.mem32[0x4005c008] = setup_1", + " machine.mem32[0x4005c00c] |= 0x10", + ] + ) elif deviceType == "pycom": # PyCom's machine.RTC takes its arguments in a slightly # different order than the official machine.RTC. @@ -931,60 +971,73 @@ # tzinfo]]) # https://docs.pycom.io/firmwareapi/pycom/machine/rtc/ # #rtc-init-datetime-none-source-rtc-internal-rc - set_time = "\n".join([ - "def set_time(rtc_time):", - " import pycom", - " rtc_time2 = rtc_time[:3] + rtc_time[4:7]", - " import machine", - " rtc = machine.RTC()", - " rtc.init(rtc_time2)", - ]) + set_time = "\n".join( + [ + "def set_time(rtc_time):", + " import pycom", + " rtc_time2 = rtc_time[:3] + rtc_time[4:7]", + " import machine", + " rtc = machine.RTC()", + " rtc.init(rtc_time2)", + ] + ) else: # no set_time() support for generic boards return - + now = time.localtime(time.time()) commands = [ set_time, - "set_time({0})".format(( - now.tm_year, now.tm_mon, now.tm_mday, now.tm_wday + 1, - now.tm_hour, now.tm_min, now.tm_sec, now.tm_yday, now.tm_isdst - )), + "set_time({0})".format( + ( + now.tm_year, + now.tm_mon, + now.tm_mday, + now.tm_wday + 1, + now.tm_hour, + now.tm_min, + now.tm_sec, + now.tm_yday, + now.tm_isdst, + ) + ), "del set_time", ] out, err = self.execute(commands) if err: raise OSError(self.__shortError(err)) - + def getTime(self): """ Public method to get the current time of the device. - + @return time of the device @rtype str @exception OSError raised to indicate an issue with the device """ commands = [ - "\n".join([ - "try:", - " import rtc as __rtc_", - " print('{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'" - ".format(*__rtc_.RTC().datetime[:6]))", - " del __rtc_", - "except:", - " import time as __time_", - " try:", - " print(__time_.strftime('%Y-%m-%d %H:%M:%S'," - # __IGNORE_WARNING_M601__ - " __time_.localtime()))", - " except AttributeError:", - " tm = __time_.localtime()", - " print('{0:04d}-{1:02d}-{2:02d}" - " {3:02d}:{4:02d}:{5:02d}'" - ".format(tm[0], tm[1], tm[2], tm[3], tm[4], tm[5]))", - " del tm", - " del __time_" - ]), + "\n".join( + [ + "try:", + " import rtc as __rtc_", + " print('{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'" + ".format(*__rtc_.RTC().datetime[:6]))", + " del __rtc_", + "except:", + " import time as __time_", + " try:", + " print(__time_.strftime('%Y-%m-%d %H:%M:%S'," + # __IGNORE_WARNING_M601__ + " __time_.localtime()))", + " except AttributeError:", + " tm = __time_.localtime()", + " print('{0:04d}-{1:02d}-{2:02d}" + " {3:02d}:{4:02d}:{5:02d}'" + ".format(tm[0], tm[1], tm[2], tm[3], tm[4], tm[5]))", + " del tm", + " del __time_", + ] + ), ] out, err = self.execute(commands) if err: