src/eric7/MicroPython/Devices/CircuitPythonDevices.py

branch
mpy_network
changeset 9828
32c8a5b57332
parent 9820
67597e003373
child 9834
1fdaebde6316
--- a/src/eric7/MicroPython/Devices/CircuitPythonDevices.py	Tue Feb 28 10:14:12 2023 +0100
+++ b/src/eric7/MicroPython/Devices/CircuitPythonDevices.py	Tue Feb 28 17:54:33 2023 +0100
@@ -8,6 +8,7 @@
 """
 
 import ast
+import binascii
 import json
 import os
 import shutil
@@ -17,6 +18,7 @@
 from PyQt6.QtWidgets import QMenu
 
 from eric7 import Globals, Preferences
+from eric7.EricGui.EricOverrideCursor import EricOverrideCursor
 from eric7.EricWidgets import EricFileDialog, EricMessageBox
 from eric7.EricWidgets.EricApplication import ericApp
 from eric7.SystemUtilities import FileSystemUtilities
@@ -52,7 +54,7 @@
         """
         super().__init__(microPythonWidget, deviceType, parent)
 
-        self.submitMode = "paste"  # use 'paste' mode to avoid loosing state
+        self._submitMode = "paste"  # use 'paste' mode to avoid loosing state
 
         self.__boardName = boardName
         self.__workspace = self.__findWorkspace()
@@ -61,6 +63,27 @@
 
         self.__createCPyMenu()
 
+        self.__securityTranslations = {
+            "OPEN": self.tr("open", "open WiFi network"),
+            "WEP": "WEP",
+            "WPA_PSK": "WPA",
+            "WPA2_PSK": "WPA2",
+            "WPA_WPA2_PSK": "WPA/WPA2",
+            "WPA2_ENTERPRISE": "WPA2 (CCMP)",
+            "WPA3_PSK": "WPA3",
+            "WPA2_WPA3_PSK": "WPA2/WPA3",
+        }
+        self.__securityCode2AuthModeString = {
+            0: "[wifi.AuthMode.OPEN]",
+            1: "[wifi.AuthMode.WEP]",
+            2: "[wifi.AuthMode.WPA, wifi.AuthMode.PSK]",
+            3: "[wifi.AuthMode.WPA2, wifi.AuthMode.PSK]",
+            4: "[wifi.AuthMode.WPA, wifi.AuthMode.WPA2, wifi.AuthMode.PSK]",
+            5: "[wifi.AuthMode.WPA2, wifi.AuthMode.ENTERPRISE]",
+            6: "[wifi.AuthMode.WPA3, wifi.AuthMode.PSK]",
+            7: "[wifi.AuthMode.WPA2, wifi.AuthMode.WPA3, wifi.AuthMode.PSK]",
+        }
+
     def setConnected(self, connected):
         """
         Public method to set the connection state.
@@ -204,8 +227,8 @@
         """
         Private method to find the workspace directory.
 
-        @param silent flag indicating silent operations
-        @type bool
+        @param silent flag indicating silent operations (defaults to False)
+        @type bool (optional)
         @return workspace directory used for saving files
         @rtype str
         """
@@ -612,7 +635,7 @@
 print(has_wifi())
 del has_wifi
 """
-        out, err = self._interface.execute(command, mode=self.submitMode)
+        out, err = self._interface.execute(command, mode=self._submitMode)
         if err:
             raise OSError(self._shortError(err))
         return ast.literal_eval(out.decode("utf-8"))
@@ -635,7 +658,7 @@
     r = wifi.radio
 
     station = {
-        'active': r.enabled and r.ipv4_address_ap is None,
+        'active': r.enabled and r.ipv4_address is not None,
         'connected': r.ipv4_address is not None,
         'ifconfig': (
             str(r.ipv4_address) if r.ipv4_address else'0.0.0.0',
@@ -644,9 +667,29 @@
             str(r.ipv4_dns) if r.ipv4_dns else'0.0.0.0',
         ),
         'mac': binascii.hexlify(r.mac_address, ':').decode(),
-        'txpower': r.tx_power,
         'hostname': r.hostname,
     }
