Tue, 28 Feb 2023 17:54:33 +0100
MicroPython
- added the WiFi functions for CircuitPython based controllers
--- a/eric7.epj Tue Feb 28 10:14:12 2023 +0100 +++ b/eric7.epj Tue Feb 28 17:54:33 2023 +0100 @@ -1305,6 +1305,7 @@ "src/eric7/MicroPython/Devices/EspDialogs/__init__.py", "src/eric7/MicroPython/Devices/GenericMicroPythonDevices.py", "src/eric7/MicroPython/Devices/MCUScripts/__init__.py", + "src/eric7/MicroPython/Devices/MCUScripts/circuitPy7WiFiConnect.py", "src/eric7/MicroPython/Devices/MCUScripts/esp32WiFiConnect.py", "src/eric7/MicroPython/Devices/MCUScripts/picowWiFiConnect.py", "src/eric7/MicroPython/Devices/MicrobitDevices.py",
--- a/src/eric7/MicroPython/Devices/CircuitPythonDevices.py Tue Feb 28 10:14:12 2023 +0100 +++ b/src/eric7/MicroPython/Devices/CircuitPythonDevices.py Tue Feb 28 17:54:33 2023 +0100 @@ -8,6 +8,7 @@ """ import ast +import binascii import json import os import shutil @@ -17,6 +18,7 @@ from PyQt6.QtWidgets import QMenu from eric7 import Globals, Preferences +from eric7.EricGui.EricOverrideCursor import EricOverrideCursor from eric7.EricWidgets import EricFileDialog, EricMessageBox from eric7.EricWidgets.EricApplication import ericApp from eric7.SystemUtilities import FileSystemUtilities @@ -52,7 +54,7 @@ """ super().__init__(microPythonWidget, deviceType, parent) - self.submitMode = "paste" # use 'paste' mode to avoid loosing state + self._submitMode = "paste" # use 'paste' mode to avoid loosing state self.__boardName = boardName self.__workspace = self.__findWorkspace() @@ -61,6 +63,27 @@ self.__createCPyMenu() + self.__securityTranslations = { + "OPEN": self.tr("open", "open WiFi network"), + "WEP": "WEP", + "WPA_PSK": "WPA", + "WPA2_PSK": "WPA2", + "WPA_WPA2_PSK": "WPA/WPA2", + "WPA2_ENTERPRISE": "WPA2 (CCMP)", + "WPA3_PSK": "WPA3", + "WPA2_WPA3_PSK": "WPA2/WPA3", + } + self.__securityCode2AuthModeString = { + 0: "[wifi.AuthMode.OPEN]", + 1: "[wifi.AuthMode.WEP]", + 2: "[wifi.AuthMode.WPA, wifi.AuthMode.PSK]", + 3: "[wifi.AuthMode.WPA2, wifi.AuthMode.PSK]", + 4: "[wifi.AuthMode.WPA, wifi.AuthMode.WPA2, wifi.AuthMode.PSK]", + 5: "[wifi.AuthMode.WPA2, wifi.AuthMode.ENTERPRISE]", + 6: "[wifi.AuthMode.WPA3, wifi.AuthMode.PSK]", + 7: "[wifi.AuthMode.WPA2, wifi.AuthMode.WPA3, wifi.AuthMode.PSK]", + } + def setConnected(self, connected): """ Public method to set the connection state. @@ -204,8 +227,8 @@ """ Private method to find the workspace directory. - @param silent flag indicating silent operations - @type bool + @param silent flag indicating silent operations (defaults to False) + @type bool (optional) @return workspace directory used for saving files @rtype str """ @@ -612,7 +635,7 @@ print(has_wifi()) del has_wifi """ - out, err = self._interface.execute(command, mode=self.submitMode) + out, err = self._interface.execute(command, mode=self._submitMode) if err: raise OSError(self._shortError(err)) return ast.literal_eval(out.decode("utf-8")) @@ -635,7 +658,7 @@ r = wifi.radio station = { - 'active': r.enabled and r.ipv4_address_ap is None, + 'active': r.enabled and r.ipv4_address is not None, 'connected': r.ipv4_address is not None, 'ifconfig': ( str(r.ipv4_address) if r.ipv4_address else'0.0.0.0', @@ -644,9 +667,29 @@ str(r.ipv4_dns) if r.ipv4_dns else'0.0.0.0', ), 'mac': binascii.hexlify(r.mac_address, ':').decode(), - 'txpower': r.tx_power, 'hostname': r.hostname, } + try: + station['txpower'] = r.tx_power + except AttributeError: + pass + try: + if r.ap_info is not None: + station.update({ + 'ap_ssid': r.ap_info.ssid, + 'ap_bssid': binascii.hexlify(r.ap_info.bssid, ':'), + 'ap_rssi': r.ap_info.rssi, + 'ap_channel': r.ap_info.channel, + 'ap_country': r.ap_info.country, + }) + authmode = r.ap_info.authmode + station['ap_security'] = ( + '_'.join(str(x).split('.')[-1] for x in authmode) + if isinstance(authmode, list) + else authmode + ) + except (NotImplementedError, AttributeError): + pass print(json.dumps(station)) ap = { @@ -659,9 +702,12 @@ str(r.ipv4_dns) if r.ipv4_dns else'0.0.0.0', ), 'mac': binascii.hexlify(r.mac_address_ap, ':').decode(), - 'txpower': r.tx_power, 'hostname': r.hostname, } + try: + ap['txpower'] = r.tx_power + except AttributeError: + pass print(json.dumps(ap)) overall = { @@ -673,7 +719,7 @@ del wifi_status """ - out, err = self._interface.execute(command, mode=self.submitMode) + out, err = self._interface.execute(command, mode=self._submitMode) if err: raise OSError(self._shortError(err)) @@ -681,8 +727,415 @@ station = json.loads(stationStr) ap = json.loads(apStr) overall = json.loads(overallStr) + if "ap_security" in station: + try: + station["ap_security"] = self.__securityTranslations[ + station["ap_security"] + ] + except KeyError: + station["ap_security"] = self.tr("unknown ({0})").format( + station["ap_security"] + ) + return station, ap, overall + def connectWifi(self, ssid, password): + """ + Public method to connect a device to a WiFi network. + + @param ssid name (SSID) of the WiFi network + @type str + @param password password needed to connect + @type str + @return tuple containing the connection status and an error string + @rtype tuple of (bool, str) + """ + command = """ +def connect_wifi(ssid, password): + import json + import wifi + + r = wifi.radio + try: + r.start_station() + r.connect(ssid, password) + status = 'connected' + except Exception as exc: + status = str(exc) + + print(json.dumps({{'connected': r.ipv4_address is not None, 'status': status}})) + +connect_wifi({0}, {1}) +del connect_wifi +""".format( + repr(ssid), + repr(password if password else ""), + ) + + with EricOverrideCursor(): + out, err = self._interface.execute( + command, mode=self._submitMode, timeout=15000 + ) + if err: + return False, err + + result = json.loads(out.decode("utf-8").strip()) + error = "" if result["connected"] else result["status"] + + return result["connected"], error + + def disconnectWifi(self): + """ + Public method to disconnect a device from the WiFi network. + + @return tuple containing a flag indicating success and an error string + @rtype tuple of (bool, str) + """ + command = """ +def disconnect_wifi(): + import json + import wifi + + r = wifi.radio + try: + r.stop_station() + status = '' + except Exception as exc: + status = str(exc) + + print(json.dumps({'success': status == '', 'status': status})) + +disconnect_wifi() +del disconnect_wifi +""" + + out, err = self._interface.execute(command, mode=self._submitMode) + if err: + return False, err + + result = json.loads(out.decode("utf-8").strip()) + return result["success"], result["status"] + + def writeCredentials(self, ssid, password): + """ + Public method to write the given credentials to the connected device and modify + the start script to connect automatically. + + @param ssid SSID of the network to connect to + @type str + @param password password needed to authenticate + @type str + @return tuple containing a flag indicating success and an error message + @rtype tuple of (bool, str) + """ + if not self.__deviceVolumeMounted(): + return False, self.tr("The device volume is not available.") + + workspace = self.getWorkspace() + + if Globals.versionToTuple(self._deviceData["release"]) >= (8, 0, 0): + # CircuitPython >= 8.0.0: generate 'settings.toml' file + contents = ( + 'CIRCUITPY_WIFI_SSID = "{0}"\nCIRCUITPY_WIFI_PASSWORD = "{1}"\n'.format( + ssid, password + ) + ) + filename = os.path.join(workspace, "settings.toml") + if os.path.exists(filename): + ok = EricMessageBox.yesNo( + None, + self.tr("Write WiFi Credentials"), + self.tr( + """<p>The file <b>{0}</b> exists already. Shall it be""" + """ replaced?</p>""" + ).format(filename), + icon=EricMessageBox.Warning, + ) + if not ok: + return False, self.tr("Aborted") + try: + with open(filename, "w") as f: + f.write(contents) + return True, "" + except OSError as err: + return False, str(err) + + else: + # CircuitPython < 8.0.0: generate a secrets.py script + # step 1: generate the secrets.py file + contents = ( + 'secrets = {{\n "ssid": "{0}",\n "password": "{1}",\n}}\n'.format( + ssid, password + ) + ) + filename = os.path.join(workspace, "secrets.py") + if os.path.exists(filename): + ok = EricMessageBox.yesNo( + None, + self.tr("Write WiFi Credentials"), + self.tr( + """<p>The file <b>{0}</b> exists already. Shall it be""" + """ replaced?</p>""" + ).format(filename), + icon=EricMessageBox.Warning, + ) + if not ok: + return False, self.tr("Aborted") + # step 2: create the auto-connect script (wifi_connect.py) + try: + with open(filename, "w") as f: + f.write(contents) + except OSError as err: + return False, str(err) + scriptFile = os.path.join( + os.path.dirname(__file__), "MCUScripts", "circuitPy7WiFiConnect.py" + ) + targetFile = os.path.join(workspace, "wifi_connect.py") + try: + shutil.copy2(scriptFile, targetFile) + except OSError as err: + return False, str(err) + # Note: code.py will not be modified because the connection will be + # reset anyway + return True, "" + + def removeCredentials(self): + """ + Public method to remove the saved credentials from the connected device. + + @return tuple containing a flag indicating success and an error message + @rtype tuple of (bool, str) + """ + if not self.__deviceVolumeMounted(): + return False, self.tr("The device volume is not available.") + + workspace = self.getWorkspace() + for name in ("settings.toml", "secrets.py"): + filename = os.path.join(workspace, name) + if os.path.exists(filename): + os.remove(filename) + + return True, "" + + def checkInternet(self): + """ + Public method to check, if the internet can be reached. + + @return tuple containing a flag indicating reachability and an error string + @rtype tuple of (bool, str) + """ + command = """ +def check_internet(): + import ipaddress + import wifi + + r = wifi.radio + if r.ipv4_address is not None: + ping = r.ping(ipaddress.IPv4Address("8.8.8.8")) + print(ping is not None) + else: + print(False) + +check_internet() +del check_internet +""" + + out, err = self._interface.execute(command, mode=self._submitMode) + if err: + return False, err + + return out.decode("utf-8").strip() == "True", "" + + def scanNetworks(self): + """ + Public method to scan for available WiFi networks. + + @return tuple containing the list of available networks as a tuple of 'Name', + 'MAC-Address', 'channel', 'RSSI' and 'security' and an error string + @rtype tuple of (list of tuple of (str, str, int, int, str), str) + """ + command = """ +def scan_networks(): + import wifi + + r = wifi.radio + network_list = [] + enabled = r.enabled + if not enabled: + r.enabled = True + for net in r.start_scanning_networks(): + network_list.append( + (net.ssid, net.bssid, net.channel, net.rssi, + '_'.join(str(x).split('.')[-1] for x in net.authmode)) + ) + r.stop_scanning_networks() + if not enabled: + r.enabled = False + print(network_list) + +scan_networks() +del scan_networks +""" + + out, err = self._interface.execute( + command, mode=self._submitMode, timeout=15000 + ) + if err: + return [], err + + networksList = ast.literal_eval(out.decode("utf-8")) + networks = [] + seenNetworks = [] + for network in networksList: + if network[0]: + ssid = network[0] + mac = binascii.hexlify(network[1], ":").decode("utf-8") + channel = network[2] + rssi = network[3] + try: + security = self.__securityTranslations[network[4]] + except KeyError: + security = self.tr("unknown ({0})").format(network[4]) + if (ssid, mac, channel) not in seenNetworks: + seenNetworks.append((ssid, mac, channel)) + networks.append((ssid, mac, channel, rssi, security)) + + return networks, "" + + def deactivateInterface(self, interface): + """ + Public method to deactivate a given WiFi interface of the connected device. + + Note: With CircuitPython it is not possible to deactivate the station and + access point interfaces separately. + + @param interface designation of the interface to be deactivated (one of 'AP' + or 'STA') + @type str + @return tuple containg a flag indicating success and an error message + @rtype tuple of (bool, str) + @exception ValueError raised to indicate a wrong value for the interface type + """ + if interface not in ("STA", "AP"): + raise ValueError( + "interface must be 'AP' or 'STA', got '{0}'".format(interface) + ) + + command = """ +def deactivate(): + import wifi + + wifi.radio.enabled = False + print(not wifi.radio.enabled) + +deactivate() +del deactivate +""" + + out, err = self._interface.execute(command, mode=self._submitMode) + if err: + return False, err + else: + return out.decode("utf-8").strip() == "True", "" + + def startAccessPoint(self, ssid, security=None, password=None, ifconfig=None): + """ + Public method to start the access point interface. + + @param ssid SSID of the access point + @type str + @param security security method (defaults to None) + @type int (optional) + @param password password (defaults to None) + @type str (optional) + @param ifconfig IPv4 configuration for the access point if not default + (IPv4 address, netmask, gateway address, DNS server address) + @type tuple of (str, str, str, str) + @return tuple containing a flag indicating success and an error message + @rtype tuple of (bool, str) + """ + if security is None or password is None: + security = 0 + password = "" + authmode = self.__securityCode2AuthModeString[security] + + if ifconfig: + return ( + False, + self.tr( + "CircuitPython does not support setting the IPv4 parameters of the" + " WiFi access point." + ), + ) + + command = """ +def start_ap(ssid, password): + import wifi + + r = wifi.radio + try: + r.start_ap(ssid, password, authmode={2}) + except ValueError as exc: + print('Error:', str(exc)) + +start_ap({0}, {1}) +del start_ap +""".format( + repr(ssid), repr(password), authmode + ) + + out, err = self._interface.execute( + command, mode=self._submitMode, timeout=15000 + ) + if err: + return False, err + elif out and out.startswith(b"Error:"): + return False, out.decode("utf-8").split(None, 1)[-1] + else: + return True, "" + + def stopAccessPoint(self): + """ + Public method to stop the access point interface. + + @return tuple containg a flag indicating success and an error message + @rtype tuple of (bool, str) + """ + command = """ +def stop_ap(): + import wifi + + r = wifi.radio + try: + r.stop_ap() + except NotImplementedError as exc: + print('Error:', str(exc)) + +stop_ap() +del stop_ap +""" + + out, err = self._interface.execute(command, mode=self._submitMode) + if err: + return False, err + elif out and out.startswith(b"Error:"): + return False, out.decode("utf-8").split(None, 1)[-1] + else: + return True, "" + + def getConnectedClients(self): + """ + Public method to get a list of connected clients. + + @return a tuple containing a list of tuples containing the client MAC-Address + and the RSSI (if supported and available) and an error message + @rtype tuple of ([(bytes, int)], str) + """ + return ( + [], + self.tr("CircuitPython does not support reporting of connected clients"), + ) + def createDevice(microPythonWidget, deviceType, vid, pid, boardName, serialNumber): """
--- a/src/eric7/MicroPython/Devices/EspDevices.py Tue Feb 28 10:14:12 2023 +0100 +++ b/src/eric7/MicroPython/Devices/EspDevices.py Tue Feb 28 17:54:33 2023 +0100 @@ -890,7 +890,9 @@ del scan_networks """ - out, err = self._interface.execute(command, mode=self._submitMode, timeout=15000) + out, err = self._interface.execute( + command, mode=self._submitMode, timeout=15000 + ) if err: return [], err @@ -990,7 +992,9 @@ repr(ssid), security, repr(password), ifconfig ) - out, err = self._interface.execute(command, mode=self._submitMode, timeout=15000) + out, err = self._interface.execute( + command, mode=self._submitMode, timeout=15000 + ) if err: return False, err else: @@ -1025,7 +1029,9 @@ del get_stations """ - out, err = self._interface.execute(command, mode=self._submitMode, timeout=10000) + out, err = self._interface.execute( + command, mode=self._submitMode, timeout=10000 + ) if err: return [], err
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/eric7/MicroPython/Devices/MCUScripts/circuitPy7WiFiConnect.py Tue Feb 28 17:54:33 2023 +0100 @@ -0,0 +1,22 @@ +try: + from secrets import secrets + + def connectWiFi(): + import wifi + + wifi.radio.start_station() + try: + wifi.radio.connect( + secrets["ssid"], + "" if secrets["password"] is None else secrets["password"] + ) + except Exception as exc: + print("WiFi connection failed:", str(exc)) + if wifi.radio.ipv4_address is None: + print("WiFi connection failed") + else: + print("WiFi connected:", wifi.radio.ipv4_address) + + connectWiFi() +except ImportError: + print("WiFi secrets are kept in 'secrets.py', please add them there!")
--- a/src/eric7/MicroPython/Devices/MCUScripts/esp32WiFiConnect.py Tue Feb 28 10:14:12 2023 +0100 +++ b/src/eric7/MicroPython/Devices/MCUScripts/esp32WiFiConnect.py Tue Feb 28 17:54:33 2023 +0100 @@ -22,6 +22,6 @@ sleep(0.1) print("Connection status:", wifi.isconnected()) except: - pass + print("WiFi secrets are kept in NVM. Please store them there!") connectWiFi()
--- a/src/eric7/MicroPython/Devices/MCUScripts/picowWiFiConnect.py Tue Feb 28 10:14:12 2023 +0100 +++ b/src/eric7/MicroPython/Devices/MCUScripts/picowWiFiConnect.py Tue Feb 28 17:54:33 2023 +0100 @@ -27,4 +27,4 @@ connectWiFi() except ImportError: - pass + print("WiFi secrets are kept in 'secrets.py', please add them there!")
--- a/src/eric7/MicroPython/Devices/RP2040Devices.py Tue Feb 28 10:14:12 2023 +0100 +++ b/src/eric7/MicroPython/Devices/RP2040Devices.py Tue Feb 28 17:54:33 2023 +0100 @@ -416,7 +416,9 @@ print(has_wifi()) del has_wifi """ - out, err = self._interface.execute(command, mode=self._submitMode, timeout=10000) + out, err = self._interface.execute( + command, mode=self._submitMode, timeout=10000 + ) if err: raise OSError(self._shortError(err)) return ast.literal_eval(out.decode("utf-8")) @@ -759,7 +761,9 @@ else: return super().scanNetworks() - out, err = self._interface.execute(command, mode=self._submitMode, timeout=15000) + out, err = self._interface.execute( + command, mode=self._submitMode, timeout=15000 + ) if err: return [], err @@ -878,7 +882,9 @@ else: return super().startAccessPoint(ssid, security=security, password=password) - out, err = self._interface.execute(command, mode=self._submitMode, timeout=15000) + out, err = self._interface.execute( + command, mode=self._submitMode, timeout=15000 + ) if err: return False, err else: @@ -925,7 +931,9 @@ else: return super().checkInternet() - out, err = self._interface.execute(command, mode=self._submitMode, timeout=10000) + out, err = self._interface.execute( + command, mode=self._submitMode, timeout=10000 + ) if err: return [], err
--- a/src/eric7/MicroPython/WifiDialogs/WifiNetworksWindow.py Tue Feb 28 10:14:12 2023 +0100 +++ b/src/eric7/MicroPython/WifiDialogs/WifiNetworksWindow.py Tue Feb 28 17:54:33 2023 +0100 @@ -68,7 +68,7 @@ self.tr( """<p>The scan for available WiFi networks failed.</p>""" """<p>Reason: {0}</p>""" - ), + ).format(error), ) if self.periodicCheckBox.isChecked(): self.periodicCheckBox.setChecked(False) @@ -91,6 +91,7 @@ itm.setTextAlignment(1, Qt.AlignmentFlag.AlignHCenter) itm.setTextAlignment(2, Qt.AlignmentFlag.AlignHCenter) itm.setTextAlignment(3, Qt.AlignmentFlag.AlignHCenter) + itm.setTextAlignment(4, Qt.AlignmentFlag.AlignHCenter) self.__resizeColumns() self.__resort()
--- a/src/eric7/MicroPython/WifiDialogs/WifiStatusDialog.py Tue Feb 28 10:14:12 2023 +0100 +++ b/src/eric7/MicroPython/WifiDialogs/WifiStatusDialog.py Tue Feb 28 17:54:33 2023 +0100 @@ -100,6 +100,29 @@ ], ) + if "ap_ssid" in clientStatus: + apHeader = self.__createSubheader( + header, self.tr("Connected Access Point") + ) + QTreeWidgetItem( + apHeader, [self.tr("Name"), clientStatus["ap_ssid"]] + ) + QTreeWidgetItem( + apHeader, [self.tr("Channel"), str(clientStatus["ap_channel"])] + ) + QTreeWidgetItem( + apHeader, [self.tr("MAC-Address"), clientStatus["ap_bssid"]] + ) + QTreeWidgetItem( + apHeader, [self.tr("RSSI [dBm]"), str(clientStatus["ap_rssi"])] + ) + QTreeWidgetItem( + apHeader, [self.tr("Security"), clientStatus["ap_security"]] + ) + QTreeWidgetItem( + apHeader, [self.tr("Country"), clientStatus["ap_country"]] + ) + # access point interface if apStatus: header = self.__createHeader(self.tr("Access Point")) @@ -168,3 +191,24 @@ headerItem.setFont(0, font) return headerItem + + def __createSubheader(self, parent, text): + """ + Private method to create a subheader item. + + @param parent reference to the parent item + @type QTreeWidgetItem + @param text text for the header item + @type str + @return reference to the created header item + @rtype QTreeWidgetItem + """ + headerItem = QTreeWidgetItem(parent, [text]) + headerItem.setExpanded(True) + headerItem.setFirstColumnSpanned(True) + + font = headerItem.font(0) + font.setUnderline(True) + headerItem.setFont(0, font) + + return headerItem