src/eric7/MicroPython/Devices/CircuitPythonDevices.py

branch
mpy_network
changeset 9885
05cbf70e8f10
parent 9881
5ce653f9dac8
child 9898
5bfb3c70e30b
diff -r 7e073ff57760 -r 05cbf70e8f10 src/eric7/MicroPython/Devices/CircuitPythonDevices.py
--- a/src/eric7/MicroPython/Devices/CircuitPythonDevices.py	Sun Mar 12 17:01:54 2023 +0100
+++ b/src/eric7/MicroPython/Devices/CircuitPythonDevices.py	Tue Mar 14 13:16:06 2023 +0100
@@ -18,11 +18,12 @@
 from PyQt6.QtWidgets import QMenu
 
 from eric7 import Globals, Preferences
-from eric7.EricGui.EricOverrideCursor import EricOverrideCursor
+from eric7.EricGui.EricOverrideCursor import EricOverrideCursor, EricOverridenCursor
 from eric7.EricWidgets import EricFileDialog, EricMessageBox
 from eric7.EricWidgets.EricApplication import ericApp
 from eric7.SystemUtilities import FileSystemUtilities
 
+from ..EthernetDialogs import WiznetUtilities
 from ..MicroPythonWidget import HAS_QTCHART
 from . import FirmwareGithubUrls
 from .CircuitPythonUpdater.CircuitPythonUpdaterInterface import (
@@ -40,7 +41,14 @@
     DeviceVolumeName = "CIRCUITPY"
 
     def __init__(
-        self, microPythonWidget, deviceType, boardName, hasWorkspace=True, parent=None
+        self,
+        microPythonWidget,
+        deviceType,
+        boardName,
+        vid=0,
+        pid=0,
+        hasWorkspace=True,
+        parent=None,
     ):
         """
         Constructor
@@ -51,6 +59,10 @@
         @type str
         @param boardName name of the board
         @type str
+        @param vid vendor ID (defaults to 0)
+        @type int (optional)
+        @param pid product ID (defaults to 0)
+        @type int (optional)
         @param hasWorkspace flag indicating that the devices supports access via
             a mounted volume (defaults to True)
         @type bool (optional)
@@ -62,6 +74,7 @@
         self._submitMode = "paste"  # use 'paste' mode to avoid loosing state
 
         self.__boardName = boardName
+        self.__vidpid = (vid, pid)
 
         self.__workspace = self.__findWorkspace() if hasWorkspace else None
 
@@ -69,6 +82,11 @@
 
         self.__createCPyMenu()
 
+        self.__wiznetVidPid = (
+            (0x2E8A, 0x1027),  # WIZnet W5100S-EVB-Pico
+            (0x2E8A, 0x1029),  # WIZnet W5500-EVB-Pico
+        )
+
         self.__securityTranslations = {
             "OPEN": self.tr("open", "open WiFi network"),
             "WEP": "WEP",
@@ -111,6 +129,22 @@
 
         super().setConnected(connected)
 
+        if (
+            connected
+            and not self._deviceData["ethernet"]
+            and self.__vidpid in self.__wiznetVidPid
+        ):
+            with EricOverridenCursor():
+                EricMessageBox.warning(
+                    None,
+                    self.tr("WIZnet 5x00 Ethernet"),
+                    self.tr(
+                        "<p>Support for <b>WIZnet 5x00</b> Ethernet boards could not be"
+                        " detected. Is the module <b>adafruit_wiznet5k</b> installed?"
+                        "</p>"
+                    ),
+                )
+
     def setButtons(self):
         """
         Public method to enable the supported action buttons.
@@ -894,12 +928,13 @@
                 )
                 if not ok:
                     return False, self.tr("Aborted")
-            # step 2: create the auto-connect script (wifi_connect.py)
             try:
                 with open(filename, "w") as f:
                     f.write(contents)
             except OSError as err:
                 return False, str(err)
+
+            # step 2: create the auto-connect script (wifi_connect.py)
             scriptFile = os.path.join(
                 os.path.dirname(__file__), "MCUScripts", "circuitPy7WiFiConnect.py"
             )
@@ -1150,6 +1185,336 @@
         )
 
     ##################################################################
