633 import wifi |
656 import wifi |
634 |
657 |
635 r = wifi.radio |
658 r = wifi.radio |
636 |
659 |
637 station = { |
660 station = { |
638 'active': r.enabled and r.ipv4_address_ap is None, |
661 'active': r.enabled and r.ipv4_address is not None, |
639 'connected': r.ipv4_address is not None, |
662 'connected': r.ipv4_address is not None, |
640 'ifconfig': ( |
663 'ifconfig': ( |
641 str(r.ipv4_address) if r.ipv4_address else'0.0.0.0', |
664 str(r.ipv4_address) if r.ipv4_address else'0.0.0.0', |
642 str(r.ipv4_subnet) if r.ipv4_subnet else'0.0.0.0', |
665 str(r.ipv4_subnet) if r.ipv4_subnet else'0.0.0.0', |
643 str(r.ipv4_gateway) if r.ipv4_gateway else'0.0.0.0', |
666 str(r.ipv4_gateway) if r.ipv4_gateway else'0.0.0.0', |
644 str(r.ipv4_dns) if r.ipv4_dns else'0.0.0.0', |
667 str(r.ipv4_dns) if r.ipv4_dns else'0.0.0.0', |
645 ), |
668 ), |
646 'mac': binascii.hexlify(r.mac_address, ':').decode(), |
669 'mac': binascii.hexlify(r.mac_address, ':').decode(), |
647 'txpower': r.tx_power, |
|
648 'hostname': r.hostname, |
670 'hostname': r.hostname, |
649 } |
671 } |
|
672 try: |
|
673 station['txpower'] = r.tx_power |
|
674 except AttributeError: |
|
675 pass |
|
676 try: |
|
677 if r.ap_info is not None: |
|
678 station.update({ |
|
679 'ap_ssid': r.ap_info.ssid, |
|
680 'ap_bssid': binascii.hexlify(r.ap_info.bssid, ':'), |
|
681 'ap_rssi': r.ap_info.rssi, |
|
682 'ap_channel': r.ap_info.channel, |
|
683 'ap_country': r.ap_info.country, |
|
684 }) |
|
685 authmode = r.ap_info.authmode |
|
686 station['ap_security'] = ( |
|
687 '_'.join(str(x).split('.')[-1] for x in authmode) |
|
688 if isinstance(authmode, list) |
|
689 else authmode |
|
690 ) |
|
691 except (NotImplementedError, AttributeError): |
|
692 pass |
650 print(json.dumps(station)) |
693 print(json.dumps(station)) |
651 |
694 |
652 ap = { |
695 ap = { |
653 'active': r.enabled and r.ipv4_address_ap is not None, |
696 'active': r.enabled and r.ipv4_address_ap is not None, |
654 'connected': r.ipv4_address_ap is not None, |
697 'connected': r.ipv4_address_ap is not None, |
671 |
717 |
672 wifi_status() |
718 wifi_status() |
673 del wifi_status |
719 del wifi_status |
674 """ |
720 """ |
675 |
721 |
676 out, err = self._interface.execute(command, mode=self.submitMode) |
722 out, err = self._interface.execute(command, mode=self._submitMode) |
677 if err: |
723 if err: |
678 raise OSError(self._shortError(err)) |
724 raise OSError(self._shortError(err)) |
679 |
725 |
680 stationStr, apStr, overallStr = out.decode("utf-8").splitlines() |
726 stationStr, apStr, overallStr = out.decode("utf-8").splitlines() |
681 station = json.loads(stationStr) |
727 station = json.loads(stationStr) |
682 ap = json.loads(apStr) |
728 ap = json.loads(apStr) |
683 overall = json.loads(overallStr) |
729 overall = json.loads(overallStr) |
|
730 if "ap_security" in station: |
|
731 try: |
|
732 station["ap_security"] = self.__securityTranslations[ |
|
733 station["ap_security"] |
|
734 ] |
|
735 except KeyError: |
|
736 station["ap_security"] = self.tr("unknown ({0})").format( |
|
737 station["ap_security"] |
|
738 ) |
|
739 |
684 return station, ap, overall |
740 return station, ap, overall |
|
741 |
|
742 def connectWifi(self, ssid, password): |
|
743 """ |
|
744 Public method to connect a device to a WiFi network. |
|
745 |
|
746 @param ssid name (SSID) of the WiFi network |
|
747 @type str |
|
748 @param password password needed to connect |
|
749 @type str |
|
750 @return tuple containing the connection status and an error string |
|
751 @rtype tuple of (bool, str) |
|
752 """ |
|
753 command = """ |
|
754 def connect_wifi(ssid, password): |
|
755 import json |
|
756 import wifi |
|
757 |
|
758 r = wifi.radio |
|
759 try: |
|
760 r.start_station() |
|
761 r.connect(ssid, password) |
|
762 status = 'connected' |
|
763 except Exception as exc: |
|
764 status = str(exc) |
|
765 |
|
766 print(json.dumps({{'connected': r.ipv4_address is not None, 'status': status}})) |
|
767 |
|
768 connect_wifi({0}, {1}) |
|
769 del connect_wifi |
|
770 """.format( |
|
771 repr(ssid), |
|
772 repr(password if password else ""), |
|
773 ) |
|
774 |
|
775 with EricOverrideCursor(): |
|
776 out, err = self._interface.execute( |
|
777 command, mode=self._submitMode, timeout=15000 |
|
778 ) |
|
779 if err: |
|
780 return False, err |
|
781 |
|
782 result = json.loads(out.decode("utf-8").strip()) |
|
783 error = "" if result["connected"] else result["status"] |
|
784 |
|
785 return result["connected"], error |
|
786 |
|
787 def disconnectWifi(self): |
|
788 """ |
|
789 Public method to disconnect a device from the WiFi network. |
|
790 |
|
791 @return tuple containing a flag indicating success and an error string |
|
792 @rtype tuple of (bool, str) |
|
793 """ |
|
794 command = """ |
|
795 def disconnect_wifi(): |
|
796 import json |
|
797 import wifi |
|
798 |
|
799 r = wifi.radio |
|
800 try: |
|
801 r.stop_station() |
|
802 status = '' |
|
803 except Exception as exc: |
|
804 status = str(exc) |
|
805 |
|
806 print(json.dumps({'success': status == '', 'status': status})) |
|
807 |
|
808 disconnect_wifi() |
|
809 del disconnect_wifi |
|
810 """ |
|
811 |
|
812 out, err = self._interface.execute(command, mode=self._submitMode) |
|
813 if err: |
|
814 return False, err |
|
815 |
|
816 result = json.loads(out.decode("utf-8").strip()) |
|
817 return result["success"], result["status"] |
|
818 |
|
819 def writeCredentials(self, ssid, password): |
|
820 """ |
|
821 Public method to write the given credentials to the connected device and modify |
|
822 the start script to connect automatically. |
|
823 |
|
824 @param ssid SSID of the network to connect to |
|
825 @type str |
|
826 @param password password needed to authenticate |
|
827 @type str |
|
828 @return tuple containing a flag indicating success and an error message |
|
829 @rtype tuple of (bool, str) |
|
830 """ |
|
831 if not self.__deviceVolumeMounted(): |
|
832 return False, self.tr("The device volume is not available.") |
|
833 |
|
834 workspace = self.getWorkspace() |
|
835 |
|
836 if Globals.versionToTuple(self._deviceData["release"]) >= (8, 0, 0): |
|
837 # CircuitPython >= 8.0.0: generate 'settings.toml' file |
|
838 contents = ( |
|
839 'CIRCUITPY_WIFI_SSID = "{0}"\nCIRCUITPY_WIFI_PASSWORD = "{1}"\n'.format( |
|
840 ssid, password |
|
841 ) |
|
842 ) |
|
843 filename = os.path.join(workspace, "settings.toml") |
|
844 if os.path.exists(filename): |
|
845 ok = EricMessageBox.yesNo( |
|
846 None, |
|
847 self.tr("Write WiFi Credentials"), |
|
848 self.tr( |
|
849 """<p>The file <b>{0}</b> exists already. Shall it be""" |
|
850 """ replaced?</p>""" |
|
851 ).format(filename), |
|
852 icon=EricMessageBox.Warning, |
|
853 ) |
|
854 if not ok: |
|
855 return False, self.tr("Aborted") |
|
856 try: |
|
857 with open(filename, "w") as f: |
|
858 f.write(contents) |
|
859 return True, "" |
|
860 except OSError as err: |
|
861 return False, str(err) |
|
862 |
|
863 else: |
|
864 # CircuitPython < 8.0.0: generate a secrets.py script |
|
865 # step 1: generate the secrets.py file |
|
866 contents = ( |
|
867 'secrets = {{\n "ssid": "{0}",\n "password": "{1}",\n}}\n'.format( |
|
868 ssid, password |
|
869 ) |
|
870 ) |
|
871 filename = os.path.join(workspace, "secrets.py") |
|
872 if os.path.exists(filename): |
|
873 ok = EricMessageBox.yesNo( |
|
874 None, |
|
875 self.tr("Write WiFi Credentials"), |
|
876 self.tr( |
|
877 """<p>The file <b>{0}</b> exists already. Shall it be""" |
|
878 """ replaced?</p>""" |
|
879 ).format(filename), |
|
880 icon=EricMessageBox.Warning, |
|
881 ) |
|
882 if not ok: |
|
883 return False, self.tr("Aborted") |
|
884 # step 2: create the auto-connect script (wifi_connect.py) |
|
885 try: |
|
886 with open(filename, "w") as f: |
|
887 f.write(contents) |
|
888 except OSError as err: |
|
889 return False, str(err) |
|
890 scriptFile = os.path.join( |
|
891 os.path.dirname(__file__), "MCUScripts", "circuitPy7WiFiConnect.py" |
|
892 ) |
|
893 targetFile = os.path.join(workspace, "wifi_connect.py") |
|
894 try: |
|
895 shutil.copy2(scriptFile, targetFile) |
|
896 except OSError as err: |
|
897 return False, str(err) |
|
898 # Note: code.py will not be modified because the connection will be |
|
899 # reset anyway |
|
900 return True, "" |
|
901 |
|
902 def removeCredentials(self): |
|
903 """ |
|
904 Public method to remove the saved credentials from the connected device. |
|
905 |
|
906 @return tuple containing a flag indicating success and an error message |
|
907 @rtype tuple of (bool, str) |
|
908 """ |
|
909 if not self.__deviceVolumeMounted(): |
|
910 return False, self.tr("The device volume is not available.") |
|
911 |
|
912 workspace = self.getWorkspace() |
|
913 for name in ("settings.toml", "secrets.py"): |
|
914 filename = os.path.join(workspace, name) |
|
915 if os.path.exists(filename): |
|
916 os.remove(filename) |
|
917 |
|
918 return True, "" |
|
919 |
|
920 def checkInternet(self): |
|
921 """ |
|
922 Public method to check, if the internet can be reached. |
|
923 |
|
924 @return tuple containing a flag indicating reachability and an error string |
|
925 @rtype tuple of (bool, str) |
|
926 """ |
|
927 command = """ |
|
928 def check_internet(): |
|
929 import ipaddress |
|
930 import wifi |
|
931 |
|
932 r = wifi.radio |
|
933 if r.ipv4_address is not None: |
|
934 ping = r.ping(ipaddress.IPv4Address("8.8.8.8")) |
|
935 print(ping is not None) |
|
936 else: |
|
937 print(False) |
|
938 |
|
939 check_internet() |
|
940 del check_internet |
|
941 """ |
|
942 |
|
943 out, err = self._interface.execute(command, mode=self._submitMode) |
|
944 if err: |
|
945 return False, err |
|
946 |
|
947 return out.decode("utf-8").strip() == "True", "" |
|
948 |
|
949 def scanNetworks(self): |
|
950 """ |
|
951 Public method to scan for available WiFi networks. |
|
952 |
|
953 @return tuple containing the list of available networks as a tuple of 'Name', |
|
954 'MAC-Address', 'channel', 'RSSI' and 'security' and an error string |
|
955 @rtype tuple of (list of tuple of (str, str, int, int, str), str) |
|
956 """ |
|
957 command = """ |
|
958 def scan_networks(): |
|
959 import wifi |
|
960 |
|
961 r = wifi.radio |
|
962 network_list = [] |
|
963 enabled = r.enabled |
|
964 if not enabled: |
|
965 r.enabled = True |
|
966 for net in r.start_scanning_networks(): |
|
967 network_list.append( |
|
968 (net.ssid, net.bssid, net.channel, net.rssi, |
|
969 '_'.join(str(x).split('.')[-1] for x in net.authmode)) |
|
970 ) |
|
971 r.stop_scanning_networks() |
|
972 if not enabled: |
|
973 r.enabled = False |
|
974 print(network_list) |
|
975 |
|
976 scan_networks() |
|
977 del scan_networks |
|
978 """ |
|
979 |
|
980 out, err = self._interface.execute( |
|
981 command, mode=self._submitMode, timeout=15000 |
|
982 ) |
|
983 if err: |
|
984 return [], err |
|
985 |
|
986 networksList = ast.literal_eval(out.decode("utf-8")) |
|
987 networks = [] |
|
988 seenNetworks = [] |
|
989 for network in networksList: |
|
990 if network[0]: |
|
991 ssid = network[0] |
|
992 mac = binascii.hexlify(network[1], ":").decode("utf-8") |
|
993 channel = network[2] |
|
994 rssi = network[3] |
|
995 try: |
|
996 security = self.__securityTranslations[network[4]] |
|
997 except KeyError: |
|
998 security = self.tr("unknown ({0})").format(network[4]) |
|
999 if (ssid, mac, channel) not in seenNetworks: |
|
1000 seenNetworks.append((ssid, mac, channel)) |
|
1001 networks.append((ssid, mac, channel, rssi, security)) |
|
1002 |
|
1003 return networks, "" |
|
1004 |
|
1005 def deactivateInterface(self, interface): |
|
1006 """ |
|
1007 Public method to deactivate a given WiFi interface of the connected device. |
|
1008 |
|
1009 Note: With CircuitPython it is not possible to deactivate the station and |
|
1010 access point interfaces separately. |
|
1011 |
|
1012 @param interface designation of the interface to be deactivated (one of 'AP' |
|
1013 or 'STA') |
|
1014 @type str |
|
1015 @return tuple containg a flag indicating success and an error message |
|
1016 @rtype tuple of (bool, str) |
|
1017 @exception ValueError raised to indicate a wrong value for the interface type |
|
1018 """ |
|
1019 if interface not in ("STA", "AP"): |
|
1020 raise ValueError( |
|
1021 "interface must be 'AP' or 'STA', got '{0}'".format(interface) |
|
1022 ) |
|
1023 |
|
1024 command = """ |
|
1025 def deactivate(): |
|
1026 import wifi |
|
1027 |
|
1028 wifi.radio.enabled = False |
|
1029 print(not wifi.radio.enabled) |
|
1030 |
|
1031 deactivate() |
|
1032 del deactivate |
|
1033 """ |
|
1034 |
|
1035 out, err = self._interface.execute(command, mode=self._submitMode) |
|
1036 if err: |
|
1037 return False, err |
|
1038 else: |
|
1039 return out.decode("utf-8").strip() == "True", "" |
|
1040 |
|
1041 def startAccessPoint(self, ssid, security=None, password=None, ifconfig=None): |
|
1042 """ |
|
1043 Public method to start the access point interface. |
|
1044 |
|
1045 @param ssid SSID of the access point |
|
1046 @type str |
|
1047 @param security security method (defaults to None) |
|
1048 @type int (optional) |
|
1049 @param password password (defaults to None) |
|
1050 @type str (optional) |
|
1051 @param ifconfig IPv4 configuration for the access point if not default |
|
1052 (IPv4 address, netmask, gateway address, DNS server address) |
|
1053 @type tuple of (str, str, str, str) |
|
1054 @return tuple containing a flag indicating success and an error message |
|
1055 @rtype tuple of (bool, str) |
|
1056 """ |
|
1057 if security is None or password is None: |
|
1058 security = 0 |
|
1059 password = "" |
|
1060 authmode = self.__securityCode2AuthModeString[security] |
|
1061 |
|
1062 if ifconfig: |
|
1063 return ( |
|
1064 False, |
|
1065 self.tr( |
|
1066 "CircuitPython does not support setting the IPv4 parameters of the" |
|
1067 " WiFi access point." |
|
1068 ), |
|
1069 ) |
|
1070 |
|
1071 command = """ |
|
1072 def start_ap(ssid, password): |
|
1073 import wifi |
|
1074 |
|
1075 r = wifi.radio |
|
1076 try: |
|
1077 r.start_ap(ssid, password, authmode={2}) |
|
1078 except ValueError as exc: |
|
1079 print('Error:', str(exc)) |
|
1080 |
|
1081 start_ap({0}, {1}) |
|
1082 del start_ap |
|
1083 """.format( |
|
1084 repr(ssid), repr(password), authmode |
|
1085 ) |
|
1086 |
|
1087 out, err = self._interface.execute( |
|
1088 command, mode=self._submitMode, timeout=15000 |
|
1089 ) |
|
1090 if err: |
|
1091 return False, err |
|
1092 elif out and out.startswith(b"Error:"): |
|
1093 return False, out.decode("utf-8").split(None, 1)[-1] |
|
1094 else: |
|
1095 return True, "" |
|
1096 |
|
1097 def stopAccessPoint(self): |
|
1098 """ |
|
1099 Public method to stop the access point interface. |
|
1100 |
|
1101 @return tuple containg a flag indicating success and an error message |
|
1102 @rtype tuple of (bool, str) |
|
1103 """ |
|
1104 command = """ |
|
1105 def stop_ap(): |
|
1106 import wifi |
|
1107 |
|
1108 r = wifi.radio |
|
1109 try: |
|
1110 r.stop_ap() |
|
1111 except NotImplementedError as exc: |
|
1112 print('Error:', str(exc)) |
|
1113 |
|
1114 stop_ap() |
|
1115 del stop_ap |
|
1116 """ |
|
1117 |
|
1118 out, err = self._interface.execute(command, mode=self._submitMode) |
|
1119 if err: |
|
1120 return False, err |
|
1121 elif out and out.startswith(b"Error:"): |
|
1122 return False, out.decode("utf-8").split(None, 1)[-1] |
|
1123 else: |
|
1124 return True, "" |
|
1125 |
|
1126 def getConnectedClients(self): |
|
1127 """ |
|
1128 Public method to get a list of connected clients. |
|
1129 |
|
1130 @return a tuple containing a list of tuples containing the client MAC-Address |
|
1131 and the RSSI (if supported and available) and an error message |
|
1132 @rtype tuple of ([(bytes, int)], str) |
|
1133 """ |
|
1134 return ( |
|
1135 [], |
|
1136 self.tr("CircuitPython does not support reporting of connected clients"), |
|
1137 ) |
685 |
1138 |
686 |
1139 |
687 def createDevice(microPythonWidget, deviceType, vid, pid, boardName, serialNumber): |
1140 def createDevice(microPythonWidget, deviceType, vid, pid, boardName, serialNumber): |
688 """ |
1141 """ |
689 Function to instantiate a MicroPython device object. |
1142 Function to instantiate a MicroPython device object. |