+    try:
+        station['txpower'] = r.tx_power
+    except AttributeError:
+        pass
+    try:
+        if r.ap_info is not None:
+            station.update({
+                'ap_ssid': r.ap_info.ssid,
+                'ap_bssid': binascii.hexlify(r.ap_info.bssid, ':'),
+                'ap_rssi': r.ap_info.rssi,
+                'ap_channel': r.ap_info.channel,
+                'ap_country': r.ap_info.country,
+            })
+            authmode = r.ap_info.authmode
+            station['ap_security'] = (
+                '_'.join(str(x).split('.')[-1] for x in authmode)
+                if isinstance(authmode, list)
+                else authmode
+            )
+    except (NotImplementedError, AttributeError):
+        pass
     print(json.dumps(station))
 
     ap = {
@@ -659,9 +702,12 @@
             str(r.ipv4_dns) if r.ipv4_dns else'0.0.0.0',
         ),
         'mac': binascii.hexlify(r.mac_address_ap, ':').decode(),
-        'txpower': r.tx_power,
         'hostname': r.hostname,
     }
+    try:
+        ap['txpower'] = r.tx_power
+    except AttributeError:
+        pass
     print(json.dumps(ap))
 
     overall = {
@@ -673,7 +719,7 @@
 del wifi_status
 """
 
-        out, err = self._interface.execute(command, mode=self.submitMode)
+        out, err = self._interface.execute(command, mode=self._submitMode)
         if err:
             raise OSError(self._shortError(err))
 
@@ -681,8 +727,415 @@
         station = json.loads(stationStr)
         ap = json.loads(apStr)
         overall = json.loads(overallStr)
+        if "ap_security" in station:
+            try:
+                station["ap_security"] = self.__securityTranslations[
+                    station["ap_security"]
+                ]
+            except KeyError:
+                station["ap_security"] = self.tr("unknown ({0})").format(
+                    station["ap_security"]
+                )
+
         return station, ap, overall
 
+    def connectWifi(self, ssid, password):
+        """
+        Public method to connect a device to a WiFi network.
+
+        @param ssid name (SSID) of the WiFi network
+        @type str
+        @param password password needed to connect
+        @type str
+        @return tuple containing the connection status and an error string
+        @rtype tuple of (bool, str)
+        """
+        command = """
+def connect_wifi(ssid, password):
+    import json
+    import wifi
+
+    r = wifi.radio
+    try:
+        r.start_station()
+        r.connect(ssid, password)
+        status = 'connected'
+    except Exception as exc:
+        status = str(exc)
+
+    print(json.dumps({{'connected': r.ipv4_address is not None, 'status': status}}))
+
+connect_wifi({0}, {1})
+del connect_wifi
+""".format(
+            repr(ssid),
+            repr(password if password else ""),
+        )
+
+        with EricOverrideCursor():
+            out, err = self._interface.execute(
+                command, mode=self._submitMode, timeout=15000
+            )
+        if err:
+            return False, err
+
+        result = json.loads(out.decode("utf-8").strip())
+        error = "" if result["connected"] else result["status"]
+
+        return result["connected"], error
+
+    def disconnectWifi(self):
+        """
+        Public method to disconnect a device from the WiFi network.
+
+        @return tuple containing a flag indicating success and an error string
+        @rtype tuple of (bool, str)
+        """
+        command = """
+def disconnect_wifi():
+    import json
+    import wifi
+
+    r = wifi.radio
+    try:
+        r.stop_station()
+        status = ''
+    except Exception as exc:
+        status = str(exc)
+
+    print(json.dumps({'success': status == '', 'status': status}))
+
+disconnect_wifi()
+del disconnect_wifi
+"""
+
+        out, err = self._interface.execute(command, mode=self._submitMode)
+        if err:
+            return False, err
+
+        result = json.loads(out.decode("utf-8").strip())
+        return result["success"], result["status"]
+
+    def writeCredentials(self, ssid, password):
+        """
+        Public method to write the given credentials to the connected device and modify
+        the start script to connect automatically.
+
+        @param ssid SSID of the network to connect to
+        @type str
+        @param password password needed to authenticate
+        @type str
+        @return tuple containing a flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        """
+        if not self.__deviceVolumeMounted():
+            return False, self.tr("The device volume is not available.")
+
+        workspace = self.getWorkspace()
+
+        if Globals.versionToTuple(self._deviceData["release"]) >= (8, 0, 0):
+            # CircuitPython >= 8.0.0: generate 'settings.toml' file
+            contents = (
+                'CIRCUITPY_WIFI_SSID = "{0}"\nCIRCUITPY_WIFI_PASSWORD = "{1}"\n'.format(
+                    ssid, password
+                )
+            )
+            filename = os.path.join(workspace, "settings.toml")
+            if os.path.exists(filename):
+                ok = EricMessageBox.yesNo(
+                    None,
+                    self.tr("Write WiFi Credentials"),
+                    self.tr(
+                        """<p>The file <b>{0}</b> exists already. Shall it be"""
+                        """ replaced?</p>"""
+                    ).format(filename),
+                    icon=EricMessageBox.Warning,
+                )
+                if not ok:
+                    return False, self.tr("Aborted")
+            try:
+                with open(filename, "w") as f:
+                    f.write(contents)
+                    return True, ""
+            except OSError as err:
+                return False, str(err)
+
+        else:
+            # CircuitPython < 8.0.0: generate a secrets.py script
+            # step 1: generate the secrets.py file
+            contents = (
+                'secrets = {{\n    "ssid": "{0}",\n    "password": "{1}",\n}}\n'.format(
+                    ssid, password
+                )
+            )
+            filename = os.path.join(workspace, "secrets.py")
+            if os.path.exists(filename):
+                ok = EricMessageBox.yesNo(
+                    None,
+                    self.tr("Write WiFi Credentials"),
+                    self.tr(
+                        """<p>The file <b>{0}</b> exists already. Shall it be"""
+                        """ replaced?</p>"""
+                    ).format(filename),
+                    icon=EricMessageBox.Warning,
+                )
+                if not ok:
+                    return False, self.tr("Aborted")
+            # step 2: create the auto-connect script (wifi_connect.py)
+            try:
+                with open(filename, "w") as f:
+                    f.write(contents)
+            except OSError as err:
+                return False, str(err)
+            scriptFile = os.path.join(
+                os.path.dirname(__file__), "MCUScripts", "circuitPy7WiFiConnect.py"
+            )
+            targetFile = os.path.join(workspace, "wifi_connect.py")
+            try:
+                shutil.copy2(scriptFile, targetFile)
+            except OSError as err:
+                return False, str(err)
+            # Note: code.py will not be modified because the connection will be
+            #       reset anyway
+            return True, ""
+
+    def removeCredentials(self):
+        """
+        Public method to remove the saved credentials from the connected device.
+
+        @return tuple containing a flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        """
+        if not self.__deviceVolumeMounted():
+            return False, self.tr("The device volume is not available.")
+
+        workspace = self.getWorkspace()
+        for name in ("settings.toml", "secrets.py"):
+            filename = os.path.join(workspace, name)
+            if os.path.exists(filename):
+                os.remove(filename)
+
+        return True, ""
+
+    def checkInternet(self):
+        """
+        Public method to check, if the internet can be reached.
+
+        @return tuple containing a flag indicating reachability and an error string
+        @rtype tuple of (bool, str)
+        """
+        command = """
+def check_internet():
+    import ipaddress
+    import wifi
+
+    r = wifi.radio
+    if r.ipv4_address is not None:
+        ping = r.ping(ipaddress.IPv4Address("8.8.8.8"))
+        print(ping is not None)
+    else:
+        print(False)
+
+check_internet()
+del check_internet
+"""
+
+        out, err = self._interface.execute(command, mode=self._submitMode)
+        if err:
+            return False, err
+
+        return out.decode("utf-8").strip() == "True", ""
+
+    def scanNetworks(self):
+        """
+        Public method to scan for available WiFi networks.
+
+        @return tuple containing the list of available networks as a tuple of 'Name',
+            'MAC-Address', 'channel', 'RSSI' and 'security' and an error string
+        @rtype tuple of (list of tuple of (str, str, int, int, str), str)
+        """
+        command = """
+def scan_networks():
+    import wifi
+
+    r = wifi.radio
+    network_list = []
+    enabled = r.enabled
+    if not enabled:
+        r.enabled = True
+    for net in r.start_scanning_networks():
+        network_list.append(
+            (net.ssid, net.bssid, net.channel, net.rssi,
+             '_'.join(str(x).split('.')[-1] for x in net.authmode))
+        )
+    r.stop_scanning_networks()
+    if not enabled:
+        r.enabled = False
+    print(network_list)
+
+scan_networks()
+del scan_networks
+"""
+
+        out, err = self._interface.execute(
+            command, mode=self._submitMode, timeout=15000
+        )
+        if err:
+            return [], err
+
+        networksList = ast.literal_eval(out.decode("utf-8"))
+        networks = []
+        seenNetworks = []
+        for network in networksList:
+            if network[0]:
+                ssid = network[0]
+                mac = binascii.hexlify(network[1], ":").decode("utf-8")
+                channel = network[2]
+                rssi = network[3]
+                try:
+                    security = self.__securityTranslations[network[4]]
+                except KeyError:
+                    security = self.tr("unknown ({0})").format(network[4])
+                if (ssid, mac, channel) not in seenNetworks:
+                    seenNetworks.append((ssid, mac, channel))
+                    networks.append((ssid, mac, channel, rssi, security))
+
+        return networks, ""
+
+    def deactivateInterface(self, interface):
+        """
+        Public method to deactivate a given WiFi interface of the connected device.
+
+        Note: With CircuitPython it is not possible to deactivate the station and
+        access point interfaces separately.
+
+        @param interface designation of the interface to be deactivated (one of 'AP'
+            or 'STA')
+        @type str
+        @return tuple containg a flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        @exception ValueError raised to indicate a wrong value for the interface type
+        """
+        if interface not in ("STA", "AP"):
+            raise ValueError(
+                "interface must be 'AP' or 'STA', got '{0}'".format(interface)
+            )
+
+        command = """
+def deactivate():
+    import wifi
+
+    wifi.radio.enabled = False
+    print(not wifi.radio.enabled)
+
+deactivate()
+del deactivate
+"""
+
+        out, err = self._interface.execute(command, mode=self._submitMode)
+        if err:
+            return False, err
+        else:
+            return out.decode("utf-8").strip() == "True", ""
+
+    def startAccessPoint(self, ssid, security=None, password=None, ifconfig=None):
+        """
+        Public method to start the access point interface.
+
+        @param ssid SSID of the access point
+        @type str
+        @param security security method (defaults to None)
+        @type int (optional)
+        @param password password (defaults to None)
+        @type str (optional)
+        @param ifconfig IPv4 configuration for the access point if not default
+            (IPv4 address, netmask, gateway address, DNS server address)
+        @type tuple of (str, str, str, str)
+        @return tuple containing a flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        """
+        if security is None or password is None:
+            security = 0
+            password = ""
+        authmode = self.__securityCode2AuthModeString[security]
+
+        if ifconfig:
+            return (
+                False,
+                self.tr(
+                    "CircuitPython does not support setting the IPv4 parameters of the"
+                    " WiFi access point."
+                ),
+            )
+
+        command = """
+def start_ap(ssid, password):
+    import wifi
+
+    r = wifi.radio
+    try:
+        r.start_ap(ssid, password, authmode={2})
+    except ValueError as exc:
+        print('Error:', str(exc))
+
+start_ap({0}, {1})
+del start_ap
+""".format(
+            repr(ssid), repr(password), authmode
+        )
+
+        out, err = self._interface.execute(
+            command, mode=self._submitMode, timeout=15000
+        )
+        if err:
+            return False, err
+        elif out and out.startswith(b"Error:"):
+            return False, out.decode("utf-8").split(None, 1)[-1]
+        else:
+            return True, ""
+
+    def stopAccessPoint(self):
+        """
+        Public method to stop the access point interface.
+
+        @return tuple containg a flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        """
+        command = """
+def stop_ap():
+    import wifi
+
+    r = wifi.radio
+    try:
+        r.stop_ap()
+    except NotImplementedError as exc:
+        print('Error:', str(exc))
+
+stop_ap()
+del stop_ap
+"""
+
+        out, err = self._interface.execute(command, mode=self._submitMode)
+        if err:
+            return False, err
+        elif out and out.startswith(b"Error:"):
+            return False, out.decode("utf-8").split(None, 1)[-1]
+        else:
+            return True, ""
+
+    def getConnectedClients(self):
+        """
+        Public method to get a list of connected clients.
+
+        @return a tuple containing a list of tuples containing the client MAC-Address
+            and the RSSI (if supported and available) and an error message
+        @rtype tuple of ([(bytes, int)], str)
+        """
+        return (
+            [],
+            self.tr("CircuitPython does not support reporting of connected clients"),
+        )
+
 
 def createDevice(microPythonWidget, deviceType, vid, pid, boardName, serialNumber):
     """

eric ide

mercurial