src/eric7/MicroPython/Devices/CircuitPythonDevices.py

branch
mpy_network
changeset 9828
32c8a5b57332
parent 9820
67597e003373
child 9834
1fdaebde6316
equal deleted inserted replaced
9827:21803aa6c3e2 9828:32c8a5b57332
6 """ 6 """
7 Module implementing the device interface class for CircuitPython boards. 7 Module implementing the device interface class for CircuitPython boards.
8 """ 8 """
9 9
10 import ast 10 import ast
11 import binascii
11 import json 12 import json
12 import os 13 import os
13 import shutil 14 import shutil
14 15
15 from PyQt6.QtCore import QUrl, pyqtSlot 16 from PyQt6.QtCore import QUrl, pyqtSlot
16 from PyQt6.QtNetwork import QNetworkReply, QNetworkRequest 17 from PyQt6.QtNetwork import QNetworkReply, QNetworkRequest
17 from PyQt6.QtWidgets import QMenu 18 from PyQt6.QtWidgets import QMenu
18 19
19 from eric7 import Globals, Preferences 20 from eric7 import Globals, Preferences
21 from eric7.EricGui.EricOverrideCursor import EricOverrideCursor
20 from eric7.EricWidgets import EricFileDialog, EricMessageBox 22 from eric7.EricWidgets import EricFileDialog, EricMessageBox
21 from eric7.EricWidgets.EricApplication import ericApp 23 from eric7.EricWidgets.EricApplication import ericApp
22 from eric7.SystemUtilities import FileSystemUtilities 24 from eric7.SystemUtilities import FileSystemUtilities
23 25
24 from ..MicroPythonWidget import HAS_QTCHART 26 from ..MicroPythonWidget import HAS_QTCHART
50 @param parent reference to the parent object 52 @param parent reference to the parent object
51 @type QObject 53 @type QObject
52 """ 54 """
53 super().__init__(microPythonWidget, deviceType, parent) 55 super().__init__(microPythonWidget, deviceType, parent)
54 56
55 self.submitMode = "paste" # use 'paste' mode to avoid loosing state 57 self._submitMode = "paste" # use 'paste' mode to avoid loosing state
56 58
57 self.__boardName = boardName 59 self.__boardName = boardName
58 self.__workspace = self.__findWorkspace() 60 self.__workspace = self.__findWorkspace()
59 61
60 self.__updater = CircuitPythonUpdaterInterface(self) 62 self.__updater = CircuitPythonUpdaterInterface(self)
61 63
62 self.__createCPyMenu() 64 self.__createCPyMenu()
65
66 self.__securityTranslations = {
67 "OPEN": self.tr("open", "open WiFi network"),
68 "WEP": "WEP",
69 "WPA_PSK": "WPA",
70 "WPA2_PSK": "WPA2",
71 "WPA_WPA2_PSK": "WPA/WPA2",
72 "WPA2_ENTERPRISE": "WPA2 (CCMP)",
73 "WPA3_PSK": "WPA3",
74 "WPA2_WPA3_PSK": "WPA2/WPA3",
75 }
76 self.__securityCode2AuthModeString = {
77 0: "[wifi.AuthMode.OPEN]",
78 1: "[wifi.AuthMode.WEP]",
79 2: "[wifi.AuthMode.WPA, wifi.AuthMode.PSK]",
80 3: "[wifi.AuthMode.WPA2, wifi.AuthMode.PSK]",
81 4: "[wifi.AuthMode.WPA, wifi.AuthMode.WPA2, wifi.AuthMode.PSK]",
82 5: "[wifi.AuthMode.WPA2, wifi.AuthMode.ENTERPRISE]",
83 6: "[wifi.AuthMode.WPA3, wifi.AuthMode.PSK]",
84 7: "[wifi.AuthMode.WPA2, wifi.AuthMode.WPA3, wifi.AuthMode.PSK]",
85 }
63 86
64 def setConnected(self, connected): 87 def setConnected(self, connected):
65 """ 88 """
66 Public method to set the connection state. 89 Public method to set the connection state.
67 90
202 225
203 def __findWorkspace(self, silent=False): 226 def __findWorkspace(self, silent=False):
204 """ 227 """
205 Private method to find the workspace directory. 228 Private method to find the workspace directory.
206 229
207 @param silent flag indicating silent operations 230 @param silent flag indicating silent operations (defaults to False)
208 @type bool 231 @type bool (optional)
209 @return workspace directory used for saving files 232 @return workspace directory used for saving files
210 @rtype str 233 @rtype str
211 """ 234 """
212 # Attempts to find the paths on the filesystem that represents the 235 # Attempts to find the paths on the filesystem that represents the
213 # plugged in CIRCUITPY boards. 236 # plugged in CIRCUITPY boards.
610 return False, '' 633 return False, ''
611 634
612 print(has_wifi()) 635 print(has_wifi())
613 del has_wifi 636 del has_wifi
614 """ 637 """
615 out, err = self._interface.execute(command, mode=self.submitMode) 638 out, err = self._interface.execute(command, mode=self._submitMode)
616 if err: 639 if err:
617 raise OSError(self._shortError(err)) 640 raise OSError(self._shortError(err))
618 return ast.literal_eval(out.decode("utf-8")) 641 return ast.literal_eval(out.decode("utf-8"))
619 642
620 def getWifiData(self): 643 def getWifiData(self):
633 import wifi 656 import wifi
634 657
635 r = wifi.radio 658 r = wifi.radio
636 659
637 station = { 660 station = {
638 'active': r.enabled and r.ipv4_address_ap is None, 661 'active': r.enabled and r.ipv4_address is not None,
639 'connected': r.ipv4_address is not None, 662 'connected': r.ipv4_address is not None,
640 'ifconfig': ( 663 'ifconfig': (
641 str(r.ipv4_address) if r.ipv4_address else'0.0.0.0', 664 str(r.ipv4_address) if r.ipv4_address else'0.0.0.0',
642 str(r.ipv4_subnet) if r.ipv4_subnet else'0.0.0.0', 665 str(r.ipv4_subnet) if r.ipv4_subnet else'0.0.0.0',
643 str(r.ipv4_gateway) if r.ipv4_gateway else'0.0.0.0', 666 str(r.ipv4_gateway) if r.ipv4_gateway else'0.0.0.0',
644 str(r.ipv4_dns) if r.ipv4_dns else'0.0.0.0', 667 str(r.ipv4_dns) if r.ipv4_dns else'0.0.0.0',
645 ), 668 ),
646 'mac': binascii.hexlify(r.mac_address, ':').decode(), 669 'mac': binascii.hexlify(r.mac_address, ':').decode(),
647 'txpower': r.tx_power,
648 'hostname': r.hostname, 670 'hostname': r.hostname,
649 } 671 }
672 try:
673 station['txpower'] = r.tx_power
674 except AttributeError:
675 pass
676 try:
677 if r.ap_info is not None:
678 station.update({
679 'ap_ssid': r.ap_info.ssid,
680 'ap_bssid': binascii.hexlify(r.ap_info.bssid, ':'),
681 'ap_rssi': r.ap_info.rssi,
682 'ap_channel': r.ap_info.channel,
683 'ap_country': r.ap_info.country,
684 })
685 authmode = r.ap_info.authmode
686 station['ap_security'] = (
687 '_'.join(str(x).split('.')[-1] for x in authmode)
688 if isinstance(authmode, list)
689 else authmode
690 )
691 except (NotImplementedError, AttributeError):
692 pass
650 print(json.dumps(station)) 693 print(json.dumps(station))
651 694
652 ap = { 695 ap = {
653 'active': r.enabled and r.ipv4_address_ap is not None, 696 'active': r.enabled and r.ipv4_address_ap is not None,
654 'connected': r.ipv4_address_ap is not None, 697 'connected': r.ipv4_address_ap is not None,
657 str(r.ipv4_subnet_ap) if r.ipv4_subnet_ap else'0.0.0.0', 700 str(r.ipv4_subnet_ap) if r.ipv4_subnet_ap else'0.0.0.0',
658 str(r.ipv4_gateway_ap) if r.ipv4_gateway_ap else'0.0.0.0', 701 str(r.ipv4_gateway_ap) if r.ipv4_gateway_ap else'0.0.0.0',
659 str(r.ipv4_dns) if r.ipv4_dns else'0.0.0.0', 702 str(r.ipv4_dns) if r.ipv4_dns else'0.0.0.0',
660 ), 703 ),
661 'mac': binascii.hexlify(r.mac_address_ap, ':').decode(), 704 'mac': binascii.hexlify(r.mac_address_ap, ':').decode(),
662 'txpower': r.tx_power,
663 'hostname': r.hostname, 705 'hostname': r.hostname,
664 } 706 }
707 try:
708 ap['txpower'] = r.tx_power
709 except AttributeError:
710 pass
665 print(json.dumps(ap)) 711 print(json.dumps(ap))
666 712
667 overall = { 713 overall = {
668 'active': r.enabled 714 'active': r.enabled
669 } 715 }
671 717
672 wifi_status() 718 wifi_status()
673 del wifi_status 719 del wifi_status
674 """ 720 """
675 721
676 out, err = self._interface.execute(command, mode=self.submitMode) 722 out, err = self._interface.execute(command, mode=self._submitMode)
677 if err: 723 if err:
678 raise OSError(self._shortError(err)) 724 raise OSError(self._shortError(err))
679 725
680 stationStr, apStr, overallStr = out.decode("utf-8").splitlines() 726 stationStr, apStr, overallStr = out.decode("utf-8").splitlines()
681 station = json.loads(stationStr) 727 station = json.loads(stationStr)
682 ap = json.loads(apStr) 728 ap = json.loads(apStr)
683 overall = json.loads(overallStr) 729 overall = json.loads(overallStr)
730 if "ap_security" in station:
731 try:
732 station["ap_security"] = self.__securityTranslations[
733 station["ap_security"]
734 ]
735 except KeyError:
736 station["ap_security"] = self.tr("unknown ({0})").format(
737 station["ap_security"]
738 )
739
684 return station, ap, overall 740 return station, ap, overall
741
742 def connectWifi(self, ssid, password):
743 """
744 Public method to connect a device to a WiFi network.
745
746 @param ssid name (SSID) of the WiFi network
747 @type str
748 @param password password needed to connect
749 @type str
750 @return tuple containing the connection status and an error string
751 @rtype tuple of (bool, str)
752 """
753 command = """
754 def connect_wifi(ssid, password):
755 import json
756 import wifi
757
758 r = wifi.radio
759 try:
760 r.start_station()
761 r.connect(ssid, password)
762 status = 'connected'
763 except Exception as exc:
764 status = str(exc)
765
766 print(json.dumps({{'connected': r.ipv4_address is not None, 'status': status}}))
767
768 connect_wifi({0}, {1})
769 del connect_wifi
770 """.format(
771 repr(ssid),
772 repr(password if password else ""),
773 )
774
775 with EricOverrideCursor():
776 out, err = self._interface.execute(
777 command, mode=self._submitMode, timeout=15000
778 )
779 if err:
780 return False, err
781
782 result = json.loads(out.decode("utf-8").strip())
783 error = "" if result["connected"] else result["status"]
784
785 return result["connected"], error
786
787 def disconnectWifi(self):
788 """
789 Public method to disconnect a device from the WiFi network.
790
791 @return tuple containing a flag indicating success and an error string
792 @rtype tuple of (bool, str)
793 """
794 command = """
795 def disconnect_wifi():
796 import json
797 import wifi
798
799 r = wifi.radio
800 try:
801 r.stop_station()
802 status = ''
803 except Exception as exc:
804 status = str(exc)
805
806 print(json.dumps({'success': status == '', 'status': status}))
807
808 disconnect_wifi()
809 del disconnect_wifi
810 """
811
812 out, err = self._interface.execute(command, mode=self._submitMode)
813 if err:
814 return False, err
815
816 result = json.loads(out.decode("utf-8").strip())
817 return result["success"], result["status"]
818
819 def writeCredentials(self, ssid, password):
820 """
821 Public method to write the given credentials to the connected device and modify
822 the start script to connect automatically.
823
824 @param ssid SSID of the network to connect to
825 @type str
826 @param password password needed to authenticate
827 @type str
828 @return tuple containing a flag indicating success and an error message
829 @rtype tuple of (bool, str)
830 """
831 if not self.__deviceVolumeMounted():
832 return False, self.tr("The device volume is not available.")
833
834 workspace = self.getWorkspace()
835
836 if Globals.versionToTuple(self._deviceData["release"]) >= (8, 0, 0):
837 # CircuitPython >= 8.0.0: generate 'settings.toml' file
838 contents = (
839 'CIRCUITPY_WIFI_SSID = "{0}"\nCIRCUITPY_WIFI_PASSWORD = "{1}"\n'.format(
840 ssid, password
841 )
842 )
843 filename = os.path.join(workspace, "settings.toml")
844 if os.path.exists(filename):
845 ok = EricMessageBox.yesNo(
846 None,
847 self.tr("Write WiFi Credentials"),
848 self.tr(
849 """<p>The file <b>{0}</b> exists already. Shall it be"""
850 """ replaced?</p>"""
851 ).format(filename),
852 icon=EricMessageBox.Warning,
853 )
854 if not ok:
855 return False, self.tr("Aborted")
856 try:
857 with open(filename, "w") as f:
858 f.write(contents)
859 return True, ""
860 except OSError as err:
861 return False, str(err)
862
863 else:
864 # CircuitPython < 8.0.0: generate a secrets.py script
865 # step 1: generate the secrets.py file
866 contents = (
867 'secrets = {{\n "ssid": "{0}",\n "password": "{1}",\n}}\n'.format(
868 ssid, password
869 )
870 )
871 filename = os.path.join(workspace, "secrets.py")
872 if os.path.exists(filename):
873 ok = EricMessageBox.yesNo(
874 None,
875 self.tr("Write WiFi Credentials"),
876 self.tr(
877 """<p>The file <b>{0}</b> exists already. Shall it be"""
878 """ replaced?</p>"""
879 ).format(filename),
880 icon=EricMessageBox.Warning,
881 )
882 if not ok:
883 return False, self.tr("Aborted")
884 # step 2: create the auto-connect script (wifi_connect.py)
885 try:
886 with open(filename, "w") as f:
887 f.write(contents)
888 except OSError as err:
889 return False, str(err)
890 scriptFile = os.path.join(
891 os.path.dirname(__file__), "MCUScripts", "circuitPy7WiFiConnect.py"
892 )
893 targetFile = os.path.join(workspace, "wifi_connect.py")
894 try:
895 shutil.copy2(scriptFile, targetFile)
896 except OSError as err:
897 return False, str(err)
898 # Note: code.py will not be modified because the connection will be
899 # reset anyway
900 return True, ""
901
902 def removeCredentials(self):
903 """
904 Public method to remove the saved credentials from the connected device.
905
906 @return tuple containing a flag indicating success and an error message
907 @rtype tuple of (bool, str)
908 """
909 if not self.__deviceVolumeMounted():
910 return False, self.tr("The device volume is not available.")
911
912 workspace = self.getWorkspace()
913 for name in ("settings.toml", "secrets.py"):
914 filename = os.path.join(workspace, name)
915 if os.path.exists(filename):
916 os.remove(filename)
917
918 return True, ""
919
920 def checkInternet(self):
921 """
922 Public method to check, if the internet can be reached.
923
924 @return tuple containing a flag indicating reachability and an error string
925 @rtype tuple of (bool, str)
926 """
927 command = """
928 def check_internet():
929 import ipaddress
930 import wifi
931
932 r = wifi.radio
933 if r.ipv4_address is not None:
934 ping = r.ping(ipaddress.IPv4Address("8.8.8.8"))
935 print(ping is not None)
936 else:
937 print(False)
938
939 check_internet()
940 del check_internet
941 """
942
943 out, err = self._interface.execute(command, mode=self._submitMode)
944 if err:
945 return False, err
946
947 return out.decode("utf-8").strip() == "True", ""
948
949 def scanNetworks(self):
950 """
951 Public method to scan for available WiFi networks.
952
953 @return tuple containing the list of available networks as a tuple of 'Name',
954 'MAC-Address', 'channel', 'RSSI' and 'security' and an error string
955 @rtype tuple of (list of tuple of (str, str, int, int, str), str)
956 """
957 command = """
958 def scan_networks():
959 import wifi
960
961 r = wifi.radio
962 network_list = []
963 enabled = r.enabled
964 if not enabled:
965 r.enabled = True
966 for net in r.start_scanning_networks():
967 network_list.append(
968 (net.ssid, net.bssid, net.channel, net.rssi,
969 '_'.join(str(x).split('.')[-1] for x in net.authmode))
970 )
971 r.stop_scanning_networks()
972 if not enabled:
973 r.enabled = False
974 print(network_list)
975
976 scan_networks()
977 del scan_networks
978 """
979
980 out, err = self._interface.execute(
981 command, mode=self._submitMode, timeout=15000
982 )
983 if err:
984 return [], err
985
986 networksList = ast.literal_eval(out.decode("utf-8"))
987 networks = []
988 seenNetworks = []
989 for network in networksList:
990 if network[0]:
991 ssid = network[0]
992 mac = binascii.hexlify(network[1], ":").decode("utf-8")
993 channel = network[2]
994 rssi = network[3]
995 try:
996 security = self.__securityTranslations[network[4]]
997 except KeyError:
998 security = self.tr("unknown ({0})").format(network[4])
999 if (ssid, mac, channel) not in seenNetworks:
1000 seenNetworks.append((ssid, mac, channel))
1001 networks.append((ssid, mac, channel, rssi, security))
1002
1003 return networks, ""
1004
1005 def deactivateInterface(self, interface):
1006 """
1007 Public method to deactivate a given WiFi interface of the connected device.
1008
1009 Note: With CircuitPython it is not possible to deactivate the station and
1010 access point interfaces separately.
1011
1012 @param interface designation of the interface to be deactivated (one of 'AP'
1013 or 'STA')
1014 @type str
1015 @return tuple containg a flag indicating success and an error message
1016 @rtype tuple of (bool, str)
1017 @exception ValueError raised to indicate a wrong value for the interface type
1018 """
1019 if interface not in ("STA", "AP"):
1020 raise ValueError(
1021 "interface must be 'AP' or 'STA', got '{0}'".format(interface)
1022 )
1023
1024 command = """
1025 def deactivate():
1026 import wifi
1027
1028 wifi.radio.enabled = False
1029 print(not wifi.radio.enabled)
1030
1031 deactivate()
1032 del deactivate
1033 """
1034
1035 out, err = self._interface.execute(command, mode=self._submitMode)
1036 if err:
1037 return False, err
1038 else:
1039 return out.decode("utf-8").strip() == "True", ""
1040
1041 def startAccessPoint(self, ssid, security=None, password=None, ifconfig=None):
1042 """
1043 Public method to start the access point interface.
1044
1045 @param ssid SSID of the access point
1046 @type str
1047 @param security security method (defaults to None)
1048 @type int (optional)
1049 @param password password (defaults to None)
1050 @type str (optional)
1051 @param ifconfig IPv4 configuration for the access point if not default
1052 (IPv4 address, netmask, gateway address, DNS server address)
1053 @type tuple of (str, str, str, str)
1054 @return tuple containing a flag indicating success and an error message
1055 @rtype tuple of (bool, str)
1056 """
1057 if security is None or password is None:
1058 security = 0
1059 password = ""
1060 authmode = self.__securityCode2AuthModeString[security]
1061
1062 if ifconfig:
1063 return (
1064 False,
1065 self.tr(
1066 "CircuitPython does not support setting the IPv4 parameters of the"
1067 " WiFi access point."
1068 ),
1069 )
1070
1071 command = """
1072 def start_ap(ssid, password):
1073 import wifi
1074
1075 r = wifi.radio
1076 try:
1077 r.start_ap(ssid, password, authmode={2})
1078 except ValueError as exc:
1079 print('Error:', str(exc))
1080
1081 start_ap({0}, {1})
1082 del start_ap
1083 """.format(
1084 repr(ssid), repr(password), authmode
1085 )
1086
1087 out, err = self._interface.execute(
1088 command, mode=self._submitMode, timeout=15000
1089 )
1090 if err:
1091 return False, err
1092 elif out and out.startswith(b"Error:"):
1093 return False, out.decode("utf-8").split(None, 1)[-1]
1094 else:
1095 return True, ""
1096
1097 def stopAccessPoint(self):
1098 """
1099 Public method to stop the access point interface.
1100
1101 @return tuple containg a flag indicating success and an error message
1102 @rtype tuple of (bool, str)
1103 """
1104 command = """
1105 def stop_ap():
1106 import wifi
1107
1108 r = wifi.radio
1109 try:
1110 r.stop_ap()
1111 except NotImplementedError as exc:
1112 print('Error:', str(exc))
1113
1114 stop_ap()
1115 del stop_ap
1116 """
1117
1118 out, err = self._interface.execute(command, mode=self._submitMode)
1119 if err:
1120 return False, err
1121 elif out and out.startswith(b"Error:"):
1122 return False, out.decode("utf-8").split(None, 1)[-1]
1123 else:
1124 return True, ""
1125
1126 def getConnectedClients(self):
1127 """
1128 Public method to get a list of connected clients.
1129
1130 @return a tuple containing a list of tuples containing the client MAC-Address
1131 and the RSSI (if supported and available) and an error message
1132 @rtype tuple of ([(bytes, int)], str)
1133 """
1134 return (
1135 [],
1136 self.tr("CircuitPython does not support reporting of connected clients"),
1137 )
685 1138
686 1139
687 def createDevice(microPythonWidget, deviceType, vid, pid, boardName, serialNumber): 1140 def createDevice(microPythonWidget, deviceType, vid, pid, boardName, serialNumber):
688 """ 1141 """
689 Function to instantiate a MicroPython device object. 1142 Function to instantiate a MicroPython device object.

eric ide

mercurial