src/eric7/MicroPython/Devices/EspDevices.py

branch
mpy_network
changeset 9795
11b4d39d7584
parent 9766
f0e22f3a5878
child 9797
3be7b2326e2c
--- a/src/eric7/MicroPython/Devices/EspDevices.py	Thu Feb 23 13:31:20 2023 +0100
+++ b/src/eric7/MicroPython/Devices/EspDevices.py	Thu Feb 23 13:31:55 2023 +0100
@@ -8,11 +8,17 @@
 boards.
 """
 
+import ast
+import binascii
+import json
+import os
+
 from PyQt6.QtCore import QProcess, QUrl, pyqtSlot
 from PyQt6.QtNetwork import QNetworkRequest
 from PyQt6.QtWidgets import QDialog, QMenu
 
 from eric7 import Globals, Preferences
+from eric7.EricGui.EricOverrideCursor import EricOverrideCursor
 from eric7.EricWidgets import EricMessageBox
 from eric7.EricWidgets.EricApplication import ericApp
 from eric7.EricWidgets.EricProcessDialog import EricProcessDialog
@@ -43,6 +49,27 @@
 
         self.__createEsp32Submenu()
 
+        self.__statusTranslations = {
+            200: self.tr("beacon timeout"),
+            201: self.tr("no matching access point found"),
+            202: self.tr("authentication failed"),
+            203: self.tr("association failed"),
+            204: self.tr("handshake timeout"),
+            1000: self.tr("idle"),
+            1001: self.tr("connecting"),
+            1010: self.tr("connected"),
+        }
+        self.__securityTranslations = {
+            0: self.tr("open", "open WiFi network"),
+            1: "WEP",
+            2: "WPA",
+            3: "WPA2",
+            4: "WPA/WPA2",
+            5: "WPA2 (CCMP)",
+            6: "WPA3",
+            7: "WPA2/WPA3",
+        }
+
     def setButtons(self):
         """
         Public method to enable the supported action buttons.
