1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2019 - 2023 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing some file system commands for MicroPython. |
|
8 """ |
|
9 |
|
10 import ast |
|
11 import os |
|
12 import time |
|
13 |
|
14 from PyQt6.QtCore import ( |
|
15 QCoreApplication, |
|
16 QEventLoop, |
|
17 QObject, |
|
18 QThread, |
|
19 QTimer, |
|
20 pyqtSignal, |
|
21 pyqtSlot, |
|
22 ) |
|
23 |
|
24 from eric7 import Preferences |
|
25 |
|
26 from .MicroPythonSerialPort import MicroPythonSerialPort |
|
27 |
|
28 |
|
29 class MicroPythonCommandsInterface(QObject): |
|
30 """ |
|
31 Class implementing some file system commands for MicroPython. |
|
32 |
|
33 Commands are provided to perform operations on the file system of a |
|
34 connected MicroPython device. Supported commands are: |
|
35 <ul> |
|
36 <li>ls: directory listing</li> |
|
37 <li>lls: directory listing with meta data</li> |
|
38 <li>cd: change directory</li> |
|
39 <li>pwd: get the current directory</li> |
|
40 <li>put: copy a file to the connected device</li> |
|
41 <li>get: get a file from the connected device</li> |
|
42 <li>rm: remove a file from the connected device</li> |
|
43 <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash) |
|
44 <li>mkdir: create a new directory</li> |
|
45 <li>rmdir: remove an empty directory</li> |
|
46 </ul> |
|
47 |
|
48 There are additional commands related to time and version. |
|
49 <ul> |
|
50 <li>version: get version info about MicroPython</li> |
|
51 <li>getImplementation: get some implementation information</li> |
|
52 <li>syncTime: synchronize the time of the connected device</li> |
|
53 <li>showTime: show the current time of the connected device</li> |
|
54 </ul> |
|
55 |
|
56 @signal executeAsyncFinished() emitted to indicate the end of an |
|
57 asynchronously executed list of commands (e.g. a script) |
|
58 @signal dataReceived(data) emitted to send data received via the serial |
|
59 connection for further processing |
|
60 """ |
|
61 |
|
62 executeAsyncFinished = pyqtSignal() |
|
63 dataReceived = pyqtSignal(bytes) |
|
64 |
|
65 def __init__(self, parent=None): |
|
66 """ |
|
67 Constructor |
|
68 |
|
69 @param parent reference to the parent object |
|
70 @type QObject |
|
71 """ |
|
72 super().__init__(parent) |
|
73 |
|
74 self.__repl = parent |
|
75 |
|
76 self.__blockReadyRead = False |
|
77 |
|
78 self.__serial = MicroPythonSerialPort( |
|
79 timeout=Preferences.getMicroPython("SerialTimeout"), parent=self |
|
80 ) |
|
81 self.__serial.readyRead.connect(self.__readSerial) |
|
82 |
|
83 @pyqtSlot() |
|
84 def __readSerial(self): |
|
85 """ |
|
86 Private slot to read all available serial data and emit it with the |
|
87 "dataReceived" signal for further processing. |
|
88 """ |
|
89 if not self.__blockReadyRead: |
|
90 data = bytes(self.__serial.readAll()) |
|
91 self.dataReceived.emit(data) |
|
92 |
|
93 @pyqtSlot() |
|
94 def connectToDevice(self, port): |
|
95 """ |
|
96 Public slot to start the manager. |
|
97 |
|
98 @param port name of the port to be used |
|
99 @type str |
|
100 @return flag indicating success |
|
101 @rtype bool |
|
102 """ |
|
103 return self.__serial.openSerialLink(port) |
|
104 |
|
105 @pyqtSlot() |
|
106 def disconnectFromDevice(self): |
|
107 """ |
|
108 Public slot to stop the thread. |
|
109 """ |
|
110 self.__serial.closeSerialLink() |
|
111 |
|
112 def isConnected(self): |
|
113 """ |
|
114 Public method to get the connection status. |
|
115 |
|
116 @return flag indicating the connection status |
|
117 @rtype bool |
|
118 """ |
|
119 return self.__serial.isConnected() |
|
120 |
|
121 @pyqtSlot() |
|
122 def handlePreferencesChanged(self): |
|
123 """ |
|
124 Public slot to handle a change of the preferences. |
|
125 """ |
|
126 self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout")) |
|
127 |
|
128 def write(self, data): |
|
129 """ |
|
130 Public method to write data to the connected device. |
|
131 |
|
132 @param data data to be written |
|
133 @type bytes or bytearray |
|
134 """ |
|
135 self.__serial.isConnected() and self.__serial.write(data) |
|
136 |
|
137 def __rawOn(self): |
|
138 """ |
|
139 Private method to switch the connected device to 'raw' mode. |
|
140 |
|
141 Note: switching to raw mode is done with synchronous writes. |
|
142 |
|
143 @return flag indicating success |
|
144 @@rtype bool |
|
145 """ |
|
146 if not self.__serial: |
|
147 return False |
|
148 |
|
149 rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>" |
|
150 |
|
151 self.__serial.write(b"\x02") # end raw mode if required |
|
152 written = self.__serial.waitForBytesWritten(500) |
|
153 # time out after 500ms if device is not responding |
|
154 if not written: |
|
155 return False |
|
156 for _i in range(3): |
|
157 # CTRL-C three times to break out of loops |
|
158 self.__serial.write(b"\r\x03") |
|
159 written = self.__serial.waitForBytesWritten(500) |
|
160 # time out after 500ms if device is not responding |
|
161 if not written: |
|
162 return False |
|
163 QThread.msleep(10) |
|
164 self.__serial.readAll() # read all data and discard it |
|
165 self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode |
|
166 self.__serial.readUntil(rawReplMessage) |
|
167 if self.__serial.hasTimedOut(): |
|
168 # it timed out; try it again and than fail |
|
169 self.__serial.write(b"\r\x01") # send CTRL-A again |
|
170 self.__serial.readUntil(rawReplMessage) |
|
171 if self.__serial.hasTimedOut(): |
|
172 return False |
|
173 |
|
174 QCoreApplication.processEvents( |
|
175 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents |
|
176 ) |
|
177 self.__serial.readAll() # read all data and discard it |
|
178 return True |
|
179 |
|
180 def __rawOff(self): |
|
181 """ |
|
182 Private method to switch 'raw' mode off. |
|
183 """ |
|
184 if self.__serial: |
|
185 self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode |
|
186 self.__serial.readUntil(b">>> ") # read until Python prompt |
|
187 self.__serial.readAll() # read all data and discard it |
|
188 |
|
189 def probeDevice(self): |
|
190 """ |
|
191 Public method to check the device is responding. |
|
192 |
|
193 If the device has not been flashed with a MicroPython formware, the |
|
194 probe will fail. |
|
195 |
|
196 @return flag indicating a communicating MicroPython device |
|
197 @rtype bool |
|
198 """ |
|
199 if not self.__serial: |
|
200 return False |
|
201 |
|
202 if not self.__serial.isConnected(): |
|
203 return False |
|
204 |
|
205 # switch on raw mode |
|
206 self.__blockReadyRead = True |
|
207 ok = self.__rawOn() |
|
208 if not ok: |
|
209 self.__blockReadyRead = False |
|
210 return False |
|
211 |
|
212 # switch off raw mode |
|
213 QThread.msleep(10) |
|
214 self.__rawOff() |
|
215 self.__blockReadyRead = False |
|
216 |
|
217 return True |
|
218 |
|
219 def execute(self, commands): |
|
220 """ |
|
221 Public method to send commands to the connected device and return the |
|
222 result. |
|
223 |
|
224 If no serial connection is available, empty results will be returned. |
|
225 |
|
226 @param commands list of commands to be executed |
|
227 @type str |
|
228 @return tuple containing stdout and stderr output of the device |
|
229 @rtype tuple of (bytes, bytes) |
|
230 """ |
|
231 if not self.__serial: |
|
232 return b"", b"" |
|
233 |
|
234 if not self.__serial.isConnected(): |
|
235 return b"", b"Device not connected or not switched on." |
|
236 |
|
237 result = bytearray() |
|
238 err = b"" |
|
239 |
|
240 # switch on raw mode |
|
241 self.__blockReadyRead = True |
|
242 ok = self.__rawOn() |
|
243 if not ok: |
|
244 self.__blockReadyRead = False |
|
245 return (b"", b"Could not switch to raw mode. Is the device switched on?") |
|
246 |
|
247 # send commands |
|
248 QThread.msleep(10) |
|
249 for command in commands: |
|
250 if command: |
|
251 commandBytes = command.encode("utf-8") |
|
252 self.__serial.write(commandBytes + b"\x04") |
|
253 QCoreApplication.processEvents( |
|
254 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents |
|
255 ) |
|
256 ok = self.__serial.readUntil(b"OK") |
|
257 if ok != b"OK": |
|
258 return ( |
|
259 b"", |
|
260 "Expected 'OK', got '{0}', followed by '{1}'".format( |
|
261 ok, self.__serial.readAll() |
|
262 ).encode("utf-8"), |
|
263 ) |
|
264 |
|
265 # read until prompt |
|
266 response = self.__serial.readUntil(b"\x04>") |
|
267 if self.__serial.hasTimedOut(): |
|
268 self.__blockReadyRead = False |
|
269 return b"", b"Timeout while processing commands." |
|
270 if b"\x04" in response[:-2]: |
|
271 # split stdout, stderr |
|
272 out, err = response[:-2].split(b"\x04") |
|
273 result += out |
|
274 else: |
|
275 err = b"invalid response received: " + response |
|
276 if err: |
|
277 result = b"" |
|
278 break |
|
279 |
|
280 # switch off raw mode |
|
281 QThread.msleep(10) |
|
282 self.__rawOff() |
|
283 self.__blockReadyRead = False |
|
284 |
|
285 return bytes(result), err |
|
286 |
|
287 def executeAsync(self, commandsList): |
|
288 """ |
|
289 Public method to execute a series of commands over a period of time |
|
290 without returning any result (asynchronous execution). |
|
291 |
|
292 @param commandsList list of commands to be execute on the device |
|
293 @type list of bytes |
|
294 """ |
|
295 |
|
296 if commandsList: |
|
297 command = commandsList.pop(0) |
|
298 self.__serial.write(command) |
|
299 QTimer.singleShot(2, lambda: self.executeAsync(commandsList)) |
|
300 else: |
|
301 self.executeAsyncFinished.emit() |
|
302 |
|
303 def __shortError(self, error): |
|
304 """ |
|
305 Private method to create a shortened error message. |
|
306 |
|
307 @param error verbose error message |
|
308 @type bytes |
|
309 @return shortened error message |
|
310 @rtype str |
|
311 """ |
|
312 if error: |
|
313 decodedError = error.decode("utf-8") |
|
314 try: |
|
315 return decodedError.split["\r\n"][-2] |
|
316 except Exception: |
|
317 return decodedError |
|
318 return self.tr("Detected an error without indications.") |
|
319 |
|
320 # TODO: move these methods to the devices. |
|
321 ################################################################## |
|
322 ## Methods below implement the file system commands |
|
323 ################################################################## |
|
324 |
|
325 def ls(self, dirname=""): |
|
326 """ |
|
327 Public method to get a directory listing of the connected device. |
|
328 |
|
329 @param dirname name of the directory to be listed |
|
330 @type str |
|
331 @return tuple containg the directory listing |
|
332 @rtype tuple of str |
|
333 @exception OSError raised to indicate an issue with the device |
|
334 """ |
|
335 commands = ( |
|
336 # BBC micro:bit does not support directories |
|
337 [ |
|
338 "import os as __os_", |
|
339 "print(__os_.listdir())", |
|
340 "del __os_", |
|
341 ] |
|
342 if self.__repl.isMicrobit() |
|
343 else [ |
|
344 "import os as __os_", |
|
345 "print(__os_.listdir('{0}'))".format(dirname), |
|
346 "del __os_", |
|
347 ] |
|
348 ) |
|
349 out, err = self.execute(commands) |
|
350 if err: |
|
351 raise OSError(self.__shortError(err)) |
|
352 return ast.literal_eval(out.decode("utf-8")) |
|
353 |
|
354 def lls(self, dirname="", fullstat=False, showHidden=False): |
|
355 """ |
|
356 Public method to get a long directory listing of the connected device |
|
357 including meta data. |
|
358 |
|
359 @param dirname name of the directory to be listed |
|
360 @type str |
|
361 @param fullstat flag indicating to return the full stat() tuple |
|
362 @type bool |
|
363 @param showHidden flag indicating to show hidden files as well |
|
364 @type bool |
|
365 @return list containing the directory listing with tuple entries of |
|
366 the name and and a tuple of mode, size and time (if fullstat is |
|
367 false) or the complete stat() tuple. 'None' is returned in case the |
|
368 directory doesn't exist. |
|
369 @rtype tuple of (str, tuple) |
|
370 @exception OSError raised to indicate an issue with the device |
|
371 """ |
|
372 commands = ( |
|
373 # BBC micro:bit does not support directories |
|
374 [ |
|
375 "import os as __os_", |
|
376 "\n".join( |
|
377 [ |
|
378 "def is_visible(filename, showHidden):", |
|
379 " return showHidden or " |
|
380 "(filename[0] != '.' and filename[-1] != '~')", |
|
381 ] |
|
382 ), |
|
383 "\n".join( |
|
384 [ |
|
385 "def stat(filename):", |
|
386 " size = __os_.size(filename)", |
|
387 " return (0, 0, 0, 0, 0, 0, size, 0, 0, 0)", |
|
388 ] |
|
389 ), |
|
390 "\n".join( |
|
391 [ |
|
392 "def listdir_stat(showHidden):", |
|
393 " files = __os_.listdir()", |
|
394 " return list((f, stat(f)) for f in files if" |
|
395 " is_visible(f,showHidden))", |
|
396 ] |
|
397 ), |
|
398 "print(listdir_stat({0}))".format(showHidden), |
|
399 "del __os_, stat, listdir_stat, is_visible", |
|
400 ] |
|
401 if self.__repl.isMicrobit() |
|
402 else [ |
|
403 "import os as __os_", |
|
404 "\n".join( |
|
405 [ |
|
406 "def is_visible(filename, showHidden):", |
|
407 " return showHidden or " |
|
408 "(filename[0] != '.' and filename[-1] != '~')", |
|
409 ] |
|
410 ), |
|
411 "\n".join( |
|
412 [ |
|
413 "def stat(filename):", |
|
414 " try:", |
|
415 " rstat = __os_.lstat(filename)", |
|
416 " except:", |
|
417 " rstat = __os_.stat(filename)", |
|
418 " return tuple(rstat)", |
|
419 ] |
|
420 ), |
|
421 "\n".join( |
|
422 [ |
|
423 "def listdir_stat(dirname, showHidden):", |
|
424 " try:", |
|
425 " files = __os_.listdir(dirname)", |
|
426 " except OSError:", |
|
427 " return []", |
|
428 " if dirname in ('', '/'):", |
|
429 " return list((f, stat(f)) for f in files if" |
|
430 " is_visible(f, showHidden))", |
|
431 " return list((f, stat(dirname + '/' + f))" |
|
432 " for f in files if is_visible(f, showHidden))", |
|
433 ] |
|
434 ), |
|
435 "print(listdir_stat('{0}', {1}))".format(dirname, showHidden), |
|
436 "del __os_, stat, listdir_stat, is_visible", |
|
437 ] |
|
438 ) |
|
439 out, err = self.execute(commands) |
|
440 if err: |
|
441 raise OSError(self.__shortError(err)) |
|
442 fileslist = ast.literal_eval(out.decode("utf-8")) |
|
443 if fileslist is None: |
|
444 return None |
|
445 else: |
|
446 if fullstat: |
|
447 return fileslist |
|
448 else: |
|
449 return [(f, (s[0], s[6], s[8])) for f, s in fileslist] |
|
450 |
|
451 def cd(self, dirname): |
|
452 """ |
|
453 Public method to change the current directory on the connected device. |
|
454 |
|
455 @param dirname directory to change to |
|
456 @type str |
|
457 @exception OSError raised to indicate an issue with the device |
|
458 """ |
|
459 if dirname: |
|
460 commands = [ |
|
461 "import os as __os_", |
|
462 "__os_.chdir('{0}')".format(dirname), |
|
463 "del __os_", |
|
464 ] |
|
465 out, err = self.execute(commands) |
|
466 if err: |
|
467 raise OSError(self.__shortError(err)) |
|
468 |
|
469 def pwd(self): |
|
470 """ |
|
471 Public method to get the current directory of the connected device. |
|
472 |
|
473 @return current directory |
|
474 @rtype str |
|
475 @exception OSError raised to indicate an issue with the device |
|
476 """ |
|
477 if self.__repl.isMicrobit(): |
|
478 # BBC micro:bit does not support directories |
|
479 return "" |
|
480 |
|
481 commands = [ |
|
482 "import os as __os_", |
|
483 "print(__os_.getcwd())", |
|
484 "del __os_", |
|
485 ] |
|
486 out, err = self.execute(commands) |
|
487 if err: |
|
488 raise OSError(self.__shortError(err)) |
|
489 return out.decode("utf-8").strip() |
|
490 |
|
491 def rm(self, filename): |
|
492 """ |
|
493 Public method to remove a file from the connected device. |
|
494 |
|
495 @param filename name of the file to be removed |
|
496 @type str |
|
497 @exception OSError raised to indicate an issue with the device |
|
498 """ |
|
499 if filename: |
|
500 commands = [ |
|
501 "import os as __os_", |
|
502 "__os_.remove('{0}')".format(filename), |
|
503 "del __os_", |
|
504 ] |
|
505 out, err = self.execute(commands) |
|
506 if err: |
|
507 raise OSError(self.__shortError(err)) |
|
508 |
|
509 def rmrf(self, name, recursive=False, force=False): |
|
510 """ |
|
511 Public method to remove a file or directory recursively. |
|
512 |
|
513 @param name of the file or directory to remove |
|
514 @type str |
|
515 @param recursive flag indicating a recursive deletion |
|
516 @type bool |
|
517 @param force flag indicating to ignore errors |
|
518 @type bool |
|
519 @return flag indicating success |
|
520 @rtype bool |
|
521 @exception OSError raised to indicate an issue with the device |
|
522 """ |
|
523 if name: |
|
524 commands = [ |
|
525 "import os as __os_", |
|
526 "\n".join( |
|
527 [ |
|
528 "def remove_file(name, recursive=False, force=False):", |
|
529 " try:", |
|
530 " mode = __os_.stat(name)[0]", |
|
531 " if mode & 0x4000 != 0:", |
|
532 " if recursive:", |
|
533 " for file in __os_.listdir(name):", |
|
534 " success = remove_file(" |
|
535 "name + '/' + file, recursive, force)", |
|
536 " if not success and not force:", |
|
537 " return False", |
|
538 " __os_.rmdir(name)", |
|
539 " else:", |
|
540 " if not force:", |
|
541 " return False", |
|
542 " else:", |
|
543 " __os_.remove(name)", |
|
544 " except:", |
|
545 " if not force:", |
|
546 " return False", |
|
547 " return True", |
|
548 ] |
|
549 ), |
|
550 "print(remove_file('{0}', {1}, {2}))".format(name, recursive, force), |
|
551 "del __os_, remove_file", |
|
552 ] |
|
553 out, err = self.execute(commands) |
|
554 if err: |
|
555 raise OSError(self.__shortError(err)) |
|
556 return ast.literal_eval(out.decode("utf-8")) |
|
557 |
|
558 return False |
|
559 |
|
560 def mkdir(self, dirname): |
|
561 """ |
|
562 Public method to create a new directory. |
|
563 |
|
564 @param dirname name of the directory to create |
|
565 @type str |
|
566 @exception OSError raised to indicate an issue with the device |
|
567 """ |
|
568 if dirname: |
|
569 commands = [ |
|
570 "import os as __os_", |
|
571 "__os_.mkdir('{0}')".format(dirname), |
|
572 "del __os_", |
|
573 ] |
|
574 out, err = self.execute(commands) |
|
575 if err: |
|
576 raise OSError(self.__shortError(err)) |
|
577 |
|
578 def rmdir(self, dirname): |
|
579 """ |
|
580 Public method to remove a directory. |
|
581 |
|
582 @param dirname name of the directory to be removed |
|
583 @type str |
|
584 @exception OSError raised to indicate an issue with the device |
|
585 """ |
|
586 if dirname: |
|
587 commands = [ |
|
588 "import os as __os_", |
|
589 "__os_.rmdir('{0}')".format(dirname), |
|
590 "del __os_", |
|
591 ] |
|
592 out, err = self.execute(commands) |
|
593 if err: |
|
594 raise OSError(self.__shortError(err)) |
|
595 |
|
596 def put(self, hostFileName, deviceFileName=None): |
|
597 """ |
|
598 Public method to copy a local file to the connected device. |
|
599 |
|
600 @param hostFileName name of the file to be copied |
|
601 @type str |
|
602 @param deviceFileName name of the file to copy to |
|
603 @type str |
|
604 @return flag indicating success |
|
605 @rtype bool |
|
606 @exception OSError raised to indicate an issue with the device |
|
607 """ |
|
608 if not os.path.isfile(hostFileName): |
|
609 raise OSError("No such file: {0}".format(hostFileName)) |
|
610 |
|
611 if not deviceFileName: |
|
612 deviceFileName = os.path.basename(hostFileName) |
|
613 |
|
614 with open(hostFileName, "rb") as hostFile: |
|
615 content = hostFile.read() |
|
616 |
|
617 return self.putData(deviceFileName, content) |
|
618 |
|
619 def putData(self, deviceFileName, content): |
|
620 """ |
|
621 Public method to write the given data to the connected device. |
|
622 |
|
623 @param deviceFileName name of the file to write to |
|
624 @type str |
|
625 @param content data to write |
|
626 @type bytes |
|
627 @return flag indicating success |
|
628 @rtype bool |
|
629 @exception OSError raised to indicate an issue with the device |
|
630 """ |
|
631 if not deviceFileName: |
|
632 raise OSError("Missing device file name") |
|
633 |
|
634 # convert eol '\r' |
|
635 content = content.replace(b"\r\n", b"\r") |
|
636 content = content.replace(b"\n", b"\r") |
|
637 |
|
638 commands = [ |
|
639 "fd = open('{0}', 'wb')".format(deviceFileName), |
|
640 "f = fd.write", |
|
641 ] |
|
642 while content: |
|
643 chunk = content[:64] |
|
644 commands.append("f(" + repr(chunk) + ")") |
|
645 content = content[64:] |
|
646 commands.extend( |
|
647 [ |
|
648 "fd.close()", |
|
649 "del f, fd", |
|
650 ] |
|
651 ) |
|
652 |
|
653 out, err = self.execute(commands) |
|
654 if err: |
|
655 raise OSError(self.__shortError(err)) |
|
656 return True |
|
657 |
|
658 def get(self, deviceFileName, hostFileName=None): |
|
659 """ |
|
660 Public method to copy a file from the connected device. |
|
661 |
|
662 @param deviceFileName name of the file to copy |
|
663 @type str |
|
664 @param hostFileName name of the file to copy to |
|
665 @type str |
|
666 @return flag indicating success |
|
667 @rtype bool |
|
668 @exception OSError raised to indicate an issue with the device |
|
669 """ |
|
670 if not deviceFileName: |
|
671 raise OSError("Missing device file name") |
|
672 |
|
673 if not hostFileName: |
|
674 hostFileName = deviceFileName |
|
675 |
|
676 out = self.getData(deviceFileName) |
|
677 with open(hostFileName, "wb") as hostFile: |
|
678 hostFile.write(out) |
|
679 |
|
680 return True |
|
681 |
|
682 def getData(self, deviceFileName): |
|
683 """ |
|
684 Public method to read data from the connected device. |
|
685 |
|
686 @param deviceFileName name of the file to read from |
|
687 @type str |
|
688 @return data read from the device |
|
689 @rtype bytes |
|
690 @exception OSError raised to indicate an issue with the device |
|
691 """ |
|
692 if not deviceFileName: |
|
693 raise OSError("Missing device file name") |
|
694 |
|
695 commands = [ |
|
696 "\n".join( |
|
697 [ |
|
698 "def send_data():", |
|
699 " try:", |
|
700 " from microbit import uart as u", |
|
701 " except ImportError:", |
|
702 " try:", |
|
703 " from sys import stdout as u", |
|
704 " except ImportError:", |
|
705 " try:", |
|
706 " from machine import UART", |
|
707 " u = UART(0, {0})".format(115200), |
|
708 " except Exception:", |
|
709 " raise Exception('Could not find UART module" |
|
710 " in device.')", |
|
711 " f = open('{0}', 'rb')".format(deviceFileName), |
|
712 " r = f.read", |
|
713 " result = True", |
|
714 " while result:", |
|
715 " result = r(32)", |
|
716 " if result:", |
|
717 " u.write(result)", |
|
718 " f.close()", |
|
719 ] |
|
720 ), |
|
721 "send_data()", |
|
722 ] |
|
723 out, err = self.execute(commands) |
|
724 if err: |
|
725 raise OSError(self.__shortError(err)) |
|
726 |
|
727 # write the received bytes to the local file |
|
728 # convert eol to "\n" |
|
729 out = out.replace(b"\r\n", b"\n") |
|
730 out = out.replace(b"\r", b"\n") |
|
731 |
|
732 return out |
|
733 |
|
734 def fileSystemInfo(self): |
|
735 """ |
|
736 Public method to obtain information about the currently mounted file |
|
737 systems. |
|
738 |
|
739 @return tuple of tuples containing the file system name, the total |
|
740 size, the used size and the free size |
|
741 @rtype tuple of tuples of (str, int, int, int) |
|
742 @exception OSError raised to indicate an issue with the device |
|
743 """ |
|
744 commands = [ |
|
745 "import os as __os_", |
|
746 "\n".join( |
|
747 [ |
|
748 "def fsinfo():", |
|
749 " infolist = []", |
|
750 " info = __os_.statvfs('/')", |
|
751 " if info[0] == 0:", |
|
752 # assume it is just mount points |
|
753 " fsnames = __os_.listdir('/')", |
|
754 " for fs in fsnames:", |
|
755 " fs = '/' + fs", |
|
756 " infolist.append((fs, __os_.statvfs(fs)))", |
|
757 " else:", |
|
758 " infolist.append(('/', info))", |
|
759 " return infolist", |
|
760 ] |
|
761 ), |
|
762 "print(fsinfo())", |
|
763 "del __os_, fsinfo", |
|
764 ] |
|
765 out, err = self.execute(commands) |
|
766 if err: |
|
767 raise OSError(self.__shortError(err)) |
|
768 infolist = ast.literal_eval(out.decode("utf-8")) |
|
769 if infolist is None: |
|
770 return None |
|
771 else: |
|
772 filesystemInfos = [] |
|
773 for fs, info in infolist: |
|
774 totalSize = info[2] * info[1] |
|
775 freeSize = info[4] * info[1] |
|
776 usedSize = totalSize - freeSize |
|
777 filesystemInfos.append((fs, totalSize, usedSize, freeSize)) |
|
778 |
|
779 return tuple(filesystemInfos) |
|
780 |
|
781 ################################################################## |
|
782 ## non-filesystem related methods below |
|
783 ################################################################## |
|
784 |
|
785 def getDeviceData(self): |
|
786 """ |
|
787 Public method to get some essential data for the connected board. |
|
788 |
|
789 @return dictionary containing the determined data |
|
790 @rtype dict |
|
791 @exception OSError raised to indicate an issue with the device |
|
792 """ |
|
793 commands = [ |
|
794 "res = {}", # __IGNORE_WARNING_M613__ |
|
795 "import os as __os_", |
|
796 "uname = __os_.uname()", |
|
797 "res['sysname'] = uname.sysname", |
|
798 "res['nodename'] = uname.nodename", |
|
799 "res['release'] = uname.release", |
|
800 "res['version'] = uname.version", |
|
801 "res['machine'] = uname.machine", |
|
802 "import sys as __sys_", |
|
803 "res['py_platform'] = __sys_.platform", |
|
804 "res['py_version'] = __sys_.version", |
|
805 "\n".join( |
|
806 [ |
|
807 "try:", |
|
808 " res['mpy_name'] = __sys_.implementation.name", |
|
809 "except AttributeError:", |
|
810 " res['mpy_name'] = 'unknown'", |
|
811 ] |
|
812 ), |
|
813 "\n".join( |
|
814 [ |
|
815 "try:", |
|
816 " res['mpy_version'] = '.'.join((str(i) for i in" |
|
817 " __sys_.implementation.version))", |
|
818 "except AttributeError:", |
|
819 " res['mpy_version'] = 'unknown'", |
|
820 ] |
|
821 ), |
|
822 "\n".join( |
|
823 [ |
|
824 "try:", |
|
825 " import pimoroni as __pimoroni_", |
|
826 " res['mpy_variant'] = 'Pimoroni'", |
|
827 " del __pimoroni_", |
|
828 "except ImportError:", |
|
829 " res['mpy_variant'] = ''", |
|
830 ] |
|
831 ), |
|
832 "print(res)", |
|
833 "del res, __os_, __sys_", |
|
834 ] |
|
835 out, err = self.execute(commands) |
|
836 if err: |
|
837 raise OSError(self.__shortError(err)) |
|
838 return ast.literal_eval(out.decode("utf-8")) |
|
839 |
|
840 def getBoardInformation(self): |
|
841 """ |
|
842 Public method to get some information data of the connected board. |
|
843 |
|
844 @return dictionary containing the determined data |
|
845 @rtype dict |
|
846 @exception OSError raised to indicate an issue with the device |
|
847 """ |
|
848 commands = [ |
|
849 "res = {}", # __IGNORE_WARNING_M613__ |
|
850 "import gc as __gc_", |
|
851 "__gc_.enable()", |
|
852 "__gc_.collect()", |
|
853 "mem_alloc = __gc_.mem_alloc()", |
|
854 "mem_free = __gc_.mem_free()", |
|
855 "mem_total = mem_alloc + mem_free", |
|
856 "res['mem_total_kb'] = mem_total / 1024.0", |
|
857 "res['mem_used_kb'] = mem_alloc / 1024.0", |
|
858 "res['mem_used_pc'] = mem_alloc / mem_total * 100.0", |
|
859 "res['mem_free_kb'] = mem_free / 1024.0", |
|
860 "res['mem_free_pc'] = mem_free / mem_total * 100.0", |
|
861 "del __gc_, mem_alloc, mem_free, mem_total", |
|
862 "import os as __os_", |
|
863 "uname = __os_.uname()", |
|
864 "res['sysname'] = uname.sysname", |
|
865 "res['nodename'] = uname.nodename", |
|
866 "res['release'] = uname.release", |
|
867 "res['version'] = uname.version", |
|
868 "res['machine'] = uname.machine", |
|
869 "import sys as __sys_", |
|
870 "res['py_platform'] = __sys_.platform", |
|
871 "res['py_version'] = __sys_.version", |
|
872 "\n".join( |
|
873 [ |
|
874 "try:", |
|
875 " res['mpy_name'] = __sys_.implementation.name", |
|
876 "except AttributeError:", |
|
877 " res['mpy_name'] = 'unknown'", |
|
878 ] |
|
879 ), |
|
880 "\n".join( |
|
881 [ |
|
882 "try:", |
|
883 " res['mpy_version'] = '.'.join((str(i) for i in" |
|
884 " __sys_.implementation.version))", |
|
885 "except AttributeError:", |
|
886 " res['mpy_version'] = 'unknown'", |
|
887 ] |
|
888 ), |
|
889 "\n".join( |
|
890 [ |
|
891 "try:", |
|
892 " import pimoroni as __pimoroni_", |
|
893 " res['mpy_variant'] = 'Pimoroni'", |
|
894 " del __pimoroni_", |
|
895 "except ImportError:", |
|
896 " res['mpy_variant'] = ''", |
|
897 ] |
|
898 ), |
|
899 "\n".join( |
|
900 [ |
|
901 "try:", |
|
902 " stat_ = __os_.statvfs('/flash')", |
|
903 " res['flash_info_available'] = True", |
|
904 " res['flash_total_kb'] = stat_[2] * stat_[0] / 1024.0", |
|
905 " res['flash_free_kb'] = stat_[3] * stat_[0] / 1024.0", |
|
906 " res['flash_used_kb'] = res['flash_total_kb'] -" |
|
907 " res['flash_free_kb']", |
|
908 " res['flash_free_pc'] = res['flash_free_kb'] /" |
|
909 " res['flash_total_kb'] * 100.0", |
|
910 " res['flash_used_pc'] = res['flash_used_kb'] /" |
|
911 " res['flash_total_kb'] * 100.0", |
|
912 " del stat_", |
|
913 "except AttributeError:", |
|
914 " res['flash_info_available'] = False", |
|
915 ] |
|
916 ), |
|
917 "\n".join( |
|
918 [ |
|
919 "try:", |
|
920 " import machine as __mc_", |
|
921 " if isinstance(__mc_.freq(), tuple):", |
|
922 " res['mc_frequency_mhz'] = __mc_.freq()[0] / 1000000.0", |
|
923 " else:", |
|
924 " res['mc_frequency_mhz'] = __mc_.freq() / 1000000.0", |
|
925 " res['mc_id'] = ':'.join(['{0:X}'.format(x)" |
|
926 " for x in __mc_.unique_id()])", |
|
927 " del __mc_", |
|
928 "except ImportError:", |
|
929 "\n".join( |
|
930 [ |
|
931 " try:", |
|
932 " import microcontroller as __mc_", |
|
933 " res['mc_frequency_mhz'] = __mc_.cpu.frequency" |
|
934 " / 1000000.0", |
|
935 " res['mc_temp_c'] = __mc_.cpu.temperature", |
|
936 " res['mc_id'] = ':'.join(['{0:X}'.format(x)" |
|
937 " for x in __mc_.cpu.uid])", |
|
938 " del __mc_", |
|
939 " except ImportError:", |
|
940 " res['mc_frequency'] = None", |
|
941 " res['mc_temp'] = None", |
|
942 ] |
|
943 ), |
|
944 ] |
|
945 ), |
|
946 "\n".join( |
|
947 [ |
|
948 "try:", |
|
949 " import ulab as __ulab_", |
|
950 " res['ulab'] = __ulab_.__version__", |
|
951 " del __ulab_", |
|
952 "except ImportError:", |
|
953 " res['ulab'] = None", |
|
954 ] |
|
955 ), |
|
956 "print(res)", |
|
957 "del res, __os_, __sys_", |
|
958 ] |
|
959 out, err = self.execute(commands) |
|
960 if err: |
|
961 raise OSError(self.__shortError(err)) |
|
962 return ast.literal_eval(out.decode("utf-8")) |
|
963 |
|
964 def syncTime(self, deviceType, hasCPy=False): |
|
965 """ |
|
966 Public method to set the time of the connected device to the local |
|
967 computer's time. |
|
968 |
|
969 @param deviceType type of board to sync time to |
|
970 @type str |
|
971 @param hasCPy flag indicating that the device has CircuitPython loadede |
|
972 (defaults to False) |
|
973 @type bool |
|
974 @exception OSError raised to indicate an issue with the device |
|
975 """ |
|
976 # rtc_time[0] - year 4 digit |
|
977 # rtc_time[1] - month 1..12 |
|
978 # rtc_time[2] - day 1..31 |
|
979 # rtc_time[3] - weekday 1..7 1=Monday |
|
980 # rtc_time[4] - hour 0..23 |
|
981 # rtc_time[5] - minute 0..59 |
|
982 # rtc_time[6] - second 0..59 |
|
983 # rtc_time[7] - yearday 1..366 |
|
984 # rtc_time[8] - isdst 0, 1, or -1 |
|
985 if deviceType == "circuitpython" or hasCPy: |
|
986 set_time = "\n".join( |
|
987 [ |
|
988 "def set_time(rtc_time):", |
|
989 " import rtc", |
|
990 " import time", |
|
991 " clock = rtc.RTC()", |
|
992 " clock_time = rtc_time[:3] + rtc_time[4:7] + (rtc_time[3]," |
|
993 " rtc_time[7], rtc_time[8])", |
|
994 " clock.datetime = time.struct_time(clock_time)", |
|
995 ] |
|
996 ) |
|
997 elif deviceType == "pyboard": |
|
998 # Pyboard (pyboard doesn't have machine.RTC()). |
|
999 # The pyb.RTC.datetime function takes the arguments in the |
|
1000 # order: (year, month, day, weekday, hour, minute, second, |
|
1001 # subseconds) |
|
1002 # http://docs.micropython.org/en/latest/library/pyb.RTC.html |
|
1003 # #pyb.RTC.datetime |
|
1004 set_time = "\n".join( |
|
1005 [ |
|
1006 "def set_time(rtc_time):", |
|
1007 " import pyb", |
|
1008 " rtc = pyb.RTC()", |
|
1009 " rtc.datetime(rtc_time[:7] + (0,))", |
|
1010 ] |
|
1011 ) |
|
1012 elif deviceType == "teensy": |
|
1013 # The pyb.RTC.datetime function takes the arguments in the |
|
1014 # order: (year, month, day, weekday, hour, minute, second, |
|
1015 # subseconds) |
|
1016 # https://docs.micropython.org/en/latest/library/machine.RTC.html |
|
1017 # #machine-rtc |
|
1018 set_time = "\n".join( |
|
1019 [ |
|
1020 "def set_time(rtc_time):", |
|
1021 " import machine", |
|
1022 " rtc = machine.RTC()", |
|
1023 " rtc.init(rtc_time[:7] + (0,))", |
|
1024 ] |
|
1025 ) |
|
1026 elif deviceType == "esp": |
|
1027 # The machine.RTC documentation was incorrect and doesn't agree |
|
1028 # with the code, so no link is presented here. The order of the |
|
1029 # arguments is the same as the pyboard except for LoBo MPy. |
|
1030 set_time = "\n".join( |
|
1031 [ |
|
1032 "def set_time(rtc_time):", |
|
1033 " import machine", |
|
1034 " rtc = machine.RTC()", |
|
1035 " try:", # ESP8266 may use rtc.datetime() |
|
1036 " rtc.datetime(rtc_time[:7] + (0,))", |
|
1037 " except Exception:", # ESP32 uses rtc.init() |
|
1038 " import os", |
|
1039 " if 'LoBo' in os.uname()[0]:", # LoBo MPy |
|
1040 " clock_time = rtc_time[:3] +" |
|
1041 " rtc_time[4:7] + (rtc_time[3], rtc_time[7])", |
|
1042 " else:", |
|
1043 " clock_time = rtc_time[:7] + (0,)", |
|
1044 " rtc.init(clock_time)", |
|
1045 ] |
|
1046 ) |
|
1047 elif deviceType in ("bbc_microbit", "calliope"): |
|
1048 # BBC micro:bit and Calliope mini with MicroPython don't support |
|
1049 # time commands. |
|
1050 return |
|
1051 elif deviceType == "rp2040": |
|
1052 # Raspberry Pi Pico (RP2040) - machine.RTC doesn't exist |
|
1053 set_time = "\n".join( |
|
1054 [ |
|
1055 "def set_time(rtc_time):", |
|
1056 " setup_0 = rtc_time[0] << 12 | rtc_time[1] << 8 |" |
|
1057 " rtc_time[2]", |
|
1058 " setup_1 = (rtc_time[3] % 7) << 24 | rtc_time[4] << 16 |" |
|
1059 " rtc_time[5] << 8 | rtc_time[6]", |
|
1060 " machine.mem32[0x4005c004] = setup_0", |
|
1061 " machine.mem32[0x4005c008] = setup_1", |
|
1062 " machine.mem32[0x4005c00c] |= 0x10", |
|
1063 ] |
|
1064 ) |
|
1065 elif deviceType == "pycom": |
|
1066 # PyCom's machine.RTC takes its arguments in a slightly |
|
1067 # different order than the official machine.RTC. |
|
1068 # (year, month, day, hour, minute, second[, microsecond[, |
|
1069 # tzinfo]]) |
|
1070 # https://docs.pycom.io/firmwareapi/pycom/machine/rtc/ |
|
1071 # #rtc-init-datetime-none-source-rtc-internal-rc |
|
1072 set_time = "\n".join( |
|
1073 [ |
|
1074 "def set_time(rtc_time):", |
|
1075 " import pycom", |
|
1076 " rtc_time2 = rtc_time[:3] + rtc_time[4:7]", |
|
1077 " import machine", |
|
1078 " rtc = machine.RTC()", |
|
1079 " rtc.init(rtc_time2)", |
|
1080 ] |
|
1081 ) |
|
1082 else: |
|
1083 # no set_time() support for generic boards |
|
1084 return |
|
1085 |
|
1086 now = time.localtime(time.time()) |
|
1087 commands = [ |
|
1088 set_time, |
|
1089 "set_time({0})".format( |
|
1090 ( |
|
1091 now.tm_year, |
|
1092 now.tm_mon, |
|
1093 now.tm_mday, |
|
1094 now.tm_wday + 1, |
|
1095 now.tm_hour, |
|
1096 now.tm_min, |
|
1097 now.tm_sec, |
|
1098 now.tm_yday, |
|
1099 now.tm_isdst, |
|
1100 ) |
|
1101 ), |
|
1102 "del set_time", |
|
1103 ] |
|
1104 out, err = self.execute(commands) |
|
1105 if err: |
|
1106 raise OSError(self.__shortError(err)) |
|
1107 |
|
1108 def getTime(self): |
|
1109 """ |
|
1110 Public method to get the current time of the device. |
|
1111 |
|
1112 @return time of the device |
|
1113 @rtype str |
|
1114 @exception OSError raised to indicate an issue with the device |
|
1115 """ |
|
1116 commands = [ |
|
1117 "\n".join( |
|
1118 [ |
|
1119 "try:", |
|
1120 " import rtc as __rtc_", |
|
1121 " print('{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'" |
|
1122 ".format(*__rtc_.RTC().datetime[:6]))", |
|
1123 " del __rtc_", |
|
1124 "except:", |
|
1125 " import time as __time_", |
|
1126 " try:", |
|
1127 " print(__time_.strftime('%Y-%m-%d %H:%M:%S'," |
|
1128 # __IGNORE_WARNING_M601__ |
|
1129 " __time_.localtime()))", |
|
1130 " except AttributeError:", |
|
1131 " tm = __time_.localtime()", |
|
1132 " print('{0:04d}-{1:02d}-{2:02d}" |
|
1133 " {3:02d}:{4:02d}:{5:02d}'" |
|
1134 ".format(tm[0], tm[1], tm[2], tm[3], tm[4], tm[5]))", |
|
1135 " del tm", |
|
1136 " del __time_", |
|
1137 ] |
|
1138 ), |
|
1139 ] |
|
1140 out, err = self.execute(commands) |
|
1141 if err: |
|
1142 if b"NotImplementedError" in err: |
|
1143 return "<unsupported> <unsupported>" |
|
1144 raise OSError(self.__shortError(err)) |
|
1145 return out.decode("utf-8").strip() |
|
1146 |
|
1147 def getModules(self): |
|
1148 """ |
|
1149 Public method to show a list of modules built into the firmware. |
|
1150 |
|
1151 @return list of builtin modules |
|
1152 @rtype list of str |
|
1153 @exception OSError raised to indicate an issue with the device |
|
1154 """ |
|
1155 commands = ["help('modules')"] |
|
1156 out, err = self.execute(commands) |
|
1157 if err: |
|
1158 raise OSError(self.__shortError(err)) |
|
1159 |
|
1160 modules = [] |
|
1161 for line in out.decode("utf-8").splitlines()[:-1]: |
|
1162 modules.extend(line.split()) |
|
1163 return modules |
|