|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2021 - 2024 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing the device interface class for RP2040/RP2350 based boards |
|
8 (e.g. Raspberry Pi Pico / Pico 2). |
|
9 """ |
|
10 |
|
11 import ast |
|
12 import binascii |
|
13 import json |
|
14 import os |
|
15 |
|
16 from PyQt6.QtCore import QUrl, pyqtSlot |
|
17 from PyQt6.QtNetwork import QNetworkReply, QNetworkRequest |
|
18 from PyQt6.QtWidgets import QDialog, QMenu |
|
19 |
|
20 from eric7 import EricUtilities, Preferences |
|
21 from eric7.EricGui.EricOverrideCursor import EricOverrideCursor |
|
22 from eric7.EricWidgets import EricMessageBox |
|
23 from eric7.EricWidgets.EricApplication import ericApp |
|
24 |
|
25 from ..EthernetDialogs import WiznetUtilities |
|
26 from ..MicroPythonWidget import HAS_QTCHART |
|
27 from . import FirmwareGithubUrls |
|
28 from .DeviceBase import BaseDevice |
|
29 |
|
30 |
|
31 class RP2Device(BaseDevice): |
|
32 """ |
|
33 Class implementing the device for RP2040/RP2350 based boards. |
|
34 """ |
|
35 |
|
36 def __init__(self, microPythonWidget, deviceType, parent=None): |
|
37 """ |
|
38 Constructor |
|
39 |
|
40 @param microPythonWidget reference to the main MicroPython widget |
|
41 @type MicroPythonWidget |
|
42 @param deviceType device type assigned to this device interface |
|
43 @type str |
|
44 @param parent reference to the parent object |
|
45 @type QObject |
|
46 """ |
|
47 super().__init__(microPythonWidget, deviceType, parent) |
|
48 |
|
49 self.__createRP2Menu() |
|
50 |
|
51 self.__statusTranslations = { |
|
52 "picow": { |
|
53 -3: self.tr("authentication failed"), |
|
54 -2: self.tr("no matching access point found"), |
|
55 -1: self.tr("connection failed"), |
|
56 0: self.tr("idle"), |
|
57 1: self.tr("connecting"), |
|
58 2: self.tr("connected, waiting for IP address"), |
|
59 3: self.tr("connected"), |
|
60 }, |
|
61 "picowireless": { |
|
62 0: self.tr("idle"), |
|
63 1: self.tr("no matching access point found"), |
|
64 2: self.tr("network scan completed"), |
|
65 3: self.tr("connected"), |
|
66 4: self.tr("connection failed"), |
|
67 5: self.tr("connection lost"), |
|
68 6: self.tr("disconnected"), |
|
69 7: self.tr("AP listening"), |
|
70 8: self.tr("AP connected"), |
|
71 9: self.tr("AP failed"), |
|
72 }, |
|
73 "picowiz": { |
|
74 0: self.tr("switched off"), |
|
75 1: self.tr("switched on, inactive"), |
|
76 2: self.tr("switched on, active"), |
|
77 }, |
|
78 } |
|
79 |
|
80 self.__securityTranslations = { |
|
81 "picow": { |
|
82 0: self.tr("open", "open WiFi network"), |
|
83 1: "WEP", |
|
84 2: "WPA", |
|
85 3: "WPA2", |
|
86 4: "WPA/WPA2", |
|
87 5: "WPA2 (CCMP)", |
|
88 6: "WPA3", |
|
89 7: "WPA2/WPA3", |
|
90 }, |
|
91 "picowireless": { |
|
92 2: "WPA", |
|
93 4: "WPA2 (CCMP)", |
|
94 5: "WEP", |
|
95 7: self.tr("open", "open WiFi network"), |
|
96 8: self.tr("automatic"), |
|
97 }, |
|
98 } |
|
99 |
|
100 def setButtons(self): |
|
101 """ |
|
102 Public method to enable the supported action buttons. |
|
103 """ |
|
104 super().setButtons() |
|
105 |
|
106 self.microPython.setActionButtons( |
|
107 run=True, repl=True, files=True, chart=HAS_QTCHART |
|
108 ) |
|
109 |
|
110 def forceInterrupt(self): |
|
111 """ |
|
112 Public method to determine the need for an interrupt when opening the |
|
113 serial connection. |
|
114 |
|
115 @return flag indicating an interrupt is needed |
|
116 @rtype bool |
|
117 """ |
|
118 return False |
|
119 |
|
120 def deviceName(self): |
|
121 """ |
|
122 Public method to get the name of the device. |
|
123 |
|
124 @return name of the device |
|
125 @rtype str |
|
126 """ |
|
127 return self.tr("RP2040/RP2350") |
|
128 |
|
129 def canStartRepl(self): |
|
130 """ |
|
131 Public method to determine, if a REPL can be started. |
|
132 |
|
133 @return tuple containing a flag indicating it is safe to start a REPL |
|
134 and a reason why it cannot. |
|
135 @rtype tuple of (bool, str) |
|
136 """ |
|
137 return True, "" |
|
138 |
|
139 def canStartPlotter(self): |
|
140 """ |
|
141 Public method to determine, if a Plotter can be started. |
|
142 |
|
143 @return tuple containing a flag indicating it is safe to start a |
|
144 Plotter and a reason why it cannot. |
|
145 @rtype tuple of (bool, str) |
|
146 """ |
|
147 return True, "" |
|
148 |
|
149 def canRunScript(self): |
|
150 """ |
|
151 Public method to determine, if a script can be executed. |
|
152 |
|
153 @return tuple containing a flag indicating it is safe to start a |
|
154 Plotter and a reason why it cannot. |
|
155 @rtype tuple of (bool, str) |
|
156 """ |
|
157 return True, "" |
|
158 |
|
159 def runScript(self, script): |
|
160 """ |
|
161 Public method to run the given Python script. |
|
162 |
|
163 @param script script to be executed |
|
164 @type str |
|
165 """ |
|
166 pythonScript = script.split("\n") |
|
167 self.sendCommands(pythonScript) |
|
168 |
|
169 def canStartFileManager(self): |
|
170 """ |
|
171 Public method to determine, if a File Manager can be started. |
|
172 |
|
173 @return tuple containing a flag indicating it is safe to start a |
|
174 File Manager and a reason why it cannot. |
|
175 @rtype tuple of (bool, str) |
|
176 """ |
|
177 return True, "" |
|
178 |
|
179 def __createRP2Menu(self): |
|
180 """ |
|
181 Private method to create the RP2 submenu. |
|
182 """ |
|
183 self.__rp2Menu = QMenu(self.tr("RP2 Functions")) |
|
184 |
|
185 self.__showMpyAct = self.__rp2Menu.addAction( |
|
186 self.tr("Show MicroPython Versions"), self.__showFirmwareVersions |
|
187 ) |
|
188 self.__rp2Menu.addSeparator() |
|
189 self.__bootloaderAct = self.__rp2Menu.addAction( |
|
190 self.tr("Activate Bootloader"), self.__activateBootloader |
|
191 ) |
|
192 self.__flashMpyAct = self.__rp2Menu.addAction( |
|
193 self.tr("Flash MicroPython Firmware"), self.__flashPython |
|
194 ) |
|
195 self.__rp2Menu.addSeparator() |
|
196 self.__resetAct = self.__rp2Menu.addAction( |
|
197 self.tr("Reset Device"), self.__resetDevice |
|
198 ) |
|
199 |
|
200 def addDeviceMenuEntries(self, menu): |
|
201 """ |
|
202 Public method to add device specific entries to the given menu. |
|
203 |
|
204 @param menu reference to the context menu |
|
205 @type QMenu |
|
206 """ |
|
207 connected = self.microPython.isConnected() |
|
208 linkConnected = self.microPython.isLinkConnected() |
|
209 |
|
210 self.__showMpyAct.setEnabled(connected) |
|
211 self.__bootloaderAct.setEnabled(connected) |
|
212 self.__flashMpyAct.setEnabled(not linkConnected) |
|
213 self.__resetAct.setEnabled(connected) |
|
214 |
|
215 menu.addMenu(self.__rp2Menu) |
|
216 |
|
217 def hasFlashMenuEntry(self): |
|
218 """ |
|
219 Public method to check, if the device has its own flash menu entry. |
|
220 |
|
221 @return flag indicating a specific flash menu entry |
|
222 @rtype bool |
|
223 """ |
|
224 return True |
|
225 |
|
226 @pyqtSlot() |
|
227 def __flashPython(self): |
|
228 """ |
|
229 Private slot to flash a MicroPython firmware to the device. |
|
230 """ |
|
231 from ..UF2FlashDialog import UF2FlashDialog |
|
232 |
|
233 dlg = UF2FlashDialog(boardType="rp2") |
|
234 dlg.exec() |
|
235 |
|
236 @pyqtSlot() |
|
237 def __activateBootloader(self): |
|
238 """ |
|
239 Private slot to switch the board into 'bootloader' mode. |
|
240 """ |
|
241 if self.microPython.isConnected(): |
|
242 self.executeCommands( |
|
243 [ |
|
244 "import machine", |
|
245 "machine.bootloader()", |
|
246 ], |
|
247 mode=self._submitMode, |
|
248 ) |
|
249 # simulate pressing the disconnect button |
|
250 self.microPython.on_connectButton_clicked() |
|
251 |
|
252 @pyqtSlot() |
|
253 def __showFirmwareVersions(self): |
|
254 """ |
|
255 Private slot to show the firmware version of the connected device and the |
|
256 available firmware version. |
|
257 """ |
|
258 if self.microPython.isConnected(): |
|
259 if self._deviceData["mpy_name"] != "micropython": |
|
260 EricMessageBox.critical( |
|
261 None, |
|
262 self.tr("Show MicroPython Versions"), |
|
263 self.tr( |
|
264 """The firmware of the connected device cannot be""" |
|
265 """ determined or the board does not run MicroPython.""" |
|
266 """ Aborting...""" |
|
267 ), |
|
268 ) |
|
269 else: |
|
270 if self._deviceData["mpy_variant"] == "Pimoroni Pico": |
|
271 # MicroPython with Pimoroni add-on libraries |
|
272 url = QUrl(FirmwareGithubUrls["pimoroni_pico"]) |
|
273 else: |
|
274 url = QUrl(FirmwareGithubUrls["micropython"]) |
|
275 ui = ericApp().getObject("UserInterface") |
|
276 request = QNetworkRequest(url) |
|
277 reply = ui.networkAccessManager().head(request) |
|
278 reply.finished.connect(lambda: self.__firmwareVersionResponse(reply)) |
|
279 |
|
280 @pyqtSlot(QNetworkReply) |
|
281 def __firmwareVersionResponse(self, reply): |
|
282 """ |
|
283 Private slot handling the response of the latest version request. |
|
284 |
|
285 @param reply reference to the reply object |
|
286 @type QNetworkReply |
|
287 """ |
|
288 latestUrl = reply.url().toString() |
|
289 tag = latestUrl.rsplit("/", 1)[-1] |
|
290 while tag and not tag[0].isdecimal(): |
|
291 # get rid of leading non-decimal characters |
|
292 tag = tag[1:] |
|
293 latestVersion = EricUtilities.versionToTuple(tag) |
|
294 |
|
295 if self._deviceData["mpy_version"] == "unknown": |
|
296 currentVersionStr = self.tr("unknown") |
|
297 currentVersion = (0, 0, 0) |
|
298 else: |
|
299 currentVersionStr = ( |
|
300 self._deviceData["mpy_variant_version"] |
|
301 if bool(self._deviceData["mpy_variant_version"]) |
|
302 else self._deviceData["mpy_version"] |
|
303 ) |
|
304 currentVersion = EricUtilities.versionToTuple(currentVersionStr) |
|
305 |
|
306 msg = self.tr( |
|
307 "<h4>MicroPython Version Information</h4>" |
|
308 "<table>" |
|
309 "<tr><td>Installed:</td><td>{0}</td></tr>" |
|
310 "<tr><td>Available:</td><td>{1}</td></tr>" |
|
311 "{2}" |
|
312 "</table>" |
|
313 ).format( |
|
314 currentVersionStr, |
|
315 tag, |
|
316 ( |
|
317 self.tr("<tr><td>Variant:</td><td>{0}</td></tr>").format( |
|
318 self._deviceData["mpy_variant"] |
|
319 ) |
|
320 if self._deviceData["mpy_variant"] |
|
321 else "" |
|
322 ), |
|
323 ) |
|
324 if self._deviceData["mpy_variant"] in ["Pimoroni Pico"] and not bool( |
|
325 self._deviceData["mpy_variant_version"] |
|
326 ): |
|
327 # cannot derive update info |
|
328 msg += self.tr("<p>Update may be available.</p>") |
|
329 elif currentVersion < latestVersion: |
|
330 msg += self.tr("<p><b>Update available!</b></p>") |
|
331 |
|
332 EricMessageBox.information( |
|
333 None, |
|
334 self.tr("MicroPython Version"), |
|
335 msg, |
|
336 ) |
|
337 |
|
338 @pyqtSlot() |
|
339 def __resetDevice(self): |
|
340 """ |
|
341 Private slot to reset the connected device. |
|
342 """ |
|
343 if self.microPython.isConnected(): |
|
344 self.executeCommands( |
|
345 "import machine\nmachine.reset()\n", mode=self._submitMode |
|
346 ) |
|
347 |
|
348 def getDocumentationUrl(self): |
|
349 """ |
|
350 Public method to get the device documentation URL. |
|
351 |
|
352 @return documentation URL of the device |
|
353 @rtype str |
|
354 """ |
|
355 return Preferences.getMicroPython("MicroPythonDocuUrl") |
|
356 |
|
357 def getDownloadMenuEntries(self): |
|
358 """ |
|
359 Public method to retrieve the entries for the downloads menu. |
|
360 |
|
361 @return list of tuples with menu text and URL to be opened for each |
|
362 entry |
|
363 @rtype list of tuple of (str, str) |
|
364 """ |
|
365 return [ |
|
366 ( |
|
367 self.tr("MicroPython Firmware"), |
|
368 Preferences.getMicroPython("MicroPythonFirmwareUrl"), |
|
369 ), |
|
370 ("<separator>", ""), |
|
371 (self.tr("Pimoroni Pico Firmware"), FirmwareGithubUrls["pimoroni_pico"]), |
|
372 ("<separator>", ""), |
|
373 ( |
|
374 self.tr("CircuitPython Firmware"), |
|
375 Preferences.getMicroPython("CircuitPythonFirmwareUrl"), |
|
376 ), |
|
377 ( |
|
378 self.tr("CircuitPython Libraries"), |
|
379 Preferences.getMicroPython("CircuitPythonLibrariesUrl"), |
|
380 ), |
|
381 ] |
|
382 |
|
383 ################################################################## |
|
384 ## time related methods below |
|
385 ################################################################## |
|
386 |
|
387 def _getSetTimeCode(self): |
|
388 """ |
|
389 Protected method to get the device code to set the time. |
|
390 |
|
391 Note: This method must be implemented in the various device specific |
|
392 subclasses. |
|
393 |
|
394 @return code to be executed on the connected device to set the time |
|
395 @rtype str |
|
396 """ |
|
397 # rtc_time[0] - year 4 digit |
|
398 # rtc_time[1] - month 1..12 |
|
399 # rtc_time[2] - day 1..31 |
|
400 # rtc_time[3] - weekday 1..7 1=Monday |
|
401 # rtc_time[4] - hour 0..23 |
|
402 # rtc_time[5] - minute 0..59 |
|
403 # rtc_time[6] - second 0..59 |
|
404 # rtc_time[7] - yearday 1..366 |
|
405 # rtc_time[8] - isdst 0, 1, or -1 |
|
406 |
|
407 # The machine.rtc.datetime() function takes the arguments in the order: |
|
408 # (year, month, day, weekday, hour, minute, second, subseconds) |
|
409 # __IGNORE_WARNING_M891__ |
|
410 # https://docs.micropython.org/en/latest/library/machine.RTC.html#machine-rtc |
|
411 return """ |
|
412 def set_time(rtc_time): |
|
413 import machine |
|
414 rtc = machine.RTC() |
|
415 rtc.datetime(rtc_time[:7] + (0,)) |
|
416 """ |
|
417 |
|
418 ################################################################## |
|
419 ## Methods below implement WiFi related methods |
|
420 ################################################################## |
|
421 |
|
422 def addDeviceWifiEntries(self, menu): |
|
423 """ |
|
424 Public method to add device specific entries to the given menu. |
|
425 |
|
426 @param menu reference to the context menu |
|
427 @type QMenu |
|
428 """ |
|
429 menu.addSeparator() |
|
430 menu.addAction(self.tr("Set Country"), self.__setCountry).setEnabled( |
|
431 self._deviceData["wifi_type"] == "picow" |
|
432 ) |
|
433 menu.addAction(self.tr("Reset Country"), self.__resetCountry).setEnabled( |
|
434 self._deviceData["wifi_type"] == "picow" |
|
435 ) |
|
436 |
|
437 def hasWifi(self): |
|
438 """ |
|
439 Public method to check the availability of WiFi. |
|
440 |
|
441 @return tuple containing a flag indicating the availability of WiFi |
|
442 and the WiFi type (picow or picowireless) |
|
443 @rtype tuple of (bool, str) |
|
444 @exception OSError raised to indicate an issue with the device |
|
445 """ |
|
446 # picowireless: |
|
447 # It seems to take up to 20 sec to detect, that no Pico Wireless Pack is |
|
448 # attached. Therefore the command will timeout before. |
|
449 command = """ |
|
450 def has_wifi(): |
|
451 try: |
|
452 import network |
|
453 if hasattr(network, 'WLAN'): |
|
454 return True, 'picow' |
|
455 except ImportError: |
|
456 try: |
|
457 import picowireless as pw |
|
458 try: |
|
459 if pw.get_fw_version() != '': |
|
460 return True, 'picowireless' |
|
461 except RuntimeError: |
|
462 pw.init() |
|
463 return True, 'picowireless' |
|
464 except ImportError: |
|
465 pass |
|
466 |
|
467 return False, '' |
|
468 |
|
469 print(has_wifi()) |
|
470 del has_wifi |
|
471 """ |
|
472 out, err = self.executeCommands(command, mode=self._submitMode, timeout=20000) |
|
473 if err: |
|
474 if not err.startswith(b"Timeout "): |
|
475 raise OSError(self._shortError(err)) |
|
476 else: |
|
477 # pimoroni firmware loaded but no pico wireless present |
|
478 return False, "" |
|
479 return ast.literal_eval(out.decode("utf-8")) |
|
480 |
|
481 def hasWifiCountry(self): |
|
482 """ |
|
483 Public method to check, if the device has support to set the WiFi country. |
|
484 |
|
485 @return flag indicating the support of WiFi country |
|
486 @rtype bool |
|
487 """ |
|
488 return self._deviceData["wifi_type"] == "picow" |
|
489 |
|
490 def getWifiData(self): |
|
491 """ |
|
492 Public method to get data related to the current WiFi status. |
|
493 |
|
494 @return tuple of three dictionaries containing the WiFi status data |
|
495 for the WiFi client, access point and overall data |
|
496 @rtype tuple of (dict, dict, dict) |
|
497 @exception OSError raised to indicate an issue with the device |
|
498 """ |
|
499 if self._deviceData["wifi_type"] == "picow": |
|
500 command = """ |
|
501 def wifi_status(): |
|
502 import ubinascii |
|
503 import ujson |
|
504 import network |
|
505 import rp2 |
|
506 |
|
507 wifi = network.WLAN(network.STA_IF) |
|
508 station = { |
|
509 'active': wifi.active(), |
|
510 'connected': wifi.isconnected(), |
|
511 'status': wifi.status(), |
|
512 'ifconfig': wifi.ifconfig(), |
|
513 'mac': ubinascii.hexlify(wifi.config('mac'), ':').decode(), |
|
514 'channel': wifi.config('channel'), |
|
515 'txpower': wifi.config('txpower'), |
|
516 } |
|
517 print(ujson.dumps(station)) |
|
518 |
|
519 wifi = network.WLAN(network.AP_IF) |
|
520 ap = { |
|
521 'active': wifi.active(), |
|
522 'connected': wifi.isconnected(), |
|
523 'status': wifi.status(), |
|
524 'ifconfig': wifi.ifconfig(), |
|
525 'mac': ubinascii.hexlify(wifi.config('mac'), ':').decode(), |
|
526 'channel': wifi.config('channel'), |
|
527 'txpower': wifi.config('txpower'), |
|
528 'essid': wifi.config('essid'), |
|
529 } |
|
530 print(ujson.dumps(ap)) |
|
531 |
|
532 overall = { |
|
533 'active': station['active'] or ap['active'] |
|
534 } |
|
535 try: |
|
536 overall['country'] = network.country() |
|
537 except AttributeError: |
|
538 overall['country'] = rp2.country() |
|
539 try: |
|
540 overall['hostname'] = network.hostname() |
|
541 except AttributeError: |
|
542 pass |
|
543 print(ujson.dumps(overall)) |
|
544 |
|
545 wifi_status() |
|
546 del wifi_status |
|
547 """ |
|
548 elif self._deviceData["wifi_type"] == "picowireless": |
|
549 command = """ |
|
550 def wifi_status(): |
|
551 import picowireless as pw |
|
552 import ubinascii |
|
553 import ujson |
|
554 |
|
555 def ip_str(ip): |
|
556 return '.'.join(str(i) for i in ip) |
|
557 |
|
558 station = { |
|
559 'active': pw.get_connection_status() not in (0, 7, 8, 9), |
|
560 'connected': pw.get_connection_status() == 3, |
|
561 'status': pw.get_connection_status(), |
|
562 'ifconfig': ( |
|
563 ip_str(pw.get_ip_address()), |
|
564 ip_str(pw.get_subnet_mask()), |
|
565 ip_str(pw.get_gateway_ip()), |
|
566 '0.0.0.0' |
|
567 ), |
|
568 'mac': ubinascii.hexlify(pw.get_mac_address(), ':').decode(), |
|
569 } |
|
570 if station['connected']: |
|
571 station.update({ |
|
572 'ap_ssid': pw.get_current_ssid(), |
|
573 'ap_bssid': ubinascii.hexlify(pw.get_current_bssid(), ':'), |
|
574 'ap_rssi': pw.get_current_rssi(), |
|
575 'ap_security': pw.get_current_encryption_type(), |
|
576 }) |
|
577 print(ujson.dumps(station)) |
|
578 |
|
579 ap = { |
|
580 'active': pw.get_connection_status() in (7, 8, 9), |
|
581 'connected': pw.get_connection_status() == 8, |
|
582 'status': pw.get_connection_status(), |
|
583 'mac': ubinascii.hexlify(pw.get_mac_address(), ':').decode(), |
|
584 } |
|
585 if ap['active']: |
|
586 ap['essid'] = pw.get_current_ssid() |
|
587 ap['ifconfig'] = ( |
|
588 ip_str(pw.get_ip_address()), |
|
589 ip_str(pw.get_subnet_mask()), |
|
590 ip_str(pw.get_gateway_ip()), |
|
591 '0.0.0.0' |
|
592 ) |
|
593 print(ujson.dumps(ap)) |
|
594 |
|
595 overall = { |
|
596 'active': pw.get_connection_status() != 0 |
|
597 } |
|
598 print(ujson.dumps(overall)) |
|
599 |
|
600 wifi_status() |
|
601 del wifi_status |
|
602 """ |
|
603 else: |
|
604 return super().getWifiData() |
|
605 |
|
606 out, err = self.executeCommands(command, mode=self._submitMode) |
|
607 if err: |
|
608 raise OSError(self._shortError(err)) |
|
609 |
|
610 stationStr, apStr, overallStr = out.decode("utf-8").splitlines() |
|
611 station = json.loads(stationStr) |
|
612 ap = json.loads(apStr) |
|
613 overall = json.loads(overallStr) |
|
614 if "status" in station: |
|
615 # translate the numerical status to a string |
|
616 try: |
|
617 station["status"] = self.__statusTranslations[ |
|
618 self._deviceData["wifi_type"] |
|
619 ][station["status"]] |
|
620 except KeyError: |
|
621 station["status"] = str(station["status"]) |
|
622 if "status" in ap: |
|
623 # translate the numerical status to a string |
|
624 try: |
|
625 ap["status"] = self.__statusTranslations[self._deviceData["wifi_type"]][ |
|
626 ap["status"] |
|
627 ] |
|
628 except KeyError: |
|
629 ap["status"] = str(ap["status"]) |
|
630 if "ap_security" in station: |
|
631 # translate the numerical AP security to a string |
|
632 try: |
|
633 station["ap_security"] = self.__securityTranslations[ |
|
634 self._deviceData["wifi_type"] |
|
635 ][station["ap_security"]] |
|
636 except KeyError: |
|
637 station["ap_security"] = self.tr("unknown ({0})").format( |
|
638 station["ap_security"] |
|
639 ) |
|
640 return station, ap, overall |
|
641 |
|
642 def connectWifi(self, ssid, password, hostname): |
|
643 """ |
|
644 Public method to connect a device to a WiFi network. |
|
645 |
|
646 @param ssid name (SSID) of the WiFi network |
|
647 @type str |
|
648 @param password password needed to connect |
|
649 @type str |
|
650 @param hostname host name of the device |
|
651 @type str |
|
652 @return tuple containing the connection status and an error string |
|
653 @rtype tuple of (bool, str) |
|
654 """ |
|
655 if self._deviceData["wifi_type"] == "picow": |
|
656 country = Preferences.getMicroPython("WifiCountry").upper() |
|
657 command = """ |
|
658 def connect_wifi(ssid, password, hostname, country): |
|
659 import network |
|
660 import rp2 |
|
661 import ujson |
|
662 from time import sleep |
|
663 |
|
664 rp2.country(country) |
|
665 |
|
666 if hostname: |
|
667 try: |
|
668 network.hostname(hostname) |
|
669 except AttributeError: |
|
670 pass |
|
671 |
|
672 wifi = network.WLAN(network.STA_IF) |
|
673 wifi.active(False) |
|
674 wifi.active(True) |
|
675 wifi.connect(ssid, password) |
|
676 max_wait = 140 |
|
677 while max_wait: |
|
678 if wifi.status() < 0 or wifi.status() >= 3: |
|
679 break |
|
680 max_wait -= 1 |
|
681 sleep(0.1) |
|
682 status = wifi.status() |
|
683 print(ujson.dumps({{'connected': wifi.isconnected(), 'status': status}})) |
|
684 |
|
685 connect_wifi({0}, {1}, {2}, {3}) |
|
686 del connect_wifi |
|
687 """.format( |
|
688 repr(ssid), |
|
689 repr(password if password else ""), |
|
690 repr(hostname), |
|
691 repr(country if country else "XX"), |
|
692 ) |
|
693 elif self._deviceData["wifi_type"] == "picowireless": |
|
694 command = """ |
|
695 def connect_wifi(ssid, password): |
|
696 import picowireless as pw |
|
697 import ujson |
|
698 from time import sleep |
|
699 |
|
700 pw.init() |
|
701 if bool(password): |
|
702 pw.wifi_set_passphrase(ssid, password) |
|
703 else: |
|
704 pw.wifi_set_network(ssid) |
|
705 |
|
706 max_wait = 140 |
|
707 while max_wait: |
|
708 if pw.get_connection_status() == 3: |
|
709 break |
|
710 max_wait -= 1 |
|
711 sleep(0.1) |
|
712 status = pw.get_connection_status() |
|
713 if status == 3: |
|
714 pw.set_led(0, 64, 0) |
|
715 else: |
|
716 pw.set_led(64, 0, 0) |
|
717 print(ujson.dumps({{'connected': status == 3, 'status': status}})) |
|
718 |
|
719 connect_wifi({0}, {1}) |
|
720 del connect_wifi |
|
721 """.format( |
|
722 repr(ssid), |
|
723 repr(password if password else ""), |
|
724 ) |
|
725 else: |
|
726 return super().connectWifi(ssid, password, hostname) |
|
727 |
|
728 with EricOverrideCursor(): |
|
729 out, err = self.executeCommands( |
|
730 command, mode=self._submitMode, timeout=15000 |
|
731 ) |
|
732 if err: |
|
733 return False, err |
|
734 |
|
735 result = json.loads(out.decode("utf-8").strip()) |
|
736 if result["connected"]: |
|
737 error = "" |
|
738 else: |
|
739 try: |
|
740 error = self.__statusTranslations[self._deviceData["wifi_type"]][ |
|
741 result["status"] |
|
742 ] |
|
743 except KeyError: |
|
744 error = str(result["status"]) |
|
745 |
|
746 return result["connected"], error |
|
747 |
|
748 def disconnectWifi(self): |
|
749 """ |
|
750 Public method to disconnect a device from the WiFi network. |
|
751 |
|
752 @return tuple containing a flag indicating success and an error string |
|
753 @rtype tuple of (bool, str) |
|
754 """ |
|
755 if self._deviceData["wifi_type"] == "picow": |
|
756 command = """ |
|
757 def disconnect_wifi(): |
|
758 import network |
|
759 from time import sleep |
|
760 |
|
761 wifi = network.WLAN(network.STA_IF) |
|
762 wifi.disconnect() |
|
763 wifi.active(False) |
|
764 sleep(0.1) |
|
765 print(not wifi.isconnected()) |
|
766 |
|
767 disconnect_wifi() |
|
768 del disconnect_wifi |
|
769 """ |
|
770 elif self._deviceData["wifi_type"] == "picowireless": |
|
771 command = """ |
|
772 def disconnect_wifi(): |
|
773 import picowireless as pw |
|
774 from time import sleep |
|
775 |
|
776 pw.disconnect() |
|
777 sleep(0.1) |
|
778 print(pw.get_connection_status() != 3) |
|
779 pw.set_led(0, 0, 0) |
|
780 |
|
781 disconnect_wifi() |
|
782 del disconnect_wifi |
|
783 """ |
|
784 else: |
|
785 return super().disconnectWifi() |
|
786 |
|
787 out, err = self.executeCommands(command, mode=self._submitMode) |
|
788 if err: |
|
789 return False, err |
|
790 |
|
791 return out.decode("utf-8").strip() == "True", "" |
|
792 |
|
793 def isWifiClientConnected(self): |
|
794 """ |
|
795 Public method to check the WiFi connection status as client. |
|
796 |
|
797 @return flag indicating the WiFi connection status |
|
798 @rtype bool |
|
799 """ |
|
800 if self._deviceData["wifi_type"] == "picow": |
|
801 command = """ |
|
802 def wifi_connected(): |
|
803 import network |
|
804 |
|
805 wifi = network.WLAN(network.STA_IF) |
|
806 print(wifi.isconnected()) |
|
807 |
|
808 wifi_connected() |
|
809 del wifi_connected |
|
810 """ |
|
811 elif self._deviceData["wifi_type"] == "picowireless": |
|
812 command = """ |
|
813 def wifi_connected(): |
|
814 import picowireless as pw |
|
815 |
|
816 print(pw.get_connection_status() == 3) |
|
817 |
|
818 wifi_connected() |
|
819 del wifi_connected |
|
820 """ |
|
821 else: |
|
822 return super().isWifiClientConnected() |
|
823 |
|
824 out, err = self.executeCommands(command, mode=self._submitMode) |
|
825 if err: |
|
826 return False |
|
827 |
|
828 return out.strip() == b"True" |
|
829 |
|
830 def isWifiApConnected(self): |
|
831 """ |
|
832 Public method to check the WiFi connection status as access point. |
|
833 |
|
834 @return flag indicating the WiFi connection status |
|
835 @rtype bool |
|
836 """ |
|
837 if self._deviceData["wifi_type"] == "picow": |
|
838 command = """ |
|
839 def wifi_connected(): |
|
840 import network |
|
841 |
|
842 wifi = network.WLAN(network.AP_IF) |
|
843 print(wifi.isconnected()) |
|
844 |
|
845 wifi_connected() |
|
846 del wifi_connected |
|
847 """ |
|
848 elif self._deviceData["wifi_type"] == "picowireless": |
|
849 command = """ |
|
850 def wifi_connected(): |
|
851 import picowireless as pw |
|
852 |
|
853 print(pw.get_connection_status() == 8) |
|
854 |
|
855 wifi_connected() |
|
856 del wifi_connected |
|
857 """ |
|
858 else: |
|
859 return super().isWifiClientConnected() |
|
860 |
|
861 out, err = self.executeCommands(command, mode=self._submitMode) |
|
862 if err: |
|
863 return False |
|
864 |
|
865 return out.strip() == b"True" |
|
866 |
|
867 def writeCredentials(self, ssid, password, hostname, country): |
|
868 """ |
|
869 Public method to write the given credentials to the connected device and modify |
|
870 the start script to connect automatically. |
|
871 |
|
872 @param ssid SSID of the network to connect to |
|
873 @type str |
|
874 @param password password needed to authenticate |
|
875 @type str |
|
876 @param hostname host name of the device |
|
877 @type str |
|
878 @param country WiFi country code |
|
879 @type str |
|
880 @return tuple containing a flag indicating success and an error message |
|
881 @rtype tuple of (bool, str) |
|
882 """ |
|
883 command = """ |
|
884 def modify_boot(): |
|
885 add = True |
|
886 try: |
|
887 with open('/boot.py', 'r') as f: |
|
888 for ln in f.readlines(): |
|
889 if 'wifi_connect' in ln: |
|
890 add = False |
|
891 break |
|
892 except: |
|
893 pass |
|
894 if add: |
|
895 with open('/boot.py', 'a') as f: |
|
896 f.write('\\nimport wifi_connect\\n') |
|
897 print(True) |
|
898 |
|
899 modify_boot() |
|
900 del modify_boot |
|
901 """ |
|
902 |
|
903 if self._deviceData["wifi_type"] == "picow": |
|
904 secrets = ( |
|
905 "WIFI_SSID = {0}\nWIFI_KEY = {1}\nWIFI_COUNTRY={2}\n" |
|
906 "WIFI_HOSTNAME = {3}\n" |
|
907 ).format( |
|
908 repr(ssid), |
|
909 repr(password) if password else '""', |
|
910 repr(country.upper()) if country else '""', |
|
911 repr(hostname) if hostname else '""', |
|
912 ) |
|
913 wifiConnectFile = "picowWiFiConnect.py" |
|
914 else: |
|
915 secrets = "WIFI_SSID = {0}\nWIFI_KEY = {1}\n".format( |
|
916 repr(ssid), |
|
917 repr(password) if password else '""', |
|
918 ) |
|
919 if self._deviceData["wifi_type"] == "picowireless": |
|
920 wifiConnectFile = "pimoroniWiFiConnect.py" |
|
921 else: |
|
922 secrets += "WIFI_HOSTNAME = {0}\n".format( |
|
923 repr(hostname if hostname else '""') |
|
924 ) |
|
925 wifiConnectFile = "mpyWiFiConnect.py" |
|
926 try: |
|
927 # write secrets file |
|
928 self.putData("/secrets.py", secrets.encode("utf-8")) |
|
929 # copy auto-connect file |
|
930 self.put( |
|
931 os.path.join(os.path.dirname(__file__), "MCUScripts", wifiConnectFile), |
|
932 "/wifi_connect.py", |
|
933 ) |
|
934 except OSError as err: |
|
935 return False, str(err) |
|
936 |
|
937 # modify boot.py |
|
938 out, err = self.executeCommands(command, mode=self._submitMode) |
|
939 if err: |
|
940 return False, err |
|
941 |
|
942 return out.decode("utf-8").strip() == "True", "" |
|
943 |
|
944 def removeCredentials(self): |
|
945 """ |
|
946 Public method to remove the saved credentials from the connected device. |
|
947 |
|
948 @return tuple containing a flag indicating success and an error message |
|
949 @rtype tuple of (bool, str) |
|
950 """ |
|
951 try: |
|
952 self.rm("/secrets.py") |
|
953 except OSError as err: |
|
954 return False, str(err) |
|
955 |
|
956 return True, "" |
|
957 |
|
958 def checkInternet(self): |
|
959 """ |
|
960 Public method to check, if the internet can be reached. |
|
961 |
|
962 @return tuple containing a flag indicating reachability and an error string |
|
963 @rtype tuple of (bool, str) |
|
964 """ |
|
965 if self._deviceData["wifi_type"] == "picow": |
|
966 command = """ |
|
967 def check_internet(): |
|
968 import network |
|
969 import socket |
|
970 |
|
971 wifi = network.WLAN(network.STA_IF) |
|
972 if wifi.isconnected(): |
|
973 s = socket.socket() |
|
974 try: |
|
975 s.connect(socket.getaddrinfo('quad9.net', 443)[0][-1]) |
|
976 s.close() |
|
977 print(True) |
|
978 except: |
|
979 print(False) |
|
980 else: |
|
981 print(False) |
|
982 |
|
983 check_internet() |
|
984 del check_internet |
|
985 """ |
|
986 elif self._deviceData["wifi_type"] == "picowireless": |
|
987 command = """ |
|
988 def check_internet(): |
|
989 import picowireless as pw |
|
990 |
|
991 if pw.get_connection_status() == 3: |
|
992 res = pw.ping((9, 9, 9, 9), 300) |
|
993 print(res >= 0) |
|
994 else: |
|
995 print(False) |
|
996 |
|
997 check_internet() |
|
998 del check_internet |
|
999 """ |
|
1000 else: |
|
1001 return super().checkInternet() |
|
1002 |
|
1003 out, err = self.executeCommands(command, mode=self._submitMode, timeout=10000) |
|
1004 if err: |
|
1005 return False, err |
|
1006 |
|
1007 return out.decode("utf-8").strip() == "True", "" |
|
1008 |
|
1009 def scanNetworks(self): |
|
1010 """ |
|
1011 Public method to scan for available WiFi networks. |
|
1012 |
|
1013 @return tuple containing the list of available networks as a tuple of 'Name', |
|
1014 'MAC-Address', 'channel', 'RSSI' and 'security' and an error string |
|
1015 @rtype tuple of (list of tuple of (str, str, int, int, str), str) |
|
1016 """ |
|
1017 if self._deviceData["wifi_type"] == "picow": |
|
1018 country = Preferences.getMicroPython("WifiCountry").upper() |
|
1019 command = """ |
|
1020 def scan_networks(): |
|
1021 import network |
|
1022 import rp2 |
|
1023 |
|
1024 rp2.country({0}) |
|
1025 |
|
1026 wifi = network.WLAN(network.STA_IF) |
|
1027 active = wifi.active() |
|
1028 if not active: |
|
1029 wifi.active(True) |
|
1030 network_list = wifi.scan() |
|
1031 if not active: |
|
1032 wifi.active(False) |
|
1033 print(network_list) |
|
1034 |
|
1035 scan_networks() |
|
1036 del scan_networks |
|
1037 """.format( |
|
1038 repr(country if country else "XX") |
|
1039 ) |
|
1040 |
|
1041 elif self._deviceData["wifi_type"] == "picowireless": |
|
1042 command = """ |
|
1043 def scan_networks(): |
|
1044 import picowireless as pw |
|
1045 |
|
1046 network_list = [] |
|
1047 pw.init() |
|
1048 pw.start_scan_networks() |
|
1049 networks = pw.get_scan_networks() |
|
1050 for n in range(networks): |
|
1051 network_list.append(( |
|
1052 pw.get_ssid_networks(n), |
|
1053 pw.get_bssid_networks(n), |
|
1054 pw.get_channel_networks(n), |
|
1055 pw.get_rssi_networks(n), |
|
1056 pw.get_enc_type_networks(n), |
|
1057 )) |
|
1058 print(network_list) |
|
1059 |
|
1060 scan_networks() |
|
1061 del scan_networks |
|
1062 """ |
|
1063 else: |
|
1064 return super().scanNetworks() |
|
1065 |
|
1066 out, err = self.executeCommands(command, mode=self._submitMode, timeout=15000) |
|
1067 if err: |
|
1068 return [], err |
|
1069 |
|
1070 networksList = ast.literal_eval(out.decode("utf-8")) |
|
1071 networks = [] |
|
1072 for network in networksList: |
|
1073 if network[0]: |
|
1074 ssid = ( |
|
1075 network[0].decode("utf-8") |
|
1076 if isinstance(network[0], bytes) |
|
1077 else network[0] |
|
1078 ) |
|
1079 mac = ( |
|
1080 binascii.hexlify(network[1], ":").decode("utf-8") |
|
1081 if network[1] is not None |
|
1082 else "" |
|
1083 ) |
|
1084 channel = network[2] |
|
1085 rssi = network[3] |
|
1086 try: |
|
1087 security = self.__securityTranslations[ |
|
1088 self._deviceData["wifi_type"] |
|
1089 ][network[4]] |
|
1090 except KeyError: |
|
1091 security = self.tr("unknown ({0})").format(network[4]) |
|
1092 networks.append((ssid, mac, channel, rssi, security)) |
|
1093 |
|
1094 return networks, "" |
|
1095 |
|
1096 def deactivateInterface(self, interface): |
|
1097 """ |
|
1098 Public method to deactivate a given WiFi interface of the connected device. |
|
1099 |
|
1100 @param interface designation of the interface to be deactivated (one of 'AP' |
|
1101 or 'STA') |
|
1102 @type str |
|
1103 @return tuple containg a flag indicating success and an error message |
|
1104 @rtype tuple of (bool, str) |
|
1105 @exception ValueError raised to indicate a wrong value for the interface type |
|
1106 """ |
|
1107 if interface not in ("STA", "AP"): |
|
1108 raise ValueError( |
|
1109 "interface must be 'AP' or 'STA', got '{0}'".format(interface) |
|
1110 ) |
|
1111 |
|
1112 if self._deviceData["wifi_type"] == "picow": |
|
1113 command = """ |
|
1114 def deactivate(): |
|
1115 import network |
|
1116 from time import sleep |
|
1117 |
|
1118 wifi = network.WLAN(network.{0}_IF) |
|
1119 wifi.active(False) |
|
1120 sleep(0.1) |
|
1121 print(not wifi.active()) |
|
1122 |
|
1123 deactivate() |
|
1124 del deactivate |
|
1125 """.format( |
|
1126 interface |
|
1127 ) |
|
1128 elif self._deviceData["wifi_type"] == "picowireless": |
|
1129 command = """ |
|
1130 def deactivate(): |
|
1131 import picowireless as pw |
|
1132 |
|
1133 pw.init() |
|
1134 print(True) |
|
1135 |
|
1136 deactivate() |
|
1137 del deactivate |
|
1138 """ |
|
1139 else: |
|
1140 return super().deactivateInterface(interface) |
|
1141 |
|
1142 out, err = self.executeCommands(command, mode=self._submitMode) |
|
1143 if err: |
|
1144 return False, err |
|
1145 else: |
|
1146 return out.decode("utf-8").strip() == "True", "" |
|
1147 |
|
1148 def startAccessPoint( |
|
1149 self, |
|
1150 ssid, |
|
1151 security=None, |
|
1152 password=None, |
|
1153 hostname=None, |
|
1154 ifconfig=None, |
|
1155 ): |
|
1156 """ |
|
1157 Public method to start the access point interface. |
|
1158 |
|
1159 @param ssid SSID of the access point |
|
1160 @type str |
|
1161 @param security security method (defaults to None) |
|
1162 @type int (optional) |
|
1163 @param password password (defaults to None) |
|
1164 @type str (optional) |
|
1165 @param hostname host name of the device (defaults to None) |
|
1166 @type str (optional) |
|
1167 @param ifconfig IPv4 configuration for the access point if not default |
|
1168 (IPv4 address, netmask, gateway address, DNS server address) |
|
1169 @type tuple of (str, str, str, str) |
|
1170 @return tuple containing a flag indicating success and an error message |
|
1171 @rtype tuple of (bool, str) |
|
1172 """ |
|
1173 if security is None or password is None: |
|
1174 security = 0 |
|
1175 password = "" # secok |
|
1176 |
|
1177 if self._deviceData["wifi_type"] == "picow": |
|
1178 country = Preferences.getMicroPython("WifiCountry").upper() |
|
1179 if security: |
|
1180 security = 4 # Pico W supports just WPA/WPA2 |
|
1181 command = """ |
|
1182 def start_ap(ssid, security, password, hostname, ifconfig, country): |
|
1183 import network |
|
1184 import rp2 |
|
1185 from time import sleep |
|
1186 |
|
1187 rp2.country(country) |
|
1188 |
|
1189 if hostname: |
|
1190 try: |
|
1191 network.hostname(hostname) |
|
1192 except AttributeError: |
|
1193 pass |
|
1194 |
|
1195 ap = network.WLAN(network.AP_IF) |
|
1196 ap.active(True) |
|
1197 if ifconfig: |
|
1198 ap.ifconfig(ifconfig) |
|
1199 ap.config(ssid=ssid, security=security, password=password) |
|
1200 sleep(0.1) |
|
1201 print(ap.isconnected()) |
|
1202 |
|
1203 start_ap({0}, {1}, {2}, {3}, {4}, {5}) |
|
1204 del start_ap |
|
1205 """.format( |
|
1206 repr(ssid), |
|
1207 security, |
|
1208 repr(password), |
|
1209 repr(hostname), |
|
1210 ifconfig, |
|
1211 repr(country if country else "XX"), |
|
1212 ) |
|
1213 elif self._deviceData["wifi_type"] == "picowireless": |
|
1214 if ifconfig: |
|
1215 return ( |
|
1216 False, |
|
1217 self.tr( |
|
1218 "Pico Wireless does not support setting the IPv4 parameters of" |
|
1219 " the WiFi access point." |
|
1220 ), |
|
1221 ) |
|
1222 |
|
1223 # AP is fixed at channel 6 |
|
1224 command = """ |
|
1225 def start_ap(ssid, password): |
|
1226 import picowireless as pw |
|
1227 |
|
1228 pw.init() |
|
1229 if bool(password): |
|
1230 res = pw.wifi_set_ap_passphrase(ssid, password, 6) |
|
1231 else: |
|
1232 res = pw.wifi_set_ap_network(ssid, 6) |
|
1233 status = pw.get_connection_status() |
|
1234 if status in (7, 8): |
|
1235 pw.set_led(0, 64, 0) |
|
1236 else: |
|
1237 pw.set_led(64, 0, 0) |
|
1238 print(res >= 0) |
|
1239 |
|
1240 start_ap({0}, {1}) |
|
1241 del start_ap |
|
1242 """.format( |
|
1243 repr(ssid), |
|
1244 repr(password if password else ""), |
|
1245 ) |
|
1246 else: |
|
1247 return super().startAccessPoint( |
|
1248 ssid, |
|
1249 security=security, |
|
1250 password=password, |
|
1251 hostname=hostname, |
|
1252 ifconfig=ifconfig, |
|
1253 ) |
|
1254 |
|
1255 out, err = self.executeCommands(command, mode=self._submitMode, timeout=15000) |
|
1256 if err: |
|
1257 return False, err |
|
1258 else: |
|
1259 return out.decode("utf-8").strip() == "True", "" |
|
1260 |
|
1261 def stopAccessPoint(self): |
|
1262 """ |
|
1263 Public method to stop the access point interface. |
|
1264 |
|
1265 @return tuple containg a flag indicating success and an error message |
|
1266 @rtype tuple of (bool, str) |
|
1267 """ |
|
1268 if self._deviceData["wifi_type"] in ("picow", "picowireless"): |
|
1269 return self.deactivateInterface("AP") |
|
1270 else: |
|
1271 return super().stopAccessPoint() |
|
1272 |
|
1273 def getConnectedClients(self): |
|
1274 """ |
|
1275 Public method to get a list of connected clients. |
|
1276 |
|
1277 @return a tuple containing a list of tuples containing the client MAC-Address |
|
1278 and the RSSI (if supported and available) and an error message |
|
1279 @rtype tuple of ([(bytes, int)], str) |
|
1280 """ |
|
1281 if self._deviceData["wifi_type"] == "picow": |
|
1282 command = """ |
|
1283 def get_stations(): |
|
1284 import network |
|
1285 |
|
1286 ap = network.WLAN(network.AP_IF) |
|
1287 stations = ap.status('stations') |
|
1288 print(stations) |
|
1289 |
|
1290 get_stations() |
|
1291 del get_stations |
|
1292 """ |
|
1293 elif self._deviceData["wifi_type"] == "picowireless": |
|
1294 return ( |
|
1295 [], |
|
1296 self.tr( |
|
1297 "Pico Wireless does not support reporting of connected clients." |
|
1298 ), |
|
1299 ) |
|
1300 else: |
|
1301 return super().checkInternet() |
|
1302 |
|
1303 out, err = self.executeCommands(command, mode=self._submitMode, timeout=10000) |
|
1304 if err: |
|
1305 return [], err |
|
1306 |
|
1307 clientsList = ast.literal_eval(out.decode("utf-8")) |
|
1308 return clientsList, "" |
|
1309 |
|
1310 def enableWebrepl(self, password): |
|
1311 """ |
|
1312 Public method to write the given WebREPL password to the connected device and |
|
1313 modify the start script to start the WebREPL server. |
|
1314 |
|
1315 @param password password needed to authenticate |
|
1316 @type str |
|
1317 @return tuple containing a flag indicating success and an error message |
|
1318 @rtype tuple of (bool, str) |
|
1319 """ |
|
1320 command = """ |
|
1321 def modify_boot(): |
|
1322 import os |
|
1323 |
|
1324 try: |
|
1325 with open('/boot.py', 'r') as old_f, open('/boot.py.tmp', 'w') as new_f: |
|
1326 found = False |
|
1327 for l in old_f.read().splitlines(): |
|
1328 if 'webrepl' in l: |
|
1329 found = True |
|
1330 if l.startswith('#'): |
|
1331 l = l[1:] |
|
1332 new_f.write(l + '\\n') |
|
1333 if not found: |
|
1334 new_f.write('\\nimport webrepl\\nwebrepl.start()\\n') |
|
1335 |
|
1336 os.remove('/boot.py') |
|
1337 os.rename('/boot.py.tmp', '/boot.py') |
|
1338 except: |
|
1339 pass |
|
1340 |
|
1341 print(True) |
|
1342 |
|
1343 modify_boot() |
|
1344 del modify_boot |
|
1345 """ |
|
1346 |
|
1347 if self._deviceData["wifi_type"] == "picow": |
|
1348 config = "PASS = {0}\n".format(repr(password)) |
|
1349 else: |
|
1350 return False, self.tr("WebREPL is not supported on this device.") |
|
1351 |
|
1352 try: |
|
1353 # write config file |
|
1354 self.putData("/webrepl_cfg.py", config.encode("utf-8")) |
|
1355 except OSError as err: |
|
1356 return False, str(err) |
|
1357 |
|
1358 # modify boot.py |
|
1359 out, err = self.executeCommands(command, mode=self._submitMode) |
|
1360 if err: |
|
1361 return False, err |
|
1362 |
|
1363 return out.decode("utf-8").strip() == "True", "" |
|
1364 |
|
1365 def disableWebrepl(self): |
|
1366 """ |
|
1367 Public method to write the given WebREPL password to the connected device and |
|
1368 modify the start script to start the WebREPL server. |
|
1369 |
|
1370 @return tuple containing a flag indicating success and an error message |
|
1371 @rtype tuple of (bool, str) |
|
1372 """ |
|
1373 command = """ |
|
1374 def modify_boot(): |
|
1375 import os |
|
1376 |
|
1377 try: |
|
1378 with open('/boot.py', 'r') as old_f, open('/boot.py.tmp', 'w') as new_f: |
|
1379 for l in old_f.read().splitlines(): |
|
1380 if 'webrepl' in l: |
|
1381 if not l.startswith('#'): |
|
1382 l = '#' + l |
|
1383 new_f.write(l + '\\n') |
|
1384 |
|
1385 os.remove('/boot.py') |
|
1386 os.rename('/boot.py.tmp', '/boot.py') |
|
1387 except: |
|
1388 pass |
|
1389 |
|
1390 print(True) |
|
1391 |
|
1392 modify_boot() |
|
1393 del modify_boot |
|
1394 """ |
|
1395 |
|
1396 # modify boot.py |
|
1397 out, err = self.executeCommands(command, mode=self._submitMode) |
|
1398 if err: |
|
1399 return False, err |
|
1400 |
|
1401 return out.decode("utf-8").strip() == "True", "" |
|
1402 |
|
1403 @pyqtSlot() |
|
1404 def __setCountry(self): |
|
1405 """ |
|
1406 Private slot to configure the country of the connected device. |
|
1407 |
|
1408 The country is the two-letter ISO 3166-1 Alpha-2 country code. |
|
1409 """ |
|
1410 from ..WifiDialogs.WifiCountryDialog import WifiCountryDialog |
|
1411 |
|
1412 dlg = WifiCountryDialog() |
|
1413 if dlg.exec() == QDialog.DialogCode.Accepted: |
|
1414 country, remember = dlg.getCountry() |
|
1415 if remember: |
|
1416 Preferences.setMicroPython("WifiCountry", country) |
|
1417 |
|
1418 command = """ |
|
1419 try: |
|
1420 import network |
|
1421 network.country({0}) |
|
1422 except AttributeError: |
|
1423 import rp2 |
|
1424 rp2.country({0}) |
|
1425 """.format( |
|
1426 repr(country) |
|
1427 ) |
|
1428 |
|
1429 out, err = self.executeCommands(command, mode=self._submitMode) |
|
1430 if err: |
|
1431 self.microPython.showError("country()", err) |
|
1432 |
|
1433 @pyqtSlot() |
|
1434 def __resetCountry(self): |
|
1435 """ |
|
1436 Private slot to reset the country of the connected ESP32 device. |
|
1437 |
|
1438 The country is the two-letter ISO 3166-1 Alpha-2 country code. This method |
|
1439 resets it to the default code 'XX' representing the "worldwide" region. |
|
1440 """ |
|
1441 command = """ |
|
1442 try: |
|
1443 import network |
|
1444 network.country('XX') |
|
1445 except AttributeError: |
|
1446 pass |
|
1447 """ |
|
1448 |
|
1449 out, err = self.executeCommands(command, mode=self._submitMode) |
|
1450 if err: |
|
1451 self.microPython.showError("country()", err) |
|
1452 |
|
1453 ################################################################## |
|
1454 ## Methods below implement Bluetooth related methods |
|
1455 ################################################################## |
|
1456 |
|
1457 def hasBluetooth(self): |
|
1458 """ |
|
1459 Public method to check the availability of Bluetooth. |
|
1460 |
|
1461 @return flag indicating the availability of Bluetooth |
|
1462 @rtype bool |
|
1463 @exception OSError raised to indicate an issue with the device |
|
1464 """ |
|
1465 command = """ |
|
1466 def has_bt(): |
|
1467 try: |
|
1468 import bluetooth |
|
1469 if hasattr(bluetooth, 'BLE'): |
|
1470 return True |
|
1471 except ImportError: |
|
1472 pass |
|
1473 |
|
1474 return False |
|
1475 |
|
1476 print(has_bt()) |
|
1477 del has_bt |
|
1478 """ |
|
1479 out, err = self.executeCommands(command, mode=self._submitMode, timeout=10000) |
|
1480 if err: |
|
1481 raise OSError(self._shortError(err)) |
|
1482 return out.strip() == b"True" |
|
1483 |
|
1484 def getBluetoothStatus(self): |
|
1485 """ |
|
1486 Public method to get Bluetooth status data of the connected board. |
|
1487 |
|
1488 @return list of tuples containing the translated status data label and |
|
1489 the associated value |
|
1490 @rtype list of tuples of (str, str) |
|
1491 @exception OSError raised to indicate an issue with the device |
|
1492 """ |
|
1493 command = """ |
|
1494 def ble_status(): |
|
1495 import bluetooth |
|
1496 import ubinascii |
|
1497 import ujson |
|
1498 |
|
1499 ble = bluetooth.BLE() |
|
1500 |
|
1501 ble_active = ble.active() |
|
1502 if not ble_active: |
|
1503 ble.active(True) |
|
1504 |
|
1505 res = { |
|
1506 'active': ble_active, |
|
1507 'mac': ubinascii.hexlify(ble.config('mac')[1], ':').decode(), |
|
1508 'addr_type': ble.config('mac')[0], |
|
1509 'name': ble.config('gap_name'), |
|
1510 'mtu': ble.config('mtu'), |
|
1511 } |
|
1512 |
|
1513 if not ble_active: |
|
1514 ble.active(False) |
|
1515 |
|
1516 print(ujson.dumps(res)) |
|
1517 |
|
1518 ble_status() |
|
1519 del ble_status |
|
1520 """ |
|
1521 out, err = self.executeCommands(command, mode=self._submitMode) |
|
1522 if err: |
|
1523 raise OSError(self._shortError(err)) |
|
1524 |
|
1525 status = [] |
|
1526 bleStatus = json.loads(out.decode("utf-8")) |
|
1527 status.append((self.tr("Active"), self.bool2str(bleStatus["active"]))) |
|
1528 status.append((self.tr("Name"), bleStatus["name"])) |
|
1529 status.append((self.tr("MAC-Address"), bleStatus["mac"])) |
|
1530 status.append( |
|
1531 ( |
|
1532 self.tr("Address Type"), |
|
1533 self.tr("Public") if bleStatus == 0 else self.tr("Random"), |
|
1534 ) |
|
1535 ) |
|
1536 status.append((self.tr("MTU"), self.tr("{0} Bytes").format(bleStatus["mtu"]))) |
|
1537 |
|
1538 return status |
|
1539 |
|
1540 def activateBluetoothInterface(self): |
|
1541 """ |
|
1542 Public method to activate the Bluetooth interface. |
|
1543 |
|
1544 @return flag indicating the new state of the Bluetooth interface |
|
1545 @rtype bool |
|
1546 @exception OSError raised to indicate an issue with the device |
|
1547 """ |
|
1548 command = """ |
|
1549 def activate_ble(): |
|
1550 import bluetooth |
|
1551 |
|
1552 ble = bluetooth.BLE() |
|
1553 if not ble.active(): |
|
1554 ble.active(True) |
|
1555 print(ble.active()) |
|
1556 |
|
1557 activate_ble() |
|
1558 del activate_ble |
|
1559 """ |
|
1560 out, err = self.executeCommands(command, mode=self._submitMode) |
|
1561 if err: |
|
1562 raise OSError(self._shortError(err)) |
|
1563 |
|
1564 return out.strip() == b"True" |
|
1565 |
|
1566 def deactivateBluetoothInterface(self): |
|
1567 """ |
|
1568 Public method to deactivate the Bluetooth interface. |
|
1569 |
|
1570 @return flag indicating the new state of the Bluetooth interface |
|
1571 @rtype bool |
|
1572 @exception OSError raised to indicate an issue with the device |
|
1573 """ |
|
1574 command = """ |
|
1575 def deactivate_ble(): |
|
1576 import bluetooth |
|
1577 |
|
1578 ble = bluetooth.BLE() |
|
1579 if ble.active(): |
|
1580 ble.active(False) |
|
1581 print(ble.active()) |
|
1582 |
|
1583 deactivate_ble() |
|
1584 del deactivate_ble |
|
1585 """ |
|
1586 out, err = self.executeCommands(command, mode=self._submitMode) |
|
1587 if err: |
|
1588 raise OSError(self._shortError(err)) |
|
1589 |
|
1590 return out.strip() == b"True" |
|
1591 |
|
1592 def getDeviceScan(self, timeout=10): |
|
1593 """ |
|
1594 Public method to perform a Bluetooth device scan. |
|
1595 |
|
1596 @param timeout duration of the device scan in seconds (defaults |
|
1597 to 10) |
|
1598 @type int (optional) |
|
1599 @return tuple containing a dictionary with the scan results and |
|
1600 an error string |
|
1601 @rtype tuple of (dict, str) |
|
1602 """ |
|
1603 from ..BluetoothDialogs.BluetoothAdvertisement import BluetoothAdvertisement |
|
1604 |
|
1605 command = """ |
|
1606 _scan_done = False |
|
1607 |
|
1608 def ble_scan(): |
|
1609 import bluetooth |
|
1610 import time |
|
1611 import ubinascii |
|
1612 |
|
1613 IRQ_SCAN_RESULT = 5 |
|
1614 IRQ_SCAN_DONE = 6 |
|
1615 |
|
1616 def _bleIrq(event, data): |
|
1617 global _scan_done |
|
1618 if event == IRQ_SCAN_RESULT: |
|
1619 addr_type, addr, adv_type, rssi, adv_data = data |
|
1620 if addr: |
|
1621 print({{ |
|
1622 'address': ubinascii.hexlify(addr,':').decode('utf-8'), |
|
1623 'rssi': rssi, |
|
1624 'adv_type': adv_type, |
|
1625 'advertisement': bytes(adv_data), |
|
1626 }}) |
|
1627 elif event == IRQ_SCAN_DONE: |
|
1628 _scan_done = True |
|
1629 |
|
1630 ble = bluetooth.BLE() |
|
1631 |
|
1632 ble_active = ble.active() |
|
1633 if not ble_active: |
|
1634 ble.active(True) |
|
1635 |
|
1636 ble.irq(_bleIrq) |
|
1637 ble.gap_scan({0} * 1000, 1000000, 50000, True) |
|
1638 while not _scan_done: |
|
1639 time.sleep(0.2) |
|
1640 |
|
1641 if not ble_active: |
|
1642 ble.active(False) |
|
1643 |
|
1644 ble_scan() |
|
1645 del ble_scan, _scan_done |
|
1646 """.format( |
|
1647 timeout |
|
1648 ) |
|
1649 out, err = self.executeCommands( |
|
1650 command, mode=self._submitMode, timeout=(timeout + 5) * 1000 |
|
1651 ) |
|
1652 if err: |
|
1653 return {}, err |
|
1654 |
|
1655 scanResults = {} |
|
1656 for line in out.decode("utf-8").splitlines(): |
|
1657 res = ast.literal_eval(line) |
|
1658 address = res["address"] |
|
1659 if address not in scanResults: |
|
1660 scanResults[address] = BluetoothAdvertisement(address) |
|
1661 scanResults[address].update( |
|
1662 res["adv_type"], res["rssi"], res["advertisement"] |
|
1663 ) |
|
1664 |
|
1665 return scanResults, "" |
|
1666 |
|
1667 ################################################################## |
|
1668 ## Methods below implement Ethernet related methods |
|
1669 ################################################################## |
|
1670 |
|
1671 def hasEthernet(self): |
|
1672 """ |
|
1673 Public method to check the availability of Ethernet. |
|
1674 |
|
1675 @return tuple containing a flag indicating the availability of Ethernet |
|
1676 and the Ethernet type (picowiz) |
|
1677 @rtype tuple of (bool, str) |
|
1678 @exception OSError raised to indicate an issue with the device |
|
1679 """ |
|
1680 command = """ |
|
1681 def has_eth(): |
|
1682 try: |
|
1683 import network |
|
1684 if hasattr(network, 'WIZNET5K'): |
|
1685 return True, 'picowiz' |
|
1686 except ImportError: |
|
1687 pass |
|
1688 |
|
1689 return False, '' |
|
1690 |
|
1691 print(has_eth()) |
|
1692 del has_eth |
|
1693 """ |
|
1694 |
|
1695 out, err = self.executeCommands(command, mode=self._submitMode, timeout=10000) |
|
1696 if err: |
|
1697 raise OSError(self._shortError(err)) |
|
1698 |
|
1699 return ast.literal_eval(out.decode("utf-8")) |
|
1700 |
|
1701 def getEthernetStatus(self): |
|
1702 """ |
|
1703 Public method to get Ethernet status data of the connected board. |
|
1704 |
|
1705 @return list of tuples containing the translated status data label and |
|
1706 the associated value |
|
1707 @rtype list of tuples of (str, str) |
|
1708 @exception OSError raised to indicate an issue with the device |
|
1709 """ |
|
1710 command = """{0} |
|
1711 def ethernet_status(): |
|
1712 import network |
|
1713 import ubinascii |
|
1714 import ujson |
|
1715 |
|
1716 w5x00_init() |
|
1717 |
|
1718 res = {{ |
|
1719 'active': nic.active(), |
|
1720 'connected': nic.isconnected(), |
|
1721 'status': nic.status(), |
|
1722 'ifconfig': nic.ifconfig(), |
|
1723 'mac': ubinascii.hexlify(nic.config('mac'), ':').decode(), |
|
1724 }} |
|
1725 try: |
|
1726 res['hostname'] = network.hostname() |
|
1727 except AttributeError: |
|
1728 res['hostname'] = '' |
|
1729 print(ujson.dumps(res)) |
|
1730 |
|
1731 ethernet_status() |
|
1732 del ethernet_status, w5x00_init |
|
1733 """.format( |
|
1734 WiznetUtilities.mpyWiznetInit() |
|
1735 ) |
|
1736 |
|
1737 out, err = self.executeCommands(command, mode=self._submitMode) |
|
1738 if err: |
|
1739 raise OSError(self._shortError(err)) |
|
1740 |
|
1741 status = [] |
|
1742 ethStatus = json.loads(out.decode("utf-8")) |
|
1743 status.append((self.tr("Active"), self.bool2str(ethStatus["active"]))) |
|
1744 status.append((self.tr("Connected"), self.bool2str(ethStatus["connected"]))) |
|
1745 status.append( |
|
1746 ( |
|
1747 self.tr("Status"), |
|
1748 self.__statusTranslations["picowiz"][ethStatus["status"]], |
|
1749 ) |
|
1750 ) |
|
1751 status.append( |
|
1752 ( |
|
1753 self.tr("Hostname"), |
|
1754 ethStatus["hostname"] if ethStatus["hostname"] else self.tr("unknown"), |
|
1755 ) |
|
1756 ) |
|
1757 status.append((self.tr("IPv4 Address"), ethStatus["ifconfig"][0])) |
|
1758 status.append((self.tr("Netmask"), ethStatus["ifconfig"][1])) |
|
1759 status.append((self.tr("Gateway"), ethStatus["ifconfig"][2])) |
|
1760 status.append((self.tr("DNS"), ethStatus["ifconfig"][3])) |
|
1761 status.append((self.tr("MAC-Address"), ethStatus["mac"])) |
|
1762 |
|
1763 return status |
|
1764 |
|
1765 def connectToLan(self, config, hostname): |
|
1766 """ |
|
1767 Public method to connect the connected device to the LAN. |
|
1768 |
|
1769 @param config configuration for the connection (either the string 'dhcp' |
|
1770 for a dynamic address or a tuple of four strings with the IPv4 parameters. |
|
1771 @type str or tuple of (str, str, str, str) |
|
1772 @param hostname host name of the device |
|
1773 @type str |
|
1774 @return tuple containing a flag indicating success and an error message |
|
1775 @rtype tuple of (bool, str) |
|
1776 """ |
|
1777 command = """{0} |
|
1778 def connect_lan(config, hostname): |
|
1779 import network |
|
1780 import time |
|
1781 |
|
1782 if hostname: |
|
1783 try: |
|
1784 network.hostname(hostname) |
|
1785 except AttributeError: |
|
1786 pass |
|
1787 |
|
1788 w5x00_init() |
|
1789 |
|
1790 nic.active(False) |
|
1791 nic.active(True) |
|
1792 nic.ifconfig(config) |
|
1793 max_wait = 140 |
|
1794 while max_wait: |
|
1795 if nic.isconnected(): |
|
1796 break |
|
1797 max_wait -= 1 |
|
1798 time.sleep(0.1) |
|
1799 print(nic.isconnected()) |
|
1800 |
|
1801 connect_lan({1}, {2}) |
|
1802 del connect_lan, w5x00_init |
|
1803 """.format( |
|
1804 WiznetUtilities.mpyWiznetInit(), |
|
1805 "'dhcp'" if config == "dhcp" else config, |
|
1806 repr(hostname) if hostname else "''", |
|
1807 ) |
|
1808 |
|
1809 with EricOverrideCursor(): |
|
1810 out, err = self.executeCommands( |
|
1811 command, mode=self._submitMode, timeout=15000 |
|
1812 ) |
|
1813 if err: |
|
1814 return False, err |
|
1815 |
|
1816 return out.strip() == b"True", "" |
|
1817 |
|
1818 def disconnectFromLan(self): |
|
1819 """ |
|
1820 Public method to disconnect from the LAN. |
|
1821 |
|
1822 @return tuple containing a flag indicating success and an error message |
|
1823 @rtype tuple of (bool, str) |
|
1824 """ |
|
1825 command = """{0} |
|
1826 def disconnect_lan(): |
|
1827 import time |
|
1828 |
|
1829 w5x00_init() |
|
1830 |
|
1831 nic.active(False) |
|
1832 time.sleep(0.1) |
|
1833 print(not nic.isconnected()) |
|
1834 |
|
1835 disconnect_lan() |
|
1836 del disconnect_lan, w5x00_init |
|
1837 """.format( |
|
1838 WiznetUtilities.mpyWiznetInit(), |
|
1839 ) |
|
1840 |
|
1841 with EricOverrideCursor(): |
|
1842 out, err = self.executeCommands( |
|
1843 command, mode=self._submitMode, timeout=15000 |
|
1844 ) |
|
1845 if err: |
|
1846 return False, err |
|
1847 |
|
1848 return out.strip() == b"True", "" |
|
1849 |
|
1850 def isLanConnected(self): |
|
1851 """ |
|
1852 Public method to check the LAN connection status. |
|
1853 |
|
1854 @return flag indicating that the device is connected to the LAN |
|
1855 @rtype bool |
|
1856 """ |
|
1857 command = """{0} |
|
1858 def is_connected(): |
|
1859 import network |
|
1860 |
|
1861 w5x00_init() |
|
1862 |
|
1863 print(nic.isconnected()) |
|
1864 |
|
1865 is_connected() |
|
1866 del is_connected, w5x00_init |
|
1867 """.format( |
|
1868 WiznetUtilities.mpyWiznetInit(), |
|
1869 ) |
|
1870 |
|
1871 out, err = self.executeCommands(command, mode=self._submitMode) |
|
1872 if err: |
|
1873 return False |
|
1874 |
|
1875 return out.strip() == b"True" |
|
1876 |
|
1877 def checkInternetViaLan(self): |
|
1878 """ |
|
1879 Public method to check, if the internet can be reached (LAN variant). |
|
1880 |
|
1881 @return tuple containing a flag indicating reachability and an error string |
|
1882 @rtype tuple of (bool, str) |
|
1883 """ |
|
1884 command = """{0} |
|
1885 def check_internet(): |
|
1886 import network |
|
1887 import socket |
|
1888 |
|
1889 w5x00_init() |
|
1890 |
|
1891 if nic.isconnected(): |
|
1892 s = socket.socket() |
|
1893 try: |
|
1894 s.connect(socket.getaddrinfo('quad9.net', 443)[0][-1]) |
|
1895 s.close() |
|
1896 print(True) |
|
1897 except: |
|
1898 print(False) |
|
1899 else: |
|
1900 print(False) |
|
1901 |
|
1902 check_internet() |
|
1903 del check_internet, w5x00_init |
|
1904 """.format( |
|
1905 WiznetUtilities.mpyWiznetInit(), |
|
1906 ) |
|
1907 |
|
1908 out, err = self.executeCommands(command, mode=self._submitMode, timeout=10000) |
|
1909 if err: |
|
1910 return False, err |
|
1911 |
|
1912 return out.strip() == b"True", "" |
|
1913 |
|
1914 def deactivateEthernet(self): |
|
1915 """ |
|
1916 Public method to deactivate the Ethernet interface of the connected device. |
|
1917 |
|
1918 @return tuple containg a flag indicating success and an error message |
|
1919 @rtype tuple of (bool, str) |
|
1920 """ |
|
1921 # The WIZnet 5x00 interface cannot be switched off explicitly. That means, |
|
1922 # disconnect from the LAN is all we can do. |
|
1923 |
|
1924 return self.disconnectFromLan() |
|
1925 |
|
1926 def writeLanAutoConnect(self, config, hostname): |
|
1927 """ |
|
1928 Public method to generate a script and associated configuration to connect the |
|
1929 device to the LAN during boot time. |
|
1930 |
|
1931 @param config configuration for the connection (either the string 'dhcp' |
|
1932 for a dynamic address or a tuple of four strings with the IPv4 parameters. |
|
1933 @type str or tuple of (str, str, str, str) |
|
1934 @param hostname host name of the device |
|
1935 @type str |
|
1936 @return tuple containing a flag indicating success and an error message |
|
1937 @rtype tuple of (bool, str) |
|
1938 """ |
|
1939 command = """ |
|
1940 def modify_boot(): |
|
1941 add = True |
|
1942 try: |
|
1943 with open('/boot.py', 'r') as f: |
|
1944 for ln in f.readlines(): |
|
1945 if 'wiznet_connect' in ln: |
|
1946 add = False |
|
1947 break |
|
1948 except: |
|
1949 pass |
|
1950 if add: |
|
1951 with open('/boot.py', 'a') as f: |
|
1952 f.write('\\n') |
|
1953 f.write('import wiznet_connect\\n') |
|
1954 f.write('nic = wiznet_connect.connect_lan()\\n') |
|
1955 print(True) |
|
1956 |
|
1957 modify_boot() |
|
1958 del modify_boot |
|
1959 """ |
|
1960 devconfig = "ifconfig = {0}\nhostname = {1}".format( |
|
1961 "'dhcp'" if config == "dhcp" else config, |
|
1962 repr(hostname) if hostname else "''", |
|
1963 ) |
|
1964 try: |
|
1965 # write secrets file |
|
1966 self.putData("/wiznet_config.py", devconfig.encode("utf-8")) |
|
1967 # copy auto-connect file |
|
1968 self.put( |
|
1969 os.path.join( |
|
1970 os.path.dirname(__file__), "MCUScripts", "picoWiznetConnect.py" |
|
1971 ), |
|
1972 "/wiznet_connect.py", |
|
1973 ) |
|
1974 except OSError as err: |
|
1975 return False, str(err) |
|
1976 |
|
1977 # modify boot.py |
|
1978 out, err = self.executeCommands(command, mode=self._submitMode) |
|
1979 if err: |
|
1980 return False, err |
|
1981 |
|
1982 return out.decode("utf-8").strip() == "True", "" |
|
1983 |
|
1984 def removeLanAutoConnect(self): |
|
1985 """ |
|
1986 Public method to remove the saved IPv4 parameters from the connected device. |
|
1987 |
|
1988 Note: This disables the LAN auto-connect feature. |
|
1989 |
|
1990 @return tuple containing a flag indicating success and an error message |
|
1991 @rtype tuple of (bool, str) |
|
1992 """ |
|
1993 try: |
|
1994 self.rm("/wiznet_config.py") |
|
1995 except OSError as err: |
|
1996 return False, str(err) |
|
1997 |
|
1998 return True, "" |
|
1999 |
|
2000 ################################################################## |
|
2001 ## Methods below implement NTP related methods |
|
2002 ################################################################## |
|
2003 |
|
2004 def hasNetworkTime(self): |
|
2005 """ |
|
2006 Public method to check the availability of network time functions. |
|
2007 |
|
2008 @return flag indicating the availability of network time functions |
|
2009 @rtype bool |
|
2010 @exception OSError raised to indicate an issue with the device |
|
2011 """ |
|
2012 command = """ |
|
2013 def has_ntp(): |
|
2014 try: |
|
2015 import ntptime |
|
2016 return True |
|
2017 except ImportError: |
|
2018 return False |
|
2019 |
|
2020 print(has_ntp()) |
|
2021 del has_ntp |
|
2022 """ |
|
2023 out, err = self.executeCommands(command, mode=self._submitMode) |
|
2024 if err: |
|
2025 raise OSError(self._shortError(err)) |
|
2026 return out.strip() == b"True" |
|
2027 |
|
2028 def setNetworkTime(self, server="pool.ntp.org", tzOffset=0, timeout=10): |
|
2029 """ |
|
2030 Public method to set the time to the network time retrieved from an |
|
2031 NTP server. |
|
2032 |
|
2033 @param server name of the NTP server to get the network time from |
|
2034 (defaults to "0.pool.ntp.org") |
|
2035 @type str (optional) |
|
2036 @param tzOffset offset with respect to UTC (defaults to 0) |
|
2037 @type int (optional) |
|
2038 @param timeout maximum time to wait for a server response in seconds |
|
2039 (defaults to 10) |
|
2040 @type int |
|
2041 @return tuple containing a flag indicating success and an error string |
|
2042 @rtype tuple of (bool, str) |
|
2043 """ |
|
2044 command = """ |
|
2045 def set_ntp_time(server, tz_offset, timeout): |
|
2046 import network |
|
2047 import ntptime |
|
2048 import machine |
|
2049 |
|
2050 if hasattr(network, 'WLAN') and not network.WLAN(network.STA_IF).isconnected(): |
|
2051 return False |
|
2052 elif hasattr(network, 'WIZNET5K'): |
|
2053 try: |
|
2054 if not nic.isconnected(): |
|
2055 return False |
|
2056 except NameError: |
|
2057 return False |
|
2058 |
|
2059 ntptime.host = server |
|
2060 ntptime.timeout = timeout |
|
2061 ntptime.settime() |
|
2062 |
|
2063 rtc = machine.RTC() |
|
2064 t = list(rtc.datetime()) |
|
2065 t[4] += tz_offset |
|
2066 rtc.datetime(t) |
|
2067 |
|
2068 return True |
|
2069 |
|
2070 try: |
|
2071 print({{ |
|
2072 'result': set_ntp_time({0}, {1}, {2}), |
|
2073 'error': '', |
|
2074 }}) |
|
2075 except Exception as err: |
|
2076 print({{ |
|
2077 'result': False, |
|
2078 'error': str(err), |
|
2079 }}) |
|
2080 del set_ntp_time |
|
2081 """.format( |
|
2082 repr(server), tzOffset, timeout |
|
2083 ) |
|
2084 out, err = self.executeCommands( |
|
2085 command, mode=self._submitMode, timeout=(timeout + 2) * 1000 |
|
2086 ) |
|
2087 if err: |
|
2088 return False, err |
|
2089 else: |
|
2090 res = ast.literal_eval(out.decode("utf-8")) |
|
2091 return res["result"], res["error"] |
|
2092 |
|
2093 |
|
2094 def createDevice(microPythonWidget, deviceType, _vid, _pid, _boardName, _serialNumber): |
|
2095 """ |
|
2096 Function to instantiate a MicroPython device object. |
|
2097 |
|
2098 @param microPythonWidget reference to the main MicroPython widget |
|
2099 @type MicroPythonWidget |
|
2100 @param deviceType device type assigned to this device interface |
|
2101 @type str |
|
2102 @param _vid vendor ID (unused) |
|
2103 @type int |
|
2104 @param _pid product ID (unused) |
|
2105 @type int |
|
2106 @param _boardName name of the board (unused) |
|
2107 @type str |
|
2108 @param _serialNumber serial number of the board (unused) |
|
2109 @type str |
|
2110 @return reference to the instantiated device object |
|
2111 @rtype RP2Device |
|
2112 """ |
|
2113 return RP2Device(microPythonWidget, deviceType) |