--- a/src/eric7/MicroPython/Devices/DeviceBase.py Tue Feb 14 11:09:49 2023 +0100 +++ b/src/eric7/MicroPython/Devices/DeviceBase.py Tue Feb 14 18:10:30 2023 +0100 @@ -8,9 +8,11 @@ class. """ +import ast import contextlib import copy import os +import time from PyQt6.QtCore import QObject, pyqtSlot from PyQt6.QtWidgets import QInputDialog @@ -39,6 +41,7 @@ super().__init__(parent) self._deviceType = deviceType + self._interface = microPythonWidget.deviceInterface() self.microPython = microPythonWidget self._deviceData = {} # dictionary with essential device data @@ -56,7 +59,7 @@ if connected: with contextlib.suppress(OSError): - self._deviceData = self.microPython.commandsInterface().getDeviceData() + self._deviceData = self.__getDeviceData() def getDeviceType(self): """ @@ -97,6 +100,18 @@ ) return False + def hasCircuitPython(self): + """ + Public method to check, if the connected device is flashed with CircuitPython. + + @return flag indicating CircuitPython + @rtype bool + """ + return ( + self.checkDeviceData() + and self._deviceData["mpy_name"].lower() == "circuitpython" + ) + def setButtons(self): """ Public method to enable the supported action buttons. @@ -267,7 +282,7 @@ commands.append(b"\x04") rawOff = [b"\x02", b"\x02"] commandSequence = rawOn + newLine + commands + rawOff - self.microPython.commandsInterface().executeAsync(commandSequence) + self._interface.executeAsync(commandSequence) @pyqtSlot() def handleDataFlood(self): @@ -360,3 +375,716 @@ @rtype list of tuple of (str, str) """ return [] + + ################################################################## + ## Methods below implement the file system commands + ################################################################## + + def _shortError(self, error): + """ + Protected method to create a shortened error message. + + @param error verbose error message + @type bytes + @return shortened error message + @rtype str + """ + if error: + decodedError = error.decode("utf-8") + try: + return decodedError.split["\r\n"][-2] + except Exception: + return decodedError + + return self.tr("Detected an error without indications.") + + def ls(self, dirname=""): + """ + Public method to get a directory listing of the connected device. + + @param dirname name of the directory to be listed + @type str + @return tuple containg the directory listing + @rtype tuple of str + @exception OSError raised to indicate an issue with the device + """ + command = """ +import os as __os_ +print(__os_.listdir('{0}')) +del __os_ +""".format( + dirname + ) + out, err = self._interface.execute(command) + if err: + raise OSError(self._shortError(err)) + return ast.literal_eval(out.decode("utf-8")) + + def lls(self, dirname="", fullstat=False, showHidden=False): + """ + Public method to get a long directory listing of the connected device + including meta data. + + @param dirname name of the directory to be listed + @type str + @param fullstat flag indicating to return the full stat() tuple + @type bool + @param showHidden flag indicating to show hidden files as well + @type bool + @return list containing the directory listing with tuple entries of + the name and and a tuple of mode, size and time (if fullstat is + false) or the complete stat() tuple. 'None' is returned in case the + directory doesn't exist. + @rtype tuple of (str, tuple) + @exception OSError raised to indicate an issue with the device + """ + command = """ +import os as __os_ + +def is_visible(filename, showHidden): + return showHidden or (filename[0] != '.' and filename[-1] != '~') + +def stat(filename): + try: + rstat = __os_.lstat(filename) + except: + rstat = __os_.stat(filename) + return tuple(rstat) + +def listdir_stat(dirname, showHidden): + try: + files = __os_.listdir(dirname) + except OSError: + return [] + if dirname in ('', '/'): + return list((f, stat(f)) for f in files if is_visible(f, showHidden)) + return list( + (f, stat(dirname + '/' + f)) for f in files if is_visible(f, showHidden) + ) + +print(listdir_stat('{0}', {1})) +del __os_, stat, listdir_stat, is_visible +""".format( + dirname, showHidden + ) + out, err = self._interface.execute(command) + if err: + raise OSError(self._shortError(err)) + fileslist = ast.literal_eval(out.decode("utf-8")) + if fileslist is None: + return None + else: + if fullstat: + return fileslist + else: + return [(f, (s[0], s[6], s[8])) for f, s in fileslist] + + def cd(self, dirname): + """ + Public method to change the current directory on the connected device. + + @param dirname directory to change to + @type str + @exception OSError raised to indicate an issue with the device + """ + if dirname: + command = """ +import os as __os_ +__os_.chdir('{0}') +del __os_ +""".format( + dirname + ) + out, err = self._interface.execute(command) + if err: + raise OSError(self._shortError(err)) + + def pwd(self): + """ + Public method to get the current directory of the connected device. + + @return current directory + @rtype str + @exception OSError raised to indicate an issue with the device + """ + command = """ +import os as __os_ +print(__os_.getcwd()) +del __os_ +""" + out, err = self._interface.execute(command) + if err: + raise OSError(self._shortError(err)) + return out.decode("utf-8").strip() + + def rm(self, filename): + """ + Public method to remove a file from the connected device. + + @param filename name of the file to be removed + @type str + @exception OSError raised to indicate an issue with the device + """ + if filename: + command = """ +import os as __os_ +__os_.remove('{0}') +del __os_ +""".format( + filename + ) + out, err = self._interface.execute(command) + if err: + raise OSError(self._shortError(err)) + + def rmrf(self, name, recursive=False, force=False): + """ + Public method to remove a file or directory recursively. + + @param name of the file or directory to remove + @type str + @param recursive flag indicating a recursive deletion + @type bool + @param force flag indicating to ignore errors + @type bool + @return flag indicating success + @rtype bool + @exception OSError raised to indicate an issue with the device + """ + if name: + command = """ +import os as __os_ + +def remove_file(name, recursive=False, force=False): + try: + mode = __os_.stat(name)[0] + if mode & 0x4000 != 0: + if recursive: + for file in __os_.listdir(name): + success = remove_file(name + '/' + file, recursive, force) + if not success and not force: + return False + __os_.rmdir(name) + else: + if not force: + return False + else: + __os_.remove(name) + except: + if not force: + return False + return True + +print(remove_file('{0}', {1}, {2})) +del __os_, remove_file +""".format( + name, recursive, force + ) + out, err = self._interface.execute(command) + if err: + raise OSError(self._shortError(err)) + return ast.literal_eval(out.decode("utf-8")) + + return False + + def mkdir(self, dirname): + """ + Public method to create a new directory. + + @param dirname name of the directory to create + @type str + @exception OSError raised to indicate an issue with the device + """ + if dirname: + command = """ +import os as __os_ +__os_.mkdir('{0}') +del __os_ +""".format( + dirname + ) + out, err = self._interface.execute(command) + if err: + raise OSError(self._shortError(err)) + + def rmdir(self, dirname): + """ + Public method to remove a directory. + + @param dirname name of the directory to be removed + @type str + @exception OSError raised to indicate an issue with the device + """ + if dirname: + command = """ +import os as __os_ +__os_.rmdir('{0}') +del __os_ +""".format( + dirname + ) + out, err = self._interface.execute(command) + if err: + raise OSError(self._shortError(err)) + + def put(self, hostFileName, deviceFileName=None): + """ + Public method to copy a local file to the connected device. + + @param hostFileName name of the file to be copied + @type str + @param deviceFileName name of the file to copy to + @type str + @return flag indicating success + @rtype bool + @exception OSError raised to indicate an issue with the device + """ + if not os.path.isfile(hostFileName): + raise OSError("No such file: {0}".format(hostFileName)) + + if not deviceFileName: + deviceFileName = os.path.basename(hostFileName) + + with open(hostFileName, "rb") as hostFile: + content = hostFile.read() + + return self.putData(deviceFileName, content) + + def putData(self, deviceFileName, content): + """ + Public method to write the given data to the connected device. + + @param deviceFileName name of the file to write to + @type str + @param content data to write + @type bytes + @return flag indicating success + @rtype bool + @exception OSError raised to indicate an issue with the device + """ + if not deviceFileName: + raise OSError("Missing device file name") + + # convert eol '\r' + content = content.replace(b"\r\n", b"\r") + content = content.replace(b"\n", b"\r") + + commands = [ + "fd = open('{0}', 'wb')".format(deviceFileName), + "f = fd.write", + ] + while content: + chunk = content[:64] + commands.append("f(" + repr(chunk) + ")") + content = content[64:] + commands.extend( + [ + "fd.close()", + "del f, fd", + ] + ) + command = "\n".join(commands) + + out, err = self._interface.execute(command) + if err: + raise OSError(self._shortError(err)) + return True + + def get(self, deviceFileName, hostFileName=None): + """ + Public method to copy a file from the connected device. + + @param deviceFileName name of the file to copy + @type str + @param hostFileName name of the file to copy to + @type str + @return flag indicating success + @rtype bool + @exception OSError raised to indicate an issue with the device + """ + if not deviceFileName: + raise OSError("Missing device file name") + + if not hostFileName: + hostFileName = deviceFileName + + out = self.getData(deviceFileName) + with open(hostFileName, "wb") as hostFile: + hostFile.write(out) + + return True + + def getData(self, deviceFileName): + """ + Public method to read data from the connected device. + + @param deviceFileName name of the file to read from + @type str + @return data read from the device + @rtype bytes + @exception OSError raised to indicate an issue with the device + """ + if not deviceFileName: + raise OSError("Missing device file name") + + command = """ +def send_data(): + try: + from microbit import uart as u + except ImportError: + try: + from sys import stdout as u + except ImportError: + try: + from machine import UART + u = UART(0, 115200) + except Exception: + raise Exception('Could not find UART module in device.') + f = open('{0}', 'rb') + r = f.read + result = True + while result: + result = r(32) + if result: + u.write(result) + f.close() + +send_data() +del send_data +""".format( + deviceFileName + ) + out, err = self._interface.execute(command) + if err: + raise OSError(self._shortError(err)) + + # write the received bytes to the local file + # convert eol to "\n" + out = out.replace(b"\r\n", b"\n") + out = out.replace(b"\r", b"\n") + + return out + + def fileSystemInfo(self): + """ + Public method to obtain information about the currently mounted file + systems. + + @return tuple of tuples containing the file system name, the total + size, the used size and the free size + @rtype tuple of tuples of (str, int, int, int) + @exception OSError raised to indicate an issue with the device + """ + command = """ +import os as __os_ + +def fsinfo(): + infolist = [] + info = __os_.statvfs('/') + if info[0] == 0: + fsnames = __os_.listdir('/') + for fs in fsnames: + fs = '/' + fs + infolist.append((fs, __os_.statvfs(fs))) + else: + infolist.append(('/', info)) + return infolist + +print(fsinfo()) +del __os_, fsinfo +""" + out, err = self._interface.execute(command) + if err: + raise OSError(self._shortError(err)) + infolist = ast.literal_eval(out.decode("utf-8")) + if infolist is None: + return None + else: + filesystemInfos = [] + for fs, info in infolist: + totalSize = info[2] * info[1] + freeSize = info[4] * info[1] + usedSize = totalSize - freeSize + filesystemInfos.append((fs, totalSize, usedSize, freeSize)) + + return tuple(filesystemInfos) + + ################################################################## + ## board information related methods below + ################################################################## + + def __getDeviceData(self): + """ + Private method to get some essential data for the connected board. + + @return dictionary containing the determined data + @rtype dict + @exception OSError raised to indicate an issue with the device + """ + command = """ +res = {} + +import os as __os_ +uname = __os_.uname() +res['sysname'] = uname.sysname +res['nodename'] = uname.nodename +res['release'] = uname.release +res['version'] = uname.version +res['machine'] = uname.machine + +import sys as __sys_ +res['py_platform'] = __sys_.platform +res['py_version'] = __sys_.version + +try: + res['mpy_name'] = __sys_.implementation.name +except AttributeError: + res['mpy_name'] = 'unknown' + +try: + res['mpy_version'] = '.'.join((str(i) for i in __sys_.implementation.version)) +except AttributeError: + res['mpy_version'] = 'unknown' + +try: + import pimoroni as __pimoroni_ + res['mpy_variant'] = 'Pimoroni' + del __pimoroni_ +except ImportError: + res['mpy_variant'] = '' + +print(res) +del res, uname, __os_, __sys_ +""" + out, err = self._interface.execute(command) + if err: + raise OSError(self._shortError(err)) + return ast.literal_eval(out.decode("utf-8")) + + def getBoardInformation(self): + """ + Public method to get some information data of the connected board. + + @return dictionary containing the determined data + @rtype dict + @exception OSError raised to indicate an issue with the device + """ + command = """ +res = {} + +import gc as __gc_ +__gc_.enable() +__gc_.collect() +mem_alloc = __gc_.mem_alloc() +mem_free = __gc_.mem_free() +mem_total = mem_alloc + mem_free +res['mem_total_kb'] = mem_total / 1024.0 +res['mem_used_kb'] = mem_alloc / 1024.0 +res['mem_used_pc'] = mem_alloc / mem_total * 100.0 +res['mem_free_kb'] = mem_free / 1024.0 +res['mem_free_pc'] = mem_free / mem_total * 100.0 +del __gc_, mem_alloc, mem_free, mem_total + +import os as __os_ +uname = __os_.uname() +res['sysname'] = uname.sysname +res['nodename'] = uname.nodename +res['release'] = uname.release +res['version'] = uname.version +res['machine'] = uname.machine + +import sys as __sys_ +res['py_platform'] = __sys_.platform +res['py_version'] = __sys_.version + +try: + res['mpy_name'] = __sys_.implementation.name +except AttributeError: + res['mpy_name'] = 'unknown' +try: + res['mpy_version'] = '.'.join((str(i) for i in __sys_.implementation.version)) +except AttributeError: + res['mpy_version'] = 'unknown' +try: + import pimoroni as __pimoroni_ + res['mpy_variant'] = 'Pimoroni' + del __pimoroni_ +except ImportError: + res['mpy_variant'] = '' + +try: + stat_ = __os_.statvfs('/flash') + res['flash_info_available'] = True + res['flash_total_kb'] = stat_[2] * stat_[0] / 1024.0 + res['flash_free_kb'] = stat_[3] * stat_[0] / 1024.0 + res['flash_used_kb'] = res['flash_total_kb'] - res['flash_free_kb'] + res['flash_free_pc'] = res['flash_free_kb'] / res['flash_total_kb'] * 100.0 + res['flash_used_pc'] = res['flash_used_kb'] / res['flash_total_kb'] * 100.0 + del stat_ +except AttributeError: + res['flash_info_available'] = False + +try: + import machine as __mc_ + if isinstance(__mc_.freq(), tuple): + res['mc_frequency_mhz'] = __mc_.freq()[0] / 1000000.0 + else: + res['mc_frequency_mhz'] = __mc_.freq() / 1000000.0 + res['mc_id'] = ':'.join(['{0:X}'.format(x) for x in __mc_.unique_id()]) + del __mc_ +except ImportError: + try: + import microcontroller as __mc_ + res['mc_frequency_mhz'] = __mc_.cpu.frequency / 1000000.0 + res['mc_temp_c'] = __mc_.cpu.temperature + res['mc_id'] = ':'.join(['{0:X}'.format(x) for x in __mc_.cpu.uid]) + del __mc_ + except ImportError: + res['mc_frequency'] = None + res['mc_temp'] = None + +try: + import ulab as __ulab_ + res['ulab'] = __ulab_.__version__ + del __ulab_ +except ImportError: + res['ulab'] = None + +print(res) +del res, __os_, __sys_ +""" + out, err = self._interface.execute(command) + if err: + raise OSError(self._shortError(err)) + return ast.literal_eval(out.decode("utf-8")) + + def getModules(self): + """ + Public method to show a list of modules built into the firmware. + + @return list of builtin modules + @rtype list of str + @exception OSError raised to indicate an issue with the device + """ + commands = ["help('modules')"] + out, err = self._interface.execute(commands) + if err: + raise OSError(self._shortError(err)) + + modules = [] + for line in out.decode("utf-8").splitlines()[:-1]: + modules.extend(line.split()) + return modules + + ################################################################## + ## time related methods below + ################################################################## + + def getTime(self): + """ + Public method to get the current time of the device. + + @return time of the device + @rtype str + @exception OSError raised to indicate an issue with the device + """ + command = """ +try: + import rtc as __rtc_ + print( + '{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}' + .format(*__rtc_.RTC().datetime[:6]) + ) + del __rtc_ +except: + import time as __time_ + try: + print(__time_.strftime('%Y-%m-%d %H:%M:%S', __time_.localtime())) + except AttributeError: + tm = __time_.localtime() + print( + '{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}' + .format(tm[0], tm[1], tm[2], tm[3], tm[4], tm[5]) + ) + del tm + del __time_ +""" + out, err = self._interface.execute(command) + if err: + if b"NotImplementedError" in err: + return "<unsupported> <unsupported>" + raise OSError(self._shortError(err)) + return out.decode("utf-8").strip() + + def _getSetTimeCode(self): + """ + Protected method to get the device code to set the time. + + Note: This method must be implemented in the various device specific + subclasses. + + @return code to be executed on the connected device to set the time + @rtype str + """ + # rtc_time[0] - year 4 digit + # rtc_time[1] - month 1..12 + # rtc_time[2] - day 1..31 + # rtc_time[3] - weekday 1..7 1=Monday + # rtc_time[4] - hour 0..23 + # rtc_time[5] - minute 0..59 + # rtc_time[6] - second 0..59 + # rtc_time[7] - yearday 1..366 + # rtc_time[8] - isdst 0, 1, or -1 + if self.hasCircuitPython(): + # CircuitPython is handled here in order to not duplicate the code in all + # specific boards able to be flashed with CircuitPython or MicroPython + return """ +def set_time(rtc_time): + import rtc + import time + clock = rtc.RTC() + clock_time = rtc_time[:3] + rtc_time[4:7] + (rtc_time[3], rtc_time[7], rtc_time[8]) + clock.datetime = time.struct_time(clock_time) +""" + else: + return "" + + def syncTime(self, deviceType, hasCPy=False): + """ + Public method to set the time of the connected device to the local + computer's time. + + @param deviceType type of board to sync time to + @type str + @param hasCPy flag indicating that the device has CircuitPython loadede + (defaults to False) + @type bool + @exception OSError raised to indicate an issue with the device + """ + setTimeCode = self._getSetTimeCode() + if setTimeCode: + now = time.localtime(time.time()) + command = """{0} +set_time({1}) +del set_time +""".format( + setTimeCode, + ( + now.tm_year, + now.tm_mon, + now.tm_mday, + now.tm_wday + 1, + now.tm_hour, + now.tm_min, + now.tm_sec, + now.tm_yday, + now.tm_isdst, + ), + ) + out, err = self._interface.execute(command) + if err: + raise OSError(self._shortError(err)) + + +# +# eflag: noqa = M613