Mon, 20 Feb 2023 11:42:45 +0100
Continued implementing WiFi functionality for RP2040 based devices (internet connection, network scan).
# -*- coding: utf-8 -*- # Copyright (c) 2021 - 2023 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the device interface class for RP2040 based boards (e.g. Raspberry Pi Pico). """ import ast import binascii import json from PyQt6.QtCore import QUrl, pyqtSlot from PyQt6.QtNetwork import QNetworkRequest from PyQt6.QtWidgets import QDialog, QMenu from eric7 import Globals, Preferences from eric7.EricGui.EricOverrideCursor import EricOverrideCursor from eric7.EricWidgets import EricMessageBox from eric7.EricWidgets.EricApplication import ericApp from ..MicroPythonWidget import HAS_QTCHART from . import FirmwareGithubUrls from .DeviceBase import BaseDevice class RP2040Device(BaseDevice): """ Class implementing the device for RP2040 based boards. """ def __init__(self, microPythonWidget, deviceType, parent=None): """ Constructor @param microPythonWidget reference to the main MicroPython widget @type MicroPythonWidget @param deviceType device type assigned to this device interface @type str @param parent reference to the parent object @type QObject """ super().__init__(microPythonWidget, deviceType, parent) self.__createRP2040Menu() self.__statusTranslations = { "picow": { -3: self.tr('authentication failed'), -2: self.tr('no matching access point found'), -1: self.tr('connection failed'), 0: self.tr('idle'), 1: self.tr('connecting'), 2: self.tr('connected, waiting for IP address'), 3: self.tr('connected'), }, "pimoroni": { # TODO: not yet implemented }, } self.__securityTranslations = { 0: self.tr("open", "open WiFi network"), 1: "WEP", 2: "WPA", 3: "WPA2", 4: "WPA/WPA2", 5: "WPA2 (CCMP)", 6: "WPA3", 7: "WPA2/WPA3" } def setButtons(self): """ Public method to enable the supported action buttons. """ super().setButtons() self.microPython.setActionButtons( run=True, repl=True, files=True, chart=HAS_QTCHART ) def forceInterrupt(self): """ Public method to determine the need for an interrupt when opening the serial connection. @return flag indicating an interrupt is needed @rtype bool """ return False def deviceName(self): """ Public method to get the name of the device. @return name of the device @rtype str """ return self.tr("RP2040") def canStartRepl(self): """ Public method to determine, if a REPL can be started. @return tuple containing a flag indicating it is safe to start a REPL and a reason why it cannot. @rtype tuple of (bool, str) """ return True, "" def canStartPlotter(self): """ Public method to determine, if a Plotter can be started. @return tuple containing a flag indicating it is safe to start a Plotter and a reason why it cannot. @rtype tuple of (bool, str) """ return True, "" def canRunScript(self): """ Public method to determine, if a script can be executed. @return tuple containing a flag indicating it is safe to start a Plotter and a reason why it cannot. @rtype tuple of (bool, str) """ return True, "" def runScript(self, script): """ Public method to run the given Python script. @param script script to be executed @type str """ pythonScript = script.split("\n") self.sendCommands(pythonScript) def canStartFileManager(self): """ Public method to determine, if a File Manager can be started. @return tuple containing a flag indicating it is safe to start a File Manager and a reason why it cannot. @rtype tuple of (bool, str) """ return True, "" def __createRP2040Menu(self): """ Private method to create the RO2040 submenu. """ self.__rp2040Menu = QMenu(self.tr("RP2040 Functions")) self.__showMpyAct = self.__rp2040Menu.addAction( self.tr("Show MicroPython Versions"), self.__showFirmwareVersions ) self.__rp2040Menu.addSeparator() self.__bootloaderAct = self.__rp2040Menu.addAction( self.tr("Activate Bootloader"), self.__activateBootloader ) self.__flashMpyAct = self.__rp2040Menu.addAction( self.tr("Flash MicroPython Firmware"), self.__flashPython ) def addDeviceMenuEntries(self, menu): """ Public method to add device specific entries to the given menu. @param menu reference to the context menu @type QMenu """ connected = self.microPython.isConnected() linkConnected = self.microPython.isLinkConnected() self.__showMpyAct.setEnabled(connected) self.__bootloaderAct.setEnabled(connected) self.__flashMpyAct.setEnabled(not linkConnected) menu.addMenu(self.__rp2040Menu) def hasFlashMenuEntry(self): """ Public method to check, if the device has its own flash menu entry. @return flag indicating a specific flash menu entry @rtype bool """ return True @pyqtSlot() def __flashPython(self): """ Private slot to flash a MicroPython firmware to the device. """ from ..UF2FlashDialog import UF2FlashDialog dlg = UF2FlashDialog(boardType="rp2040") dlg.exec() def __activateBootloader(self): """ Private method to switch the board into 'bootloader' mode. """ if self.microPython.isConnected(): self.microPython.deviceInterface().execute( [ "import machine", "machine.bootloader()", ] ) # simulate pressing the disconnect button self.microPython.on_connectButton_clicked() @pyqtSlot() def __showFirmwareVersions(self): """ Private slot to show the firmware version of the connected device and the available firmware version. """ if self.microPython.isConnected(): if self._deviceData["mpy_name"] != "micropython": EricMessageBox.critical( None, self.tr("Show MicroPython Versions"), self.tr( """The firmware of the connected device cannot be""" """ determined or the board does not run MicroPython.""" """ Aborting...""" ), ) else: if self._deviceData["mpy_variant"] == "Pimoroni Pico": # MicroPython with Pimoroni add-on libraries url = QUrl(FirmwareGithubUrls["pimoroni_pico"]) else: url = QUrl(FirmwareGithubUrls["micropython"]) ui = ericApp().getObject("UserInterface") request = QNetworkRequest(url) reply = ui.networkAccessManager().head(request) reply.finished.connect(lambda: self.__firmwareVersionResponse(reply)) def __firmwareVersionResponse(self, reply): """ Private method handling the response of the latest version request. @param reply reference to the reply object @type QNetworkReply """ latestUrl = reply.url().toString() tag = latestUrl.rsplit("/", 1)[-1] while tag and not tag[0].isdecimal(): # get rid of leading non-decimal characters tag = tag[1:] latestVersion = Globals.versionToTuple(tag) if self._deviceData["mpy_version"] == "unknown": currentVersionStr = self.tr("unknown") currentVersion = (0, 0, 0) else: currentVersionStr = ( self._deviceData["mpy_variant_version"] if bool(self._deviceData["mpy_variant_version"]) else self._deviceData["mpy_version"] ) currentVersion = Globals.versionToTuple(currentVersionStr) msg = self.tr( "<h4>MicroPython Version Information</h4>" "<table>" "<tr><td>Installed:</td><td>{0}</td></tr>" "<tr><td>Available:</td><td>{1}</td></tr>" "{2}" "</table>" ).format( currentVersionStr, tag, self.tr( "<tr><td>Variant:</td><td>{0}</td></tr>" ).format(self._deviceData["mpy_variant"]) if self._deviceData["mpy_variant"] else "", ) if ( self._deviceData["mpy_variant"] in ["Pimoroni Pico"] and not bool(self._deviceData["mpy_variant_version"]) ): # cannot derive update info msg += self.tr("<p>Update may be available.</p>") elif currentVersion < latestVersion: msg += self.tr("<p><b>Update available!</b></p>") EricMessageBox.information( None, self.tr("MicroPython Version"), msg, ) def getDocumentationUrl(self): """ Public method to get the device documentation URL. @return documentation URL of the device @rtype str """ return Preferences.getMicroPython("MicroPythonDocuUrl") def getDownloadMenuEntries(self): """ Public method to retrieve the entries for the downloads menu. @return list of tuples with menu text and URL to be opened for each entry @rtype list of tuple of (str, str) """ return [ ( self.tr("MicroPython Firmware"), Preferences.getMicroPython("MicroPythonFirmwareUrl"), ), ("<separator>", ""), (self.tr("Pimoroni Pico Firmware"), FirmwareGithubUrls["pimoroni_pico"]), ("<separator>", ""), ( self.tr("CircuitPython Firmware"), Preferences.getMicroPython("CircuitPythonFirmwareUrl"), ), ( self.tr("CircuitPython Libraries"), Preferences.getMicroPython("CircuitPythonLibrariesUrl"), ), ] ################################################################## ## time related methods below ################################################################## def _getSetTimeCode(self): """ Protected method to get the device code to set the time. Note: This method must be implemented in the various device specific subclasses. @return code to be executed on the connected device to set the time @rtype str """ # rtc_time[0] - year 4 digit # rtc_time[1] - month 1..12 # rtc_time[2] - day 1..31 # rtc_time[3] - weekday 1..7 1=Monday # rtc_time[4] - hour 0..23 # rtc_time[5] - minute 0..59 # rtc_time[6] - second 0..59 # rtc_time[7] - yearday 1..366 # rtc_time[8] - isdst 0, 1, or -1 # The machine.rtc.datetime() function takes the arguments in the order: # (year, month, day, weekday, hour, minute, second, subseconds) # __IGNORE_WARNING_M891__ # https://docs.micropython.org/en/latest/library/machine.RTC.html#machine-rtc return """ def set_time(rtc_time): import machine rtc = machine.RTC() rtc.datetime(rtc_time[:7] + (0,)) """ ################################################################## ## Methods below implement WiFi related methods ################################################################## def addDeviceWifiEntries(self, menu): """ Public method to add device specific entries to the given menu. @param menu reference to the context menu @type QMenu """ menu.addSeparator() menu.addAction(self.tr("Set Country"), self.__setCountry).setEnabled( self._deviceData["wifi_type"] == "picow" ) def hasWifi(self): """ Public method to check the availability of WiFi. @return tuple containing a flag indicating the availability of WiFi and the WiFi type (picow or pimoroni) @rtype tuple of (bool, str) @exception OSError raised to indicate an issue with the device """ command = """ def has_wifi(): try: import network if hasattr(network, 'WLAN'): return True, 'picow' except ImportError: try: import picowireless if picowireless.get_fw_version() != '': return True, 'pimoroni' except ImportError: pass return False, '' print(has_wifi()) del has_wifi """ out, err = self._interface.execute(command, timeout=10000) if err: raise OSError(self._shortError(err)) return ast.literal_eval(out.decode("utf-8")) def getWifiData(self): """ Public method to get data related to the current WiFi status @return tuple of two dictionaries containing the WiFi status data for the WiFi client and access point @rtype tuple of (dict, dict) """ if self._deviceData["wifi_type"] == "picow": command = """ def wifi_status(): import ubinascii import ujson import network import rp2 wifi = network.WLAN(network.STA_IF) station = { 'active': wifi.active(), 'connected': wifi.isconnected(), 'status': wifi.status(), 'ifconfig': wifi.ifconfig(), 'mac': ubinascii.hexlify(wifi.config('mac'), ':').decode(), 'channel': wifi.config('channel'), 'txpower': wifi.config('txpower'), 'country': rp2.country(), } print(ujson.dumps(station)) wifi = network.WLAN(network.AP_IF) ap = { 'active': wifi.active(), 'connected': wifi.isconnected(), 'status': wifi.status(), 'ifconfig': wifi.ifconfig(), 'mac': ubinascii.hexlify(wifi.config('mac'), ':').decode(), 'channel': wifi.config('channel'), 'txpower': wifi.config('txpower'), 'essid': wifi.config('essid'), 'country': rp2.country(), } print(ujson.dumps(ap)) wifi_status() del wifi_status """ elif self._deviceData["wifi_type"] == "pimoroni": # TODO: not yet implemented pass else: return super().getWifiData() out, err = self._interface.execute(command)##, timeout=10000) if err: raise OSError(self._shortError(err)) stationStr, apStr = out.decode("utf-8").splitlines() station = json.loads(stationStr) ap = json.loads(apStr) try: station["status"] = self.__statusTranslations[ self._deviceData["wifi_type"] ][station["status"]] except KeyError: station["status"] = str(station["status"]) try: ap["status"] = self.__statusTranslations[ self._deviceData["wifi_type"] ][ap["status"]] except KeyError: ap["status"] = str(ap["status"]) return station, ap def connectWifi(self, ssid, password): """ Public method to connect a device to a WiFi network. @param ssid name (SSID) of the WiFi network @type str @param password password needed to connect @type str @return tuple containing the connection status and an error string @rtype tuple of (bool, str) """ if self._deviceData["wifi_type"] == "picow": country = Preferences.getMicroPython("WifiCountry").upper() command = """ def connect_wifi(ssid, password): import network import rp2 import ujson from time import sleep rp2.country({2}) wifi = network.WLAN(network.STA_IF) wifi.active(False) wifi.active(True) wifi.connect(ssid, password) max_wait = 140 while max_wait: if wifi.status() < 0 or wifi.status() >= 3: break max_wait -= 1 sleep(0.1) status = wifi.status() print(ujson.dumps({{'connected': wifi.isconnected(), 'status': status}})) connect_wifi({0}, {1}) del connect_wifi """.format( repr(ssid), repr(password if password else ""), repr(country if country else "XX"), ) elif self._deviceData["wifi_type"] == "pimoroni": # TODO: not yet implemented pass else: return super().connectWifi(ssid, password) with EricOverrideCursor(): out, err = self._interface.execute(command, timeout=15000) if err: return False, err result = json.loads(out.decode("utf-8").strip()) if result["connected"]: error = "" else: try: error = self.__statusTranslations[ self._deviceData["wifi_type"] ][result["status"]] except KeyError: error = str(result["status"]) return result["connected"], error def disconnectWifi(self): """ Public method to disconnect a device from the WiFi network. @return tuple containing a flag indicating success and an error string @rtype tuple of (bool, str) """ if self._deviceData["wifi_type"] == "picow": command = """ def disconnect_wifi(): import network from time import sleep wifi = network.WLAN(network.STA_IF) wifi.disconnect() wifi.active(False) sleep(0.1) print(not wifi.isconnected()) disconnect_wifi() del disconnect_wifi """ elif self._deviceData["wifi_type"] == "pimoroni": # TODO: not yet implemented pass else: return super().disconnectWifi() out, err = self._interface.execute(command) if err: return False, err return out.decode("utf-8").strip() == "True", "" def checkInternet(self): """ Public method to check, if the internet can be reached. @return tuple containing a flag indicating reachability and an error string @rtype tuple of (bool, str) """ if self._deviceData["wifi_type"] == "picow": command = """ def check_internet(): import network import socket wifi = network.WLAN(network.STA_IF) if wifi.isconnected(): s = socket.socket() try: s.connect(socket.getaddrinfo('google.com', 80)[0][-1]) s.close() print(True) except: print(False) else: print(False) check_internet() del check_internet """ elif self._deviceData["wifi_type"] == "pimoroni": # TODO: not yet implemented pass else: return super().checkInternet() out, err = self._interface.execute(command) if err: return False, err return out.decode("utf-8").strip() == "True", "" def scanNetworks(self): """ Public method to scan for available WiFi networks. @return tuple containing the list of available networks as a tuple of 'Name', 'MAC-Address', 'channel', 'RSSI' and 'security' and an error string @rtype tuple of (list of tuple of (str, str, int, int, str), str) """ if self._deviceData["wifi_type"] == "picow": command = """ def scan_networks(): import network wifi = network.WLAN(network.STA_IF) active = wifi.active() if not active: wifi.active(True) network_list = wifi.scan() if not active: wifi.active(False) print(network_list) scan_networks() del scan_networks """ elif self._deviceData["wifi_type"] == "pimoroni": # TODO: not yet implemented pass else: return super().checkInternet() out, err = self._interface.execute(command, timeout=15000) if err: return [], err networksList = ast.literal_eval(out.decode("utf-8")) networks = [] for network in networksList: if network[0]: ssid = network[0].decode("utf-8") mac = binascii.hexlify(network[1], ":").decode("utf-8") channel = network[2] rssi = network[3] try: security = self.__securityTranslations[network[4]] except KeyError: security = self.tr("unknown ({0})").format(network[4]) networks.append((ssid, mac, channel, rssi, security)) return networks, "" ############################################################################ ## RP2 only methods below ############################################################################ @pyqtSlot() def __setCountry(self): """ Private slot to configure the country of the connected RP2040 device. The country is the two letter country code. """ from ..WifiDialogs.WifiCountryDialog import WifiCountryDialog dlg = WifiCountryDialog() if dlg.exec() == QDialog.DialogCode.Accepted: country, remember = dlg.getCountry() if remember: Preferences.setMicroPython("WifiCountry", country) command = """ import rp2 rp2.country({0}) """.format(repr(country)) out, err = self._interface.execute(command) if err: self._showError("rp2.country()", err) def createDevice(microPythonWidget, deviceType, vid, pid, boardName, serialNumber): """ Function to instantiate a MicroPython device object. @param microPythonWidget reference to the main MicroPython widget @type MicroPythonWidget @param deviceType device type assigned to this device interface @type str @param vid vendor ID @type int @param pid product ID @type int @param boardName name of the board @type str @param serialNumber serial number of the board @type str @return reference to the instantiated device object @rtype RP2040Device """ return RP2040Device(microPythonWidget, deviceType)