--- a/src/eric7/MicroPython/Devices/EspDevices.py Thu Feb 23 13:31:20 2023 +0100 +++ b/src/eric7/MicroPython/Devices/EspDevices.py Thu Feb 23 13:31:55 2023 +0100 @@ -8,11 +8,17 @@ boards. """ +import ast +import binascii +import json +import os + from PyQt6.QtCore import QProcess, QUrl, pyqtSlot from PyQt6.QtNetwork import QNetworkRequest from PyQt6.QtWidgets import QDialog, QMenu from eric7 import Globals, Preferences +from eric7.EricGui.EricOverrideCursor import EricOverrideCursor from eric7.EricWidgets import EricMessageBox from eric7.EricWidgets.EricApplication import ericApp from eric7.EricWidgets.EricProcessDialog import EricProcessDialog @@ -43,6 +49,27 @@ self.__createEsp32Submenu() + self.__statusTranslations = { + 200: self.tr("beacon timeout"), + 201: self.tr("no matching access point found"), + 202: self.tr("authentication failed"), + 203: self.tr("association failed"), + 204: self.tr("handshake timeout"), + 1000: self.tr("idle"), + 1001: self.tr("connecting"), + 1010: self.tr("connected"), + } + self.__securityTranslations = { + 0: self.tr("open", "open WiFi network"), + 1: "WEP", + 2: "WPA", + 3: "WPA2", + 4: "WPA/WPA2", + 5: "WPA2 (CCMP)", + 6: "WPA3", + 7: "WPA2/WPA3", + } + def setButtons(self): """ Public method to enable the supported action buttons. @@ -619,6 +646,456 @@ rtc.init(clock_time) """ + ################################################################## + ## Methods below implement WiFi related methods + ################################################################## + + def hasWifi(self): + """ + Public method to check the availability of WiFi. + + @return tuple containing a flag indicating the availability of WiFi + and the WiFi type (esp32) + @rtype tuple of (bool, str) + """ + # TODO: check if ESP8266 is different + return True, "esp32" + + def getWifiData(self): + """ + Public method to get data related to the current WiFi status. + + @return tuple of two dictionaries containing the WiFi status data + for the WiFi client and access point + @rtype tuple of (dict, dict) + @exception OSError raised to indicate an issue with the device + """ + command = """ +def wifi_status(): + import ubinascii + import ujson + import network + + wifi = network.WLAN(network.STA_IF) + station = { + 'active': wifi.active(), + 'connected': wifi.isconnected(), + 'status': wifi.status(), + 'ifconfig': wifi.ifconfig(), + 'mac': ubinascii.hexlify(wifi.config('mac'), ':').decode(), + } + if wifi.active(): + station['txpower'] = wifi.config('txpower') + else: + station['txpower'] = 0 + print(ujson.dumps(station)) + + wifi = network.WLAN(network.AP_IF) + ap = { + 'active': wifi.active(), + 'connected': wifi.isconnected(), + 'status': wifi.status(), + 'ifconfig': wifi.ifconfig(), + 'mac': ubinascii.hexlify(wifi.config('mac'), ':').decode(), + 'channel': wifi.config('channel'), + 'essid': wifi.config('essid'), + } + if wifi.active(): + ap['txpower'] = wifi.config('txpower') + else: + ap['txpower'] = 0 + print(ujson.dumps(ap)) + +wifi_status() +del wifi_status +""" + + out, err = self._interface.execute(command) + if err: + raise OSError(self._shortError(err)) + + stationStr, apStr = out.decode("utf-8").splitlines() + station = json.loads(stationStr) + ap = json.loads(apStr) + try: + station["status"] = self.__statusTranslations[station["status"]] + except KeyError: + station["status"] = str(station["status"]) + try: + ap["status"] = self.__statusTranslations[ap["status"]] + except KeyError: + ap["status"] = str(ap["status"]) + return station, ap + + def connectWifi(self, ssid, password): + """ + Public method to connect a device to a WiFi network. + + @param ssid name (SSID) of the WiFi network + @type str + @param password password needed to connect + @type str + @return tuple containing the connection status and an error string + @rtype tuple of (bool, str) + """ + command = """ +def connect_wifi(ssid, password): + import network + import ujson + from time import sleep + + wifi = network.WLAN(network.STA_IF) + wifi.active(False) + wifi.active(True) + wifi.connect(ssid, password) + max_wait = 140 + while max_wait and wifi.status() == network.STAT_CONNECTING: + max_wait -= 1 + sleep(0.1) + status = wifi.status() + print(ujson.dumps({{'connected': wifi.isconnected(), 'status': status}})) + +connect_wifi({0}, {1}) +del connect_wifi +""".format( + repr(ssid), + repr(password if password else ""), + ) + + with EricOverrideCursor(): + out, err = self._interface.execute(command, timeout=15000) + if err: + return False, err + + result = json.loads(out.decode("utf-8").strip()) + if result["connected"]: + error = "" + else: + try: + error = self.__statusTranslations[result["status"]] + except KeyError: + error = str(result["status"]) + + return result["connected"], error + + def disconnectWifi(self): + """ + Public method to disconnect a device from the WiFi network. + + @return tuple containing a flag indicating success and an error string + @rtype tuple of (bool, str) + """ + command = """ +def disconnect_wifi(): + import network + from time import sleep + + wifi = network.WLAN(network.STA_IF) + wifi.disconnect() + wifi.active(False) + sleep(0.1) + print(not wifi.isconnected()) + +disconnect_wifi() +del disconnect_wifi +""" + + out, err = self._interface.execute(command) + if err: + return False, err + + return out.decode("utf-8").strip() == "True", "" + + def writeCredentials(self, ssid, password): + """ + Public method to write the given credentials to the connected device and modify + the start script to connect automatically. + + @param ssid SSID of the network to connect to + @type str + @param password password needed to authenticate + @type str + @return tuple containing a flag indicating success and an error message + @rtype tuple of (bool, str) + """ + nvsCommand = """ +def save_wifi_creds(ssid, password): + import esp32 + + nvs = esp32.NVS('wifi_creds') + nvs.set_blob('ssid', ssid) + nvs.set_blob('password', password) + nvs.commit() + +save_wifi_creds({0}, {1}) +del save_wifi_creds +""".format(repr(ssid), repr(password) if password else "''") + bootCommand = """ +def modify_boot(): + add = True + try: + with open('/boot.py', 'r') as f: + for ln in f.readlines(): + if 'wifi_connect' in ln: + add = False + break + except: + pass + if add: + with open('/boot.py', 'a') as f: + f.write('\\nimport wifi_connect\\n') + print(True) + +modify_boot() +del modify_boot +""" + + out, err = self._interface.execute(nvsCommand) + if err: + return False, self.tr("Error saving credentials: {0}").format(err) + + try: + # copy auto-connect file + self.put( + os.path.join( + os.path.dirname(__file__), "MCUScripts", "esp32WiFiConnect.py" + ), + "/wifi_connect.py", + ) + except OSError as err: + return False, self.tr("Error saving auto-connect script: {0}").format(err) + + out, err = self._interface.execute(bootCommand) + if err: + return False, self.tr("Error modifying 'boot.py': {0}").format(err) + + return True, "" + + def removeCredentials(self): + """ + Public method to remove the saved credentials from the connected device. + + @return tuple containing a flag indicating success and an error message + @rtype tuple of (bool, str) + """ + nvsCommand = """ +def delete_wifi_creds(): + import esp32 + + nvs = esp32.NVS('wifi_creds') + try: + nvs.erase_key('ssid') + nvs.erase_key('password') + nvs.commit() + except OSError: + pass + +delete_wifi_creds() +del delete_wifi_creds +""" + + out, err = self._interface.execute(nvsCommand) + if err: + return False, self.tr("Error deleting credentials: {0}").format(err) + + return True, "" + + def checkInternet(self): + """ + Public method to check, if the internet can be reached. + + @return tuple containing a flag indicating reachability and an error string + @rtype tuple of (bool, str) + """ + command = """ +def check_internet(): + import network + import socket + + wifi = network.WLAN(network.STA_IF) + if wifi.isconnected(): + s = socket.socket() + try: + s.connect(socket.getaddrinfo('google.com', 80)[0][-1]) + s.close() + print(True) + except: + print(False) + else: + print(False) + +check_internet() +del check_internet +""" + + out, err = self._interface.execute(command) + if err: + return False, err + + return out.decode("utf-8").strip() == "True", "" + + def scanNetworks(self): + """ + Public method to scan for available WiFi networks. + + @return tuple containing the list of available networks as a tuple of 'Name', + 'MAC-Address', 'channel', 'RSSI' and 'security' and an error string + @rtype tuple of (list of tuple of (str, str, int, int, str), str) + """ + command = """ +def scan_networks(): + import network + + wifi = network.WLAN(network.STA_IF) + active = wifi.active() + if not active: + wifi.active(True) + network_list = wifi.scan() + if not active: + wifi.active(False) + print(network_list) + +scan_networks() +del scan_networks +""" + + out, err = self._interface.execute(command, timeout=15000) + if err: + return [], err + + networksList = ast.literal_eval(out.decode("utf-8")) + networks = [] + for network in networksList: + if network[0]: + ssid = network[0].decode("utf-8") + mac = binascii.hexlify(network[1], ":").decode("utf-8") + channel = network[2] + rssi = network[3] + try: + security = self.__securityTranslations[network[4]] + except KeyError: + security = self.tr("unknown ({0})").format(network[4]) + networks.append((ssid, mac, channel, rssi, security)) + + return networks, "" + + def deactivateInterface(self, interface): + """ + Public method to deactivate a given WiFi interface of the connected device. + + @param interface designation of the interface to be deactivated (one of 'AP' + or 'STA') + @type str + @return tuple containg a flag indicating success and an error message + @rtype tuple of (bool, str) + @exception ValueError raised to indicate a wrong value for the interface type + """ + if interface not in ("STA", "AP"): + raise ValueError( + "interface must be 'AP' or 'STA', got '{0}'".format(interface) + ) + + command = """ +def deactivate(): + import network + from time import sleep + + wifi = network.WLAN(network.{0}_IF) + wifi.active(False) + sleep(0.1) + print(not wifi.active()) + +deactivate() +del deactivate +""".format( + interface + ) + + out, err = self._interface.execute(command) + if err: + return False, err + else: + return out.decode("utf-8").strip() == "True", "" + + def startAccessPoint(self, ssid, security=None, password=None): + """ + Public method to start the access point interface. + + @param ssid SSID of the access point + @type str + @param security security method (defaults to None) + @type int (optional) + @param password password (defaults to None) + @type str (optional) + @return tuple containing a flag indicating success and an error message + @rtype tuple of (bool, str) + """ + if security is None or password is None: + security = 0 + password = "" + if security > 4: + security = 4 # security >4 cause an error thrown by the ESP32 + + command = """ +def start_ap(): + import network + + ap = network.WLAN(network.AP_IF) + ap.active(False) + ap.active(True) + try: + ap.config(ssid={0}, authmode={1}, password={2}) + except: + ap.config(essid={0}, authmode={1}, password={2}) + +start_ap() +del start_ap +""".format( + repr(ssid), security, repr(password) + ) + + out, err = self._interface.execute(command, timeout=15000) + if err: + return False, err + else: + return True, "" + + def stopAccessPoint(self): + """ + Public method to stop the access point interface. + + @return tuple containg a flag indicating success and an error message + @rtype tuple of (bool, str) + """ + return self.deactivateInterface("AP") + + def getConnectedClients(self): + """ + Public method to get a list of connected clients. + + @return a tuple containing a list of tuples containing the client MAC-Address + and the RSSI (if supported and available) and an error message + @rtype tuple of ([(bytes, int)], str) + """ + command = """ +def get_stations(): + import network + + ap = network.WLAN(network.AP_IF) + stations = ap.status('stations') + print(stations) + +get_stations() +del get_stations +""" + + out, err = self._interface.execute(command, timeout=10000) + if err: + return [], err + + clientsList = ast.literal_eval(out.decode("utf-8")) + return clientsList, "" + def createDevice(microPythonWidget, deviceType, vid, pid, boardName, serialNumber): """