--- a/src/eric7/MicroPython/Devices/CircuitPythonDevices.py Tue Feb 28 10:14:12 2023 +0100 +++ b/src/eric7/MicroPython/Devices/CircuitPythonDevices.py Tue Feb 28 17:54:33 2023 +0100 @@ -8,6 +8,7 @@ """ import ast +import binascii import json import os import shutil @@ -17,6 +18,7 @@ from PyQt6.QtWidgets import QMenu from eric7 import Globals, Preferences +from eric7.EricGui.EricOverrideCursor import EricOverrideCursor from eric7.EricWidgets import EricFileDialog, EricMessageBox from eric7.EricWidgets.EricApplication import ericApp from eric7.SystemUtilities import FileSystemUtilities @@ -52,7 +54,7 @@ """ super().__init__(microPythonWidget, deviceType, parent) - self.submitMode = "paste" # use 'paste' mode to avoid loosing state + self._submitMode = "paste" # use 'paste' mode to avoid loosing state self.__boardName = boardName self.__workspace = self.__findWorkspace() @@ -61,6 +63,27 @@ self.__createCPyMenu() + self.__securityTranslations = { + "OPEN": self.tr("open", "open WiFi network"), + "WEP": "WEP", + "WPA_PSK": "WPA", + "WPA2_PSK": "WPA2", + "WPA_WPA2_PSK": "WPA/WPA2", + "WPA2_ENTERPRISE": "WPA2 (CCMP)", + "WPA3_PSK": "WPA3", + "WPA2_WPA3_PSK": "WPA2/WPA3", + } + self.__securityCode2AuthModeString = { + 0: "[wifi.AuthMode.OPEN]", + 1: "[wifi.AuthMode.WEP]", + 2: "[wifi.AuthMode.WPA, wifi.AuthMode.PSK]", + 3: "[wifi.AuthMode.WPA2, wifi.AuthMode.PSK]", + 4: "[wifi.AuthMode.WPA, wifi.AuthMode.WPA2, wifi.AuthMode.PSK]", + 5: "[wifi.AuthMode.WPA2, wifi.AuthMode.ENTERPRISE]", + 6: "[wifi.AuthMode.WPA3, wifi.AuthMode.PSK]", + 7: "[wifi.AuthMode.WPA2, wifi.AuthMode.WPA3, wifi.AuthMode.PSK]", + } + def setConnected(self, connected): """ Public method to set the connection state. @@ -204,8 +227,8 @@ """ Private method to find the workspace directory. - @param silent flag indicating silent operations - @type bool + @param silent flag indicating silent operations (defaults to False) + @type bool (optional) @return workspace directory used for saving files @rtype str """ @@ -612,7 +635,7 @@ print(has_wifi()) del has_wifi """ - out, err = self._interface.execute(command, mode=self.submitMode) + out, err = self._interface.execute(command, mode=self._submitMode) if err: raise OSError(self._shortError(err)) return ast.literal_eval(out.decode("utf-8")) @@ -635,7 +658,7 @@ r = wifi.radio station = { - 'active': r.enabled and r.ipv4_address_ap is None, + 'active': r.enabled and r.ipv4_address is not None, 'connected': r.ipv4_address is not None, 'ifconfig': ( str(r.ipv4_address) if r.ipv4_address else'0.0.0.0', @@ -644,9 +667,29 @@ str(r.ipv4_dns) if r.ipv4_dns else'0.0.0.0', ), 'mac': binascii.hexlify(r.mac_address, ':').decode(), - 'txpower': r.tx_power, 'hostname': r.hostname, } + try: + station['txpower'] = r.tx_power + except AttributeError: + pass + try: + if r.ap_info is not None: + station.update({ + 'ap_ssid': r.ap_info.ssid, + 'ap_bssid': binascii.hexlify(r.ap_info.bssid, ':'), + 'ap_rssi': r.ap_info.rssi, + 'ap_channel': r.ap_info.channel, + 'ap_country': r.ap_info.country, + }) + authmode = r.ap_info.authmode + station['ap_security'] = ( + '_'.join(str(x).split('.')[-1] for x in authmode) + if isinstance(authmode, list) + else authmode + ) + except (NotImplementedError, AttributeError): + pass print(json.dumps(station)) ap = { @@ -659,9 +702,12 @@ str(r.ipv4_dns) if r.ipv4_dns else'0.0.0.0', ), 'mac': binascii.hexlify(r.mac_address_ap, ':').decode(), - 'txpower': r.tx_power, 'hostname': r.hostname, } + try: + ap['txpower'] = r.tx_power + except AttributeError: + pass print(json.dumps(ap)) overall = { @@ -673,7 +719,7 @@ del wifi_status """ - out, err = self._interface.execute(command, mode=self.submitMode) + out, err = self._interface.execute(command, mode=self._submitMode) if err: raise OSError(self._shortError(err)) @@ -681,8 +727,415 @@ station = json.loads(stationStr) ap = json.loads(apStr) overall = json.loads(overallStr) + if "ap_security" in station: + try: + station["ap_security"] = self.__securityTranslations[ + station["ap_security"] + ] + except KeyError: + station["ap_security"] = self.tr("unknown ({0})").format( + station["ap_security"] + ) + return station, ap, overall + def connectWifi(self, ssid, password): + """ + Public method to connect a device to a WiFi network. + + @param ssid name (SSID) of the WiFi network + @type str + @param password password needed to connect + @type str + @return tuple containing the connection status and an error string + @rtype tuple of (bool, str) + """ + command = """ +def connect_wifi(ssid, password): + import json + import wifi + + r = wifi.radio + try: + r.start_station() + r.connect(ssid, password) + status = 'connected' + except Exception as exc: + status = str(exc) + + print(json.dumps({{'connected': r.ipv4_address is not None, 'status': status}})) + +connect_wifi({0}, {1}) +del connect_wifi +""".format( + repr(ssid), + repr(password if password else ""), + ) + + with EricOverrideCursor(): + out, err = self._interface.execute( + command, mode=self._submitMode, timeout=15000 + ) + if err: + return False, err + + result = json.loads(out.decode("utf-8").strip()) + error = "" if result["connected"] else result["status"] + + return result["connected"], error + + def disconnectWifi(self): + """ + Public method to disconnect a device from the WiFi network. + + @return tuple containing a flag indicating success and an error string + @rtype tuple of (bool, str) + """ + command = """ +def disconnect_wifi(): + import json + import wifi + + r = wifi.radio + try: + r.stop_station() + status = '' + except Exception as exc: + status = str(exc) + + print(json.dumps({'success': status == '', 'status': status})) + +disconnect_wifi() +del disconnect_wifi +""" + + out, err = self._interface.execute(command, mode=self._submitMode) + if err: + return False, err + + result = json.loads(out.decode("utf-8").strip()) + return result["success"], result["status"] + + def writeCredentials(self, ssid, password): + """ + Public method to write the given credentials to the connected device and modify + the start script to connect automatically. + + @param ssid SSID of the network to connect to + @type str + @param password password needed to authenticate + @type str + @return tuple containing a flag indicating success and an error message + @rtype tuple of (bool, str) + """ + if not self.__deviceVolumeMounted(): + return False, self.tr("The device volume is not available.") + + workspace = self.getWorkspace() + + if Globals.versionToTuple(self._deviceData["release"]) >= (8, 0, 0): + # CircuitPython >= 8.0.0: generate 'settings.toml' file + contents = ( + 'CIRCUITPY_WIFI_SSID = "{0}"\nCIRCUITPY_WIFI_PASSWORD = "{1}"\n'.format( + ssid, password + ) + ) + filename = os.path.join(workspace, "settings.toml") + if os.path.exists(filename): + ok = EricMessageBox.yesNo( + None, + self.tr("Write WiFi Credentials"), + self.tr( + """<p>The file <b>{0}</b> exists already. Shall it be""" + """ replaced?</p>""" + ).format(filename), + icon=EricMessageBox.Warning, + ) + if not ok: + return False, self.tr("Aborted") + try: + with open(filename, "w") as f: + f.write(contents) + return True, "" + except OSError as err: + return False, str(err) + + else: + # CircuitPython < 8.0.0: generate a secrets.py script + # step 1: generate the secrets.py file + contents = ( + 'secrets = {{\n "ssid": "{0}",\n "password": "{1}",\n}}\n'.format( + ssid, password + ) + ) + filename = os.path.join(workspace, "secrets.py") + if os.path.exists(filename): + ok = EricMessageBox.yesNo( + None, + self.tr("Write WiFi Credentials"), + self.tr( + """<p>The file <b>{0}</b> exists already. Shall it be""" + """ replaced?</p>""" + ).format(filename), + icon=EricMessageBox.Warning, + ) + if not ok: + return False, self.tr("Aborted") + # step 2: create the auto-connect script (wifi_connect.py) + try: + with open(filename, "w") as f: + f.write(contents) + except OSError as err: + return False, str(err) + scriptFile = os.path.join( + os.path.dirname(__file__), "MCUScripts", "circuitPy7WiFiConnect.py" + ) + targetFile = os.path.join(workspace, "wifi_connect.py") + try: + shutil.copy2(scriptFile, targetFile) + except OSError as err: + return False, str(err) + # Note: code.py will not be modified because the connection will be + # reset anyway + return True, "" + + def removeCredentials(self): + """ + Public method to remove the saved credentials from the connected device. + + @return tuple containing a flag indicating success and an error message + @rtype tuple of (bool, str) + """ + if not self.__deviceVolumeMounted(): + return False, self.tr("The device volume is not available.") + + workspace = self.getWorkspace() + for name in ("settings.toml", "secrets.py"): + filename = os.path.join(workspace, name) + if os.path.exists(filename): + os.remove(filename) + + return True, "" + + def checkInternet(self): + """ + Public method to check, if the internet can be reached. + + @return tuple containing a flag indicating reachability and an error string + @rtype tuple of (bool, str) + """ + command = """ +def check_internet(): + import ipaddress + import wifi + + r = wifi.radio + if r.ipv4_address is not None: + ping = r.ping(ipaddress.IPv4Address("8.8.8.8")) + print(ping is not None) + else: + print(False) + +check_internet() +del check_internet +""" + + out, err = self._interface.execute(command, mode=self._submitMode) + if err: + return False, err + + return out.decode("utf-8").strip() == "True", "" + + def scanNetworks(self): + """ + Public method to scan for available WiFi networks. + + @return tuple containing the list of available networks as a tuple of 'Name', + 'MAC-Address', 'channel', 'RSSI' and 'security' and an error string + @rtype tuple of (list of tuple of (str, str, int, int, str), str) + """ + command = """ +def scan_networks(): + import wifi + + r = wifi.radio + network_list = [] + enabled = r.enabled + if not enabled: + r.enabled = True + for net in r.start_scanning_networks(): + network_list.append( + (net.ssid, net.bssid, net.channel, net.rssi, + '_'.join(str(x).split('.')[-1] for x in net.authmode)) + ) + r.stop_scanning_networks() + if not enabled: + r.enabled = False + print(network_list) + +scan_networks() +del scan_networks +""" + + out, err = self._interface.execute( + command, mode=self._submitMode, timeout=15000 + ) + if err: + return [], err + + networksList = ast.literal_eval(out.decode("utf-8")) + networks = [] + seenNetworks = [] + for network in networksList: + if network[0]: + ssid = network[0] + mac = binascii.hexlify(network[1], ":").decode("utf-8") + channel = network[2] + rssi = network[3] + try: + security = self.__securityTranslations[network[4]] + except KeyError: + security = self.tr("unknown ({0})").format(network[4]) + if (ssid, mac, channel) not in seenNetworks: + seenNetworks.append((ssid, mac, channel)) + networks.append((ssid, mac, channel, rssi, security)) + + return networks, "" + + def deactivateInterface(self, interface): + """ + Public method to deactivate a given WiFi interface of the connected device. + + Note: With CircuitPython it is not possible to deactivate the station and + access point interfaces separately. + + @param interface designation of the interface to be deactivated (one of 'AP' + or 'STA') + @type str + @return tuple containg a flag indicating success and an error message + @rtype tuple of (bool, str) + @exception ValueError raised to indicate a wrong value for the interface type + """ + if interface not in ("STA", "AP"): + raise ValueError( + "interface must be 'AP' or 'STA', got '{0}'".format(interface) + ) + + command = """ +def deactivate(): + import wifi + + wifi.radio.enabled = False + print(not wifi.radio.enabled) + +deactivate() +del deactivate +""" + + out, err = self._interface.execute(command, mode=self._submitMode) + if err: + return False, err + else: + return out.decode("utf-8").strip() == "True", "" + + def startAccessPoint(self, ssid, security=None, password=None, ifconfig=None): + """ + Public method to start the access point interface. + + @param ssid SSID of the access point + @type str + @param security security method (defaults to None) + @type int (optional) + @param password password (defaults to None) + @type str (optional) + @param ifconfig IPv4 configuration for the access point if not default + (IPv4 address, netmask, gateway address, DNS server address) + @type tuple of (str, str, str, str) + @return tuple containing a flag indicating success and an error message + @rtype tuple of (bool, str) + """ + if security is None or password is None: + security = 0 + password = "" + authmode = self.__securityCode2AuthModeString[security] + + if ifconfig: + return ( + False, + self.tr( + "CircuitPython does not support setting the IPv4 parameters of the" + " WiFi access point." + ), + ) + + command = """ +def start_ap(ssid, password): + import wifi + + r = wifi.radio + try: + r.start_ap(ssid, password, authmode={2}) + except ValueError as exc: + print('Error:', str(exc)) + +start_ap({0}, {1}) +del start_ap +""".format( + repr(ssid), repr(password), authmode + ) + + out, err = self._interface.execute( + command, mode=self._submitMode, timeout=15000 + ) + if err: + return False, err + elif out and out.startswith(b"Error:"): + return False, out.decode("utf-8").split(None, 1)[-1] + else: + return True, "" + + def stopAccessPoint(self): + """ + Public method to stop the access point interface. + + @return tuple containg a flag indicating success and an error message + @rtype tuple of (bool, str) + """ + command = """ +def stop_ap(): + import wifi + + r = wifi.radio + try: + r.stop_ap() + except NotImplementedError as exc: + print('Error:', str(exc)) + +stop_ap() +del stop_ap +""" + + out, err = self._interface.execute(command, mode=self._submitMode) + if err: + return False, err + elif out and out.startswith(b"Error:"): + return False, out.decode("utf-8").split(None, 1)[-1] + else: + return True, "" + + def getConnectedClients(self): + """ + Public method to get a list of connected clients. + + @return a tuple containing a list of tuples containing the client MAC-Address + and the RSSI (if supported and available) and an error message + @rtype tuple of ([(bytes, int)], str) + """ + return ( + [], + self.tr("CircuitPython does not support reporting of connected clients"), + ) + def createDevice(microPythonWidget, deviceType, vid, pid, boardName, serialNumber): """