src/eric7/MicroPython/Devices/DeviceBase.py

branch
eric7
changeset 9765
6378da868bb0
parent 9763
52f982c08301
child 9766
f0e22f3a5878
diff -r 57496966803c -r 6378da868bb0 src/eric7/MicroPython/Devices/DeviceBase.py
--- a/src/eric7/MicroPython/Devices/DeviceBase.py	Tue Feb 14 11:09:49 2023 +0100
+++ b/src/eric7/MicroPython/Devices/DeviceBase.py	Tue Feb 14 18:10:30 2023 +0100
@@ -8,9 +8,11 @@
 class.
 """
 
+import ast
 import contextlib
 import copy
 import os
+import time
 
 from PyQt6.QtCore import QObject, pyqtSlot
 from PyQt6.QtWidgets import QInputDialog
@@ -39,6 +41,7 @@
         super().__init__(parent)
 
         self._deviceType = deviceType
+        self._interface = microPythonWidget.deviceInterface()
         self.microPython = microPythonWidget
         self._deviceData = {}  # dictionary with essential device data
 
@@ -56,7 +59,7 @@
 
         if connected:
             with contextlib.suppress(OSError):
-                self._deviceData = self.microPython.commandsInterface().getDeviceData()
+                self._deviceData = self.__getDeviceData()
 
     def getDeviceType(self):
         """
@@ -97,6 +100,18 @@
             )
             return False
 
+    def hasCircuitPython(self):
+        """
+        Public method to check, if the connected device is flashed with CircuitPython.
+
+        @return flag indicating CircuitPython
+        @rtype bool
+        """
+        return (
+            self.checkDeviceData()
+            and self._deviceData["mpy_name"].lower() == "circuitpython"
+        )
+
     def setButtons(self):
         """
         Public method to enable the supported action buttons.
@@ -267,7 +282,7 @@
         commands.append(b"\x04")
         rawOff = [b"\x02", b"\x02"]
         commandSequence = rawOn + newLine + commands + rawOff
-        self.microPython.commandsInterface().executeAsync(commandSequence)
+        self._interface.executeAsync(commandSequence)
 
     @pyqtSlot()
     def handleDataFlood(self):
@@ -360,3 +375,716 @@
         @rtype list of tuple of (str, str)
         """
         return []
