src/eric7/MicroPython/Devices/EspDevices.py

branch
mpy_network
changeset 9795
11b4d39d7584
parent 9766
f0e22f3a5878
child 9797
3be7b2326e2c
equal deleted inserted replaced
9794:21fd7c63b487 9795:11b4d39d7584
6 """ 6 """
7 Module implementing the device interface class for ESP32 and ESP8266 based 7 Module implementing the device interface class for ESP32 and ESP8266 based
8 boards. 8 boards.
9 """ 9 """
10 10
11 import ast
12 import binascii
13 import json
14 import os
15
11 from PyQt6.QtCore import QProcess, QUrl, pyqtSlot 16 from PyQt6.QtCore import QProcess, QUrl, pyqtSlot
12 from PyQt6.QtNetwork import QNetworkRequest 17 from PyQt6.QtNetwork import QNetworkRequest
13 from PyQt6.QtWidgets import QDialog, QMenu 18 from PyQt6.QtWidgets import QDialog, QMenu
14 19
15 from eric7 import Globals, Preferences 20 from eric7 import Globals, Preferences
21 from eric7.EricGui.EricOverrideCursor import EricOverrideCursor
16 from eric7.EricWidgets import EricMessageBox 22 from eric7.EricWidgets import EricMessageBox
17 from eric7.EricWidgets.EricApplication import ericApp 23 from eric7.EricWidgets.EricApplication import ericApp
18 from eric7.EricWidgets.EricProcessDialog import EricProcessDialog 24 from eric7.EricWidgets.EricProcessDialog import EricProcessDialog
19 from eric7.SystemUtilities import PythonUtilities 25 from eric7.SystemUtilities import PythonUtilities
20 26
40 @type QObject 46 @type QObject
41 """ 47 """
42 super().__init__(microPythonWidget, deviceType, parent) 48 super().__init__(microPythonWidget, deviceType, parent)
43 49
44 self.__createEsp32Submenu() 50 self.__createEsp32Submenu()
51
52 self.__statusTranslations = {
53 200: self.tr("beacon timeout"),
54 201: self.tr("no matching access point found"),
55 202: self.tr("authentication failed"),
56 203: self.tr("association failed"),
57 204: self.tr("handshake timeout"),
58 1000: self.tr("idle"),
59 1001: self.tr("connecting"),
60 1010: self.tr("connected"),
61 }
62 self.__securityTranslations = {
63 0: self.tr("open", "open WiFi network"),
64 1: "WEP",
65 2: "WPA",
66 3: "WPA2",
67 4: "WPA/WPA2",
68 5: "WPA2 (CCMP)",
69 6: "WPA3",
70 7: "WPA2/WPA3",
71 }
45 72
46 def setButtons(self): 73 def setButtons(self):
47 """ 74 """
48 Public method to enable the supported action buttons. 75 Public method to enable the supported action buttons.
49 """ 76 """
617 else: 644 else:
618 clock_time = rtc_time[:7] + (0,) 645 clock_time = rtc_time[:7] + (0,)
619 rtc.init(clock_time) 646 rtc.init(clock_time)
620 """ 647 """
621 648
649 ##################################################################
650 ## Methods below implement WiFi related methods
651 ##################################################################
652
653 def hasWifi(self):
654 """
655 Public method to check the availability of WiFi.
656
657 @return tuple containing a flag indicating the availability of WiFi
658 and the WiFi type (esp32)
659 @rtype tuple of (bool, str)
660 """
661 # TODO: check if ESP8266 is different
662 return True, "esp32"
663
664 def getWifiData(self):
665 """
666 Public method to get data related to the current WiFi status.
667
668 @return tuple of two dictionaries containing the WiFi status data
669 for the WiFi client and access point
670 @rtype tuple of (dict, dict)
671 @exception OSError raised to indicate an issue with the device
672 """
673 command = """
674 def wifi_status():
675 import ubinascii
676 import ujson
677 import network
678
679 wifi = network.WLAN(network.STA_IF)
680 station = {
681 'active': wifi.active(),
682 'connected': wifi.isconnected(),
683 'status': wifi.status(),
684 'ifconfig': wifi.ifconfig(),
685 'mac': ubinascii.hexlify(wifi.config('mac'), ':').decode(),
686 }
687 if wifi.active():
688 station['txpower'] = wifi.config('txpower')
689 else:
690 station['txpower'] = 0
691 print(ujson.dumps(station))
692
693 wifi = network.WLAN(network.AP_IF)
694 ap = {
695 'active': wifi.active(),
696 'connected': wifi.isconnected(),
697 'status': wifi.status(),
698 'ifconfig': wifi.ifconfig(),
699 'mac': ubinascii.hexlify(wifi.config('mac'), ':').decode(),
700 'channel': wifi.config('channel'),
701 'essid': wifi.config('essid'),
702 }
703 if wifi.active():
704 ap['txpower'] = wifi.config('txpower')
705 else:
706 ap['txpower'] = 0
707 print(ujson.dumps(ap))
708
709 wifi_status()
710 del wifi_status
711 """
712
713 out, err = self._interface.execute(command)
714 if err:
715 raise OSError(self._shortError(err))
716
717 stationStr, apStr = out.decode("utf-8").splitlines()
718 station = json.loads(stationStr)
719 ap = json.loads(apStr)
720 try:
721 station["status"] = self.__statusTranslations[station["status"]]
722 except KeyError:
723 station["status"] = str(station["status"])
724 try:
725 ap["status"] = self.__statusTranslations[ap["status"]]
726 except KeyError:
727 ap["status"] = str(ap["status"])
728 return station, ap
729
730 def connectWifi(self, ssid, password):
731 """
732 Public method to connect a device to a WiFi network.
733
734 @param ssid name (SSID) of the WiFi network
735 @type str
736 @param password password needed to connect
737 @type str
738 @return tuple containing the connection status and an error string
739 @rtype tuple of (bool, str)
740 """
741 command = """
742 def connect_wifi(ssid, password):
743 import network
744 import ujson
745 from time import sleep
746
747 wifi = network.WLAN(network.STA_IF)
748 wifi.active(False)
749 wifi.active(True)
750 wifi.connect(ssid, password)
751 max_wait = 140
752 while max_wait and wifi.status() == network.STAT_CONNECTING:
753 max_wait -= 1
754 sleep(0.1)
755 status = wifi.status()
756 print(ujson.dumps({{'connected': wifi.isconnected(), 'status': status}}))
757
758 connect_wifi({0}, {1})
759 del connect_wifi
760 """.format(
761 repr(ssid),
762 repr(password if password else ""),
763 )
764
765 with EricOverrideCursor():
766 out, err = self._interface.execute(command, timeout=15000)
767 if err:
768 return False, err
769
770 result = json.loads(out.decode("utf-8").strip())
771 if result["connected"]:
772 error = ""
773 else:
774 try:
775 error = self.__statusTranslations[result["status"]]
776 except KeyError:
777 error = str(result["status"])
778
779 return result["connected"], error
780
781 def disconnectWifi(self):
782 """
783 Public method to disconnect a device from the WiFi network.
784
785 @return tuple containing a flag indicating success and an error string
786 @rtype tuple of (bool, str)
787 """
788 command = """
789 def disconnect_wifi():
790 import network
791 from time import sleep
792
793 wifi = network.WLAN(network.STA_IF)
794 wifi.disconnect()
795 wifi.active(False)
796 sleep(0.1)
797 print(not wifi.isconnected())
798
799 disconnect_wifi()
800 del disconnect_wifi
801 """
802
803 out, err = self._interface.execute(command)
804 if err:
805 return False, err
806
807 return out.decode("utf-8").strip() == "True", ""
808
809 def writeCredentials(self, ssid, password):
810 """
811 Public method to write the given credentials to the connected device and modify
812 the start script to connect automatically.
813
814 @param ssid SSID of the network to connect to
815 @type str
816 @param password password needed to authenticate
817 @type str
818 @return tuple containing a flag indicating success and an error message
819 @rtype tuple of (bool, str)
820 """
821 nvsCommand = """
822 def save_wifi_creds(ssid, password):
823 import esp32
824
825 nvs = esp32.NVS('wifi_creds')
826 nvs.set_blob('ssid', ssid)
827 nvs.set_blob('password', password)
828 nvs.commit()
829
830 save_wifi_creds({0}, {1})
831 del save_wifi_creds
832 """.format(repr(ssid), repr(password) if password else "''")
833 bootCommand = """
834 def modify_boot():
835 add = True
836 try:
837 with open('/boot.py', 'r') as f:
838 for ln in f.readlines():
839 if 'wifi_connect' in ln:
840 add = False
841 break
842 except:
843 pass
844 if add:
845 with open('/boot.py', 'a') as f:
846 f.write('\\nimport wifi_connect\\n')
847 print(True)
848
849 modify_boot()
850 del modify_boot
851 """
852
853 out, err = self._interface.execute(nvsCommand)
854 if err:
855 return False, self.tr("Error saving credentials: {0}").format(err)
856
857 try:
858 # copy auto-connect file
859 self.put(
860 os.path.join(
861 os.path.dirname(__file__), "MCUScripts", "esp32WiFiConnect.py"
862 ),
863 "/wifi_connect.py",
864 )
865 except OSError as err:
866 return False, self.tr("Error saving auto-connect script: {0}").format(err)
867
868 out, err = self._interface.execute(bootCommand)
869 if err:
870 return False, self.tr("Error modifying 'boot.py': {0}").format(err)
871
872 return True, ""
873
874 def removeCredentials(self):
875 """
876 Public method to remove the saved credentials from the connected device.
877
878 @return tuple containing a flag indicating success and an error message
879 @rtype tuple of (bool, str)
880 """
881 nvsCommand = """
882 def delete_wifi_creds():
883 import esp32
884
885 nvs = esp32.NVS('wifi_creds')
886 try:
887 nvs.erase_key('ssid')
888 nvs.erase_key('password')
889 nvs.commit()
890 except OSError:
891 pass
892
893 delete_wifi_creds()
894 del delete_wifi_creds
895 """
896
897 out, err = self._interface.execute(nvsCommand)
898 if err:
899 return False, self.tr("Error deleting credentials: {0}").format(err)
900
901 return True, ""
902
903 def checkInternet(self):
904 """
905 Public method to check, if the internet can be reached.
906
907 @return tuple containing a flag indicating reachability and an error string
908 @rtype tuple of (bool, str)
909 """
910 command = """
911 def check_internet():
912 import network
913 import socket
914
915 wifi = network.WLAN(network.STA_IF)
916 if wifi.isconnected():
917 s = socket.socket()
918 try:
919 s.connect(socket.getaddrinfo('google.com', 80)[0][-1])
920 s.close()
921 print(True)
922 except:
923 print(False)
924 else:
925 print(False)
926
927 check_internet()
928 del check_internet
929 """
930
931 out, err = self._interface.execute(command)
932 if err:
933 return False, err
934
935 return out.decode("utf-8").strip() == "True", ""
936
937 def scanNetworks(self):
938 """
939 Public method to scan for available WiFi networks.
940
941 @return tuple containing the list of available networks as a tuple of 'Name',
942 'MAC-Address', 'channel', 'RSSI' and 'security' and an error string
943 @rtype tuple of (list of tuple of (str, str, int, int, str), str)
944 """
945 command = """
946 def scan_networks():
947 import network
948
949 wifi = network.WLAN(network.STA_IF)
950 active = wifi.active()
951 if not active:
952 wifi.active(True)
953 network_list = wifi.scan()
954 if not active:
955 wifi.active(False)
956 print(network_list)
957
958 scan_networks()
959 del scan_networks
960 """
961
962 out, err = self._interface.execute(command, timeout=15000)
963 if err:
964 return [], err
965
966 networksList = ast.literal_eval(out.decode("utf-8"))
967 networks = []
968 for network in networksList:
969 if network[0]:
970 ssid = network[0].decode("utf-8")
971 mac = binascii.hexlify(network[1], ":").decode("utf-8")
972 channel = network[2]
973 rssi = network[3]
974 try:
975 security = self.__securityTranslations[network[4]]
976 except KeyError:
977 security = self.tr("unknown ({0})").format(network[4])
978 networks.append((ssid, mac, channel, rssi, security))
979
980 return networks, ""
981
982 def deactivateInterface(self, interface):
983 """
984 Public method to deactivate a given WiFi interface of the connected device.
985
986 @param interface designation of the interface to be deactivated (one of 'AP'
987 or 'STA')
988 @type str
989 @return tuple containg a flag indicating success and an error message
990 @rtype tuple of (bool, str)
991 @exception ValueError raised to indicate a wrong value for the interface type
992 """
993 if interface not in ("STA", "AP"):
994 raise ValueError(
995 "interface must be 'AP' or 'STA', got '{0}'".format(interface)
996 )
997
998 command = """
999 def deactivate():
1000 import network
1001 from time import sleep
1002
1003 wifi = network.WLAN(network.{0}_IF)
1004 wifi.active(False)
1005 sleep(0.1)
1006 print(not wifi.active())
1007
1008 deactivate()
1009 del deactivate
1010 """.format(
1011 interface
1012 )
1013
1014 out, err = self._interface.execute(command)
1015 if err:
1016 return False, err
1017 else:
1018 return out.decode("utf-8").strip() == "True", ""
1019
1020 def startAccessPoint(self, ssid, security=None, password=None):
1021 """
1022 Public method to start the access point interface.
1023
1024 @param ssid SSID of the access point
1025 @type str
1026 @param security security method (defaults to None)
1027 @type int (optional)
1028 @param password password (defaults to None)
1029 @type str (optional)
1030 @return tuple containing a flag indicating success and an error message
1031 @rtype tuple of (bool, str)
1032 """
1033 if security is None or password is None:
1034 security = 0
1035 password = ""
1036 if security > 4:
1037 security = 4 # security >4 cause an error thrown by the ESP32
1038
1039 command = """
1040 def start_ap():
1041 import network
1042
1043 ap = network.WLAN(network.AP_IF)
1044 ap.active(False)
1045 ap.active(True)
1046 try:
1047 ap.config(ssid={0}, authmode={1}, password={2})
1048 except:
1049 ap.config(essid={0}, authmode={1}, password={2})
1050
1051 start_ap()
1052 del start_ap
1053 """.format(
1054 repr(ssid), security, repr(password)
1055 )
1056
1057 out, err = self._interface.execute(command, timeout=15000)
1058 if err:
1059 return False, err
1060 else:
1061 return True, ""
1062
1063 def stopAccessPoint(self):
1064 """
1065 Public method to stop the access point interface.
1066
1067 @return tuple containg a flag indicating success and an error message
1068 @rtype tuple of (bool, str)
1069 """
1070 return self.deactivateInterface("AP")
1071
1072 def getConnectedClients(self):
1073 """
1074 Public method to get a list of connected clients.
1075
1076 @return a tuple containing a list of tuples containing the client MAC-Address
1077 and the RSSI (if supported and available) and an error message
1078 @rtype tuple of ([(bytes, int)], str)
1079 """
1080 command = """
1081 def get_stations():
1082 import network
1083
1084 ap = network.WLAN(network.AP_IF)
1085 stations = ap.status('stations')
1086 print(stations)
1087
1088 get_stations()
1089 del get_stations
1090 """
1091
1092 out, err = self._interface.execute(command, timeout=10000)
1093 if err:
1094 return [], err
1095
1096 clientsList = ast.literal_eval(out.decode("utf-8"))
1097 return clientsList, ""
1098
622 1099
623 def createDevice(microPythonWidget, deviceType, vid, pid, boardName, serialNumber): 1100 def createDevice(microPythonWidget, deviceType, vid, pid, boardName, serialNumber):
624 """ 1101 """
625 Function to instantiate a MicroPython device object. 1102 Function to instantiate a MicroPython device object.
626 1103

eric ide

mercurial