diff -r 3fc8dfeb6ebe -r b99e7fd55fd3 src/eric7/MicroPython/MicroPythonCommandsInterface.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/eric7/MicroPython/MicroPythonCommandsInterface.py Thu Jul 07 11:23:56 2022 +0200 @@ -0,0 +1,994 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2019 - 2022 Detlev Offenbach <detlev@die-offenbachs.de> +# + +""" +Module implementing some file system commands for MicroPython. +""" + +import ast +import time +import os + +from PyQt6.QtCore import ( + pyqtSlot, pyqtSignal, QObject, QThread, QTimer, QCoreApplication, + QEventLoop +) + +from .MicroPythonSerialPort import MicroPythonSerialPort + +import Preferences + + +class MicroPythonCommandsInterface(QObject): + """ + Class implementing some file system commands for MicroPython. + + Commands are provided to perform operations on the file system of a + connected MicroPython device. Supported commands are: + <ul> + <li>ls: directory listing</li> + <li>lls: directory listing with meta data</li> + <li>cd: change directory</li> + <li>pwd: get the current directory</li> + <li>put: copy a file to the connected device</li> + <li>get: get a file from the connected device</li> + <li>rm: remove a file from the connected device</li> + <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash) + <li>mkdir: create a new directory</li> + <li>rmdir: remove an empty directory</li> + </ul> + + There are additional commands related to time and version. + <ul> + <li>version: get version info about MicroPython</li> + <li>getImplementation: get some implementation information</li> + <li>syncTime: synchronize the time of the connected device</li> + <li>showTime: show the current time of the connected device</li> + </ul> + + @signal executeAsyncFinished() emitted to indicate the end of an + asynchronously executed list of commands (e.g. a script) + @signal dataReceived(data) emitted to send data received via the serial + connection for further processing + """ + executeAsyncFinished = pyqtSignal() + dataReceived = pyqtSignal(bytes) + + def __init__(self, parent=None): + """ + Constructor + + @param parent reference to the parent object + @type QObject + """ + super().__init__(parent) + + self.__repl = parent + + self.__blockReadyRead = False + + self.__serial = MicroPythonSerialPort( + timeout=Preferences.getMicroPython("SerialTimeout"), + parent=self) + self.__serial.readyRead.connect(self.__readSerial) + + @pyqtSlot() + def __readSerial(self): + """ + Private slot to read all available serial data and emit it with the + "dataReceived" signal for further processing. + """ + if not self.__blockReadyRead: + data = bytes(self.__serial.readAll()) + self.dataReceived.emit(data) + + @pyqtSlot() + def connectToDevice(self, port): + """ + Public slot to start the manager. + + @param port name of the port to be used + @type str + @return flag indicating success + @rtype bool + """ + return self.__serial.openSerialLink(port) + + @pyqtSlot() + def disconnectFromDevice(self): + """ + Public slot to stop the thread. + """ + self.__serial.closeSerialLink() + + def isConnected(self): + """ + Public method to get the connection status. + + @return flag indicating the connection status + @rtype bool + """ + return self.__serial.isConnected() + + @pyqtSlot() + def handlePreferencesChanged(self): + """ + Public slot to handle a change of the preferences. + """ + self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout")) + + def write(self, data): + """ + Public method to write data to the connected device. + + @param data data to be written + @type bytes or bytearray + """ + self.__serial.isConnected() and self.__serial.write(data) + + def __rawOn(self): + """ + Private method to switch the connected device to 'raw' mode. + + Note: switching to raw mode is done with synchronous writes. + + @return flag indicating success + @@rtype bool + """ + if not self.__serial: + return False + + rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>" + + self.__serial.write(b"\x02") # end raw mode if required + written = self.__serial.waitForBytesWritten(500) + # time out after 500ms if device is not responding + if not written: + return False + for _i in range(3): + # CTRL-C three times to break out of loops + self.__serial.write(b"\r\x03") + written = self.__serial.waitForBytesWritten(500) + # time out after 500ms if device is not responding + if not written: + return False + QThread.msleep(10) + self.__serial.readAll() # read all data and discard it + self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode + self.__serial.readUntil(rawReplMessage) + if self.__serial.hasTimedOut(): + # it timed out; try it again and than fail + self.__serial.write(b"\r\x01") # send CTRL-A again + self.__serial.readUntil(rawReplMessage) + if self.__serial.hasTimedOut(): + return False + + QCoreApplication.processEvents( + QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents) + self.__serial.readAll() # read all data and discard it + return True + + def __rawOff(self): + """ + Private method to switch 'raw' mode off. + """ + if self.__serial: + self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode + self.__serial.readUntil(b">>> ") # read until Python prompt + self.__serial.readAll() # read all data and discard it + + def execute(self, commands): + """ + Public method to send commands to the connected device and return the + result. + + If no serial connection is available, empty results will be returned. + + @param commands list of commands to be executed + @type str + @return tuple containing stdout and stderr output of the device + @rtype tuple of (bytes, bytes) + """ + if not self.__serial: + return b"", b"" + + if not self.__serial.isConnected(): + return b"", b"Device not connected or not switched on." + + result = bytearray() + err = b"" + + # switch on raw mode + self.__blockReadyRead = True + ok = self.__rawOn() + if not ok: + self.__blockReadyRead = False + return ( + b"", + b"Could not switch to raw mode. Is the device switched on?" + ) + + # send commands + QThread.msleep(10) + for command in commands: + if command: + commandBytes = command.encode("utf-8") + self.__serial.write(commandBytes + b"\x04") + QCoreApplication.processEvents( + QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents) + ok = self.__serial.readUntil(b"OK") + if ok != b"OK": + return ( + b"", + "Expected 'OK', got '{0}', followed by '{1}'".format( + ok, self.__serial.readAll()).encode("utf-8") + ) + + # read until prompt + response = self.__serial.readUntil(b"\x04>") + if self.__serial.hasTimedOut(): + self.__blockReadyRead = False + return b"", b"Timeout while processing commands." + if b"\x04" in response[:-2]: + # split stdout, stderr + out, err = response[:-2].split(b"\x04") + result += out + else: + err = b"invalid response received: " + response + if err: + self.__blockReadyRead = False + return b"", err + + # switch off raw mode + QThread.msleep(10) + self.__rawOff() + self.__blockReadyRead = False + + return bytes(result), err + + def executeAsync(self, commandsList): + """ + Public method to execute a series of commands over a period of time + without returning any result (asynchronous execution). + + @param commandsList list of commands to be execute on the device + @type list of bytes + """ + def remainingTask(commands): + self.executeAsync(commands) + + if commandsList: + command = commandsList[0] + self.__serial.write(command) + remainder = commandsList[1:] + QTimer.singleShot(2, lambda: remainingTask(remainder)) + else: + self.executeAsyncFinished.emit() + + def __shortError(self, error): + """ + Private method to create a shortened error message. + + @param error verbose error message + @type bytes + @return shortened error message + @rtype str + """ + if error: + decodedError = error.decode("utf-8") + try: + return decodedError.split["\r\n"][-2] + except Exception: + return decodedError + return self.tr("Detected an error without indications.") + + ################################################################## + ## Methods below implement the file system commands + ################################################################## + + def ls(self, dirname=""): + """ + Public method to get a directory listing of the connected device. + + @param dirname name of the directory to be listed + @type str + @return tuple containg the directory listing + @rtype tuple of str + @exception OSError raised to indicate an issue with the device + """ + commands = ( + # BBC micro:bit does not support directories + [ + "import os as __os_", + "print(__os_.listdir())", + "del __os_", + ] + if self.__repl.isMicrobit() else + [ + "import os as __os_", + "print(__os_.listdir('{0}'))".format(dirname), + "del __os_", + ] + ) + out, err = self.execute(commands) + if err: + raise OSError(self.__shortError(err)) + return ast.literal_eval(out.decode("utf-8")) + + def lls(self, dirname="", fullstat=False, showHidden=False): + """ + Public method to get a long directory listing of the connected device + including meta data. + + @param dirname name of the directory to be listed + @type str + @param fullstat flag indicating to return the full stat() tuple + @type bool + @param showHidden flag indicating to show hidden files as well + @type bool + @return list containing the directory listing with tuple entries of + the name and and a tuple of mode, size and time (if fullstat is + false) or the complete stat() tuple. 'None' is returned in case the + directory doesn't exist. + @rtype tuple of (str, tuple) + @exception OSError raised to indicate an issue with the device + """ + commands = ( + # BBC micro:bit does not support directories + [ + "import os as __os_", + "\n".join([ + "def is_visible(filename, showHidden):", + " return showHidden or " + "(filename[0] != '.' and filename[-1] != '~')", + ]), + "\n".join([ + "def stat(filename):", + " size = __os_.size(filename)", + " return (0, 0, 0, 0, 0, 0, size, 0, 0, 0)" + ]), + "\n".join([ + "def listdir_stat(showHidden):", + " files = __os_.listdir()", + " return list((f, stat(f)) for f in files if" + " is_visible(f,showHidden))", + ]), + "print(listdir_stat({0}))".format(showHidden), + "del __os_, stat, listdir_stat, is_visible", + ] + if self.__repl.isMicrobit() else + [ + "import os as __os_", + "\n".join([ + "def is_visible(filename, showHidden):", + " return showHidden or " + "(filename[0] != '.' and filename[-1] != '~')", + ]), + "\n".join([ + "def stat(filename):", + " try:", + " rstat = __os_.lstat(filename)", + " except:", + " rstat = __os_.stat(filename)", + " return tuple(rstat)", + ]), + "\n".join([ + "def listdir_stat(dirname, showHidden):", + " try:", + " files = __os_.listdir(dirname)", + " except OSError:", + " return []", + " if dirname in ('', '/'):", + " return list((f, stat(f)) for f in files if" + " is_visible(f, showHidden))", + " return list((f, stat(dirname + '/' + f))" + " for f in files if is_visible(f, showHidden))", + ]), + "print(listdir_stat('{0}', {1}))".format(dirname, showHidden), + "del __os_, stat, listdir_stat, is_visible", + ] + ) + out, err = self.execute(commands) + if err: + raise OSError(self.__shortError(err)) + fileslist = ast.literal_eval(out.decode("utf-8")) + if fileslist is None: + return None + else: + if fullstat: + return fileslist + else: + return [(f, (s[0], s[6], s[8])) for f, s in fileslist] + + def cd(self, dirname): + """ + Public method to change the current directory on the connected device. + + @param dirname directory to change to + @type str + @exception OSError raised to indicate an issue with the device + """ + if dirname: + commands = [ + "import os as __os_", + "__os_.chdir('{0}')".format(dirname), + "del __os_", + ] + out, err = self.execute(commands) + if err: + raise OSError(self.__shortError(err)) + + def pwd(self): + """ + Public method to get the current directory of the connected device. + + @return current directory + @rtype str + @exception OSError raised to indicate an issue with the device + """ + if self.__repl.isMicrobit(): + # BBC micro:bit does not support directories + return "" + + commands = [ + "import os as __os_", + "print(__os_.getcwd())", + "del __os_", + ] + out, err = self.execute(commands) + if err: + raise OSError(self.__shortError(err)) + return out.decode("utf-8").strip() + + def rm(self, filename): + """ + Public method to remove a file from the connected device. + + @param filename name of the file to be removed + @type str + @exception OSError raised to indicate an issue with the device + """ + if filename: + commands = [ + "import os as __os_", + "__os_.remove('{0}')".format(filename), + "del __os_", + ] + out, err = self.execute(commands) + if err: + raise OSError(self.__shortError(err)) + + def rmrf(self, name, recursive=False, force=False): + """ + Public method to remove a file or directory recursively. + + @param name of the file or directory to remove + @type str + @param recursive flag indicating a recursive deletion + @type bool + @param force flag indicating to ignore errors + @type bool + @return flag indicating success + @rtype bool + @exception OSError raised to indicate an issue with the device + """ + if name: + commands = [ + "import os as __os_", + "\n".join([ + "def remove_file(name, recursive=False, force=False):", + " try:", + " mode = __os_.stat(name)[0]", + " if mode & 0x4000 != 0:", + " if recursive:", + " for file in __os_.listdir(name):", + " success = remove_file(" + "name + '/' + file, recursive, force)", + " if not success and not force:", + " return False", + " __os_.rmdir(name)", + " else:", + " if not force:", + " return False", + " else:", + " __os_.remove(name)", + " except:", + " if not force:", + " return False", + " return True", + ]), + "print(remove_file('{0}', {1}, {2}))".format(name, recursive, + force), + "del __os_, remove_file", + ] + out, err = self.execute(commands) + if err: + raise OSError(self.__shortError(err)) + return ast.literal_eval(out.decode("utf-8")) + + return False + + def mkdir(self, dirname): + """ + Public method to create a new directory. + + @param dirname name of the directory to create + @type str + @exception OSError raised to indicate an issue with the device + """ + if dirname: + commands = [ + "import os as __os_", + "__os_.mkdir('{0}')".format(dirname), + "del __os_", + ] + out, err = self.execute(commands) + if err: + raise OSError(self.__shortError(err)) + + def rmdir(self, dirname): + """ + Public method to remove a directory. + + @param dirname name of the directory to be removed + @type str + @exception OSError raised to indicate an issue with the device + """ + if dirname: + commands = [ + "import os as __os_", + "__os_.rmdir('{0}')".format(dirname), + "del __os_", + ] + out, err = self.execute(commands) + if err: + raise OSError(self.__shortError(err)) + + def put(self, hostFileName, deviceFileName=None): + """ + Public method to copy a local file to the connected device. + + @param hostFileName name of the file to be copied + @type str + @param deviceFileName name of the file to copy to + @type str + @return flag indicating success + @rtype bool + @exception OSError raised to indicate an issue with the device + """ + if not os.path.isfile(hostFileName): + raise OSError("No such file: {0}".format(hostFileName)) + + with open(hostFileName, "rb") as hostFile: + content = hostFile.read() + # convert eol '\r' + content = content.replace(b"\r\n", b"\r") + content = content.replace(b"\n", b"\r") + + if not deviceFileName: + deviceFileName = os.path.basename(hostFileName) + + commands = [ + "fd = open('{0}', 'wb')".format(deviceFileName), + "f = fd.write", + ] + while content: + chunk = content[:64] + commands.append("f(" + repr(chunk) + ")") + content = content[64:] + commands.extend([ + "fd.close()", + "del f, fd", + ]) + + out, err = self.execute(commands) + if err: + raise OSError(self.__shortError(err)) + return True + + def get(self, deviceFileName, hostFileName=None): + """ + Public method to copy a file from the connected device. + + @param deviceFileName name of the file to copy + @type str + @param hostFileName name of the file to copy to + @type str + @return flag indicating success + @rtype bool + @exception OSError raised to indicate an issue with the device + """ + if not hostFileName: + hostFileName = deviceFileName + + commands = [ + "\n".join([ + "def send_data():", + " try:", + " from microbit import uart as u", + " except ImportError:", + " try:", + " from machine import UART", + " u = UART(0, {0})".format(115200), + " except Exception:", + " try:", + " from sys import stdout as u", + " except Exception:", + " raise Exception('Could not find UART module" + " in device.')", + " f = open('{0}', 'rb')".format(deviceFileName), + " r = f.read", + " result = True", + " while result:", + " result = r(32)", + " if result:", + " u.write(result)", + " f.close()", + ]), + "send_data()", + ] + out, err = self.execute(commands) + if err: + raise OSError(self.__shortError(err)) + + # write the received bytes to the local file + # convert eol to "\n" + out = out.replace(b"\r\n", b"\n") + out = out.replace(b"\r", b"\n") + with open(hostFileName, "wb") as hostFile: + hostFile.write(out) + return True + + def fileSystemInfo(self): + """ + Public method to obtain information about the currently mounted file + systems. + + @return tuple of tuples containing the file system name, the total + size, the used size and the free size + @rtype tuple of tuples of (str, int, int, int) + @exception OSError raised to indicate an issue with the device + """ + commands = [ + "import os as __os_", + "\n".join([ + "def fsinfo():", + " infolist = []", + " info = __os_.statvfs('/')", + " if info[0] == 0:", + # assume it is just mount points + " fsnames = __os_.listdir('/')", + " for fs in fsnames:", + " fs = '/' + fs", + " infolist.append((fs, __os_.statvfs(fs)))", + " else:", + " infolist.append(('/', info))", + " return infolist", + ]), + "print(fsinfo())", + "del __os_, fsinfo", + ] + out, err = self.execute(commands) + if err: + raise OSError(self.__shortError(err)) + infolist = ast.literal_eval(out.decode("utf-8")) + if infolist is None: + return None + else: + filesystemInfos = [] + for fs, info in infolist: + totalSize = info[2] * info[1] + freeSize = info[4] * info[1] + usedSize = totalSize - freeSize + filesystemInfos.append((fs, totalSize, usedSize, freeSize)) + + return tuple(filesystemInfos) + + ################################################################## + ## non-filesystem related methods below + ################################################################## + + def version(self): + """ + Public method to get the MicroPython version information of the + connected device. + + @return dictionary containing the version information + @rtype dict + @exception OSError raised to indicate an issue with the device + """ + commands = [ + "import os as __os_", + "print(__os_.uname())", + "del __os_", + ] + out, err = self.execute(commands) + if err: + raise OSError(self.__shortError(err)) + + rawOutput = out.decode("utf-8").strip() + rawOutput = rawOutput[1:-1] + items = rawOutput.split(",") + result = {} + for item in items: + key, value = item.strip().split("=") + result[key.strip()] = value.strip()[1:-1] + return result + + def getImplementation(self): + """ + Public method to get some implementation information of the connected + device. + + @return dictionary containing the implementation information + @rtype dict + @exception OSError raised to indicate an issue with the device + """ + commands = [ + "import sys as __sys_", + "res = {}", # __IGNORE_WARNING_M613__ + "\n".join([ + "try:", + " res['name'] = __sys_.implementation.name", + "except AttributeError:", + " res['name'] = 'unknown'", + ]), + "\n".join([ + "try:", + " res['version'] = '.'.join((str(i) for i in" + " __sys_.implementation.version))", + "except AttributeError:", + " res['version'] = 'unknown'", + ]), + "print(res)", + "del res, __sys_", + ] + out, err = self.execute(commands) + if err: + raise OSError(self.__shortError(err)) + return ast.literal_eval(out.decode("utf-8")) + + def getBoardInformation(self): + """ + Public method to get some information data of the connected board. + + @return dictionary containing the determined data + @rtype dict + @exception OSError raised to indicate an issue with the device + """ + commands = [ + "res = {}", # __IGNORE_WARNING_M613__ + + "import gc as __gc_", + "__gc_.enable()", + "__gc_.collect()", + "mem_alloc = __gc_.mem_alloc()", + "mem_free = __gc_.mem_free()", + "mem_total = mem_alloc + mem_free", + "res['mem_total_kb'] = mem_total / 1024.0", + "res['mem_used_kb'] = mem_alloc / 1024.0", + "res['mem_used_pc'] = mem_alloc / mem_total * 100.0", + "res['mem_free_kb'] = mem_free / 1024.0", + "res['mem_free_pc'] = mem_free / mem_total * 100.0", + "del __gc_, mem_alloc, mem_free, mem_total", + + "import os as __os_", + "uname = __os_.uname()", + "res['sysname'] = uname.sysname", + "res['nodename'] = uname.nodename", + "res['release'] = uname.release", + "res['version'] = uname.version", + "res['machine'] = uname.machine", + + "import sys as __sys_", + "res['py_platform'] = __sys_.platform", + "res['py_version'] = __sys_.version", + "\n".join([ + "try:", + " res['mpy_name'] = __sys_.implementation.name", + "except AttributeError:", + " res['mpy_name'] = 'unknown'", + ]), + "\n".join([ + "try:", + " res['mpy_version'] = '.'.join((str(i) for i in" + " __sys_.implementation.version))", + "except AttributeError:", + " res['mpy_version'] = 'unknown'", + ]), + + "stat_ = __os_.statvfs('/flash')", + "res['flash_total_kb'] = stat_[2] * stat_[0] / 1024.0", + "res['flash_free_kb'] = stat_[3] * stat_[0] / 1024.0", + "res['flash_used_kb'] = res['flash_total_kb'] -" + " res['flash_free_kb']", + "res['flash_free_pc'] = res['flash_free_kb'] /" + " res['flash_total_kb'] * 100.0", + "res['flash_used_pc'] = res['flash_used_kb'] /" + " res['flash_total_kb'] * 100.0", + + "\n".join([ + "try:", + " import machine as __mc_", + " res['mc_frequency_mhz'] = __mc_.freq() / 1000000.0", + " res['mc_id'] = ':'.join(['{0:X}'.format(x)" + " for x in __mc_.unique_id()])", + " del __mc_", + "except ImportError:", + "\n".join([ + " try:", + " import microcontroller as __mc_", + " res['mc_frequency_mhz'] = __mc_.cpu.frequency" + " / 1000000.0", + " res['mc_temp_c'] = __mc_.cpu.temperature", + " res['mc_id'] = ':'.join(['{0:X}'.format(x)" + " for x in __mc_.cpu.uid])", + " del __mc_", + " except ImportError:", + " res['mc_frequency'] = None", + " res['mc_temp'] = None", + ]), + ]), + + "\n".join([ + "try:", + " import ulab as __ulab_", + " res['ulab'] = __ulab_.__version__", + " del __ulab_", + "except ImportError:", + " res['ulab'] = None", + ]), + + "print(res)", + "del res, stat_, __os_, __sys_", + ] + out, err = self.execute(commands) + if err: + raise OSError(self.__shortError(err)) + return ast.literal_eval(out.decode("utf-8")) + + def syncTime(self, deviceType): + """ + Public method to set the time of the connected device to the local + computer's time. + + @param deviceType type of board to sync time to + @type str + @exception OSError raised to indicate an issue with the device + """ + # rtc_time[0] - year 4 digit + # rtc_time[1] - month 1..12 + # rtc_time[2] - day 1..31 + # rtc_time[3] - weekday 1..7 1=Monday + # rtc_time[4] - hour 0..23 + # rtc_time[5] - minute 0..59 + # rtc_time[6] - second 0..59 + # rtc_time[7] - yearday 1..366 + # rtc_time[8] - isdst 0, 1, or -1 + if deviceType == "pyboard": + # Pyboard (pyboard doesn't have machine.RTC()). + # The pyb.RTC.datetime function takes the arguments in the + # order: (year, month, day, weekday, hour, minute, second, + # subseconds) + # http://docs.micropython.org/en/latest/library/pyb.RTC.html + # #pyb.RTC.datetime + set_time = "\n".join([ + "def set_time(rtc_time):", + " import pyb", + " rtc = pyb.RTC()", + " rtc.datetime(rtc_time[:7] + (0,))", + ]) + elif deviceType == "esp": + # The machine.RTC documentation was incorrect and doesn't agree + # with the code, so no link is presented here. The order of the + # arguments is the same as the pyboard except for LoBo MPy. + set_time = "\n".join([ + "def set_time(rtc_time):", + " import machine", + " rtc = machine.RTC()", + " try:", # ESP8266 may use rtc.datetime() + " rtc.datetime(rtc_time[:7] + (0,))", + " except Exception:", # ESP32 uses rtc.init() + " import os", + " if 'LoBo' in os.uname()[0]:", # LoBo MPy + " clock_time = rtc_time[:3] +" + " rtc_time[4:7] + (rtc_time[3], rtc_time[7])", + " else:", + " clock_time = rtc_time[:7] + (0,)", + " rtc.init(clock_time)", + ]) + elif deviceType == "circuitpython": + set_time = "\n".join([ + "def set_time(rtc_time):", + " import rtc", + " import time", + " clock = rtc.RTC()", + " clock_time = rtc_time[:3] + rtc_time[4:7] + (rtc_time[3]," + " rtc_time[7], rtc_time[8])", + " clock.datetime = time.struct_time(clock_time)", + ]) + elif deviceType in ("bbc_microbit", "calliope"): + # BBC micro:bit and Calliope mini don't support time commands + return + elif deviceType == "rp2040": + # Raspberry Pi Pico (RP2040) - machine.RTC doesn't exist + set_time = "\n".join([ + "def set_time(rtc_time):", + " setup_0 = rtc_time[0] << 12 | rtc_time[1] << 8 |" + " rtc_time[2]", + " setup_1 = (rtc_time[3] % 7) << 24 | rtc_time[4] << 16 |" + " rtc_time[5] << 8 | rtc_time[6]", + " machine.mem32[0x4005c004] = setup_0", + " machine.mem32[0x4005c008] = setup_1", + " machine.mem32[0x4005c00c] |= 0x10", + ]) + elif deviceType == "pycom": + # PyCom's machine.RTC takes its arguments in a slightly + # different order than the official machine.RTC. + # (year, month, day, hour, minute, second[, microsecond[, + # tzinfo]]) + # https://docs.pycom.io/firmwareapi/pycom/machine/rtc/ + # #rtc-init-datetime-none-source-rtc-internal-rc + set_time = "\n".join([ + "def set_time(rtc_time):", + " import pycom", + " rtc_time2 = rtc_time[:3] + rtc_time[4:7]", + " import machine", + " rtc = machine.RTC()", + " rtc.init(rtc_time2)", + ]) + else: + # no set_time() support for generic boards + return + + now = time.localtime(time.time()) + commands = [ + set_time, + "set_time({0})".format(( + now.tm_year, now.tm_mon, now.tm_mday, now.tm_wday + 1, + now.tm_hour, now.tm_min, now.tm_sec, now.tm_yday, now.tm_isdst + )), + "del set_time", + ] + out, err = self.execute(commands) + if err: + raise OSError(self.__shortError(err)) + + def getTime(self): + """ + Public method to get the current time of the device. + + @return time of the device + @rtype str + @exception OSError raised to indicate an issue with the device + """ + commands = [ + "\n".join([ + "try:", + " import rtc as __rtc_", + " print('{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'" + ".format(*__rtc_.RTC().datetime[:6]))", + " del __rtc_", + "except:", + " import time as __time_", + " try:", + " print(__time_.strftime('%Y-%m-%d %H:%M:%S'," + # __IGNORE_WARNING_M601__ + " __time_.localtime()))", + " except AttributeError:", + " tm = __time_.localtime()", + " print('{0:04d}-{1:02d}-{2:02d}" + " {3:02d}:{4:02d}:{5:02d}'" + ".format(tm[0], tm[1], tm[2], tm[3], tm[4], tm[5]))", + " del tm", + " del __time_" + ]), + ] + out, err = self.execute(commands) + if err: + if b"NotImplementedError" in err: + return "<unsupported> <unsupported>" + raise OSError(self.__shortError(err)) + return out.decode("utf-8").strip()