+
+    ##################################################################
+    ## Methods below implement the file system commands
+    ##################################################################
+
+    def _shortError(self, error):
+        """
+        Protected method to create a shortened error message.
+
+        @param error verbose error message
+        @type bytes
+        @return shortened error message
+        @rtype str
+        """
+        if error:
+            decodedError = error.decode("utf-8")
+            try:
+                return decodedError.split["\r\n"][-2]
+            except Exception:
+                return decodedError
+
+        return self.tr("Detected an error without indications.")
+
+    def ls(self, dirname=""):
+        """
+        Public method to get a directory listing of the connected device.
+
+        @param dirname name of the directory to be listed
+        @type str
+        @return tuple containg the directory listing
+        @rtype tuple of str
+        @exception OSError raised to indicate an issue with the device
+        """
+        command = """
+import os as __os_
+print(__os_.listdir('{0}'))
+del __os_
+""".format(
+            dirname
+        )
+        out, err = self._interface.execute(command)
+        if err:
+            raise OSError(self._shortError(err))
+        return ast.literal_eval(out.decode("utf-8"))
+
+    def lls(self, dirname="", fullstat=False, showHidden=False):
+        """
+        Public method to get a long directory listing of the connected device
+        including meta data.
+
+        @param dirname name of the directory to be listed
+        @type str
+        @param fullstat flag indicating to return the full stat() tuple
+        @type bool
+        @param showHidden flag indicating to show hidden files as well
+        @type bool
+        @return list containing the directory listing with tuple entries of
+            the name and and a tuple of mode, size and time (if fullstat is
+            false) or the complete stat() tuple. 'None' is returned in case the
+            directory doesn't exist.
+        @rtype tuple of (str, tuple)
+        @exception OSError raised to indicate an issue with the device
+        """
+        command = """
+import os as __os_
+
+def is_visible(filename, showHidden):
+    return showHidden or (filename[0] != '.' and filename[-1] != '~')
+
+def stat(filename):
+    try:
+        rstat = __os_.lstat(filename)
+    except:
+        rstat = __os_.stat(filename)
+    return tuple(rstat)
+
+def listdir_stat(dirname, showHidden):
+    try:
+        files = __os_.listdir(dirname)
+    except OSError:
+        return []
+    if dirname in ('', '/'):
+        return list((f, stat(f)) for f in files if is_visible(f, showHidden))
+    return list(
+        (f, stat(dirname + '/' + f)) for f in files if is_visible(f, showHidden)
+    )
+
+print(listdir_stat('{0}', {1}))
+del __os_, stat, listdir_stat, is_visible
+""".format(
+            dirname, showHidden
+        )
+        out, err = self._interface.execute(command)
+        if err:
+            raise OSError(self._shortError(err))
+        fileslist = ast.literal_eval(out.decode("utf-8"))
+        if fileslist is None:
+            return None
+        else:
+            if fullstat:
+                return fileslist
+            else:
+                return [(f, (s[0], s[6], s[8])) for f, s in fileslist]
+
+    def cd(self, dirname):
+        """
+        Public method to change the current directory on the connected device.
+
+        @param dirname directory to change to
+        @type str
+        @exception OSError raised to indicate an issue with the device
+        """
+        if dirname:
+            command = """
+import os as __os_
+__os_.chdir('{0}')
+del __os_
+""".format(
+                dirname
+            )
+            out, err = self._interface.execute(command)
+            if err:
+                raise OSError(self._shortError(err))
+
+    def pwd(self):
+        """
+        Public method to get the current directory of the connected device.
+
+        @return current directory
+        @rtype str
+        @exception OSError raised to indicate an issue with the device
+        """
+        command = """
+import os as __os_
+print(__os_.getcwd())
+del __os_
+"""
+        out, err = self._interface.execute(command)
+        if err:
+            raise OSError(self._shortError(err))
+        return out.decode("utf-8").strip()
+
+    def rm(self, filename):
+        """
+        Public method to remove a file from the connected device.
+
+        @param filename name of the file to be removed
+        @type str
+        @exception OSError raised to indicate an issue with the device
+        """
+        if filename:
+            command = """
+import os as __os_
+__os_.remove('{0}')
+del __os_
+""".format(
+                filename
+            )
+            out, err = self._interface.execute(command)
+            if err:
+                raise OSError(self._shortError(err))
+
+    def rmrf(self, name, recursive=False, force=False):
+        """
+        Public method to remove a file or directory recursively.
+
+        @param name of the file or directory to remove
+        @type str
+        @param recursive flag indicating a recursive deletion
+        @type bool
+        @param force flag indicating to ignore errors
+        @type bool
+        @return flag indicating success
+        @rtype bool
+        @exception OSError raised to indicate an issue with the device
+        """
+        if name:
+            command = """
+import os as __os_
+
+def remove_file(name, recursive=False, force=False):
+    try:
+        mode = __os_.stat(name)[0]
+        if mode & 0x4000 != 0:
+            if recursive:
+                for file in __os_.listdir(name):
+                    success = remove_file(name + '/' + file, recursive, force)
+                    if not success and not force:
+                        return False
+                __os_.rmdir(name)
+            else:
+                if not force:
+                    return False
+        else:
+            __os_.remove(name)
+    except:
+        if not force:
+            return False
+    return True
+
+print(remove_file('{0}', {1}, {2}))
+del __os_, remove_file
+""".format(
+                name, recursive, force
+            )
+            out, err = self._interface.execute(command)
+            if err:
+                raise OSError(self._shortError(err))
+            return ast.literal_eval(out.decode("utf-8"))
+
+        return False
+
+    def mkdir(self, dirname):
+        """
+        Public method to create a new directory.
+
+        @param dirname name of the directory to create
+        @type str
+        @exception OSError raised to indicate an issue with the device
+        """
+        if dirname:
+            command = """
+import os as __os_
+__os_.mkdir('{0}')
+del __os_
+""".format(
+                dirname
+            )
+            out, err = self._interface.execute(command)
+            if err:
+                raise OSError(self._shortError(err))
+
+    def rmdir(self, dirname):
+        """
+        Public method to remove a directory.
+
+        @param dirname name of the directory to be removed
+        @type str
+        @exception OSError raised to indicate an issue with the device
+        """
+        if dirname:
+            command = """
+import os as __os_
+__os_.rmdir('{0}')
+del __os_
+""".format(
+                dirname
+            )
+            out, err = self._interface.execute(command)
+            if err:
+                raise OSError(self._shortError(err))
+
+    def put(self, hostFileName, deviceFileName=None):
+        """
+        Public method to copy a local file to the connected device.
+
+        @param hostFileName name of the file to be copied
+        @type str
+        @param deviceFileName name of the file to copy to
+        @type str
+        @return flag indicating success
+        @rtype bool
+        @exception OSError raised to indicate an issue with the device
+        """
+        if not os.path.isfile(hostFileName):
+            raise OSError("No such file: {0}".format(hostFileName))
+
+        if not deviceFileName:
+            deviceFileName = os.path.basename(hostFileName)
+
+        with open(hostFileName, "rb") as hostFile:
+            content = hostFile.read()
+
+        return self.putData(deviceFileName, content)
+
+    def putData(self, deviceFileName, content):
+        """
+        Public method to write the given data to the connected device.
+
+        @param deviceFileName name of the file to write to
+        @type str
+        @param content data to write
+        @type bytes
+        @return flag indicating success
+        @rtype bool
+        @exception OSError raised to indicate an issue with the device
+        """
+        if not deviceFileName:
+            raise OSError("Missing device file name")
+
+        # convert eol '\r'
+        content = content.replace(b"\r\n", b"\r")
+        content = content.replace(b"\n", b"\r")
+
+        commands = [
+            "fd = open('{0}', 'wb')".format(deviceFileName),
+            "f = fd.write",
+        ]
+        while content:
+            chunk = content[:64]
+            commands.append("f(" + repr(chunk) + ")")
+            content = content[64:]
+        commands.extend(
+            [
+                "fd.close()",
+                "del f, fd",
+            ]
+        )
+        command = "\n".join(commands)
+
+        out, err = self._interface.execute(command)
+        if err:
+            raise OSError(self._shortError(err))
+        return True
+
+    def get(self, deviceFileName, hostFileName=None):
+        """
+        Public method to copy a file from the connected device.
+
+        @param deviceFileName name of the file to copy
+        @type str
+        @param hostFileName name of the file to copy to
+        @type str
+        @return flag indicating success
+        @rtype bool
+        @exception OSError raised to indicate an issue with the device
+        """
+        if not deviceFileName:
+            raise OSError("Missing device file name")
+
+        if not hostFileName:
+            hostFileName = deviceFileName
+
+        out = self.getData(deviceFileName)
+        with open(hostFileName, "wb") as hostFile:
+            hostFile.write(out)
+
+        return True
+
+    def getData(self, deviceFileName):
+        """
+        Public method to read data from the connected device.
+
+        @param deviceFileName name of the file to read from
+        @type str
+        @return data read from the device
+        @rtype bytes
+        @exception OSError raised to indicate an issue with the device
+        """
+        if not deviceFileName:
+            raise OSError("Missing device file name")
+
+        command = """
+def send_data():
+    try:
+        from microbit import uart as u
+    except ImportError:
+        try:
+            from sys import stdout as u
+        except ImportError:
+            try:
+                from machine import UART
+                u = UART(0, 115200)
+            except Exception:
+                raise Exception('Could not find UART module in device.')
+    f = open('{0}', 'rb')
+    r = f.read
+    result = True
+    while result:
+        result = r(32)
+        if result:
+            u.write(result)
+    f.close()
+
+send_data()
+del send_data
+""".format(
+            deviceFileName
+        )
+        out, err = self._interface.execute(command)
+        if err:
+            raise OSError(self._shortError(err))
+
+        # write the received bytes to the local file
+        # convert eol to "\n"
+        out = out.replace(b"\r\n", b"\n")
+        out = out.replace(b"\r", b"\n")
+
+        return out
+
+    def fileSystemInfo(self):
+        """
+        Public method to obtain information about the currently mounted file
+        systems.
+
+        @return tuple of tuples containing the file system name, the total
+            size, the used size and the free size
+        @rtype tuple of tuples of (str, int, int, int)
+        @exception OSError raised to indicate an issue with the device
+        """
+        command = """
+import os as __os_
+
+def fsinfo():
+    infolist = []
+    info = __os_.statvfs('/')
+    if info[0] == 0:
+        fsnames = __os_.listdir('/')
+        for fs in fsnames:
+            fs = '/' + fs
+            infolist.append((fs, __os_.statvfs(fs)))
+    else:
+        infolist.append(('/', info))
+    return infolist
+
+print(fsinfo())
+del __os_, fsinfo
+"""
+        out, err = self._interface.execute(command)
+        if err:
+            raise OSError(self._shortError(err))
+        infolist = ast.literal_eval(out.decode("utf-8"))
+        if infolist is None:
+            return None
+        else:
+            filesystemInfos = []
+            for fs, info in infolist:
+                totalSize = info[2] * info[1]
+                freeSize = info[4] * info[1]
+                usedSize = totalSize - freeSize
+                filesystemInfos.append((fs, totalSize, usedSize, freeSize))
+
+        return tuple(filesystemInfos)
+
+    ##################################################################
+    ## board information related methods below
+    ##################################################################
+
+    def __getDeviceData(self):
+        """
+        Private method to get some essential data for the connected board.
+
+        @return dictionary containing the determined data
+        @rtype dict
+        @exception OSError raised to indicate an issue with the device
+        """
+        command = """
+res = {}
+
+import os as __os_
+uname = __os_.uname()
+res['sysname'] = uname.sysname
+res['nodename'] = uname.nodename
+res['release'] = uname.release
+res['version'] = uname.version
+res['machine'] = uname.machine
+
+import sys as __sys_
+res['py_platform'] = __sys_.platform
+res['py_version'] = __sys_.version
+
+try:
+    res['mpy_name'] = __sys_.implementation.name
+except AttributeError:
+    res['mpy_name'] = 'unknown'
+
+try:
+    res['mpy_version'] = '.'.join((str(i) for i in __sys_.implementation.version))
+except AttributeError:
+    res['mpy_version'] = 'unknown'
+
+try:
+    import pimoroni as __pimoroni_
+    res['mpy_variant'] = 'Pimoroni'
+    del __pimoroni_
+except ImportError:
+    res['mpy_variant'] = ''
+
+print(res)
+del res, uname, __os_, __sys_
+"""
+        out, err = self._interface.execute(command)
+        if err:
+            raise OSError(self._shortError(err))
+        return ast.literal_eval(out.decode("utf-8"))
+
+    def getBoardInformation(self):
+        """
+        Public method to get some information data of the connected board.
+
+        @return dictionary containing the determined data
+        @rtype dict
+        @exception OSError raised to indicate an issue with the device
+        """
+        command = """
+res = {}
+
+import gc as __gc_
+__gc_.enable()
+__gc_.collect()
+mem_alloc = __gc_.mem_alloc()
+mem_free = __gc_.mem_free()
+mem_total = mem_alloc + mem_free
+res['mem_total_kb'] = mem_total / 1024.0
+res['mem_used_kb'] = mem_alloc / 1024.0
+res['mem_used_pc'] = mem_alloc / mem_total * 100.0
+res['mem_free_kb'] = mem_free / 1024.0
+res['mem_free_pc'] = mem_free / mem_total * 100.0
+del __gc_, mem_alloc, mem_free, mem_total
+
+import os as __os_
+uname = __os_.uname()
+res['sysname'] = uname.sysname
+res['nodename'] = uname.nodename
+res['release'] = uname.release
+res['version'] = uname.version
+res['machine'] = uname.machine
+
+import sys as __sys_
+res['py_platform'] = __sys_.platform
+res['py_version'] = __sys_.version
+
+try:
+    res['mpy_name'] = __sys_.implementation.name
+except AttributeError:
+    res['mpy_name'] = 'unknown'
+try:
+    res['mpy_version'] = '.'.join((str(i) for i in __sys_.implementation.version))
+except AttributeError:
+    res['mpy_version'] = 'unknown'
+try:
+    import pimoroni as __pimoroni_
+    res['mpy_variant'] = 'Pimoroni'
+    del __pimoroni_
+except ImportError:
+    res['mpy_variant'] = ''
+
+try:
+    stat_ = __os_.statvfs('/flash')
+    res['flash_info_available'] = True
+    res['flash_total_kb'] = stat_[2] * stat_[0] / 1024.0
+    res['flash_free_kb'] = stat_[3] * stat_[0] / 1024.0
+    res['flash_used_kb'] = res['flash_total_kb'] - res['flash_free_kb']
+    res['flash_free_pc'] = res['flash_free_kb'] / res['flash_total_kb'] * 100.0
+    res['flash_used_pc'] = res['flash_used_kb'] / res['flash_total_kb'] * 100.0
+    del stat_
+except AttributeError:
+    res['flash_info_available'] = False
+
+try:
+    import machine as __mc_
+    if isinstance(__mc_.freq(), tuple):
+        res['mc_frequency_mhz'] = __mc_.freq()[0] / 1000000.0
+    else:
+       res['mc_frequency_mhz'] = __mc_.freq() / 1000000.0
+    res['mc_id'] = ':'.join(['{0:X}'.format(x) for x in __mc_.unique_id()])
+    del __mc_
+except ImportError:
+    try:
+        import microcontroller as __mc_
+        res['mc_frequency_mhz'] = __mc_.cpu.frequency / 1000000.0
+        res['mc_temp_c'] = __mc_.cpu.temperature
+        res['mc_id'] = ':'.join(['{0:X}'.format(x) for x in __mc_.cpu.uid])
+        del __mc_
+    except ImportError:
+        res['mc_frequency'] = None
+        res['mc_temp'] = None
+
+try:
+    import ulab as __ulab_
+    res['ulab'] = __ulab_.__version__
+    del __ulab_
+except ImportError:
+    res['ulab'] = None
+
+print(res)
+del res, __os_, __sys_
+"""
+        out, err = self._interface.execute(command)
+        if err:
+            raise OSError(self._shortError(err))
+        return ast.literal_eval(out.decode("utf-8"))
+
+    def getModules(self):
+        """
+        Public method to show a list of modules built into the firmware.
+
+        @return list of builtin modules
+        @rtype list of str
+        @exception OSError raised to indicate an issue with the device
+        """
+        commands = ["help('modules')"]
+        out, err = self._interface.execute(commands)
+        if err:
+            raise OSError(self._shortError(err))
+
+        modules = []
+        for line in out.decode("utf-8").splitlines()[:-1]:
+            modules.extend(line.split())
+        return modules
+
+    ##################################################################
+    ## time related methods below
+    ##################################################################
+
+    def getTime(self):
+        """
+        Public method to get the current time of the device.
+
+        @return time of the device
+        @rtype str
+        @exception OSError raised to indicate an issue with the device
+        """
+        command = """
+try:
+    import rtc as __rtc_
+    print(
+        '{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'
+        .format(*__rtc_.RTC().datetime[:6])
+    )
+    del __rtc_
+except:
+    import time as __time_
+    try:
+        print(__time_.strftime('%Y-%m-%d %H:%M:%S', __time_.localtime()))
+    except AttributeError:
+        tm = __time_.localtime()
+        print(
+            '{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'
+            .format(tm[0], tm[1], tm[2], tm[3], tm[4], tm[5])
+        )
+        del tm
+    del __time_
+"""
+        out, err = self._interface.execute(command)
+        if err:
+            if b"NotImplementedError" in err:
+                return "<unsupported> <unsupported>"
+            raise OSError(self._shortError(err))
+        return out.decode("utf-8").strip()
+
+    def _getSetTimeCode(self):
+        """
+        Protected method to get the device code to set the time.
+
+        Note: This method must be implemented in the various device specific
+        subclasses.
+
+        @return code to be executed on the connected device to set the time
+        @rtype str
+        """
+        # rtc_time[0] - year    4 digit
+        # rtc_time[1] - month   1..12
+        # rtc_time[2] - day     1..31
+        # rtc_time[3] - weekday 1..7 1=Monday
+        # rtc_time[4] - hour    0..23
+        # rtc_time[5] - minute  0..59
+        # rtc_time[6] - second  0..59
+        # rtc_time[7] - yearday 1..366
+        # rtc_time[8] - isdst   0, 1, or -1
+        if self.hasCircuitPython():
+            # CircuitPython is handled here in order to not duplicate the code in all
+            # specific boards able to be flashed with CircuitPython or MicroPython
+            return """
+def set_time(rtc_time):
+    import rtc
+    import time
+    clock = rtc.RTC()
+    clock_time = rtc_time[:3] + rtc_time[4:7] + (rtc_time[3], rtc_time[7], rtc_time[8])
+    clock.datetime = time.struct_time(clock_time)
+"""
+        else:
+            return ""
+
+    def syncTime(self, deviceType, hasCPy=False):
+        """
+        Public method to set the time of the connected device to the local
+        computer's time.
+
+        @param deviceType type of board to sync time to
+        @type str
+        @param hasCPy flag indicating that the device has CircuitPython loadede
+            (defaults to False)
+        @type bool
+        @exception OSError raised to indicate an issue with the device
+        """
+        setTimeCode = self._getSetTimeCode()
+        if setTimeCode:
+            now = time.localtime(time.time())
+            command = """{0}
+set_time({1})
+del set_time
+""".format(
+                setTimeCode,
+                (
+                    now.tm_year,
+                    now.tm_mon,
+                    now.tm_mday,
+                    now.tm_wday + 1,
+                    now.tm_hour,
+                    now.tm_min,
+                    now.tm_sec,
+                    now.tm_yday,
+                    now.tm_isdst,
+                ),
+            )
+            out, err = self._interface.execute(command)
+            if err:
+                raise OSError(self._shortError(err))
+
+
+#
+# eflag: noqa = M613

eric ide

mercurial