+    ## Methods below implement Ethernet related methods
+    ##################################################################
+
+    def hasEthernet(self):
+        """
+        Public method to check the availability of Ethernet.
+
+        @return tuple containing a flag indicating the availability of Ethernet
+            and the Ethernet type
+        @rtype tuple of (bool, str)
+        @exception OSError raised to indicate an issue with the device
+        """
+        command = """
+def has_eth():
+    try:
+        from adafruit_wiznet5k import adafruit_wiznet5k
+        if hasattr(adafruit_wiznet5k, 'WIZNET5K'):
+            return True, 'cpypicowiz'
+    except ImportError:
+        pass
+
+    return False, ''
+
+print(has_eth())
+del has_eth
+"""
+
+        out, err = self._interface.execute(
+            command, mode=self._submitMode, timeout=10000
+        )
+        if err:
+            raise OSError(self._shortError(err))
+
+        return ast.literal_eval(out.decode("utf-8"))
+
+    def getEthernetStatus(self):
+        """
+        Public method to get Ethernet status data of the connected board.
+
+        @return list of tuples containing the translated status data label and
+            the associated value
+        @rtype list of tuples of (str, str)
+        @exception OSError raised to indicate an issue with the device
+        """
+        command = """{0}
+def ethernet_status():
+    import binascii
+    import json
+
+    w5x00_init()
+
+    res = {{
+        'active': nic.link_status != 0,
+        'connected': nic.link_status == 1 and nic.ifconfig[0] != b'\x00\x00\x00\x00',
+        'ifconfig': (
+            nic.pretty_ip(nic.ifconfig[0]),
+            nic.pretty_ip(nic.ifconfig[1]),
+            nic.pretty_ip(nic.ifconfig[2]),
+            nic.pretty_ip(nic.ifconfig[3]),
+        ),
+        'mac': binascii.hexlify(nic.mac_address, ':').decode(),
+        'chip': nic.chip,
+        'max_sockets': nic.max_sockets,
+    }}
+    print(json.dumps(res))
+
+ethernet_status()
+del ethernet_status, w5x00_init
+""".format(
+            WiznetUtilities.cpyWiznetInit()
+        )
+
+        out, err = self._interface.execute(
+            command, mode=self._submitMode, timeout=10000
+        )
+        if err:
+            raise OSError(self._shortError(err))
+
+        status = []
+        ethStatus = json.loads(out.decode("utf-8"))
+        status.append((self.tr("Active"), self.bool2str(ethStatus["active"])))
+        status.append((self.tr("Connected"), self.bool2str(ethStatus["connected"])))
+        status.append((self.tr("IPv4 Address"), ethStatus["ifconfig"][0]))
+        status.append((self.tr("Netmask"), ethStatus["ifconfig"][1]))
+        status.append((self.tr("Gateway"), ethStatus["ifconfig"][2]))
+        status.append((self.tr("DNS"), ethStatus["ifconfig"][3]))
+        status.append((self.tr("MAC-Address"), ethStatus["mac"]))
+        status.append((self.tr("Chip Type"), ethStatus["chip"]))
+        status.append((self.tr("max. Sockets"), ethStatus["max_sockets"]))
+
+        return status
+
+    def connectToLan(self, config):
+        """
+        Public method to connect the connected device to the LAN.
+
+        Note: The MAC address of the interface is configured with the WIZ
+
+        @param config configuration for the connection (either the string 'dhcp'
+            for a dynamic address or a tuple of four strings with the IPv4 parameters.
+        @type str or tuple of (str, str, str, str)
+        @return tuple containing a flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        """
+        command = """{0}
+def connect_lan(config):
+    from adafruit_wiznet5k import adafruit_wiznet5k
+
+    w5x00_init()
+
+    nic.mac_address = adafruit_wiznet5k._DEFAULT_MAC
+    if config == 'dhcp':
+        nic.set_dhcp(response_timeout=14)
+    else:
+        nic.ifconfig = (
+            nic.unpretty_ip(config[0]),
+            nic.unpretty_ip(config[1]),
+            nic.unpretty_ip(config[2]),
+            tuple(int(a) for a in config[3].split('.')),
+        )
+    print(nic.ifconfig[0] != b'\x00\x00\x00\x00')
+
+connect_lan({1})
+del connect_lan, w5x00_init
+""".format(
+            WiznetUtilities.cpyWiznetInit(), "'dhcp'" if config == "dhcp" else config
+        )
+
+        with EricOverrideCursor():
+            out, err = self._interface.execute(
+                command, mode=self._submitMode, timeout=15000
+            )
+        if err:
+            return False, err
+
+        return out.strip() == b"True", ""
+
+    def disconnectFromLan(self):
+        """
+        Public method  to disconnect from the LAN.
+
+        @return tuple containing a flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        """
+        command = """{0}
+def disconnect_lan():
+    import time
+
+    w5x00_init()
+
+    nic.sw_reset()
+    time.sleep(1)
+    print(nic.ifconfig[0] == b'\x00\x00\x00\x00')
+
+disconnect_lan()
+del disconnect_lan, w5x00_init
+""".format(
+            WiznetUtilities.cpyWiznetInit(),
+        )
+
+        with EricOverrideCursor():
+            out, err = self._interface.execute(
+                command, mode=self._submitMode, timeout=15000
+            )
+        if err:
+            return False, err
+
+        return out.strip() == b"True", ""
+
+    def checkInternetViaLan(self):
+        """
+        Public method to check, if the internet can be reached (LAN variant).
+
+        @return tuple containing a flag indicating reachability and an error string
+        @rtype tuple of (bool, str)
+        """
+        command = """{0}
+def check_internet():
+    w5x00_init()
+
+    if nic.ifconfig[0] != b'\x00\x00\x00\x00':
+        sock = nic.get_socket()
+        try:
+            nic.socket_connect(sock, nic.get_host_by_name('quad9.net'), 80)
+            nic.socket_disconnect(sock)
+            print(True)
+        except:
+            print(False)
+        nic.socket_close(sock)
+    else:
+        print(False)
+
+check_internet()
+del check_internet, w5x00_init
+""".format(
+            WiznetUtilities.cpyWiznetInit(),
+        )
+
+        out, err = self._interface.execute(
+            command, mode=self._submitMode, timeout=15000
+        )
+        if err:
+            return False, err
+
+        return out.strip() == b"True", ""
+
+    def deactivateEthernet(self):
+        """
+        Public method to deactivate the Ethernet interface of the connected device.
+
+        @return tuple containg a flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        """
+        # The WIZnet 5x00 interface cannot be switched off explicitly. That means,
+        # disconnect from the LAN is all we can do.
+
+        return self.disconnectFromLan()
+
+    def writeLanAutoConnect(self, config):
+        """
+        Public method to generate a script and associated configuration to connect the
+        device to the LAN during boot time.
+
+        @param config configuration for the connection (either the string 'dhcp'
+            for a dynamic address or a tuple of four strings with the IPv4 parameters.
+        @type str or tuple of (str, str, str, str)
+        @return tuple containing a flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        """
+        if not self.__deviceVolumeMounted():
+            return False, self.tr("The device volume is not available.")
+
+        workspace = self.getWorkspace()
+
+        if Globals.versionToTuple(self._deviceData["release"]) >= (8, 0, 0):
+            # CircuitPython >= 8.0.0: generate 'settings.toml' file
+            newConfig = (
+                {
+                    "WIZNET_IFCONFIG_0": '"dhcp"',
+                    "WIZNET_IFCONFIG_1": "",
+                    "WIZNET_IFCONFIG_2": "",
+                    "WIZNET_IFCONFIG_3": "",
+                }
+                if config == "dhcp"
+                else {
+                    "WIZNET_IFCONFIG_0": '"{0}"'.format(config[0]),
+                    "WIZNET_IFCONFIG_1": '"{0}"'.format(config[1]),
+                    "WIZNET_IFCONFIG_2": '"{0}"'.format(config[2]),
+                    "WIZNET_IFCONFIG_3": '"{0}"'.format(config[3]),
+                }
+            )
+            ok, err = self.__modifySettings(newConfig)
+            if not ok:
+                return False, err
+
+            scriptFile = os.path.join(
+                os.path.dirname(__file__), "MCUScripts", "picoWiznetConnectCpy8.py"
+            )
+
+        else:
+            # step 1: generate the wiznet_config.py file
+            ifconfig = "ifconfig = {0}\n".format(
+                "'dhcp'" if config == "dhcp" else config
+            )
+            filename = os.path.join(workspace, "wiznet_config.py")
+            if os.path.exists(filename):
+                ok = EricMessageBox.yesNo(
+                    None,
+                    self.tr("Write Connect Script"),
+                    self.tr(
+                        """<p>The file <b>{0}</b> exists already. Shall it be"""
+                        """ replaced?</p>"""
+                    ).format(filename),
+                    icon=EricMessageBox.Warning,
+                )
+                if not ok:
+                    return False, self.tr("Aborted")
+            try:
+                with open(filename, "w") as f:
+                    f.write(ifconfig)
+            except OSError as err:
+                return False, str(err)
+
+            scriptFile = os.path.join(
+                os.path.dirname(__file__), "MCUScripts", "picoWiznetConnectCpy7.py"
+            )
+
+        # step 2: create the auto-connect script (wiznet_connect.py)
+        targetFile = os.path.join(workspace, "wiznet_connect.py")
+        try:
+            shutil.copy2(scriptFile, targetFile)
+        except OSError as err:
+            return False, str(err)
+        # Note: code.py will not be modified because the connection will be
+        #       reset anyway
+        return True, ""
+
+    def removeLanAutoConnect(self):
+        """
+        Public method to remove the saved IPv4 parameters from the connected device.
+
+        Note: This disables the LAN auto-connect feature.
+
+        @return tuple containing a flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        """
+        if not self.__deviceVolumeMounted():
+            return False, self.tr("The device volume is not available.")
+
+        workspace = self.getWorkspace()
+
+        if Globals.versionToTuple(self._deviceData["release"]) >= (8, 0, 0):
+            # CircuitPython >= 8.0.0: generate 'settings.toml' file
+            newConfig = {
+                "WIZNET_IFCONFIG_0": "",
+                "WIZNET_IFCONFIG_1": "",
+                "WIZNET_IFCONFIG_2": "",
+                "WIZNET_IFCONFIG_3": "",
+            }
+            self.__modifySettings(newConfig)
+
+        for name in ("wiznet_config.py", "wiznet_connect.py"):
+            filename = os.path.join(workspace, name)
+            if os.path.exists(filename):
+                os.remove(filename)
+
+        return True, ""
+        # TODO: not implemented yet
+
+    ##################################################################
     ## Methods below implement Bluetooth related methods
     ##################################################################
 
