617 else: |
644 else: |
618 clock_time = rtc_time[:7] + (0,) |
645 clock_time = rtc_time[:7] + (0,) |
619 rtc.init(clock_time) |
646 rtc.init(clock_time) |
620 """ |
647 """ |
621 |
648 |
|
649 ################################################################## |
|
650 ## Methods below implement WiFi related methods |
|
651 ################################################################## |
|
652 |
|
653 def hasWifi(self): |
|
654 """ |
|
655 Public method to check the availability of WiFi. |
|
656 |
|
657 @return tuple containing a flag indicating the availability of WiFi |
|
658 and the WiFi type (esp32) |
|
659 @rtype tuple of (bool, str) |
|
660 """ |
|
661 # TODO: check if ESP8266 is different |
|
662 return True, "esp32" |
|
663 |
|
664 def getWifiData(self): |
|
665 """ |
|
666 Public method to get data related to the current WiFi status. |
|
667 |
|
668 @return tuple of two dictionaries containing the WiFi status data |
|
669 for the WiFi client and access point |
|
670 @rtype tuple of (dict, dict) |
|
671 @exception OSError raised to indicate an issue with the device |
|
672 """ |
|
673 command = """ |
|
674 def wifi_status(): |
|
675 import ubinascii |
|
676 import ujson |
|
677 import network |
|
678 |
|
679 wifi = network.WLAN(network.STA_IF) |
|
680 station = { |
|
681 'active': wifi.active(), |
|
682 'connected': wifi.isconnected(), |
|
683 'status': wifi.status(), |
|
684 'ifconfig': wifi.ifconfig(), |
|
685 'mac': ubinascii.hexlify(wifi.config('mac'), ':').decode(), |
|
686 } |
|
687 if wifi.active(): |
|
688 station['txpower'] = wifi.config('txpower') |
|
689 else: |
|
690 station['txpower'] = 0 |
|
691 print(ujson.dumps(station)) |
|
692 |
|
693 wifi = network.WLAN(network.AP_IF) |
|
694 ap = { |
|
695 'active': wifi.active(), |
|
696 'connected': wifi.isconnected(), |
|
697 'status': wifi.status(), |
|
698 'ifconfig': wifi.ifconfig(), |
|
699 'mac': ubinascii.hexlify(wifi.config('mac'), ':').decode(), |
|
700 'channel': wifi.config('channel'), |
|
701 'essid': wifi.config('essid'), |
|
702 } |
|
703 if wifi.active(): |
|
704 ap['txpower'] = wifi.config('txpower') |
|
705 else: |
|
706 ap['txpower'] = 0 |
|
707 print(ujson.dumps(ap)) |
|
708 |
|
709 wifi_status() |
|
710 del wifi_status |
|
711 """ |
|
712 |
|
713 out, err = self._interface.execute(command) |
|
714 if err: |
|
715 raise OSError(self._shortError(err)) |
|
716 |
|
717 stationStr, apStr = out.decode("utf-8").splitlines() |
|
718 station = json.loads(stationStr) |
|
719 ap = json.loads(apStr) |
|
720 try: |
|
721 station["status"] = self.__statusTranslations[station["status"]] |
|
722 except KeyError: |
|
723 station["status"] = str(station["status"]) |
|
724 try: |
|
725 ap["status"] = self.__statusTranslations[ap["status"]] |
|
726 except KeyError: |
|
727 ap["status"] = str(ap["status"]) |
|
728 return station, ap |
|
729 |
|
730 def connectWifi(self, ssid, password): |
|
731 """ |
|
732 Public method to connect a device to a WiFi network. |
|
733 |
|
734 @param ssid name (SSID) of the WiFi network |
|
735 @type str |
|
736 @param password password needed to connect |
|
737 @type str |
|
738 @return tuple containing the connection status and an error string |
|
739 @rtype tuple of (bool, str) |
|
740 """ |
|
741 command = """ |
|
742 def connect_wifi(ssid, password): |
|
743 import network |
|
744 import ujson |
|
745 from time import sleep |
|
746 |
|
747 wifi = network.WLAN(network.STA_IF) |
|
748 wifi.active(False) |
|
749 wifi.active(True) |
|
750 wifi.connect(ssid, password) |
|
751 max_wait = 140 |
|
752 while max_wait and wifi.status() == network.STAT_CONNECTING: |
|
753 max_wait -= 1 |
|
754 sleep(0.1) |
|
755 status = wifi.status() |
|
756 print(ujson.dumps({{'connected': wifi.isconnected(), 'status': status}})) |
|
757 |
|
758 connect_wifi({0}, {1}) |
|
759 del connect_wifi |
|
760 """.format( |
|
761 repr(ssid), |
|
762 repr(password if password else ""), |
|
763 ) |
|
764 |
|
765 with EricOverrideCursor(): |
|
766 out, err = self._interface.execute(command, timeout=15000) |
|
767 if err: |
|
768 return False, err |
|
769 |
|
770 result = json.loads(out.decode("utf-8").strip()) |
|
771 if result["connected"]: |
|
772 error = "" |
|
773 else: |
|
774 try: |
|
775 error = self.__statusTranslations[result["status"]] |
|
776 except KeyError: |
|
777 error = str(result["status"]) |
|
778 |
|
779 return result["connected"], error |
|
780 |
|
781 def disconnectWifi(self): |
|
782 """ |
|
783 Public method to disconnect a device from the WiFi network. |
|
784 |
|
785 @return tuple containing a flag indicating success and an error string |
|
786 @rtype tuple of (bool, str) |
|
787 """ |
|
788 command = """ |
|
789 def disconnect_wifi(): |
|
790 import network |
|
791 from time import sleep |
|
792 |
|
793 wifi = network.WLAN(network.STA_IF) |
|
794 wifi.disconnect() |
|
795 wifi.active(False) |
|
796 sleep(0.1) |
|
797 print(not wifi.isconnected()) |
|
798 |
|
799 disconnect_wifi() |
|
800 del disconnect_wifi |
|
801 """ |
|
802 |
|
803 out, err = self._interface.execute(command) |
|
804 if err: |
|
805 return False, err |
|
806 |
|
807 return out.decode("utf-8").strip() == "True", "" |
|
808 |
|
809 def writeCredentials(self, ssid, password): |
|
810 """ |
|
811 Public method to write the given credentials to the connected device and modify |
|
812 the start script to connect automatically. |
|
813 |
|
814 @param ssid SSID of the network to connect to |
|
815 @type str |
|
816 @param password password needed to authenticate |
|
817 @type str |
|
818 @return tuple containing a flag indicating success and an error message |
|
819 @rtype tuple of (bool, str) |
|
820 """ |
|
821 nvsCommand = """ |
|
822 def save_wifi_creds(ssid, password): |
|
823 import esp32 |
|
824 |
|
825 nvs = esp32.NVS('wifi_creds') |
|
826 nvs.set_blob('ssid', ssid) |
|
827 nvs.set_blob('password', password) |
|
828 nvs.commit() |
|
829 |
|
830 save_wifi_creds({0}, {1}) |
|
831 del save_wifi_creds |
|
832 """.format(repr(ssid), repr(password) if password else "''") |
|
833 bootCommand = """ |
|
834 def modify_boot(): |
|
835 add = True |
|
836 try: |
|
837 with open('/boot.py', 'r') as f: |
|
838 for ln in f.readlines(): |
|
839 if 'wifi_connect' in ln: |
|
840 add = False |
|
841 break |
|
842 except: |
|
843 pass |
|
844 if add: |
|
845 with open('/boot.py', 'a') as f: |
|
846 f.write('\\nimport wifi_connect\\n') |
|
847 print(True) |
|
848 |
|
849 modify_boot() |
|
850 del modify_boot |
|
851 """ |
|
852 |
|
853 out, err = self._interface.execute(nvsCommand) |
|
854 if err: |
|
855 return False, self.tr("Error saving credentials: {0}").format(err) |
|
856 |
|
857 try: |
|
858 # copy auto-connect file |
|
859 self.put( |
|
860 os.path.join( |
|
861 os.path.dirname(__file__), "MCUScripts", "esp32WiFiConnect.py" |
|
862 ), |
|
863 "/wifi_connect.py", |
|
864 ) |
|
865 except OSError as err: |
|
866 return False, self.tr("Error saving auto-connect script: {0}").format(err) |
|
867 |
|
868 out, err = self._interface.execute(bootCommand) |
|
869 if err: |
|
870 return False, self.tr("Error modifying 'boot.py': {0}").format(err) |
|
871 |
|
872 return True, "" |
|
873 |
|
874 def removeCredentials(self): |
|
875 """ |
|
876 Public method to remove the saved credentials from the connected device. |
|
877 |
|
878 @return tuple containing a flag indicating success and an error message |
|
879 @rtype tuple of (bool, str) |
|
880 """ |
|
881 nvsCommand = """ |
|
882 def delete_wifi_creds(): |
|
883 import esp32 |
|
884 |
|
885 nvs = esp32.NVS('wifi_creds') |
|
886 try: |
|
887 nvs.erase_key('ssid') |
|
888 nvs.erase_key('password') |
|
889 nvs.commit() |
|
890 except OSError: |
|
891 pass |
|
892 |
|
893 delete_wifi_creds() |
|
894 del delete_wifi_creds |
|
895 """ |
|
896 |
|
897 out, err = self._interface.execute(nvsCommand) |
|
898 if err: |
|
899 return False, self.tr("Error deleting credentials: {0}").format(err) |
|
900 |
|
901 return True, "" |
|
902 |
|
903 def checkInternet(self): |
|
904 """ |
|
905 Public method to check, if the internet can be reached. |
|
906 |
|
907 @return tuple containing a flag indicating reachability and an error string |
|
908 @rtype tuple of (bool, str) |
|
909 """ |
|
910 command = """ |
|
911 def check_internet(): |
|
912 import network |
|
913 import socket |
|
914 |
|
915 wifi = network.WLAN(network.STA_IF) |
|
916 if wifi.isconnected(): |
|
917 s = socket.socket() |
|
918 try: |
|
919 s.connect(socket.getaddrinfo('google.com', 80)[0][-1]) |
|
920 s.close() |
|
921 print(True) |
|
922 except: |
|
923 print(False) |
|
924 else: |
|
925 print(False) |
|
926 |
|
927 check_internet() |
|
928 del check_internet |
|
929 """ |
|
930 |
|
931 out, err = self._interface.execute(command) |
|
932 if err: |
|
933 return False, err |
|
934 |
|
935 return out.decode("utf-8").strip() == "True", "" |
|
936 |
|
937 def scanNetworks(self): |
|
938 """ |
|
939 Public method to scan for available WiFi networks. |
|
940 |
|
941 @return tuple containing the list of available networks as a tuple of 'Name', |
|
942 'MAC-Address', 'channel', 'RSSI' and 'security' and an error string |
|
943 @rtype tuple of (list of tuple of (str, str, int, int, str), str) |
|
944 """ |
|
945 command = """ |
|
946 def scan_networks(): |
|
947 import network |
|
948 |
|
949 wifi = network.WLAN(network.STA_IF) |
|
950 active = wifi.active() |
|
951 if not active: |
|
952 wifi.active(True) |
|
953 network_list = wifi.scan() |
|
954 if not active: |
|
955 wifi.active(False) |
|
956 print(network_list) |
|
957 |
|
958 scan_networks() |
|
959 del scan_networks |
|
960 """ |
|
961 |
|
962 out, err = self._interface.execute(command, timeout=15000) |
|
963 if err: |
|
964 return [], err |
|
965 |
|
966 networksList = ast.literal_eval(out.decode("utf-8")) |
|
967 networks = [] |
|
968 for network in networksList: |
|
969 if network[0]: |
|
970 ssid = network[0].decode("utf-8") |
|
971 mac = binascii.hexlify(network[1], ":").decode("utf-8") |
|
972 channel = network[2] |
|
973 rssi = network[3] |
|
974 try: |
|
975 security = self.__securityTranslations[network[4]] |
|
976 except KeyError: |
|
977 security = self.tr("unknown ({0})").format(network[4]) |
|
978 networks.append((ssid, mac, channel, rssi, security)) |
|
979 |
|
980 return networks, "" |
|
981 |
|
982 def deactivateInterface(self, interface): |
|
983 """ |
|
984 Public method to deactivate a given WiFi interface of the connected device. |
|
985 |
|
986 @param interface designation of the interface to be deactivated (one of 'AP' |
|
987 or 'STA') |
|
988 @type str |
|
989 @return tuple containg a flag indicating success and an error message |
|
990 @rtype tuple of (bool, str) |
|
991 @exception ValueError raised to indicate a wrong value for the interface type |
|
992 """ |
|
993 if interface not in ("STA", "AP"): |
|
994 raise ValueError( |
|
995 "interface must be 'AP' or 'STA', got '{0}'".format(interface) |
|
996 ) |
|
997 |
|
998 command = """ |
|
999 def deactivate(): |
|
1000 import network |
|
1001 from time import sleep |
|
1002 |
|
1003 wifi = network.WLAN(network.{0}_IF) |
|
1004 wifi.active(False) |
|
1005 sleep(0.1) |
|
1006 print(not wifi.active()) |
|
1007 |
|
1008 deactivate() |
|
1009 del deactivate |
|
1010 """.format( |
|
1011 interface |
|
1012 ) |
|
1013 |
|
1014 out, err = self._interface.execute(command) |
|
1015 if err: |
|
1016 return False, err |
|
1017 else: |
|
1018 return out.decode("utf-8").strip() == "True", "" |
|
1019 |
|
1020 def startAccessPoint(self, ssid, security=None, password=None): |
|
1021 """ |
|
1022 Public method to start the access point interface. |
|
1023 |
|
1024 @param ssid SSID of the access point |
|
1025 @type str |
|
1026 @param security security method (defaults to None) |
|
1027 @type int (optional) |
|
1028 @param password password (defaults to None) |
|
1029 @type str (optional) |
|
1030 @return tuple containing a flag indicating success and an error message |
|
1031 @rtype tuple of (bool, str) |
|
1032 """ |
|
1033 if security is None or password is None: |
|
1034 security = 0 |
|
1035 password = "" |
|
1036 if security > 4: |
|
1037 security = 4 # security >4 cause an error thrown by the ESP32 |
|
1038 |
|
1039 command = """ |
|
1040 def start_ap(): |
|
1041 import network |
|
1042 |
|
1043 ap = network.WLAN(network.AP_IF) |
|
1044 ap.active(False) |
|
1045 ap.active(True) |
|
1046 try: |
|
1047 ap.config(ssid={0}, authmode={1}, password={2}) |
|
1048 except: |
|
1049 ap.config(essid={0}, authmode={1}, password={2}) |
|
1050 |
|
1051 start_ap() |
|
1052 del start_ap |
|
1053 """.format( |
|
1054 repr(ssid), security, repr(password) |
|
1055 ) |
|
1056 |
|
1057 out, err = self._interface.execute(command, timeout=15000) |
|
1058 if err: |
|
1059 return False, err |
|
1060 else: |
|
1061 return True, "" |
|
1062 |
|
1063 def stopAccessPoint(self): |
|
1064 """ |
|
1065 Public method to stop the access point interface. |
|
1066 |
|
1067 @return tuple containg a flag indicating success and an error message |
|
1068 @rtype tuple of (bool, str) |
|
1069 """ |
|
1070 return self.deactivateInterface("AP") |
|
1071 |
|
1072 def getConnectedClients(self): |
|
1073 """ |
|
1074 Public method to get a list of connected clients. |
|
1075 |
|
1076 @return a tuple containing a list of tuples containing the client MAC-Address |
|
1077 and the RSSI (if supported and available) and an error message |
|
1078 @rtype tuple of ([(bytes, int)], str) |
|
1079 """ |
|
1080 command = """ |
|
1081 def get_stations(): |
|
1082 import network |
|
1083 |
|
1084 ap = network.WLAN(network.AP_IF) |
|
1085 stations = ap.status('stations') |
|
1086 print(stations) |
|
1087 |
|
1088 get_stations() |
|
1089 del get_stations |
|
1090 """ |
|
1091 |
|
1092 out, err = self._interface.execute(command, timeout=10000) |
|
1093 if err: |
|
1094 return [], err |
|
1095 |
|
1096 clientsList = ast.literal_eval(out.decode("utf-8")) |
|
1097 return clientsList, "" |
|
1098 |
622 |
1099 |
623 def createDevice(microPythonWidget, deviceType, vid, pid, boardName, serialNumber): |
1100 def createDevice(microPythonWidget, deviceType, vid, pid, boardName, serialNumber): |
624 """ |
1101 """ |
625 Function to instantiate a MicroPython device object. |
1102 Function to instantiate a MicroPython device object. |
626 |
1103 |