diff -r 57496966803c -r 6378da868bb0 src/eric7/MicroPython/MicroPythonCommandsInterface.py --- a/src/eric7/MicroPython/MicroPythonCommandsInterface.py Tue Feb 14 11:09:49 2023 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1163 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (c) 2019 - 2023 Detlev Offenbach <detlev@die-offenbachs.de> -# - -""" -Module implementing some file system commands for MicroPython. -""" - -import ast -import os -import time - -from PyQt6.QtCore import ( - QCoreApplication, - QEventLoop, - QObject, - QThread, - QTimer, - pyqtSignal, - pyqtSlot, -) - -from eric7 import Preferences - -from .MicroPythonSerialPort import MicroPythonSerialPort - - -class MicroPythonCommandsInterface(QObject): - """ - Class implementing some file system commands for MicroPython. - - Commands are provided to perform operations on the file system of a - connected MicroPython device. Supported commands are: - <ul> - <li>ls: directory listing</li> - <li>lls: directory listing with meta data</li> - <li>cd: change directory</li> - <li>pwd: get the current directory</li> - <li>put: copy a file to the connected device</li> - <li>get: get a file from the connected device</li> - <li>rm: remove a file from the connected device</li> - <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash) - <li>mkdir: create a new directory</li> - <li>rmdir: remove an empty directory</li> - </ul> - - There are additional commands related to time and version. - <ul> - <li>version: get version info about MicroPython</li> - <li>getImplementation: get some implementation information</li> - <li>syncTime: synchronize the time of the connected device</li> - <li>showTime: show the current time of the connected device</li> - </ul> - - @signal executeAsyncFinished() emitted to indicate the end of an - asynchronously executed list of commands (e.g. a script) - @signal dataReceived(data) emitted to send data received via the serial - connection for further processing - """ - - executeAsyncFinished = pyqtSignal() - dataReceived = pyqtSignal(bytes) - - def __init__(self, parent=None): - """ - Constructor - - @param parent reference to the parent object - @type QObject - """ - super().__init__(parent) - - self.__repl = parent - - self.__blockReadyRead = False - - self.__serial = MicroPythonSerialPort( - timeout=Preferences.getMicroPython("SerialTimeout"), parent=self - ) - self.__serial.readyRead.connect(self.__readSerial) - - @pyqtSlot() - def __readSerial(self): - """ - Private slot to read all available serial data and emit it with the - "dataReceived" signal for further processing. - """ - if not self.__blockReadyRead: - data = bytes(self.__serial.readAll()) - self.dataReceived.emit(data) - - @pyqtSlot() - def connectToDevice(self, port): - """ - Public slot to start the manager. - - @param port name of the port to be used - @type str - @return flag indicating success - @rtype bool - """ - return self.__serial.openSerialLink(port) - - @pyqtSlot() - def disconnectFromDevice(self): - """ - Public slot to stop the thread. - """ - self.__serial.closeSerialLink() - - def isConnected(self): - """ - Public method to get the connection status. - - @return flag indicating the connection status - @rtype bool - """ - return self.__serial.isConnected() - - @pyqtSlot() - def handlePreferencesChanged(self): - """ - Public slot to handle a change of the preferences. - """ - self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout")) - - def write(self, data): - """ - Public method to write data to the connected device. - - @param data data to be written - @type bytes or bytearray - """ - self.__serial.isConnected() and self.__serial.write(data) - - def __rawOn(self): - """ - Private method to switch the connected device to 'raw' mode. - - Note: switching to raw mode is done with synchronous writes. - - @return flag indicating success - @@rtype bool - """ - if not self.__serial: - return False - - rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>" - - self.__serial.write(b"\x02") # end raw mode if required - written = self.__serial.waitForBytesWritten(500) - # time out after 500ms if device is not responding - if not written: - return False - for _i in range(3): - # CTRL-C three times to break out of loops - self.__serial.write(b"\r\x03") - written = self.__serial.waitForBytesWritten(500) - # time out after 500ms if device is not responding - if not written: - return False - QThread.msleep(10) - self.__serial.readAll() # read all data and discard it - self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode - self.__serial.readUntil(rawReplMessage) - if self.__serial.hasTimedOut(): - # it timed out; try it again and than fail - self.__serial.write(b"\r\x01") # send CTRL-A again - self.__serial.readUntil(rawReplMessage) - if self.__serial.hasTimedOut(): - return False - - QCoreApplication.processEvents( - QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents - ) - self.__serial.readAll() # read all data and discard it - return True - - def __rawOff(self): - """ - Private method to switch 'raw' mode off. - """ - if self.__serial: - self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode - self.__serial.readUntil(b">>> ") # read until Python prompt - self.__serial.readAll() # read all data and discard it - - def probeDevice(self): - """ - Public method to check the device is responding. - - If the device has not been flashed with a MicroPython formware, the - probe will fail. - - @return flag indicating a communicating MicroPython device - @rtype bool - """ - if not self.__serial: - return False - - if not self.__serial.isConnected(): - return False - - # switch on raw mode - self.__blockReadyRead = True - ok = self.__rawOn() - if not ok: - self.__blockReadyRead = False - return False - - # switch off raw mode - QThread.msleep(10) - self.__rawOff() - self.__blockReadyRead = False - - return True - - def execute(self, commands): - """ - Public method to send commands to the connected device and return the - result. - - If no serial connection is available, empty results will be returned. - - @param commands list of commands to be executed - @type str - @return tuple containing stdout and stderr output of the device - @rtype tuple of (bytes, bytes) - """ - if not self.__serial: - return b"", b"" - - if not self.__serial.isConnected(): - return b"", b"Device not connected or not switched on." - - result = bytearray() - err = b"" - - # switch on raw mode - self.__blockReadyRead = True - ok = self.__rawOn() - if not ok: - self.__blockReadyRead = False - return (b"", b"Could not switch to raw mode. Is the device switched on?") - - # send commands - QThread.msleep(10) - for command in commands: - if command: - commandBytes = command.encode("utf-8") - self.__serial.write(commandBytes + b"\x04") - QCoreApplication.processEvents( - QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents - ) - ok = self.__serial.readUntil(b"OK") - if ok != b"OK": - return ( - b"", - "Expected 'OK', got '{0}', followed by '{1}'".format( - ok, self.__serial.readAll() - ).encode("utf-8"), - ) - - # read until prompt - response = self.__serial.readUntil(b"\x04>") - if self.__serial.hasTimedOut(): - self.__blockReadyRead = False - return b"", b"Timeout while processing commands." - if b"\x04" in response[:-2]: - # split stdout, stderr - out, err = response[:-2].split(b"\x04") - result += out - else: - err = b"invalid response received: " + response - if err: - result = b"" - break - - # switch off raw mode - QThread.msleep(10) - self.__rawOff() - self.__blockReadyRead = False - - return bytes(result), err - - def executeAsync(self, commandsList): - """ - Public method to execute a series of commands over a period of time - without returning any result (asynchronous execution). - - @param commandsList list of commands to be execute on the device - @type list of bytes - """ - - if commandsList: - command = commandsList.pop(0) - self.__serial.write(command) - QTimer.singleShot(2, lambda: self.executeAsync(commandsList)) - else: - self.executeAsyncFinished.emit() - - def __shortError(self, error): - """ - Private method to create a shortened error message. - - @param error verbose error message - @type bytes - @return shortened error message - @rtype str - """ - if error: - decodedError = error.decode("utf-8") - try: - return decodedError.split["\r\n"][-2] - except Exception: - return decodedError - return self.tr("Detected an error without indications.") - - # TODO: move these methods to the devices. - ################################################################## - ## Methods below implement the file system commands - ################################################################## - - def ls(self, dirname=""): - """ - Public method to get a directory listing of the connected device. - - @param dirname name of the directory to be listed - @type str - @return tuple containg the directory listing - @rtype tuple of str - @exception OSError raised to indicate an issue with the device - """ - commands = ( - # BBC micro:bit does not support directories - [ - "import os as __os_", - "print(__os_.listdir())", - "del __os_", - ] - if self.__repl.isMicrobit() - else [ - "import os as __os_", - "print(__os_.listdir('{0}'))".format(dirname), - "del __os_", - ] - ) - out, err = self.execute(commands) - if err: - raise OSError(self.__shortError(err)) - return ast.literal_eval(out.decode("utf-8")) - - def lls(self, dirname="", fullstat=False, showHidden=False): - """ - Public method to get a long directory listing of the connected device - including meta data. - - @param dirname name of the directory to be listed - @type str - @param fullstat flag indicating to return the full stat() tuple - @type bool - @param showHidden flag indicating to show hidden files as well - @type bool - @return list containing the directory listing with tuple entries of - the name and and a tuple of mode, size and time (if fullstat is - false) or the complete stat() tuple. 'None' is returned in case the - directory doesn't exist. - @rtype tuple of (str, tuple) - @exception OSError raised to indicate an issue with the device - """ - commands = ( - # BBC micro:bit does not support directories - [ - "import os as __os_", - "\n".join( - [ - "def is_visible(filename, showHidden):", - " return showHidden or " - "(filename[0] != '.' and filename[-1] != '~')", - ] - ), - "\n".join( - [ - "def stat(filename):", - " size = __os_.size(filename)", - " return (0, 0, 0, 0, 0, 0, size, 0, 0, 0)", - ] - ), - "\n".join( - [ - "def listdir_stat(showHidden):", - " files = __os_.listdir()", - " return list((f, stat(f)) for f in files if" - " is_visible(f,showHidden))", - ] - ), - "print(listdir_stat({0}))".format(showHidden), - "del __os_, stat, listdir_stat, is_visible", - ] - if self.__repl.isMicrobit() - else [ - "import os as __os_", - "\n".join( - [ - "def is_visible(filename, showHidden):", - " return showHidden or " - "(filename[0] != '.' and filename[-1] != '~')", - ] - ), - "\n".join( - [ - "def stat(filename):", - " try:", - " rstat = __os_.lstat(filename)", - " except:", - " rstat = __os_.stat(filename)", - " return tuple(rstat)", - ] - ), - "\n".join( - [ - "def listdir_stat(dirname, showHidden):", - " try:", - " files = __os_.listdir(dirname)", - " except OSError:", - " return []", - " if dirname in ('', '/'):", - " return list((f, stat(f)) for f in files if" - " is_visible(f, showHidden))", - " return list((f, stat(dirname + '/' + f))" - " for f in files if is_visible(f, showHidden))", - ] - ), - "print(listdir_stat('{0}', {1}))".format(dirname, showHidden), - "del __os_, stat, listdir_stat, is_visible", - ] - ) - out, err = self.execute(commands) - if err: - raise OSError(self.__shortError(err)) - fileslist = ast.literal_eval(out.decode("utf-8")) - if fileslist is None: - return None - else: - if fullstat: - return fileslist - else: - return [(f, (s[0], s[6], s[8])) for f, s in fileslist] - - def cd(self, dirname): - """ - Public method to change the current directory on the connected device. - - @param dirname directory to change to - @type str - @exception OSError raised to indicate an issue with the device - """ - if dirname: - commands = [ - "import os as __os_", - "__os_.chdir('{0}')".format(dirname), - "del __os_", - ] - out, err = self.execute(commands) - if err: - raise OSError(self.__shortError(err)) - - def pwd(self): - """ - Public method to get the current directory of the connected device. - - @return current directory - @rtype str - @exception OSError raised to indicate an issue with the device - """ - if self.__repl.isMicrobit(): - # BBC micro:bit does not support directories - return "" - - commands = [ - "import os as __os_", - "print(__os_.getcwd())", - "del __os_", - ] - out, err = self.execute(commands) - if err: - raise OSError(self.__shortError(err)) - return out.decode("utf-8").strip() - - def rm(self, filename): - """ - Public method to remove a file from the connected device. - - @param filename name of the file to be removed - @type str - @exception OSError raised to indicate an issue with the device - """ - if filename: - commands = [ - "import os as __os_", - "__os_.remove('{0}')".format(filename), - "del __os_", - ] - out, err = self.execute(commands) - if err: - raise OSError(self.__shortError(err)) - - def rmrf(self, name, recursive=False, force=False): - """ - Public method to remove a file or directory recursively. - - @param name of the file or directory to remove - @type str - @param recursive flag indicating a recursive deletion - @type bool - @param force flag indicating to ignore errors - @type bool - @return flag indicating success - @rtype bool - @exception OSError raised to indicate an issue with the device - """ - if name: - commands = [ - "import os as __os_", - "\n".join( - [ - "def remove_file(name, recursive=False, force=False):", - " try:", - " mode = __os_.stat(name)[0]", - " if mode & 0x4000 != 0:", - " if recursive:", - " for file in __os_.listdir(name):", - " success = remove_file(" - "name + '/' + file, recursive, force)", - " if not success and not force:", - " return False", - " __os_.rmdir(name)", - " else:", - " if not force:", - " return False", - " else:", - " __os_.remove(name)", - " except:", - " if not force:", - " return False", - " return True", - ] - ), - "print(remove_file('{0}', {1}, {2}))".format(name, recursive, force), - "del __os_, remove_file", - ] - out, err = self.execute(commands) - if err: - raise OSError(self.__shortError(err)) - return ast.literal_eval(out.decode("utf-8")) - - return False - - def mkdir(self, dirname): - """ - Public method to create a new directory. - - @param dirname name of the directory to create - @type str - @exception OSError raised to indicate an issue with the device - """ - if dirname: - commands = [ - "import os as __os_", - "__os_.mkdir('{0}')".format(dirname), - "del __os_", - ] - out, err = self.execute(commands) - if err: - raise OSError(self.__shortError(err)) - - def rmdir(self, dirname): - """ - Public method to remove a directory. - - @param dirname name of the directory to be removed - @type str - @exception OSError raised to indicate an issue with the device - """ - if dirname: - commands = [ - "import os as __os_", - "__os_.rmdir('{0}')".format(dirname), - "del __os_", - ] - out, err = self.execute(commands) - if err: - raise OSError(self.__shortError(err)) - - def put(self, hostFileName, deviceFileName=None): - """ - Public method to copy a local file to the connected device. - - @param hostFileName name of the file to be copied - @type str - @param deviceFileName name of the file to copy to - @type str - @return flag indicating success - @rtype bool - @exception OSError raised to indicate an issue with the device - """ - if not os.path.isfile(hostFileName): - raise OSError("No such file: {0}".format(hostFileName)) - - if not deviceFileName: - deviceFileName = os.path.basename(hostFileName) - - with open(hostFileName, "rb") as hostFile: - content = hostFile.read() - - return self.putData(deviceFileName, content) - - def putData(self, deviceFileName, content): - """ - Public method to write the given data to the connected device. - - @param deviceFileName name of the file to write to - @type str - @param content data to write - @type bytes - @return flag indicating success - @rtype bool - @exception OSError raised to indicate an issue with the device - """ - if not deviceFileName: - raise OSError("Missing device file name") - - # convert eol '\r' - content = content.replace(b"\r\n", b"\r") - content = content.replace(b"\n", b"\r") - - commands = [ - "fd = open('{0}', 'wb')".format(deviceFileName), - "f = fd.write", - ] - while content: - chunk = content[:64] - commands.append("f(" + repr(chunk) + ")") - content = content[64:] - commands.extend( - [ - "fd.close()", - "del f, fd", - ] - ) - - out, err = self.execute(commands) - if err: - raise OSError(self.__shortError(err)) - return True - - def get(self, deviceFileName, hostFileName=None): - """ - Public method to copy a file from the connected device. - - @param deviceFileName name of the file to copy - @type str - @param hostFileName name of the file to copy to - @type str - @return flag indicating success - @rtype bool - @exception OSError raised to indicate an issue with the device - """ - if not deviceFileName: - raise OSError("Missing device file name") - - if not hostFileName: - hostFileName = deviceFileName - - out = self.getData(deviceFileName) - with open(hostFileName, "wb") as hostFile: - hostFile.write(out) - - return True - - def getData(self, deviceFileName): - """ - Public method to read data from the connected device. - - @param deviceFileName name of the file to read from - @type str - @return data read from the device - @rtype bytes - @exception OSError raised to indicate an issue with the device - """ - if not deviceFileName: - raise OSError("Missing device file name") - - commands = [ - "\n".join( - [ - "def send_data():", - " try:", - " from microbit import uart as u", - " except ImportError:", - " try:", - " from sys import stdout as u", - " except ImportError:", - " try:", - " from machine import UART", - " u = UART(0, {0})".format(115200), - " except Exception:", - " raise Exception('Could not find UART module" - " in device.')", - " f = open('{0}', 'rb')".format(deviceFileName), - " r = f.read", - " result = True", - " while result:", - " result = r(32)", - " if result:", - " u.write(result)", - " f.close()", - ] - ), - "send_data()", - ] - out, err = self.execute(commands) - if err: - raise OSError(self.__shortError(err)) - - # write the received bytes to the local file - # convert eol to "\n" - out = out.replace(b"\r\n", b"\n") - out = out.replace(b"\r", b"\n") - - return out - - def fileSystemInfo(self): - """ - Public method to obtain information about the currently mounted file - systems. - - @return tuple of tuples containing the file system name, the total - size, the used size and the free size - @rtype tuple of tuples of (str, int, int, int) - @exception OSError raised to indicate an issue with the device - """ - commands = [ - "import os as __os_", - "\n".join( - [ - "def fsinfo():", - " infolist = []", - " info = __os_.statvfs('/')", - " if info[0] == 0:", - # assume it is just mount points - " fsnames = __os_.listdir('/')", - " for fs in fsnames:", - " fs = '/' + fs", - " infolist.append((fs, __os_.statvfs(fs)))", - " else:", - " infolist.append(('/', info))", - " return infolist", - ] - ), - "print(fsinfo())", - "del __os_, fsinfo", - ] - out, err = self.execute(commands) - if err: - raise OSError(self.__shortError(err)) - infolist = ast.literal_eval(out.decode("utf-8")) - if infolist is None: - return None - else: - filesystemInfos = [] - for fs, info in infolist: - totalSize = info[2] * info[1] - freeSize = info[4] * info[1] - usedSize = totalSize - freeSize - filesystemInfos.append((fs, totalSize, usedSize, freeSize)) - - return tuple(filesystemInfos) - - ################################################################## - ## non-filesystem related methods below - ################################################################## - - def getDeviceData(self): - """ - Public method to get some essential data for the connected board. - - @return dictionary containing the determined data - @rtype dict - @exception OSError raised to indicate an issue with the device - """ - commands = [ - "res = {}", # __IGNORE_WARNING_M613__ - "import os as __os_", - "uname = __os_.uname()", - "res['sysname'] = uname.sysname", - "res['nodename'] = uname.nodename", - "res['release'] = uname.release", - "res['version'] = uname.version", - "res['machine'] = uname.machine", - "import sys as __sys_", - "res['py_platform'] = __sys_.platform", - "res['py_version'] = __sys_.version", - "\n".join( - [ - "try:", - " res['mpy_name'] = __sys_.implementation.name", - "except AttributeError:", - " res['mpy_name'] = 'unknown'", - ] - ), - "\n".join( - [ - "try:", - " res['mpy_version'] = '.'.join((str(i) for i in" - " __sys_.implementation.version))", - "except AttributeError:", - " res['mpy_version'] = 'unknown'", - ] - ), - "\n".join( - [ - "try:", - " import pimoroni as __pimoroni_", - " res['mpy_variant'] = 'Pimoroni'", - " del __pimoroni_", - "except ImportError:", - " res['mpy_variant'] = ''", - ] - ), - "print(res)", - "del res, __os_, __sys_", - ] - out, err = self.execute(commands) - if err: - raise OSError(self.__shortError(err)) - return ast.literal_eval(out.decode("utf-8")) - - def getBoardInformation(self): - """ - Public method to get some information data of the connected board. - - @return dictionary containing the determined data - @rtype dict - @exception OSError raised to indicate an issue with the device - """ - commands = [ - "res = {}", # __IGNORE_WARNING_M613__ - "import gc as __gc_", - "__gc_.enable()", - "__gc_.collect()", - "mem_alloc = __gc_.mem_alloc()", - "mem_free = __gc_.mem_free()", - "mem_total = mem_alloc + mem_free", - "res['mem_total_kb'] = mem_total / 1024.0", - "res['mem_used_kb'] = mem_alloc / 1024.0", - "res['mem_used_pc'] = mem_alloc / mem_total * 100.0", - "res['mem_free_kb'] = mem_free / 1024.0", - "res['mem_free_pc'] = mem_free / mem_total * 100.0", - "del __gc_, mem_alloc, mem_free, mem_total", - "import os as __os_", - "uname = __os_.uname()", - "res['sysname'] = uname.sysname", - "res['nodename'] = uname.nodename", - "res['release'] = uname.release", - "res['version'] = uname.version", - "res['machine'] = uname.machine", - "import sys as __sys_", - "res['py_platform'] = __sys_.platform", - "res['py_version'] = __sys_.version", - "\n".join( - [ - "try:", - " res['mpy_name'] = __sys_.implementation.name", - "except AttributeError:", - " res['mpy_name'] = 'unknown'", - ] - ), - "\n".join( - [ - "try:", - " res['mpy_version'] = '.'.join((str(i) for i in" - " __sys_.implementation.version))", - "except AttributeError:", - " res['mpy_version'] = 'unknown'", - ] - ), - "\n".join( - [ - "try:", - " import pimoroni as __pimoroni_", - " res['mpy_variant'] = 'Pimoroni'", - " del __pimoroni_", - "except ImportError:", - " res['mpy_variant'] = ''", - ] - ), - "\n".join( - [ - "try:", - " stat_ = __os_.statvfs('/flash')", - " res['flash_info_available'] = True", - " res['flash_total_kb'] = stat_[2] * stat_[0] / 1024.0", - " res['flash_free_kb'] = stat_[3] * stat_[0] / 1024.0", - " res['flash_used_kb'] = res['flash_total_kb'] -" - " res['flash_free_kb']", - " res['flash_free_pc'] = res['flash_free_kb'] /" - " res['flash_total_kb'] * 100.0", - " res['flash_used_pc'] = res['flash_used_kb'] /" - " res['flash_total_kb'] * 100.0", - " del stat_", - "except AttributeError:", - " res['flash_info_available'] = False", - ] - ), - "\n".join( - [ - "try:", - " import machine as __mc_", - " if isinstance(__mc_.freq(), tuple):", - " res['mc_frequency_mhz'] = __mc_.freq()[0] / 1000000.0", - " else:", - " res['mc_frequency_mhz'] = __mc_.freq() / 1000000.0", - " res['mc_id'] = ':'.join(['{0:X}'.format(x)" - " for x in __mc_.unique_id()])", - " del __mc_", - "except ImportError:", - "\n".join( - [ - " try:", - " import microcontroller as __mc_", - " res['mc_frequency_mhz'] = __mc_.cpu.frequency" - " / 1000000.0", - " res['mc_temp_c'] = __mc_.cpu.temperature", - " res['mc_id'] = ':'.join(['{0:X}'.format(x)" - " for x in __mc_.cpu.uid])", - " del __mc_", - " except ImportError:", - " res['mc_frequency'] = None", - " res['mc_temp'] = None", - ] - ), - ] - ), - "\n".join( - [ - "try:", - " import ulab as __ulab_", - " res['ulab'] = __ulab_.__version__", - " del __ulab_", - "except ImportError:", - " res['ulab'] = None", - ] - ), - "print(res)", - "del res, __os_, __sys_", - ] - out, err = self.execute(commands) - if err: - raise OSError(self.__shortError(err)) - return ast.literal_eval(out.decode("utf-8")) - - def syncTime(self, deviceType, hasCPy=False): - """ - Public method to set the time of the connected device to the local - computer's time. - - @param deviceType type of board to sync time to - @type str - @param hasCPy flag indicating that the device has CircuitPython loadede - (defaults to False) - @type bool - @exception OSError raised to indicate an issue with the device - """ - # rtc_time[0] - year 4 digit - # rtc_time[1] - month 1..12 - # rtc_time[2] - day 1..31 - # rtc_time[3] - weekday 1..7 1=Monday - # rtc_time[4] - hour 0..23 - # rtc_time[5] - minute 0..59 - # rtc_time[6] - second 0..59 - # rtc_time[7] - yearday 1..366 - # rtc_time[8] - isdst 0, 1, or -1 - if deviceType == "circuitpython" or hasCPy: - set_time = "\n".join( - [ - "def set_time(rtc_time):", - " import rtc", - " import time", - " clock = rtc.RTC()", - " clock_time = rtc_time[:3] + rtc_time[4:7] + (rtc_time[3]," - " rtc_time[7], rtc_time[8])", - " clock.datetime = time.struct_time(clock_time)", - ] - ) - elif deviceType == "pyboard": - # Pyboard (pyboard doesn't have machine.RTC()). - # The pyb.RTC.datetime function takes the arguments in the - # order: (year, month, day, weekday, hour, minute, second, - # subseconds) - # http://docs.micropython.org/en/latest/library/pyb.RTC.html - # #pyb.RTC.datetime - set_time = "\n".join( - [ - "def set_time(rtc_time):", - " import pyb", - " rtc = pyb.RTC()", - " rtc.datetime(rtc_time[:7] + (0,))", - ] - ) - elif deviceType == "teensy": - # The pyb.RTC.datetime function takes the arguments in the - # order: (year, month, day, weekday, hour, minute, second, - # subseconds) - # https://docs.micropython.org/en/latest/library/machine.RTC.html - # #machine-rtc - set_time = "\n".join( - [ - "def set_time(rtc_time):", - " import machine", - " rtc = machine.RTC()", - " rtc.init(rtc_time[:7] + (0,))", - ] - ) - elif deviceType == "esp": - # The machine.RTC documentation was incorrect and doesn't agree - # with the code, so no link is presented here. The order of the - # arguments is the same as the pyboard except for LoBo MPy. - set_time = "\n".join( - [ - "def set_time(rtc_time):", - " import machine", - " rtc = machine.RTC()", - " try:", # ESP8266 may use rtc.datetime() - " rtc.datetime(rtc_time[:7] + (0,))", - " except Exception:", # ESP32 uses rtc.init() - " import os", - " if 'LoBo' in os.uname()[0]:", # LoBo MPy - " clock_time = rtc_time[:3] +" - " rtc_time[4:7] + (rtc_time[3], rtc_time[7])", - " else:", - " clock_time = rtc_time[:7] + (0,)", - " rtc.init(clock_time)", - ] - ) - elif deviceType in ("bbc_microbit", "calliope"): - # BBC micro:bit and Calliope mini with MicroPython don't support - # time commands. - return - elif deviceType == "rp2040": - # Raspberry Pi Pico (RP2040) - machine.RTC doesn't exist - set_time = "\n".join( - [ - "def set_time(rtc_time):", - " setup_0 = rtc_time[0] << 12 | rtc_time[1] << 8 |" - " rtc_time[2]", - " setup_1 = (rtc_time[3] % 7) << 24 | rtc_time[4] << 16 |" - " rtc_time[5] << 8 | rtc_time[6]", - " machine.mem32[0x4005c004] = setup_0", - " machine.mem32[0x4005c008] = setup_1", - " machine.mem32[0x4005c00c] |= 0x10", - ] - ) - elif deviceType == "pycom": - # PyCom's machine.RTC takes its arguments in a slightly - # different order than the official machine.RTC. - # (year, month, day, hour, minute, second[, microsecond[, - # tzinfo]]) - # https://docs.pycom.io/firmwareapi/pycom/machine/rtc/ - # #rtc-init-datetime-none-source-rtc-internal-rc - set_time = "\n".join( - [ - "def set_time(rtc_time):", - " import pycom", - " rtc_time2 = rtc_time[:3] + rtc_time[4:7]", - " import machine", - " rtc = machine.RTC()", - " rtc.init(rtc_time2)", - ] - ) - else: - # no set_time() support for generic boards - return - - now = time.localtime(time.time()) - commands = [ - set_time, - "set_time({0})".format( - ( - now.tm_year, - now.tm_mon, - now.tm_mday, - now.tm_wday + 1, - now.tm_hour, - now.tm_min, - now.tm_sec, - now.tm_yday, - now.tm_isdst, - ) - ), - "del set_time", - ] - out, err = self.execute(commands) - if err: - raise OSError(self.__shortError(err)) - - def getTime(self): - """ - Public method to get the current time of the device. - - @return time of the device - @rtype str - @exception OSError raised to indicate an issue with the device - """ - commands = [ - "\n".join( - [ - "try:", - " import rtc as __rtc_", - " print('{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'" - ".format(*__rtc_.RTC().datetime[:6]))", - " del __rtc_", - "except:", - " import time as __time_", - " try:", - " print(__time_.strftime('%Y-%m-%d %H:%M:%S'," - # __IGNORE_WARNING_M601__ - " __time_.localtime()))", - " except AttributeError:", - " tm = __time_.localtime()", - " print('{0:04d}-{1:02d}-{2:02d}" - " {3:02d}:{4:02d}:{5:02d}'" - ".format(tm[0], tm[1], tm[2], tm[3], tm[4], tm[5]))", - " del tm", - " del __time_", - ] - ), - ] - out, err = self.execute(commands) - if err: - if b"NotImplementedError" in err: - return "<unsupported> <unsupported>" - raise OSError(self.__shortError(err)) - return out.decode("utf-8").strip() - - def getModules(self): - """ - Public method to show a list of modules built into the firmware. - - @return list of builtin modules - @rtype list of str - @exception OSError raised to indicate an issue with the device - """ - commands = ["help('modules')"] - out, err = self.execute(commands) - if err: - raise OSError(self.__shortError(err)) - - modules = [] - for line in out.decode("utf-8").splitlines()[:-1]: - modules.extend(line.split()) - return modules