@@ -1387,6 +1752,13 @@
     except ImportError:
         pass
 
+    try:
+        from adafruit_wiznet5k import adafruit_wiznet5k_ntp
+        if hasattr(adafruit_wiznet5k_ntp, 'NTP'):
+            return True
+    except ImportError:
+        pass
+
     return False
 
 print(has_ntp())
@@ -1413,7 +1785,43 @@
         @return tuple containing a flag indicating success and an error string
         @rtype tuple of (bool, str)
         """
-        command = """
+        if self.getDeviceData("ethernet"):
+            # WIZnet 5x00 Ethernet interface
+            # Note: The Adafruit NTP implementation does not close the socket after
+            #       calling get_time(). That causes follow-on calls to fail. We
+            #       close the socket in our code as a workaround.
+            command = """{0}
+def set_ntp_time(server, tz_offset):
+    import rtc
+
+    from adafruit_wiznet5k import adafruit_wiznet5k_ntp
+
+    w5x00_init()
+
+    server_ip = nic.pretty_ip(nic.get_host_by_name(server))
+    ntp = adafruit_wiznet5k_ntp.NTP(iface=nic, ntp_address=server_ip, utc=tz_offset)
+    rtc.RTC().datetime = ntp.get_time()
+    ntp._sock.close()
+    return True
+
+try:
+    print({{
+        'result': set_ntp_time({1}, {2}),
+        'error': '',
+    }})
+except Exception as err:
+    print({{
+        'result': False,
+        'error': str(err),
+    }})
+del set_ntp_time, w5x00_init
+""".format(
+                WiznetUtilities.cpyWiznetInit(), repr(server), tzOffset
+            )
+
+        elif self.getDeviceData("wifi"):
+            # WiFi enabled board
+            command = """
 def set_ntp_time(server, tz_offset, timeout):
     import rtc
     import socketpool
@@ -1445,8 +1853,9 @@
     }})
 del set_ntp_time
 """.format(
-            repr(server), tzOffset, timeout
-        )
+                repr(server), tzOffset, timeout
+            )
+
         out, err = self._interface.execute(
             command, mode=self._submitMode, timeout=(timeout + 2) * 1000
         )
