src/eric7/MicroPython/Devices/RP2Devices.py

branch
eric7
changeset 10897
caba0e2456b6
parent 10806
2f6df822e3b9
child 11005
b918c6c2736b
equal deleted inserted replaced
10896:9cbbed624751 10897:caba0e2456b6
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2021 - 2024 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing the device interface class for RP2040/RP2350 based boards
8 (e.g. Raspberry Pi Pico / Pico 2).
9 """
10
11 import ast
12 import binascii
13 import json
14 import os
15
16 from PyQt6.QtCore import QUrl, pyqtSlot
17 from PyQt6.QtNetwork import QNetworkReply, QNetworkRequest
18 from PyQt6.QtWidgets import QDialog, QMenu
19
20 from eric7 import EricUtilities, Preferences
21 from eric7.EricGui.EricOverrideCursor import EricOverrideCursor
22 from eric7.EricWidgets import EricMessageBox
23 from eric7.EricWidgets.EricApplication import ericApp
24
25 from ..EthernetDialogs import WiznetUtilities
26 from ..MicroPythonWidget import HAS_QTCHART
27 from . import FirmwareGithubUrls
28 from .DeviceBase import BaseDevice
29
30
31 class RP2Device(BaseDevice):
32 """
33 Class implementing the device for RP2040/RP2350 based boards.
34 """
35
36 def __init__(self, microPythonWidget, deviceType, parent=None):
37 """
38 Constructor
39
40 @param microPythonWidget reference to the main MicroPython widget
41 @type MicroPythonWidget
42 @param deviceType device type assigned to this device interface
43 @type str
44 @param parent reference to the parent object
45 @type QObject
46 """
47 super().__init__(microPythonWidget, deviceType, parent)
48
49 self.__createRP2Menu()
50
51 self.__statusTranslations = {
52 "picow": {
53 -3: self.tr("authentication failed"),
54 -2: self.tr("no matching access point found"),
55 -1: self.tr("connection failed"),
56 0: self.tr("idle"),
57 1: self.tr("connecting"),
58 2: self.tr("connected, waiting for IP address"),
59 3: self.tr("connected"),
60 },
61 "picowireless": {
62 0: self.tr("idle"),
63 1: self.tr("no matching access point found"),
64 2: self.tr("network scan completed"),
65 3: self.tr("connected"),
66 4: self.tr("connection failed"),
67 5: self.tr("connection lost"),
68 6: self.tr("disconnected"),
69 7: self.tr("AP listening"),
70 8: self.tr("AP connected"),
71 9: self.tr("AP failed"),
72 },
73 "picowiz": {
74 0: self.tr("switched off"),
75 1: self.tr("switched on, inactive"),
76 2: self.tr("switched on, active"),
77 },
78 }
79
80 self.__securityTranslations = {
81 "picow": {
82 0: self.tr("open", "open WiFi network"),
83 1: "WEP",
84 2: "WPA",
85 3: "WPA2",
86 4: "WPA/WPA2",
87 5: "WPA2 (CCMP)",
88 6: "WPA3",
89 7: "WPA2/WPA3",
90 },
91 "picowireless": {
92 2: "WPA",
93 4: "WPA2 (CCMP)",
94 5: "WEP",
95 7: self.tr("open", "open WiFi network"),
96 8: self.tr("automatic"),
97 },
98 }
99
100 def setButtons(self):
101 """
102 Public method to enable the supported action buttons.
103 """
104 super().setButtons()
105
106 self.microPython.setActionButtons(
107 run=True, repl=True, files=True, chart=HAS_QTCHART
108 )
109
110 def forceInterrupt(self):
111 """
112 Public method to determine the need for an interrupt when opening the
113 serial connection.
114
115 @return flag indicating an interrupt is needed
116 @rtype bool
117 """
118 return False
119
120 def deviceName(self):
121 """
122 Public method to get the name of the device.
123
124 @return name of the device
125 @rtype str
126 """
127 return self.tr("RP2040/RP2350")
128
129 def canStartRepl(self):
130 """
131 Public method to determine, if a REPL can be started.
132
133 @return tuple containing a flag indicating it is safe to start a REPL
134 and a reason why it cannot.
135 @rtype tuple of (bool, str)
136 """
137 return True, ""
138
139 def canStartPlotter(self):
140 """
141 Public method to determine, if a Plotter can be started.
142
143 @return tuple containing a flag indicating it is safe to start a
144 Plotter and a reason why it cannot.
145 @rtype tuple of (bool, str)
146 """
147 return True, ""
148
149 def canRunScript(self):
150 """
151 Public method to determine, if a script can be executed.
152
153 @return tuple containing a flag indicating it is safe to start a
154 Plotter and a reason why it cannot.
155 @rtype tuple of (bool, str)
156 """
157 return True, ""
158
159 def runScript(self, script):
160 """
161 Public method to run the given Python script.
162
163 @param script script to be executed
164 @type str
165 """
166 pythonScript = script.split("\n")
167 self.sendCommands(pythonScript)
168
169 def canStartFileManager(self):
170 """
171 Public method to determine, if a File Manager can be started.
172
173 @return tuple containing a flag indicating it is safe to start a
174 File Manager and a reason why it cannot.
175 @rtype tuple of (bool, str)
176 """
177 return True, ""
178
179 def __createRP2Menu(self):
180 """
181 Private method to create the RP2 submenu.
182 """
183 self.__rp2Menu = QMenu(self.tr("RP2 Functions"))
184
185 self.__showMpyAct = self.__rp2Menu.addAction(
186 self.tr("Show MicroPython Versions"), self.__showFirmwareVersions
187 )
188 self.__rp2Menu.addSeparator()
189 self.__bootloaderAct = self.__rp2Menu.addAction(
190 self.tr("Activate Bootloader"), self.__activateBootloader
191 )
192 self.__flashMpyAct = self.__rp2Menu.addAction(
193 self.tr("Flash MicroPython Firmware"), self.__flashPython
194 )
195 self.__rp2Menu.addSeparator()
196 self.__resetAct = self.__rp2Menu.addAction(
197 self.tr("Reset Device"), self.__resetDevice
198 )
199
200 def addDeviceMenuEntries(self, menu):
201 """
202 Public method to add device specific entries to the given menu.
203
204 @param menu reference to the context menu
205 @type QMenu
206 """
207 connected = self.microPython.isConnected()
208 linkConnected = self.microPython.isLinkConnected()
209
210 self.__showMpyAct.setEnabled(connected)
211 self.__bootloaderAct.setEnabled(connected)
212 self.__flashMpyAct.setEnabled(not linkConnected)
213 self.__resetAct.setEnabled(connected)
214
215 menu.addMenu(self.__rp2Menu)
216
217 def hasFlashMenuEntry(self):
218 """
219 Public method to check, if the device has its own flash menu entry.
220
221 @return flag indicating a specific flash menu entry
222 @rtype bool
223 """
224 return True
225
226 @pyqtSlot()
227 def __flashPython(self):
228 """
229 Private slot to flash a MicroPython firmware to the device.
230 """
231 from ..UF2FlashDialog import UF2FlashDialog
232
233 dlg = UF2FlashDialog(boardType="rp2")
234 dlg.exec()
235
236 @pyqtSlot()
237 def __activateBootloader(self):
238 """
239 Private slot to switch the board into 'bootloader' mode.
240 """
241 if self.microPython.isConnected():
242 self.executeCommands(
243 [
244 "import machine",
245 "machine.bootloader()",
246 ],
247 mode=self._submitMode,
248 )
249 # simulate pressing the disconnect button
250 self.microPython.on_connectButton_clicked()
251
252 @pyqtSlot()
253 def __showFirmwareVersions(self):
254 """
255 Private slot to show the firmware version of the connected device and the
256 available firmware version.
257 """
258 if self.microPython.isConnected():
259 if self._deviceData["mpy_name"] != "micropython":
260 EricMessageBox.critical(
261 None,
262 self.tr("Show MicroPython Versions"),
263 self.tr(
264 """The firmware of the connected device cannot be"""
265 """ determined or the board does not run MicroPython."""
266 """ Aborting..."""
267 ),
268 )
269 else:
270 if self._deviceData["mpy_variant"] == "Pimoroni Pico":
271 # MicroPython with Pimoroni add-on libraries
272 url = QUrl(FirmwareGithubUrls["pimoroni_pico"])
273 else:
274 url = QUrl(FirmwareGithubUrls["micropython"])
275 ui = ericApp().getObject("UserInterface")
276 request = QNetworkRequest(url)
277 reply = ui.networkAccessManager().head(request)
278 reply.finished.connect(lambda: self.__firmwareVersionResponse(reply))
279
280 @pyqtSlot(QNetworkReply)
281 def __firmwareVersionResponse(self, reply):
282 """
283 Private slot handling the response of the latest version request.
284
285 @param reply reference to the reply object
286 @type QNetworkReply
287 """
288 latestUrl = reply.url().toString()
289 tag = latestUrl.rsplit("/", 1)[-1]
290 while tag and not tag[0].isdecimal():
291 # get rid of leading non-decimal characters
292 tag = tag[1:]
293 latestVersion = EricUtilities.versionToTuple(tag)
294
295 if self._deviceData["mpy_version"] == "unknown":
296 currentVersionStr = self.tr("unknown")
297 currentVersion = (0, 0, 0)
298 else:
299 currentVersionStr = (
300 self._deviceData["mpy_variant_version"]
301 if bool(self._deviceData["mpy_variant_version"])
302 else self._deviceData["mpy_version"]
303 )
304 currentVersion = EricUtilities.versionToTuple(currentVersionStr)
305
306 msg = self.tr(
307 "<h4>MicroPython Version Information</h4>"
308 "<table>"
309 "<tr><td>Installed:</td><td>{0}</td></tr>"
310 "<tr><td>Available:</td><td>{1}</td></tr>"
311 "{2}"
312 "</table>"
313 ).format(
314 currentVersionStr,
315 tag,
316 (
317 self.tr("<tr><td>Variant:</td><td>{0}</td></tr>").format(
318 self._deviceData["mpy_variant"]
319 )
320 if self._deviceData["mpy_variant"]
321 else ""
322 ),
323 )
324 if self._deviceData["mpy_variant"] in ["Pimoroni Pico"] and not bool(
325 self._deviceData["mpy_variant_version"]
326 ):
327 # cannot derive update info
328 msg += self.tr("<p>Update may be available.</p>")
329 elif currentVersion < latestVersion:
330 msg += self.tr("<p><b>Update available!</b></p>")
331
332 EricMessageBox.information(
333 None,
334 self.tr("MicroPython Version"),
335 msg,
336 )
337
338 @pyqtSlot()
339 def __resetDevice(self):
340 """
341 Private slot to reset the connected device.
342 """
343 if self.microPython.isConnected():
344 self.executeCommands(
345 "import machine\nmachine.reset()\n", mode=self._submitMode
346 )
347
348 def getDocumentationUrl(self):
349 """
350 Public method to get the device documentation URL.
351
352 @return documentation URL of the device
353 @rtype str
354 """
355 return Preferences.getMicroPython("MicroPythonDocuUrl")
356
357 def getDownloadMenuEntries(self):
358 """
359 Public method to retrieve the entries for the downloads menu.
360
361 @return list of tuples with menu text and URL to be opened for each
362 entry
363 @rtype list of tuple of (str, str)
364 """
365 return [
366 (
367 self.tr("MicroPython Firmware"),
368 Preferences.getMicroPython("MicroPythonFirmwareUrl"),
369 ),
370 ("<separator>", ""),
371 (self.tr("Pimoroni Pico Firmware"), FirmwareGithubUrls["pimoroni_pico"]),
372 ("<separator>", ""),
373 (
374 self.tr("CircuitPython Firmware"),
375 Preferences.getMicroPython("CircuitPythonFirmwareUrl"),
376 ),
377 (
378 self.tr("CircuitPython Libraries"),
379 Preferences.getMicroPython("CircuitPythonLibrariesUrl"),
380 ),
381 ]
382
383 ##################################################################
384 ## time related methods below
385 ##################################################################
386
387 def _getSetTimeCode(self):
388 """
389 Protected method to get the device code to set the time.
390
391 Note: This method must be implemented in the various device specific
392 subclasses.
393
394 @return code to be executed on the connected device to set the time
395 @rtype str
396 """
397 # rtc_time[0] - year 4 digit
398 # rtc_time[1] - month 1..12
399 # rtc_time[2] - day 1..31
400 # rtc_time[3] - weekday 1..7 1=Monday
401 # rtc_time[4] - hour 0..23
402 # rtc_time[5] - minute 0..59
403 # rtc_time[6] - second 0..59
404 # rtc_time[7] - yearday 1..366
405 # rtc_time[8] - isdst 0, 1, or -1
406
407 # The machine.rtc.datetime() function takes the arguments in the order:
408 # (year, month, day, weekday, hour, minute, second, subseconds)
409 # __IGNORE_WARNING_M891__
410 # https://docs.micropython.org/en/latest/library/machine.RTC.html#machine-rtc
411 return """
412 def set_time(rtc_time):
413 import machine
414 rtc = machine.RTC()
415 rtc.datetime(rtc_time[:7] + (0,))
416 """
417
418 ##################################################################
419 ## Methods below implement WiFi related methods
420 ##################################################################
421
422 def addDeviceWifiEntries(self, menu):
423 """
424 Public method to add device specific entries to the given menu.
425
426 @param menu reference to the context menu
427 @type QMenu
428 """
429 menu.addSeparator()
430 menu.addAction(self.tr("Set Country"), self.__setCountry).setEnabled(
431 self._deviceData["wifi_type"] == "picow"
432 )
433 menu.addAction(self.tr("Reset Country"), self.__resetCountry).setEnabled(
434 self._deviceData["wifi_type"] == "picow"
435 )
436
437 def hasWifi(self):
438 """
439 Public method to check the availability of WiFi.
440
441 @return tuple containing a flag indicating the availability of WiFi
442 and the WiFi type (picow or picowireless)
443 @rtype tuple of (bool, str)
444 @exception OSError raised to indicate an issue with the device
445 """
446 # picowireless:
447 # It seems to take up to 20 sec to detect, that no Pico Wireless Pack is
448 # attached. Therefore the command will timeout before.
449 command = """
450 def has_wifi():
451 try:
452 import network
453 if hasattr(network, 'WLAN'):
454 return True, 'picow'
455 except ImportError:
456 try:
457 import picowireless as pw
458 try:
459 if pw.get_fw_version() != '':
460 return True, 'picowireless'
461 except RuntimeError:
462 pw.init()
463 return True, 'picowireless'
464 except ImportError:
465 pass
466
467 return False, ''
468
469 print(has_wifi())
470 del has_wifi
471 """
472 out, err = self.executeCommands(command, mode=self._submitMode, timeout=20000)
473 if err:
474 if not err.startswith(b"Timeout "):
475 raise OSError(self._shortError(err))
476 else:
477 # pimoroni firmware loaded but no pico wireless present
478 return False, ""
479 return ast.literal_eval(out.decode("utf-8"))
480
481 def hasWifiCountry(self):
482 """
483 Public method to check, if the device has support to set the WiFi country.
484
485 @return flag indicating the support of WiFi country
486 @rtype bool
487 """
488 return self._deviceData["wifi_type"] == "picow"
489
490 def getWifiData(self):
491 """
492 Public method to get data related to the current WiFi status.
493
494 @return tuple of three dictionaries containing the WiFi status data
495 for the WiFi client, access point and overall data
496 @rtype tuple of (dict, dict, dict)
497 @exception OSError raised to indicate an issue with the device
498 """
499 if self._deviceData["wifi_type"] == "picow":
500 command = """
501 def wifi_status():
502 import ubinascii
503 import ujson
504 import network
505 import rp2
506
507 wifi = network.WLAN(network.STA_IF)
508 station = {
509 'active': wifi.active(),
510 'connected': wifi.isconnected(),
511 'status': wifi.status(),
512 'ifconfig': wifi.ifconfig(),
513 'mac': ubinascii.hexlify(wifi.config('mac'), ':').decode(),
514 'channel': wifi.config('channel'),
515 'txpower': wifi.config('txpower'),
516 }
517 print(ujson.dumps(station))
518
519 wifi = network.WLAN(network.AP_IF)
520 ap = {
521 'active': wifi.active(),
522 'connected': wifi.isconnected(),
523 'status': wifi.status(),
524 'ifconfig': wifi.ifconfig(),
525 'mac': ubinascii.hexlify(wifi.config('mac'), ':').decode(),
526 'channel': wifi.config('channel'),
527 'txpower': wifi.config('txpower'),
528 'essid': wifi.config('essid'),
529 }
530 print(ujson.dumps(ap))
531
532 overall = {
533 'active': station['active'] or ap['active']
534 }
535 try:
536 overall['country'] = network.country()
537 except AttributeError:
538 overall['country'] = rp2.country()
539 try:
540 overall['hostname'] = network.hostname()
541 except AttributeError:
542 pass
543 print(ujson.dumps(overall))
544
545 wifi_status()
546 del wifi_status
547 """
548 elif self._deviceData["wifi_type"] == "picowireless":
549 command = """
550 def wifi_status():
551 import picowireless as pw
552 import ubinascii
553 import ujson
554
555 def ip_str(ip):
556 return '.'.join(str(i) for i in ip)
557
558 station = {
559 'active': pw.get_connection_status() not in (0, 7, 8, 9),
560 'connected': pw.get_connection_status() == 3,
561 'status': pw.get_connection_status(),
562 'ifconfig': (
563 ip_str(pw.get_ip_address()),
564 ip_str(pw.get_subnet_mask()),
565 ip_str(pw.get_gateway_ip()),
566 '0.0.0.0'
567 ),
568 'mac': ubinascii.hexlify(pw.get_mac_address(), ':').decode(),
569 }
570 if station['connected']:
571 station.update({
572 'ap_ssid': pw.get_current_ssid(),
573 'ap_bssid': ubinascii.hexlify(pw.get_current_bssid(), ':'),
574 'ap_rssi': pw.get_current_rssi(),
575 'ap_security': pw.get_current_encryption_type(),
576 })
577 print(ujson.dumps(station))
578
579 ap = {
580 'active': pw.get_connection_status() in (7, 8, 9),
581 'connected': pw.get_connection_status() == 8,
582 'status': pw.get_connection_status(),
583 'mac': ubinascii.hexlify(pw.get_mac_address(), ':').decode(),
584 }
585 if ap['active']:
586 ap['essid'] = pw.get_current_ssid()
587 ap['ifconfig'] = (
588 ip_str(pw.get_ip_address()),
589 ip_str(pw.get_subnet_mask()),
590 ip_str(pw.get_gateway_ip()),
591 '0.0.0.0'
592 )
593 print(ujson.dumps(ap))
594
595 overall = {
596 'active': pw.get_connection_status() != 0
597 }
598 print(ujson.dumps(overall))
599
600 wifi_status()
601 del wifi_status
602 """
603 else:
604 return super().getWifiData()
605
606 out, err = self.executeCommands(command, mode=self._submitMode)
607 if err:
608 raise OSError(self._shortError(err))
609
610 stationStr, apStr, overallStr = out.decode("utf-8").splitlines()
611 station = json.loads(stationStr)
612 ap = json.loads(apStr)
613 overall = json.loads(overallStr)
614 if "status" in station:
615 # translate the numerical status to a string
616 try:
617 station["status"] = self.__statusTranslations[
618 self._deviceData["wifi_type"]
619 ][station["status"]]
620 except KeyError:
621 station["status"] = str(station["status"])
622 if "status" in ap:
623 # translate the numerical status to a string
624 try:
625 ap["status"] = self.__statusTranslations[self._deviceData["wifi_type"]][
626 ap["status"]
627 ]
628 except KeyError:
629 ap["status"] = str(ap["status"])
630 if "ap_security" in station:
631 # translate the numerical AP security to a string
632 try:
633 station["ap_security"] = self.__securityTranslations[
634 self._deviceData["wifi_type"]
635 ][station["ap_security"]]
636 except KeyError:
637 station["ap_security"] = self.tr("unknown ({0})").format(
638 station["ap_security"]
639 )
640 return station, ap, overall
641
642 def connectWifi(self, ssid, password, hostname):
643 """
644 Public method to connect a device to a WiFi network.
645
646 @param ssid name (SSID) of the WiFi network
647 @type str
648 @param password password needed to connect
649 @type str
650 @param hostname host name of the device
651 @type str
652 @return tuple containing the connection status and an error string
653 @rtype tuple of (bool, str)
654 """
655 if self._deviceData["wifi_type"] == "picow":
656 country = Preferences.getMicroPython("WifiCountry").upper()
657 command = """
658 def connect_wifi(ssid, password, hostname, country):
659 import network
660 import rp2
661 import ujson
662 from time import sleep
663
664 rp2.country(country)
665
666 if hostname:
667 try:
668 network.hostname(hostname)
669 except AttributeError:
670 pass
671
672 wifi = network.WLAN(network.STA_IF)
673 wifi.active(False)
674 wifi.active(True)
675 wifi.connect(ssid, password)
676 max_wait = 140
677 while max_wait:
678 if wifi.status() < 0 or wifi.status() >= 3:
679 break
680 max_wait -= 1
681 sleep(0.1)
682 status = wifi.status()
683 print(ujson.dumps({{'connected': wifi.isconnected(), 'status': status}}))
684
685 connect_wifi({0}, {1}, {2}, {3})
686 del connect_wifi
687 """.format(
688 repr(ssid),
689 repr(password if password else ""),
690 repr(hostname),
691 repr(country if country else "XX"),
692 )
693 elif self._deviceData["wifi_type"] == "picowireless":
694 command = """
695 def connect_wifi(ssid, password):
696 import picowireless as pw
697 import ujson
698 from time import sleep
699
700 pw.init()
701 if bool(password):
702 pw.wifi_set_passphrase(ssid, password)
703 else:
704 pw.wifi_set_network(ssid)
705
706 max_wait = 140
707 while max_wait:
708 if pw.get_connection_status() == 3:
709 break
710 max_wait -= 1
711 sleep(0.1)
712 status = pw.get_connection_status()
713 if status == 3:
714 pw.set_led(0, 64, 0)
715 else:
716 pw.set_led(64, 0, 0)
717 print(ujson.dumps({{'connected': status == 3, 'status': status}}))
718
719 connect_wifi({0}, {1})
720 del connect_wifi
721 """.format(
722 repr(ssid),
723 repr(password if password else ""),
724 )
725 else:
726 return super().connectWifi(ssid, password, hostname)
727
728 with EricOverrideCursor():
729 out, err = self.executeCommands(
730 command, mode=self._submitMode, timeout=15000
731 )
732 if err:
733 return False, err
734
735 result = json.loads(out.decode("utf-8").strip())
736 if result["connected"]:
737 error = ""
738 else:
739 try:
740 error = self.__statusTranslations[self._deviceData["wifi_type"]][
741 result["status"]
742 ]
743 except KeyError:
744 error = str(result["status"])
745
746 return result["connected"], error
747
748 def disconnectWifi(self):
749 """
750 Public method to disconnect a device from the WiFi network.
751
752 @return tuple containing a flag indicating success and an error string
753 @rtype tuple of (bool, str)
754 """
755 if self._deviceData["wifi_type"] == "picow":
756 command = """
757 def disconnect_wifi():
758 import network
759 from time import sleep
760
761 wifi = network.WLAN(network.STA_IF)
762 wifi.disconnect()
763 wifi.active(False)
764 sleep(0.1)
765 print(not wifi.isconnected())
766
767 disconnect_wifi()
768 del disconnect_wifi
769 """
770 elif self._deviceData["wifi_type"] == "picowireless":
771 command = """
772 def disconnect_wifi():
773 import picowireless as pw
774 from time import sleep
775
776 pw.disconnect()
777 sleep(0.1)
778 print(pw.get_connection_status() != 3)
779 pw.set_led(0, 0, 0)
780
781 disconnect_wifi()
782 del disconnect_wifi
783 """
784 else:
785 return super().disconnectWifi()
786
787 out, err = self.executeCommands(command, mode=self._submitMode)
788 if err:
789 return False, err
790
791 return out.decode("utf-8").strip() == "True", ""
792
793 def isWifiClientConnected(self):
794 """
795 Public method to check the WiFi connection status as client.
796
797 @return flag indicating the WiFi connection status
798 @rtype bool
799 """
800 if self._deviceData["wifi_type"] == "picow":
801 command = """
802 def wifi_connected():
803 import network
804
805 wifi = network.WLAN(network.STA_IF)
806 print(wifi.isconnected())
807
808 wifi_connected()
809 del wifi_connected
810 """
811 elif self._deviceData["wifi_type"] == "picowireless":
812 command = """
813 def wifi_connected():
814 import picowireless as pw
815
816 print(pw.get_connection_status() == 3)
817
818 wifi_connected()
819 del wifi_connected
820 """
821 else:
822 return super().isWifiClientConnected()
823
824 out, err = self.executeCommands(command, mode=self._submitMode)
825 if err:
826 return False
827
828 return out.strip() == b"True"
829
830 def isWifiApConnected(self):
831 """
832 Public method to check the WiFi connection status as access point.
833
834 @return flag indicating the WiFi connection status
835 @rtype bool
836 """
837 if self._deviceData["wifi_type"] == "picow":
838 command = """
839 def wifi_connected():
840 import network
841
842 wifi = network.WLAN(network.AP_IF)
843 print(wifi.isconnected())
844
845 wifi_connected()
846 del wifi_connected
847 """
848 elif self._deviceData["wifi_type"] == "picowireless":
849 command = """
850 def wifi_connected():
851 import picowireless as pw
852
853 print(pw.get_connection_status() == 8)
854
855 wifi_connected()
856 del wifi_connected
857 """
858 else:
859 return super().isWifiClientConnected()
860
861 out, err = self.executeCommands(command, mode=self._submitMode)
862 if err:
863 return False
864
865 return out.strip() == b"True"
866
867 def writeCredentials(self, ssid, password, hostname, country):
868 """
869 Public method to write the given credentials to the connected device and modify
870 the start script to connect automatically.
871
872 @param ssid SSID of the network to connect to
873 @type str
874 @param password password needed to authenticate
875 @type str
876 @param hostname host name of the device
877 @type str
878 @param country WiFi country code
879 @type str
880 @return tuple containing a flag indicating success and an error message
881 @rtype tuple of (bool, str)
882 """
883 command = """
884 def modify_boot():
885 add = True
886 try:
887 with open('/boot.py', 'r') as f:
888 for ln in f.readlines():
889 if 'wifi_connect' in ln:
890 add = False
891 break
892 except:
893 pass
894 if add:
895 with open('/boot.py', 'a') as f:
896 f.write('\\nimport wifi_connect\\n')
897 print(True)
898
899 modify_boot()
900 del modify_boot
901 """
902
903 if self._deviceData["wifi_type"] == "picow":
904 secrets = (
905 "WIFI_SSID = {0}\nWIFI_KEY = {1}\nWIFI_COUNTRY={2}\n"
906 "WIFI_HOSTNAME = {3}\n"
907 ).format(
908 repr(ssid),
909 repr(password) if password else '""',
910 repr(country.upper()) if country else '""',
911 repr(hostname) if hostname else '""',
912 )
913 wifiConnectFile = "picowWiFiConnect.py"
914 else:
915 secrets = "WIFI_SSID = {0}\nWIFI_KEY = {1}\n".format(
916 repr(ssid),
917 repr(password) if password else '""',
918 )
919 if self._deviceData["wifi_type"] == "picowireless":
920 wifiConnectFile = "pimoroniWiFiConnect.py"
921 else:
922 secrets += "WIFI_HOSTNAME = {0}\n".format(
923 repr(hostname if hostname else '""')
924 )
925 wifiConnectFile = "mpyWiFiConnect.py"
926 try:
927 # write secrets file
928 self.putData("/secrets.py", secrets.encode("utf-8"))
929 # copy auto-connect file
930 self.put(
931 os.path.join(os.path.dirname(__file__), "MCUScripts", wifiConnectFile),
932 "/wifi_connect.py",
933 )
934 except OSError as err:
935 return False, str(err)
936
937 # modify boot.py
938 out, err = self.executeCommands(command, mode=self._submitMode)
939 if err:
940 return False, err
941
942 return out.decode("utf-8").strip() == "True", ""
943
944 def removeCredentials(self):
945 """
946 Public method to remove the saved credentials from the connected device.
947
948 @return tuple containing a flag indicating success and an error message
949 @rtype tuple of (bool, str)
950 """
951 try:
952 self.rm("/secrets.py")
953 except OSError as err:
954 return False, str(err)
955
956 return True, ""
957
958 def checkInternet(self):
959 """
960 Public method to check, if the internet can be reached.
961
962 @return tuple containing a flag indicating reachability and an error string
963 @rtype tuple of (bool, str)
964 """
965 if self._deviceData["wifi_type"] == "picow":
966 command = """
967 def check_internet():
968 import network
969 import socket
970
971 wifi = network.WLAN(network.STA_IF)
972 if wifi.isconnected():
973 s = socket.socket()
974 try:
975 s.connect(socket.getaddrinfo('quad9.net', 443)[0][-1])
976 s.close()
977 print(True)
978 except:
979 print(False)
980 else:
981 print(False)
982
983 check_internet()
984 del check_internet
985 """
986 elif self._deviceData["wifi_type"] == "picowireless":
987 command = """
988 def check_internet():
989 import picowireless as pw
990
991 if pw.get_connection_status() == 3:
992 res = pw.ping((9, 9, 9, 9), 300)
993 print(res >= 0)
994 else:
995 print(False)
996
997 check_internet()
998 del check_internet
999 """
1000 else:
1001 return super().checkInternet()
1002
1003 out, err = self.executeCommands(command, mode=self._submitMode, timeout=10000)
1004 if err:
1005 return False, err
1006
1007 return out.decode("utf-8").strip() == "True", ""
1008
1009 def scanNetworks(self):
1010 """
1011 Public method to scan for available WiFi networks.
1012
1013 @return tuple containing the list of available networks as a tuple of 'Name',
1014 'MAC-Address', 'channel', 'RSSI' and 'security' and an error string
1015 @rtype tuple of (list of tuple of (str, str, int, int, str), str)
1016 """
1017 if self._deviceData["wifi_type"] == "picow":
1018 country = Preferences.getMicroPython("WifiCountry").upper()
1019 command = """
1020 def scan_networks():
1021 import network
1022 import rp2
1023
1024 rp2.country({0})
1025
1026 wifi = network.WLAN(network.STA_IF)
1027 active = wifi.active()
1028 if not active:
1029 wifi.active(True)
1030 network_list = wifi.scan()
1031 if not active:
1032 wifi.active(False)
1033 print(network_list)
1034
1035 scan_networks()
1036 del scan_networks
1037 """.format(
1038 repr(country if country else "XX")
1039 )
1040
1041 elif self._deviceData["wifi_type"] == "picowireless":
1042 command = """
1043 def scan_networks():
1044 import picowireless as pw
1045
1046 network_list = []
1047 pw.init()
1048 pw.start_scan_networks()
1049 networks = pw.get_scan_networks()
1050 for n in range(networks):
1051 network_list.append((
1052 pw.get_ssid_networks(n),
1053 pw.get_bssid_networks(n),
1054 pw.get_channel_networks(n),
1055 pw.get_rssi_networks(n),
1056 pw.get_enc_type_networks(n),
1057 ))
1058 print(network_list)
1059
1060 scan_networks()
1061 del scan_networks
1062 """
1063 else:
1064 return super().scanNetworks()
1065
1066 out, err = self.executeCommands(command, mode=self._submitMode, timeout=15000)
1067 if err:
1068 return [], err
1069
1070 networksList = ast.literal_eval(out.decode("utf-8"))
1071 networks = []
1072 for network in networksList:
1073 if network[0]:
1074 ssid = (
1075 network[0].decode("utf-8")
1076 if isinstance(network[0], bytes)
1077 else network[0]
1078 )
1079 mac = (
1080 binascii.hexlify(network[1], ":").decode("utf-8")
1081 if network[1] is not None
1082 else ""
1083 )
1084 channel = network[2]
1085 rssi = network[3]
1086 try:
1087 security = self.__securityTranslations[
1088 self._deviceData["wifi_type"]
1089 ][network[4]]
1090 except KeyError:
1091 security = self.tr("unknown ({0})").format(network[4])
1092 networks.append((ssid, mac, channel, rssi, security))
1093
1094 return networks, ""
1095
1096 def deactivateInterface(self, interface):
1097 """
1098 Public method to deactivate a given WiFi interface of the connected device.
1099
1100 @param interface designation of the interface to be deactivated (one of 'AP'
1101 or 'STA')
1102 @type str
1103 @return tuple containg a flag indicating success and an error message
1104 @rtype tuple of (bool, str)
1105 @exception ValueError raised to indicate a wrong value for the interface type
1106 """
1107 if interface not in ("STA", "AP"):
1108 raise ValueError(
1109 "interface must be 'AP' or 'STA', got '{0}'".format(interface)
1110 )
1111
1112 if self._deviceData["wifi_type"] == "picow":
1113 command = """
1114 def deactivate():
1115 import network
1116 from time import sleep
1117
1118 wifi = network.WLAN(network.{0}_IF)
1119 wifi.active(False)
1120 sleep(0.1)
1121 print(not wifi.active())
1122
1123 deactivate()
1124 del deactivate
1125 """.format(
1126 interface
1127 )
1128 elif self._deviceData["wifi_type"] == "picowireless":
1129 command = """
1130 def deactivate():
1131 import picowireless as pw
1132
1133 pw.init()
1134 print(True)
1135
1136 deactivate()
1137 del deactivate
1138 """
1139 else:
1140 return super().deactivateInterface(interface)
1141
1142 out, err = self.executeCommands(command, mode=self._submitMode)
1143 if err:
1144 return False, err
1145 else:
1146 return out.decode("utf-8").strip() == "True", ""
1147
1148 def startAccessPoint(
1149 self,
1150 ssid,
1151 security=None,
1152 password=None,
1153 hostname=None,
1154 ifconfig=None,
1155 ):
1156 """
1157 Public method to start the access point interface.
1158
1159 @param ssid SSID of the access point
1160 @type str
1161 @param security security method (defaults to None)
1162 @type int (optional)
1163 @param password password (defaults to None)
1164 @type str (optional)
1165 @param hostname host name of the device (defaults to None)
1166 @type str (optional)
1167 @param ifconfig IPv4 configuration for the access point if not default
1168 (IPv4 address, netmask, gateway address, DNS server address)
1169 @type tuple of (str, str, str, str)
1170 @return tuple containing a flag indicating success and an error message
1171 @rtype tuple of (bool, str)
1172 """
1173 if security is None or password is None:
1174 security = 0
1175 password = "" # secok
1176
1177 if self._deviceData["wifi_type"] == "picow":
1178 country = Preferences.getMicroPython("WifiCountry").upper()
1179 if security:
1180 security = 4 # Pico W supports just WPA/WPA2
1181 command = """
1182 def start_ap(ssid, security, password, hostname, ifconfig, country):
1183 import network
1184 import rp2
1185 from time import sleep
1186
1187 rp2.country(country)
1188
1189 if hostname:
1190 try:
1191 network.hostname(hostname)
1192 except AttributeError:
1193 pass
1194
1195 ap = network.WLAN(network.AP_IF)
1196 ap.active(True)
1197 if ifconfig:
1198 ap.ifconfig(ifconfig)
1199 ap.config(ssid=ssid, security=security, password=password)
1200 sleep(0.1)
1201 print(ap.isconnected())
1202
1203 start_ap({0}, {1}, {2}, {3}, {4}, {5})
1204 del start_ap
1205 """.format(
1206 repr(ssid),
1207 security,
1208 repr(password),
1209 repr(hostname),
1210 ifconfig,
1211 repr(country if country else "XX"),
1212 )
1213 elif self._deviceData["wifi_type"] == "picowireless":
1214 if ifconfig:
1215 return (
1216 False,
1217 self.tr(
1218 "Pico Wireless does not support setting the IPv4 parameters of"
1219 " the WiFi access point."
1220 ),
1221 )
1222
1223 # AP is fixed at channel 6
1224 command = """
1225 def start_ap(ssid, password):
1226 import picowireless as pw
1227
1228 pw.init()
1229 if bool(password):
1230 res = pw.wifi_set_ap_passphrase(ssid, password, 6)
1231 else:
1232 res = pw.wifi_set_ap_network(ssid, 6)
1233 status = pw.get_connection_status()
1234 if status in (7, 8):
1235 pw.set_led(0, 64, 0)
1236 else:
1237 pw.set_led(64, 0, 0)
1238 print(res >= 0)
1239
1240 start_ap({0}, {1})
1241 del start_ap
1242 """.format(
1243 repr(ssid),
1244 repr(password if password else ""),
1245 )
1246 else:
1247 return super().startAccessPoint(
1248 ssid,
1249 security=security,
1250 password=password,
1251 hostname=hostname,
1252 ifconfig=ifconfig,
1253 )
1254
1255 out, err = self.executeCommands(command, mode=self._submitMode, timeout=15000)
1256 if err:
1257 return False, err
1258 else:
1259 return out.decode("utf-8").strip() == "True", ""
1260
1261 def stopAccessPoint(self):
1262 """
1263 Public method to stop the access point interface.
1264
1265 @return tuple containg a flag indicating success and an error message
1266 @rtype tuple of (bool, str)
1267 """
1268 if self._deviceData["wifi_type"] in ("picow", "picowireless"):
1269 return self.deactivateInterface("AP")
1270 else:
1271 return super().stopAccessPoint()
1272
1273 def getConnectedClients(self):
1274 """
1275 Public method to get a list of connected clients.
1276
1277 @return a tuple containing a list of tuples containing the client MAC-Address
1278 and the RSSI (if supported and available) and an error message
1279 @rtype tuple of ([(bytes, int)], str)
1280 """
1281 if self._deviceData["wifi_type"] == "picow":
1282 command = """
1283 def get_stations():
1284 import network
1285
1286 ap = network.WLAN(network.AP_IF)
1287 stations = ap.status('stations')
1288 print(stations)
1289
1290 get_stations()
1291 del get_stations
1292 """
1293 elif self._deviceData["wifi_type"] == "picowireless":
1294 return (
1295 [],
1296 self.tr(
1297 "Pico Wireless does not support reporting of connected clients."
1298 ),
1299 )
1300 else:
1301 return super().checkInternet()
1302
1303 out, err = self.executeCommands(command, mode=self._submitMode, timeout=10000)
1304 if err:
1305 return [], err
1306
1307 clientsList = ast.literal_eval(out.decode("utf-8"))
1308 return clientsList, ""
1309
1310 def enableWebrepl(self, password):
1311 """
1312 Public method to write the given WebREPL password to the connected device and
1313 modify the start script to start the WebREPL server.
1314
1315 @param password password needed to authenticate
1316 @type str
1317 @return tuple containing a flag indicating success and an error message
1318 @rtype tuple of (bool, str)
1319 """
1320 command = """
1321 def modify_boot():
1322 import os
1323
1324 try:
1325 with open('/boot.py', 'r') as old_f, open('/boot.py.tmp', 'w') as new_f:
1326 found = False
1327 for l in old_f.read().splitlines():
1328 if 'webrepl' in l:
1329 found = True
1330 if l.startswith('#'):
1331 l = l[1:]
1332 new_f.write(l + '\\n')
1333 if not found:
1334 new_f.write('\\nimport webrepl\\nwebrepl.start()\\n')
1335
1336 os.remove('/boot.py')
1337 os.rename('/boot.py.tmp', '/boot.py')
1338 except:
1339 pass
1340
1341 print(True)
1342
1343 modify_boot()
1344 del modify_boot
1345 """
1346
1347 if self._deviceData["wifi_type"] == "picow":
1348 config = "PASS = {0}\n".format(repr(password))
1349 else:
1350 return False, self.tr("WebREPL is not supported on this device.")
1351
1352 try:
1353 # write config file
1354 self.putData("/webrepl_cfg.py", config.encode("utf-8"))
1355 except OSError as err:
1356 return False, str(err)
1357
1358 # modify boot.py
1359 out, err = self.executeCommands(command, mode=self._submitMode)
1360 if err:
1361 return False, err
1362
1363 return out.decode("utf-8").strip() == "True", ""
1364
1365 def disableWebrepl(self):
1366 """
1367 Public method to write the given WebREPL password to the connected device and
1368 modify the start script to start the WebREPL server.
1369
1370 @return tuple containing a flag indicating success and an error message
1371 @rtype tuple of (bool, str)
1372 """
1373 command = """
1374 def modify_boot():
1375 import os
1376
1377 try:
1378 with open('/boot.py', 'r') as old_f, open('/boot.py.tmp', 'w') as new_f:
1379 for l in old_f.read().splitlines():
1380 if 'webrepl' in l:
1381 if not l.startswith('#'):
1382 l = '#' + l
1383 new_f.write(l + '\\n')
1384
1385 os.remove('/boot.py')
1386 os.rename('/boot.py.tmp', '/boot.py')
1387 except:
1388 pass
1389
1390 print(True)
1391
1392 modify_boot()
1393 del modify_boot
1394 """
1395
1396 # modify boot.py
1397 out, err = self.executeCommands(command, mode=self._submitMode)
1398 if err:
1399 return False, err
1400
1401 return out.decode("utf-8").strip() == "True", ""
1402
1403 @pyqtSlot()
1404 def __setCountry(self):
1405 """
1406 Private slot to configure the country of the connected device.
1407
1408 The country is the two-letter ISO 3166-1 Alpha-2 country code.
1409 """
1410 from ..WifiDialogs.WifiCountryDialog import WifiCountryDialog
1411
1412 dlg = WifiCountryDialog()
1413 if dlg.exec() == QDialog.DialogCode.Accepted:
1414 country, remember = dlg.getCountry()
1415 if remember:
1416 Preferences.setMicroPython("WifiCountry", country)
1417
1418 command = """
1419 try:
1420 import network
1421 network.country({0})
1422 except AttributeError:
1423 import rp2
1424 rp2.country({0})
1425 """.format(
1426 repr(country)
1427 )
1428
1429 out, err = self.executeCommands(command, mode=self._submitMode)
1430 if err:
1431 self.microPython.showError("country()", err)
1432
1433 @pyqtSlot()
1434 def __resetCountry(self):
1435 """
1436 Private slot to reset the country of the connected ESP32 device.
1437
1438 The country is the two-letter ISO 3166-1 Alpha-2 country code. This method
1439 resets it to the default code 'XX' representing the "worldwide" region.
1440 """
1441 command = """
1442 try:
1443 import network
1444 network.country('XX')
1445 except AttributeError:
1446 pass
1447 """
1448
1449 out, err = self.executeCommands(command, mode=self._submitMode)
1450 if err:
1451 self.microPython.showError("country()", err)
1452
1453 ##################################################################
1454 ## Methods below implement Bluetooth related methods
1455 ##################################################################
1456
1457 def hasBluetooth(self):
1458 """
1459 Public method to check the availability of Bluetooth.
1460
1461 @return flag indicating the availability of Bluetooth
1462 @rtype bool
1463 @exception OSError raised to indicate an issue with the device
1464 """
1465 command = """
1466 def has_bt():
1467 try:
1468 import bluetooth
1469 if hasattr(bluetooth, 'BLE'):
1470 return True
1471 except ImportError:
1472 pass
1473
1474 return False
1475
1476 print(has_bt())
1477 del has_bt
1478 """
1479 out, err = self.executeCommands(command, mode=self._submitMode, timeout=10000)
1480 if err:
1481 raise OSError(self._shortError(err))
1482 return out.strip() == b"True"
1483
1484 def getBluetoothStatus(self):
1485 """
1486 Public method to get Bluetooth status data of the connected board.
1487
1488 @return list of tuples containing the translated status data label and
1489 the associated value
1490 @rtype list of tuples of (str, str)
1491 @exception OSError raised to indicate an issue with the device
1492 """
1493 command = """
1494 def ble_status():
1495 import bluetooth
1496 import ubinascii
1497 import ujson
1498
1499 ble = bluetooth.BLE()
1500
1501 ble_active = ble.active()
1502 if not ble_active:
1503 ble.active(True)
1504
1505 res = {
1506 'active': ble_active,
1507 'mac': ubinascii.hexlify(ble.config('mac')[1], ':').decode(),
1508 'addr_type': ble.config('mac')[0],
1509 'name': ble.config('gap_name'),
1510 'mtu': ble.config('mtu'),
1511 }
1512
1513 if not ble_active:
1514 ble.active(False)
1515
1516 print(ujson.dumps(res))
1517
1518 ble_status()
1519 del ble_status
1520 """
1521 out, err = self.executeCommands(command, mode=self._submitMode)
1522 if err:
1523 raise OSError(self._shortError(err))
1524
1525 status = []
1526 bleStatus = json.loads(out.decode("utf-8"))
1527 status.append((self.tr("Active"), self.bool2str(bleStatus["active"])))
1528 status.append((self.tr("Name"), bleStatus["name"]))
1529 status.append((self.tr("MAC-Address"), bleStatus["mac"]))
1530 status.append(
1531 (
1532 self.tr("Address Type"),
1533 self.tr("Public") if bleStatus == 0 else self.tr("Random"),
1534 )
1535 )
1536 status.append((self.tr("MTU"), self.tr("{0} Bytes").format(bleStatus["mtu"])))
1537
1538 return status
1539
1540 def activateBluetoothInterface(self):
1541 """
1542 Public method to activate the Bluetooth interface.
1543
1544 @return flag indicating the new state of the Bluetooth interface
1545 @rtype bool
1546 @exception OSError raised to indicate an issue with the device
1547 """
1548 command = """
1549 def activate_ble():
1550 import bluetooth
1551
1552 ble = bluetooth.BLE()
1553 if not ble.active():
1554 ble.active(True)
1555 print(ble.active())
1556
1557 activate_ble()
1558 del activate_ble
1559 """
1560 out, err = self.executeCommands(command, mode=self._submitMode)
1561 if err:
1562 raise OSError(self._shortError(err))
1563
1564 return out.strip() == b"True"
1565
1566 def deactivateBluetoothInterface(self):
1567 """
1568 Public method to deactivate the Bluetooth interface.
1569
1570 @return flag indicating the new state of the Bluetooth interface
1571 @rtype bool
1572 @exception OSError raised to indicate an issue with the device
1573 """
1574 command = """
1575 def deactivate_ble():
1576 import bluetooth
1577
1578 ble = bluetooth.BLE()
1579 if ble.active():
1580 ble.active(False)
1581 print(ble.active())
1582
1583 deactivate_ble()
1584 del deactivate_ble
1585 """
1586 out, err = self.executeCommands(command, mode=self._submitMode)
1587 if err:
1588 raise OSError(self._shortError(err))
1589
1590 return out.strip() == b"True"
1591
1592 def getDeviceScan(self, timeout=10):
1593 """
1594 Public method to perform a Bluetooth device scan.
1595
1596 @param timeout duration of the device scan in seconds (defaults
1597 to 10)
1598 @type int (optional)
1599 @return tuple containing a dictionary with the scan results and
1600 an error string
1601 @rtype tuple of (dict, str)
1602 """
1603 from ..BluetoothDialogs.BluetoothAdvertisement import BluetoothAdvertisement
1604
1605 command = """
1606 _scan_done = False
1607
1608 def ble_scan():
1609 import bluetooth
1610 import time
1611 import ubinascii
1612
1613 IRQ_SCAN_RESULT = 5
1614 IRQ_SCAN_DONE = 6
1615
1616 def _bleIrq(event, data):
1617 global _scan_done
1618 if event == IRQ_SCAN_RESULT:
1619 addr_type, addr, adv_type, rssi, adv_data = data
1620 if addr:
1621 print({{
1622 'address': ubinascii.hexlify(addr,':').decode('utf-8'),
1623 'rssi': rssi,
1624 'adv_type': adv_type,
1625 'advertisement': bytes(adv_data),
1626 }})
1627 elif event == IRQ_SCAN_DONE:
1628 _scan_done = True
1629
1630 ble = bluetooth.BLE()
1631
1632 ble_active = ble.active()
1633 if not ble_active:
1634 ble.active(True)
1635
1636 ble.irq(_bleIrq)
1637 ble.gap_scan({0} * 1000, 1000000, 50000, True)
1638 while not _scan_done:
1639 time.sleep(0.2)
1640
1641 if not ble_active:
1642 ble.active(False)
1643
1644 ble_scan()
1645 del ble_scan, _scan_done
1646 """.format(
1647 timeout
1648 )
1649 out, err = self.executeCommands(
1650 command, mode=self._submitMode, timeout=(timeout + 5) * 1000
1651 )
1652 if err:
1653 return {}, err
1654
1655 scanResults = {}
1656 for line in out.decode("utf-8").splitlines():
1657 res = ast.literal_eval(line)
1658 address = res["address"]
1659 if address not in scanResults:
1660 scanResults[address] = BluetoothAdvertisement(address)
1661 scanResults[address].update(
1662 res["adv_type"], res["rssi"], res["advertisement"]
1663 )
1664
1665 return scanResults, ""
1666
1667 ##################################################################
1668 ## Methods below implement Ethernet related methods
1669 ##################################################################
1670
1671 def hasEthernet(self):
1672 """
1673 Public method to check the availability of Ethernet.
1674
1675 @return tuple containing a flag indicating the availability of Ethernet
1676 and the Ethernet type (picowiz)
1677 @rtype tuple of (bool, str)
1678 @exception OSError raised to indicate an issue with the device
1679 """
1680 command = """
1681 def has_eth():
1682 try:
1683 import network
1684 if hasattr(network, 'WIZNET5K'):
1685 return True, 'picowiz'
1686 except ImportError:
1687 pass
1688
1689 return False, ''
1690
1691 print(has_eth())
1692 del has_eth
1693 """
1694
1695 out, err = self.executeCommands(command, mode=self._submitMode, timeout=10000)
1696 if err:
1697 raise OSError(self._shortError(err))
1698
1699 return ast.literal_eval(out.decode("utf-8"))
1700
1701 def getEthernetStatus(self):
1702 """
1703 Public method to get Ethernet status data of the connected board.
1704
1705 @return list of tuples containing the translated status data label and
1706 the associated value
1707 @rtype list of tuples of (str, str)
1708 @exception OSError raised to indicate an issue with the device
1709 """
1710 command = """{0}
1711 def ethernet_status():
1712 import network
1713 import ubinascii
1714 import ujson
1715
1716 w5x00_init()
1717
1718 res = {{
1719 'active': nic.active(),
1720 'connected': nic.isconnected(),
1721 'status': nic.status(),
1722 'ifconfig': nic.ifconfig(),
1723 'mac': ubinascii.hexlify(nic.config('mac'), ':').decode(),
1724 }}
1725 try:
1726 res['hostname'] = network.hostname()
1727 except AttributeError:
1728 res['hostname'] = ''
1729 print(ujson.dumps(res))
1730
1731 ethernet_status()
1732 del ethernet_status, w5x00_init
1733 """.format(
1734 WiznetUtilities.mpyWiznetInit()
1735 )
1736
1737 out, err = self.executeCommands(command, mode=self._submitMode)
1738 if err:
1739 raise OSError(self._shortError(err))
1740
1741 status = []
1742 ethStatus = json.loads(out.decode("utf-8"))
1743 status.append((self.tr("Active"), self.bool2str(ethStatus["active"])))
1744 status.append((self.tr("Connected"), self.bool2str(ethStatus["connected"])))
1745 status.append(
1746 (
1747 self.tr("Status"),
1748 self.__statusTranslations["picowiz"][ethStatus["status"]],
1749 )
1750 )
1751 status.append(
1752 (
1753 self.tr("Hostname"),
1754 ethStatus["hostname"] if ethStatus["hostname"] else self.tr("unknown"),
1755 )
1756 )
1757 status.append((self.tr("IPv4 Address"), ethStatus["ifconfig"][0]))
1758 status.append((self.tr("Netmask"), ethStatus["ifconfig"][1]))
1759 status.append((self.tr("Gateway"), ethStatus["ifconfig"][2]))
1760 status.append((self.tr("DNS"), ethStatus["ifconfig"][3]))
1761 status.append((self.tr("MAC-Address"), ethStatus["mac"]))
1762
1763 return status
1764
1765 def connectToLan(self, config, hostname):
1766 """
1767 Public method to connect the connected device to the LAN.
1768
1769 @param config configuration for the connection (either the string 'dhcp'
1770 for a dynamic address or a tuple of four strings with the IPv4 parameters.
1771 @type str or tuple of (str, str, str, str)
1772 @param hostname host name of the device
1773 @type str
1774 @return tuple containing a flag indicating success and an error message
1775 @rtype tuple of (bool, str)
1776 """
1777 command = """{0}
1778 def connect_lan(config, hostname):
1779 import network
1780 import time
1781
1782 if hostname:
1783 try:
1784 network.hostname(hostname)
1785 except AttributeError:
1786 pass
1787
1788 w5x00_init()
1789
1790 nic.active(False)
1791 nic.active(True)
1792 nic.ifconfig(config)
1793 max_wait = 140
1794 while max_wait:
1795 if nic.isconnected():
1796 break
1797 max_wait -= 1
1798 time.sleep(0.1)
1799 print(nic.isconnected())
1800
1801 connect_lan({1}, {2})
1802 del connect_lan, w5x00_init
1803 """.format(
1804 WiznetUtilities.mpyWiznetInit(),
1805 "'dhcp'" if config == "dhcp" else config,
1806 repr(hostname) if hostname else "''",
1807 )
1808
1809 with EricOverrideCursor():
1810 out, err = self.executeCommands(
1811 command, mode=self._submitMode, timeout=15000
1812 )
1813 if err:
1814 return False, err
1815
1816 return out.strip() == b"True", ""
1817
1818 def disconnectFromLan(self):
1819 """
1820 Public method to disconnect from the LAN.
1821
1822 @return tuple containing a flag indicating success and an error message
1823 @rtype tuple of (bool, str)
1824 """
1825 command = """{0}
1826 def disconnect_lan():
1827 import time
1828
1829 w5x00_init()
1830
1831 nic.active(False)
1832 time.sleep(0.1)
1833 print(not nic.isconnected())
1834
1835 disconnect_lan()
1836 del disconnect_lan, w5x00_init
1837 """.format(
1838 WiznetUtilities.mpyWiznetInit(),
1839 )
1840
1841 with EricOverrideCursor():
1842 out, err = self.executeCommands(
1843 command, mode=self._submitMode, timeout=15000
1844 )
1845 if err:
1846 return False, err
1847
1848 return out.strip() == b"True", ""
1849
1850 def isLanConnected(self):
1851 """
1852 Public method to check the LAN connection status.
1853
1854 @return flag indicating that the device is connected to the LAN
1855 @rtype bool
1856 """
1857 command = """{0}
1858 def is_connected():
1859 import network
1860
1861 w5x00_init()
1862
1863 print(nic.isconnected())
1864
1865 is_connected()
1866 del is_connected, w5x00_init
1867 """.format(
1868 WiznetUtilities.mpyWiznetInit(),
1869 )
1870
1871 out, err = self.executeCommands(command, mode=self._submitMode)
1872 if err:
1873 return False
1874
1875 return out.strip() == b"True"
1876
1877 def checkInternetViaLan(self):
1878 """
1879 Public method to check, if the internet can be reached (LAN variant).
1880
1881 @return tuple containing a flag indicating reachability and an error string
1882 @rtype tuple of (bool, str)
1883 """
1884 command = """{0}
1885 def check_internet():
1886 import network
1887 import socket
1888
1889 w5x00_init()
1890
1891 if nic.isconnected():
1892 s = socket.socket()
1893 try:
1894 s.connect(socket.getaddrinfo('quad9.net', 443)[0][-1])
1895 s.close()
1896 print(True)
1897 except:
1898 print(False)
1899 else:
1900 print(False)
1901
1902 check_internet()
1903 del check_internet, w5x00_init
1904 """.format(
1905 WiznetUtilities.mpyWiznetInit(),
1906 )
1907
1908 out, err = self.executeCommands(command, mode=self._submitMode, timeout=10000)
1909 if err:
1910 return False, err
1911
1912 return out.strip() == b"True", ""
1913
1914 def deactivateEthernet(self):
1915 """
1916 Public method to deactivate the Ethernet interface of the connected device.
1917
1918 @return tuple containg a flag indicating success and an error message
1919 @rtype tuple of (bool, str)
1920 """
1921 # The WIZnet 5x00 interface cannot be switched off explicitly. That means,
1922 # disconnect from the LAN is all we can do.
1923
1924 return self.disconnectFromLan()
1925
1926 def writeLanAutoConnect(self, config, hostname):
1927 """
1928 Public method to generate a script and associated configuration to connect the
1929 device to the LAN during boot time.
1930
1931 @param config configuration for the connection (either the string 'dhcp'
1932 for a dynamic address or a tuple of four strings with the IPv4 parameters.
1933 @type str or tuple of (str, str, str, str)
1934 @param hostname host name of the device
1935 @type str
1936 @return tuple containing a flag indicating success and an error message
1937 @rtype tuple of (bool, str)
1938 """
1939 command = """
1940 def modify_boot():
1941 add = True
1942 try:
1943 with open('/boot.py', 'r') as f:
1944 for ln in f.readlines():
1945 if 'wiznet_connect' in ln:
1946 add = False
1947 break
1948 except:
1949 pass
1950 if add:
1951 with open('/boot.py', 'a') as f:
1952 f.write('\\n')
1953 f.write('import wiznet_connect\\n')
1954 f.write('nic = wiznet_connect.connect_lan()\\n')
1955 print(True)
1956
1957 modify_boot()
1958 del modify_boot
1959 """
1960 devconfig = "ifconfig = {0}\nhostname = {1}".format(
1961 "'dhcp'" if config == "dhcp" else config,
1962 repr(hostname) if hostname else "''",
1963 )
1964 try:
1965 # write secrets file
1966 self.putData("/wiznet_config.py", devconfig.encode("utf-8"))
1967 # copy auto-connect file
1968 self.put(
1969 os.path.join(
1970 os.path.dirname(__file__), "MCUScripts", "picoWiznetConnect.py"
1971 ),
1972 "/wiznet_connect.py",
1973 )
1974 except OSError as err:
1975 return False, str(err)
1976
1977 # modify boot.py
1978 out, err = self.executeCommands(command, mode=self._submitMode)
1979 if err:
1980 return False, err
1981
1982 return out.decode("utf-8").strip() == "True", ""
1983
1984 def removeLanAutoConnect(self):
1985 """
1986 Public method to remove the saved IPv4 parameters from the connected device.
1987
1988 Note: This disables the LAN auto-connect feature.
1989
1990 @return tuple containing a flag indicating success and an error message
1991 @rtype tuple of (bool, str)
1992 """
1993 try:
1994 self.rm("/wiznet_config.py")
1995 except OSError as err:
1996 return False, str(err)
1997
1998 return True, ""
1999
2000 ##################################################################
2001 ## Methods below implement NTP related methods
2002 ##################################################################
2003
2004 def hasNetworkTime(self):
2005 """
2006 Public method to check the availability of network time functions.
2007
2008 @return flag indicating the availability of network time functions
2009 @rtype bool
2010 @exception OSError raised to indicate an issue with the device
2011 """
2012 command = """
2013 def has_ntp():
2014 try:
2015 import ntptime
2016 return True
2017 except ImportError:
2018 return False
2019
2020 print(has_ntp())
2021 del has_ntp
2022 """
2023 out, err = self.executeCommands(command, mode=self._submitMode)
2024 if err:
2025 raise OSError(self._shortError(err))
2026 return out.strip() == b"True"
2027
2028 def setNetworkTime(self, server="pool.ntp.org", tzOffset=0, timeout=10):
2029 """
2030 Public method to set the time to the network time retrieved from an
2031 NTP server.
2032
2033 @param server name of the NTP server to get the network time from
2034 (defaults to "0.pool.ntp.org")
2035 @type str (optional)
2036 @param tzOffset offset with respect to UTC (defaults to 0)
2037 @type int (optional)
2038 @param timeout maximum time to wait for a server response in seconds
2039 (defaults to 10)
2040 @type int
2041 @return tuple containing a flag indicating success and an error string
2042 @rtype tuple of (bool, str)
2043 """
2044 command = """
2045 def set_ntp_time(server, tz_offset, timeout):
2046 import network
2047 import ntptime
2048 import machine
2049
2050 if hasattr(network, 'WLAN') and not network.WLAN(network.STA_IF).isconnected():
2051 return False
2052 elif hasattr(network, 'WIZNET5K'):
2053 try:
2054 if not nic.isconnected():
2055 return False
2056 except NameError:
2057 return False
2058
2059 ntptime.host = server
2060 ntptime.timeout = timeout
2061 ntptime.settime()
2062
2063 rtc = machine.RTC()
2064 t = list(rtc.datetime())
2065 t[4] += tz_offset
2066 rtc.datetime(t)
2067
2068 return True
2069
2070 try:
2071 print({{
2072 'result': set_ntp_time({0}, {1}, {2}),
2073 'error': '',
2074 }})
2075 except Exception as err:
2076 print({{
2077 'result': False,
2078 'error': str(err),
2079 }})
2080 del set_ntp_time
2081 """.format(
2082 repr(server), tzOffset, timeout
2083 )
2084 out, err = self.executeCommands(
2085 command, mode=self._submitMode, timeout=(timeout + 2) * 1000
2086 )
2087 if err:
2088 return False, err
2089 else:
2090 res = ast.literal_eval(out.decode("utf-8"))
2091 return res["result"], res["error"]
2092
2093
2094 def createDevice(microPythonWidget, deviceType, _vid, _pid, _boardName, _serialNumber):
2095 """
2096 Function to instantiate a MicroPython device object.
2097
2098 @param microPythonWidget reference to the main MicroPython widget
2099 @type MicroPythonWidget
2100 @param deviceType device type assigned to this device interface
2101 @type str
2102 @param _vid vendor ID (unused)
2103 @type int
2104 @param _pid product ID (unused)
2105 @type int
2106 @param _boardName name of the board (unused)
2107 @type str
2108 @param _serialNumber serial number of the board (unused)
2109 @type str
2110 @return reference to the instantiated device object
2111 @rtype RP2Device
2112 """
2113 return RP2Device(microPythonWidget, deviceType)

eric ide

mercurial