@@ -619,6 +646,456 @@
         rtc.init(clock_time)
 """
 
+    ##################################################################
+    ## Methods below implement WiFi related methods
+    ##################################################################
+
+    def hasWifi(self):
+        """
+        Public method to check the availability of WiFi.
+
+        @return tuple containing a flag indicating the availability of WiFi
+            and the WiFi type (esp32)
+        @rtype tuple of (bool, str)
+        """
+        # TODO: check if ESP8266 is different
+        return True, "esp32"
+
+    def getWifiData(self):
+        """
+        Public method to get data related to the current WiFi status.
+
+        @return tuple of two dictionaries containing the WiFi status data
+            for the WiFi client and access point
+        @rtype tuple of (dict, dict)
+        @exception OSError raised to indicate an issue with the device
+        """
+        command = """
+def wifi_status():
+    import ubinascii
+    import ujson
+    import network
+
+    wifi = network.WLAN(network.STA_IF)
+    station = {
+        'active': wifi.active(),
+        'connected': wifi.isconnected(),
+        'status': wifi.status(),
+        'ifconfig': wifi.ifconfig(),
+        'mac': ubinascii.hexlify(wifi.config('mac'), ':').decode(),
+    }
+    if wifi.active():
+        station['txpower'] = wifi.config('txpower')
+    else:
+        station['txpower'] = 0
+    print(ujson.dumps(station))
+
+    wifi = network.WLAN(network.AP_IF)
+    ap = {
+        'active': wifi.active(),
+        'connected': wifi.isconnected(),
+        'status': wifi.status(),
+        'ifconfig': wifi.ifconfig(),
+        'mac': ubinascii.hexlify(wifi.config('mac'), ':').decode(),
+        'channel': wifi.config('channel'),
+        'essid': wifi.config('essid'),
+    }
+    if wifi.active():
+        ap['txpower'] = wifi.config('txpower')
+    else:
+        ap['txpower'] = 0
+    print(ujson.dumps(ap))
+
+wifi_status()
+del wifi_status
+"""
+
+        out, err = self._interface.execute(command)
+        if err:
+            raise OSError(self._shortError(err))
+
+        stationStr, apStr = out.decode("utf-8").splitlines()
+        station = json.loads(stationStr)
+        ap = json.loads(apStr)
+        try:
+            station["status"] = self.__statusTranslations[station["status"]]
+        except KeyError:
+            station["status"] = str(station["status"])
+        try:
+            ap["status"] = self.__statusTranslations[ap["status"]]
+        except KeyError:
+            ap["status"] = str(ap["status"])
+        return station, ap
+
+    def connectWifi(self, ssid, password):
+        """
+        Public method to connect a device to a WiFi network.
+
+        @param ssid name (SSID) of the WiFi network
+        @type str
+        @param password password needed to connect
+        @type str
+        @return tuple containing the connection status and an error string
+        @rtype tuple of (bool, str)
+        """
+        command = """
+def connect_wifi(ssid, password):
+    import network
+    import ujson
+    from time import sleep
+
+    wifi = network.WLAN(network.STA_IF)
+    wifi.active(False)
+    wifi.active(True)
+    wifi.connect(ssid, password)
+    max_wait = 140
+    while max_wait and wifi.status() == network.STAT_CONNECTING:
+        max_wait -= 1
+        sleep(0.1)
+    status = wifi.status()
+    print(ujson.dumps({{'connected': wifi.isconnected(), 'status': status}}))
+
+connect_wifi({0}, {1})
+del connect_wifi
+""".format(
+                repr(ssid),
+                repr(password if password else ""),
+            )
+
+        with EricOverrideCursor():
+            out, err = self._interface.execute(command, timeout=15000)
+        if err:
+            return False, err
+
+        result = json.loads(out.decode("utf-8").strip())
+        if result["connected"]:
+            error = ""
+        else:
+            try:
+                error = self.__statusTranslations[result["status"]]
+            except KeyError:
+                error = str(result["status"])
+
+        return result["connected"], error
+
+    def disconnectWifi(self):
+        """
+        Public method to disconnect a device from the WiFi network.
+
+        @return tuple containing a flag indicating success and an error string
+        @rtype tuple of (bool, str)
+        """
+        command = """
+def disconnect_wifi():
+    import network
+    from time import sleep
+
+    wifi = network.WLAN(network.STA_IF)
+    wifi.disconnect()
+    wifi.active(False)
+    sleep(0.1)
+    print(not wifi.isconnected())
+
+disconnect_wifi()
+del disconnect_wifi
+"""
+
+        out, err = self._interface.execute(command)
+        if err:
+            return False, err
+
+        return out.decode("utf-8").strip() == "True", ""
+
+    def writeCredentials(self, ssid, password):
+        """
+        Public method to write the given credentials to the connected device and modify
+        the start script to connect automatically.
+
+        @param ssid SSID of the network to connect to
+        @type str
+        @param password password needed to authenticate
+        @type str
+        @return tuple containing a flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        """
+        nvsCommand = """
+def save_wifi_creds(ssid, password):
+    import esp32
+
+    nvs = esp32.NVS('wifi_creds')
+    nvs.set_blob('ssid', ssid)
+    nvs.set_blob('password', password)
+    nvs.commit()
+
+save_wifi_creds({0}, {1})
+del save_wifi_creds
+""".format(repr(ssid), repr(password) if password else "''")
+        bootCommand = """
+def modify_boot():
+    add = True
+    try:
+        with open('/boot.py', 'r') as f:
+            for ln in f.readlines():
+                if 'wifi_connect' in ln:
+                    add = False
+                    break
+    except:
+        pass
+    if add:
+        with open('/boot.py', 'a') as f:
+            f.write('\\nimport wifi_connect\\n')
+    print(True)
+
+modify_boot()
+del modify_boot
+"""
+
+        out, err = self._interface.execute(nvsCommand)
+        if err:
+            return False, self.tr("Error saving credentials: {0}").format(err)
+
+        try:
+            # copy auto-connect file
+            self.put(
+                os.path.join(
+                    os.path.dirname(__file__), "MCUScripts", "esp32WiFiConnect.py"
+                ),
+                "/wifi_connect.py",
+            )
+        except OSError as err:
+            return False, self.tr("Error saving auto-connect script: {0}").format(err)
+
+        out, err = self._interface.execute(bootCommand)
+        if err:
+            return False, self.tr("Error modifying 'boot.py': {0}").format(err)
+
+        return True, ""
+
+    def removeCredentials(self):
+        """
+        Public method to remove the saved credentials from the connected device.
+
+        @return tuple containing a flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        """
+        nvsCommand = """
+def delete_wifi_creds():
+    import esp32
+
+    nvs = esp32.NVS('wifi_creds')
+    try:
+        nvs.erase_key('ssid')
+        nvs.erase_key('password')
+        nvs.commit()
+    except OSError:
+        pass
+
+delete_wifi_creds()
+del delete_wifi_creds
+"""
+
+        out, err = self._interface.execute(nvsCommand)
+        if err:
+            return False, self.tr("Error deleting credentials: {0}").format(err)
+
+        return True, ""
+
+    def checkInternet(self):
+        """
+        Public method to check, if the internet can be reached.
+
+        @return tuple containing a flag indicating reachability and an error string
+        @rtype tuple of (bool, str)
+        """
+        command = """
+def check_internet():
+    import network
+    import socket
+
+    wifi = network.WLAN(network.STA_IF)
+    if wifi.isconnected():
+        s = socket.socket()
+        try:
+            s.connect(socket.getaddrinfo('google.com', 80)[0][-1])
+            s.close()
+            print(True)
+        except:
+            print(False)
+    else:
+        print(False)
+
+check_internet()
+del check_internet
+"""
+
+        out, err = self._interface.execute(command)
+        if err:
+            return False, err
+
+        return out.decode("utf-8").strip() == "True", ""
+
+    def scanNetworks(self):
+        """
+        Public method to scan for available WiFi networks.
+
+        @return tuple containing the list of available networks as a tuple of 'Name',
+            'MAC-Address', 'channel', 'RSSI' and 'security' and an error string
+        @rtype tuple of (list of tuple of (str, str, int, int, str), str)
+        """
+        command = """
+def scan_networks():
+    import network
+
+    wifi = network.WLAN(network.STA_IF)
+    active = wifi.active()
+    if not active:
+        wifi.active(True)
+    network_list = wifi.scan()
+    if not active:
+        wifi.active(False)
+    print(network_list)
+
+scan_networks()
+del scan_networks
+"""
+
+        out, err = self._interface.execute(command, timeout=15000)
+        if err:
+            return [], err
+
+        networksList = ast.literal_eval(out.decode("utf-8"))
+        networks = []
+        for network in networksList:
+            if network[0]:
+                ssid = network[0].decode("utf-8")
+                mac = binascii.hexlify(network[1], ":").decode("utf-8")
+                channel = network[2]
+                rssi = network[3]
+                try:
+                    security = self.__securityTranslations[network[4]]
+                except KeyError:
+                    security = self.tr("unknown ({0})").format(network[4])
+                networks.append((ssid, mac, channel, rssi, security))
+
+        return networks, ""
+
+    def deactivateInterface(self, interface):
+        """
+        Public method to deactivate a given WiFi interface of the connected device.
+
+        @param interface designation of the interface to be deactivated (one of 'AP'
+            or 'STA')
+        @type str
+        @return tuple containg a flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        @exception ValueError raised to indicate a wrong value for the interface type
+        """
+        if interface not in ("STA", "AP"):
+            raise ValueError(
+                "interface must be 'AP' or 'STA', got '{0}'".format(interface)
+            )
+
+        command = """
+def deactivate():
+    import network
+    from time import sleep
+
+    wifi = network.WLAN(network.{0}_IF)
+    wifi.active(False)
+    sleep(0.1)
+    print(not wifi.active())
+
+deactivate()
+del deactivate
+""".format(
+                interface
+            )
+
+        out, err = self._interface.execute(command)
+        if err:
+            return False, err
+        else:
+            return out.decode("utf-8").strip() == "True", ""
+
+    def startAccessPoint(self, ssid, security=None, password=None):
+        """
+        Public method to start the access point interface.
+
+        @param ssid SSID of the access point
+        @type str
+        @param security security method (defaults to None)
+        @type int (optional)
+        @param password password (defaults to None)
+        @type str (optional)
+        @return tuple containing a flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        """
+        if security is None or password is None:
+            security = 0
+            password = ""
+        if security > 4:
+            security = 4  # security >4 cause an error thrown by the ESP32
+
+        command = """
+def start_ap():
+    import network
+
+    ap = network.WLAN(network.AP_IF)
+    ap.active(False)
+    ap.active(True)
+    try:
+        ap.config(ssid={0}, authmode={1}, password={2})
+    except:
+        ap.config(essid={0}, authmode={1}, password={2})
+
+start_ap()
+del start_ap
+""".format(
+                repr(ssid), security, repr(password)
+            )
+
+        out, err = self._interface.execute(command, timeout=15000)
+        if err:
+            return False, err
+        else:
+            return True, ""
+
+    def stopAccessPoint(self):
+        """
+        Public method to stop the access point interface.
+
+        @return tuple containg a flag indicating success and an error message
+        @rtype tuple of (bool, str)
+        """
+        return self.deactivateInterface("AP")
+
+    def getConnectedClients(self):
+        """
+        Public method to get a list of connected clients.
+
+        @return a tuple containing a list of tuples containing the client MAC-Address
+            and the RSSI (if supported and available) and an error message
+        @rtype tuple of ([(bytes, int)], str)
+        """
+        command = """
+def get_stations():
+    import network
+
+    ap = network.WLAN(network.AP_IF)
+    stations = ap.status('stations')
+    print(stations)
+
+get_stations()
+del get_stations
+"""
+
+        out, err = self._interface.execute(command, timeout=10000)
+        if err:
+            return [], err
+
+        clientsList = ast.literal_eval(out.decode("utf-8"))
+        return clientsList, ""
+
 
 def createDevice(microPythonWidget, deviceType, vid, pid, boardName, serialNumber):
     """

eric ide

mercurial