@@ -1456,6 +1865,51 @@
             res = ast.literal_eval(out.decode("utf-8"))
             return res["result"], res["error"]
 
+    ##################################################################
+    ## Methods below implement some utility methods
+    ##################################################################
+
+    def __modifySettings(self, changedEntries):
+        """
+        Private method to modify the 'settings.toml' file as of CircuitPython 8.0.0.
+
+        @param changedEntries dictionary containing the TOML entries to be changed
+        @type dict of {str: str}
+        @return tuple containing a success flag and an error message
+        @rtype tuple of (bool, str)
+        """
+        workspace = self.getWorkspace()
+        filename = os.path.join(workspace, "settings.toml")
+        if os.path.exists(filename):
+            try:
+                with open(filename, "r") as f:
+                    lines = f.read().splitlines()
+            except OSError as err:
+                return False, str(err)
+        else:
+            lines = []
+
+        for key, value in changedEntries.items():
+            newLine = "{0} = {1}".format(key, value)
+            for row in range(len(lines)):
+                if lines[row].split("=")[0].strip() == key:
+                    if value == "":
+                        del lines[row]
+                    else:
+                        lines[row] = newLine
+                    break
+            else:
+                if value != "":
+                    lines.append(newLine)
+
+        try:
+            with open(filename, "w") as f:
+                f.write("\n".join(lines))
+        except OSError as err:
+            return False, str(err)
+
+        return True, ""
+
 
 def createDevice(microPythonWidget, deviceType, vid, pid, boardName, serialNumber):
     """
@@ -1476,4 +1930,6 @@
     @return reference to the instantiated device object
     @rtype CircuitPythonDevice
     """
-    return CircuitPythonDevice(microPythonWidget, deviceType, boardName)
+    return CircuitPythonDevice(
+        microPythonWidget, deviceType, boardName, vid=vid, pid=pid
+    )

eric ide

mercurial