|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2019 - 2021 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing some file system commands for MicroPython. |
|
8 """ |
|
9 |
|
10 import ast |
|
11 import time |
|
12 import os |
|
13 |
|
14 from PyQt5.QtCore import ( |
|
15 pyqtSlot, pyqtSignal, QObject, QThread, QTimer, QCoreApplication, |
|
16 QEventLoop |
|
17 ) |
|
18 |
|
19 from .MicroPythonSerialPort import MicroPythonSerialPort |
|
20 |
|
21 import Preferences |
|
22 |
|
23 |
|
24 class MicroPythonCommandsInterface(QObject): |
|
25 """ |
|
26 Class implementing some file system commands for MicroPython. |
|
27 |
|
28 Commands are provided to perform operations on the file system of a |
|
29 connected MicroPython device. Supported commands are: |
|
30 <ul> |
|
31 <li>ls: directory listing</li> |
|
32 <li>lls: directory listing with meta data</li> |
|
33 <li>cd: change directory</li> |
|
34 <li>pwd: get the current directory</li> |
|
35 <li>put: copy a file to the connected device</li> |
|
36 <li>get: get a file from the connected device</li> |
|
37 <li>rm: remove a file from the connected device</li> |
|
38 <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash) |
|
39 <li>mkdir: create a new directory</li> |
|
40 <li>rmdir: remove an empty directory</li> |
|
41 </ul> |
|
42 |
|
43 There are additional commands related to time and version. |
|
44 <ul> |
|
45 <li>version: get version info about MicroPython</li> |
|
46 <li>getImplementation: get some implementation information</li> |
|
47 <li>syncTime: synchronize the time of the connected device</li> |
|
48 <li>showTime: show the current time of the connected device</li> |
|
49 </ul> |
|
50 |
|
51 @signal executeAsyncFinished() emitted to indicate the end of an |
|
52 asynchronously executed list of commands (e.g. a script) |
|
53 @signal dataReceived(data) emitted to send data received via the serial |
|
54 connection for further processing |
|
55 """ |
|
56 executeAsyncFinished = pyqtSignal() |
|
57 dataReceived = pyqtSignal(bytes) |
|
58 |
|
59 def __init__(self, parent=None): |
|
60 """ |
|
61 Constructor |
|
62 |
|
63 @param parent reference to the parent object |
|
64 @type QObject |
|
65 """ |
|
66 super().__init__(parent) |
|
67 |
|
68 self.__repl = parent |
|
69 |
|
70 self.__blockReadyRead = False |
|
71 |
|
72 self.__serial = MicroPythonSerialPort( |
|
73 timeout=Preferences.getMicroPython("SerialTimeout"), |
|
74 parent=self) |
|
75 self.__serial.readyRead.connect(self.__readSerial) |
|
76 |
|
77 @pyqtSlot() |
|
78 def __readSerial(self): |
|
79 """ |
|
80 Private slot to read all available serial data and emit it with the |
|
81 "dataReceived" signal for further processing. |
|
82 """ |
|
83 if not self.__blockReadyRead: |
|
84 data = bytes(self.__serial.readAll()) |
|
85 self.dataReceived.emit(data) |
|
86 |
|
87 @pyqtSlot() |
|
88 def connectToDevice(self, port): |
|
89 """ |
|
90 Public slot to start the manager. |
|
91 |
|
92 @param port name of the port to be used |
|
93 @type str |
|
94 @return flag indicating success |
|
95 @rtype bool |
|
96 """ |
|
97 return self.__serial.openSerialLink(port) |
|
98 |
|
99 @pyqtSlot() |
|
100 def disconnectFromDevice(self): |
|
101 """ |
|
102 Public slot to stop the thread. |
|
103 """ |
|
104 self.__serial.closeSerialLink() |
|
105 |
|
106 def isConnected(self): |
|
107 """ |
|
108 Public method to get the connection status. |
|
109 |
|
110 @return flag indicating the connection status |
|
111 @rtype bool |
|
112 """ |
|
113 return self.__serial.isConnected() |
|
114 |
|
115 @pyqtSlot() |
|
116 def handlePreferencesChanged(self): |
|
117 """ |
|
118 Public slot to handle a change of the preferences. |
|
119 """ |
|
120 self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout")) |
|
121 |
|
122 def write(self, data): |
|
123 """ |
|
124 Public method to write data to the connected device. |
|
125 |
|
126 @param data data to be written |
|
127 @type bytes or bytearray |
|
128 """ |
|
129 self.__serial.isConnected() and self.__serial.write(data) |
|
130 |
|
131 def __rawOn(self): |
|
132 """ |
|
133 Private method to switch the connected device to 'raw' mode. |
|
134 |
|
135 Note: switching to raw mode is done with synchronous writes. |
|
136 |
|
137 @return flag indicating success |
|
138 @@rtype bool |
|
139 """ |
|
140 if not self.__serial: |
|
141 return False |
|
142 |
|
143 rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>" |
|
144 |
|
145 self.__serial.write(b"\x02") # end raw mode if required |
|
146 written = self.__serial.waitForBytesWritten(500) |
|
147 # time out after 500ms if device is not responding |
|
148 if not written: |
|
149 return False |
|
150 for _i in range(3): |
|
151 # CTRL-C three times to break out of loops |
|
152 self.__serial.write(b"\r\x03") |
|
153 written = self.__serial.waitForBytesWritten(500) |
|
154 # time out after 500ms if device is not responding |
|
155 if not written: |
|
156 return False |
|
157 QThread.msleep(10) |
|
158 self.__serial.readAll() # read all data and discard it |
|
159 self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode |
|
160 self.__serial.readUntil(rawReplMessage) |
|
161 if self.__serial.hasTimedOut(): |
|
162 # it timed out; try it again and than fail |
|
163 self.__serial.write(b"\r\x01") # send CTRL-A again |
|
164 self.__serial.readUntil(rawReplMessage) |
|
165 if self.__serial.hasTimedOut(): |
|
166 return False |
|
167 |
|
168 QCoreApplication.processEvents( |
|
169 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents) |
|
170 self.__serial.readAll() # read all data and discard it |
|
171 return True |
|
172 |
|
173 def __rawOff(self): |
|
174 """ |
|
175 Private method to switch 'raw' mode off. |
|
176 """ |
|
177 if self.__serial: |
|
178 self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode |
|
179 self.__serial.readUntil(b">>> ") # read until Python prompt |
|
180 self.__serial.readAll() # read all data and discard it |
|
181 |
|
182 def execute(self, commands): |
|
183 """ |
|
184 Public method to send commands to the connected device and return the |
|
185 result. |
|
186 |
|
187 If no serial connection is available, empty results will be returned. |
|
188 |
|
189 @param commands list of commands to be executed |
|
190 @type str |
|
191 @return tuple containing stdout and stderr output of the device |
|
192 @rtype tuple of (bytes, bytes) |
|
193 """ |
|
194 if not self.__serial: |
|
195 return b"", b"" |
|
196 |
|
197 if not self.__serial.isConnected(): |
|
198 return b"", b"Device not connected or not switched on." |
|
199 |
|
200 result = bytearray() |
|
201 err = b"" |
|
202 |
|
203 # switch on raw mode |
|
204 self.__blockReadyRead = True |
|
205 ok = self.__rawOn() |
|
206 if not ok: |
|
207 self.__blockReadyRead = False |
|
208 return ( |
|
209 b"", |
|
210 b"Could not switch to raw mode. Is the device switched on?" |
|
211 ) |
|
212 |
|
213 # send commands |
|
214 QThread.msleep(10) |
|
215 for command in commands: |
|
216 if command: |
|
217 commandBytes = command.encode("utf-8") |
|
218 self.__serial.write(commandBytes + b"\x04") |
|
219 QCoreApplication.processEvents( |
|
220 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents) |
|
221 ok = self.__serial.readUntil(b"OK") |
|
222 if ok != b"OK": |
|
223 return ( |
|
224 b"", |
|
225 "Expected 'OK', got '{0}', followed by '{1}'".format( |
|
226 ok, self.__serial.readAll()).encode("utf-8") |
|
227 ) |
|
228 |
|
229 # read until prompt |
|
230 response = self.__serial.readUntil(b"\x04>") |
|
231 if self.__serial.hasTimedOut(): |
|
232 self.__blockReadyRead = False |
|
233 return b"", b"Timeout while processing commands." |
|
234 if b"\x04" in response[:-2]: |
|
235 # split stdout, stderr |
|
236 out, err = response[:-2].split(b"\x04") |
|
237 result += out |
|
238 else: |
|
239 err = b"invalid response received: " + response |
|
240 if err: |
|
241 self.__blockReadyRead = False |
|
242 return b"", err |
|
243 |
|
244 # switch off raw mode |
|
245 QThread.msleep(10) |
|
246 self.__rawOff() |
|
247 self.__blockReadyRead = False |
|
248 |
|
249 return bytes(result), err |
|
250 |
|
251 def executeAsync(self, commandsList): |
|
252 """ |
|
253 Public method to execute a series of commands over a period of time |
|
254 without returning any result (asynchronous execution). |
|
255 |
|
256 @param commandsList list of commands to be execute on the device |
|
257 @type list of bytes |
|
258 """ |
|
259 def remainingTask(commands): |
|
260 self.executeAsync(commands) |
|
261 |
|
262 if commandsList: |
|
263 command = commandsList[0] |
|
264 self.__serial.write(command) |
|
265 remainder = commandsList[1:] |
|
266 QTimer.singleShot(2, lambda: remainingTask(remainder)) |
|
267 else: |
|
268 self.executeAsyncFinished.emit() |
|
269 |
|
270 def __shortError(self, error): |
|
271 """ |
|
272 Private method to create a shortened error message. |
|
273 |
|
274 @param error verbose error message |
|
275 @type bytes |
|
276 @return shortened error message |
|
277 @rtype str |
|
278 """ |
|
279 if error: |
|
280 decodedError = error.decode("utf-8") |
|
281 try: |
|
282 return decodedError.split["\r\n"][-2] |
|
283 except Exception: |
|
284 return decodedError |
|
285 return self.tr("Detected an error without indications.") |
|
286 |
|
287 ################################################################## |
|
288 ## Methods below implement the file system commands |
|
289 ################################################################## |
|
290 |
|
291 def ls(self, dirname=""): |
|
292 """ |
|
293 Public method to get a directory listing of the connected device. |
|
294 |
|
295 @param dirname name of the directory to be listed |
|
296 @type str |
|
297 @return tuple containg the directory listing |
|
298 @rtype tuple of str |
|
299 @exception OSError raised to indicate an issue with the device |
|
300 """ |
|
301 commands = ( |
|
302 # BBC micro:bit does not support directories |
|
303 [ |
|
304 "import os as __os_", |
|
305 "print(__os_.listdir())", |
|
306 "del __os_", |
|
307 ] |
|
308 if self.__repl.isMicrobit() else |
|
309 [ |
|
310 "import os as __os_", |
|
311 "print(__os_.listdir('{0}'))".format(dirname), |
|
312 "del __os_", |
|
313 ] |
|
314 ) |
|
315 out, err = self.execute(commands) |
|
316 if err: |
|
317 raise OSError(self.__shortError(err)) |
|
318 return ast.literal_eval(out.decode("utf-8")) |
|
319 |
|
320 def lls(self, dirname="", fullstat=False, showHidden=False): |
|
321 """ |
|
322 Public method to get a long directory listing of the connected device |
|
323 including meta data. |
|
324 |
|
325 @param dirname name of the directory to be listed |
|
326 @type str |
|
327 @param fullstat flag indicating to return the full stat() tuple |
|
328 @type bool |
|
329 @param showHidden flag indicating to show hidden files as well |
|
330 @type bool |
|
331 @return list containing the directory listing with tuple entries of |
|
332 the name and and a tuple of mode, size and time (if fullstat is |
|
333 false) or the complete stat() tuple. 'None' is returned in case the |
|
334 directory doesn't exist. |
|
335 @rtype tuple of (str, tuple) |
|
336 @exception OSError raised to indicate an issue with the device |
|
337 """ |
|
338 commands = ( |
|
339 # BBC micro:bit does not support directories |
|
340 [ |
|
341 "import os as __os_", |
|
342 "\n".join([ |
|
343 "def is_visible(filename, showHidden):", |
|
344 " return showHidden or " |
|
345 "(filename[0] != '.' and filename[-1] != '~')", |
|
346 ]), |
|
347 "\n".join([ |
|
348 "def stat(filename):", |
|
349 " size = __os_.size(filename)", |
|
350 " return (0, 0, 0, 0, 0, 0, size, 0, 0, 0)" |
|
351 ]), |
|
352 "\n".join([ |
|
353 "def listdir_stat(showHidden):", |
|
354 " files = __os_.listdir()", |
|
355 " return list((f, stat(f)) for f in files if" |
|
356 " is_visible(f,showHidden))", |
|
357 ]), |
|
358 "print(listdir_stat({0}))".format(showHidden), |
|
359 "del __os_, stat, listdir_stat, is_visible", |
|
360 ] |
|
361 if self.__repl.isMicrobit() else |
|
362 [ |
|
363 "import os as __os_", |
|
364 "\n".join([ |
|
365 "def is_visible(filename, showHidden):", |
|
366 " return showHidden or " |
|
367 "(filename[0] != '.' and filename[-1] != '~')", |
|
368 ]), |
|
369 "\n".join([ |
|
370 "def stat(filename):", |
|
371 " try:", |
|
372 " rstat = __os_.lstat(filename)", |
|
373 " except:", |
|
374 " rstat = __os_.stat(filename)", |
|
375 " return tuple(rstat)", |
|
376 ]), |
|
377 "\n".join([ |
|
378 "def listdir_stat(dirname, showHidden):", |
|
379 " try:", |
|
380 " files = __os_.listdir(dirname)", |
|
381 " except OSError:", |
|
382 " return []", |
|
383 " if dirname in ('', '/'):", |
|
384 " return list((f, stat(f)) for f in files if" |
|
385 " is_visible(f, showHidden))", |
|
386 " return list((f, stat(dirname + '/' + f))" |
|
387 " for f in files if is_visible(f, showHidden))", |
|
388 ]), |
|
389 "print(listdir_stat('{0}', {1}))".format(dirname, showHidden), |
|
390 "del __os_, stat, listdir_stat, is_visible", |
|
391 ] |
|
392 ) |
|
393 out, err = self.execute(commands) |
|
394 if err: |
|
395 raise OSError(self.__shortError(err)) |
|
396 fileslist = ast.literal_eval(out.decode("utf-8")) |
|
397 if fileslist is None: |
|
398 return None |
|
399 else: |
|
400 if fullstat: |
|
401 return fileslist |
|
402 else: |
|
403 return [(f, (s[0], s[6], s[8])) for f, s in fileslist] |
|
404 |
|
405 def cd(self, dirname): |
|
406 """ |
|
407 Public method to change the current directory on the connected device. |
|
408 |
|
409 @param dirname directory to change to |
|
410 @type str |
|
411 @exception OSError raised to indicate an issue with the device |
|
412 """ |
|
413 if dirname: |
|
414 commands = [ |
|
415 "import os as __os_", |
|
416 "__os_.chdir('{0}')".format(dirname), |
|
417 "del __os_", |
|
418 ] |
|
419 out, err = self.execute(commands) |
|
420 if err: |
|
421 raise OSError(self.__shortError(err)) |
|
422 |
|
423 def pwd(self): |
|
424 """ |
|
425 Public method to get the current directory of the connected device. |
|
426 |
|
427 @return current directory |
|
428 @rtype str |
|
429 @exception OSError raised to indicate an issue with the device |
|
430 """ |
|
431 if self.__repl.isMicrobit(): |
|
432 # BBC micro:bit does not support directories |
|
433 return "" |
|
434 |
|
435 commands = [ |
|
436 "import os as __os_", |
|
437 "print(__os_.getcwd())", |
|
438 "del __os_", |
|
439 ] |
|
440 out, err = self.execute(commands) |
|
441 if err: |
|
442 raise OSError(self.__shortError(err)) |
|
443 return out.decode("utf-8").strip() |
|
444 |
|
445 def rm(self, filename): |
|
446 """ |
|
447 Public method to remove a file from the connected device. |
|
448 |
|
449 @param filename name of the file to be removed |
|
450 @type str |
|
451 @exception OSError raised to indicate an issue with the device |
|
452 """ |
|
453 if filename: |
|
454 commands = [ |
|
455 "import os as __os_", |
|
456 "__os_.remove('{0}')".format(filename), |
|
457 "del __os_", |
|
458 ] |
|
459 out, err = self.execute(commands) |
|
460 if err: |
|
461 raise OSError(self.__shortError(err)) |
|
462 |
|
463 def rmrf(self, name, recursive=False, force=False): |
|
464 """ |
|
465 Public method to remove a file or directory recursively. |
|
466 |
|
467 @param name of the file or directory to remove |
|
468 @type str |
|
469 @param recursive flag indicating a recursive deletion |
|
470 @type bool |
|
471 @param force flag indicating to ignore errors |
|
472 @type bool |
|
473 @return flag indicating success |
|
474 @rtype bool |
|
475 @exception OSError raised to indicate an issue with the device |
|
476 """ |
|
477 if name: |
|
478 commands = [ |
|
479 "import os as __os_", |
|
480 "\n".join([ |
|
481 "def remove_file(name, recursive=False, force=False):", |
|
482 " try:", |
|
483 " mode = __os_.stat(name)[0]", |
|
484 " if mode & 0x4000 != 0:", |
|
485 " if recursive:", |
|
486 " for file in __os_.listdir(name):", |
|
487 " success = remove_file(" |
|
488 "name + '/' + file, recursive, force)", |
|
489 " if not success and not force:", |
|
490 " return False", |
|
491 " __os_.rmdir(name)", |
|
492 " else:", |
|
493 " if not force:", |
|
494 " return False", |
|
495 " else:", |
|
496 " __os_.remove(name)", |
|
497 " except:", |
|
498 " if not force:", |
|
499 " return False", |
|
500 " return True", |
|
501 ]), |
|
502 "print(remove_file('{0}', {1}, {2}))".format(name, recursive, |
|
503 force), |
|
504 "del __os_, remove_file", |
|
505 ] |
|
506 out, err = self.execute(commands) |
|
507 if err: |
|
508 raise OSError(self.__shortError(err)) |
|
509 return ast.literal_eval(out.decode("utf-8")) |
|
510 |
|
511 return False |
|
512 |
|
513 def mkdir(self, dirname): |
|
514 """ |
|
515 Public method to create a new directory. |
|
516 |
|
517 @param dirname name of the directory to create |
|
518 @type str |
|
519 @exception OSError raised to indicate an issue with the device |
|
520 """ |
|
521 if dirname: |
|
522 commands = [ |
|
523 "import os as __os_", |
|
524 "__os_.mkdir('{0}')".format(dirname), |
|
525 "del __os_", |
|
526 ] |
|
527 out, err = self.execute(commands) |
|
528 if err: |
|
529 raise OSError(self.__shortError(err)) |
|
530 |
|
531 def rmdir(self, dirname): |
|
532 """ |
|
533 Public method to remove a directory. |
|
534 |
|
535 @param dirname name of the directory to be removed |
|
536 @type str |
|
537 @exception OSError raised to indicate an issue with the device |
|
538 """ |
|
539 if dirname: |
|
540 commands = [ |
|
541 "import os as __os_", |
|
542 "__os_.rmdir('{0}')".format(dirname), |
|
543 "del __os_", |
|
544 ] |
|
545 out, err = self.execute(commands) |
|
546 if err: |
|
547 raise OSError(self.__shortError(err)) |
|
548 |
|
549 def put(self, hostFileName, deviceFileName=None): |
|
550 """ |
|
551 Public method to copy a local file to the connected device. |
|
552 |
|
553 @param hostFileName name of the file to be copied |
|
554 @type str |
|
555 @param deviceFileName name of the file to copy to |
|
556 @type str |
|
557 @return flag indicating success |
|
558 @rtype bool |
|
559 @exception OSError raised to indicate an issue with the device |
|
560 """ |
|
561 if not os.path.isfile(hostFileName): |
|
562 raise OSError("No such file: {0}".format(hostFileName)) |
|
563 |
|
564 with open(hostFileName, "rb") as hostFile: |
|
565 content = hostFile.read() |
|
566 # convert eol '\r' |
|
567 content = content.replace(b"\r\n", b"\r") |
|
568 content = content.replace(b"\n", b"\r") |
|
569 |
|
570 if not deviceFileName: |
|
571 deviceFileName = os.path.basename(hostFileName) |
|
572 |
|
573 commands = [ |
|
574 "fd = open('{0}', 'wb')".format(deviceFileName), |
|
575 "f = fd.write", |
|
576 ] |
|
577 while content: |
|
578 chunk = content[:64] |
|
579 commands.append("f(" + repr(chunk) + ")") |
|
580 content = content[64:] |
|
581 commands.extend([ |
|
582 "fd.close()", |
|
583 "del f, fd", |
|
584 ]) |
|
585 |
|
586 out, err = self.execute(commands) |
|
587 if err: |
|
588 raise OSError(self.__shortError(err)) |
|
589 return True |
|
590 |
|
591 def get(self, deviceFileName, hostFileName=None): |
|
592 """ |
|
593 Public method to copy a file from the connected device. |
|
594 |
|
595 @param deviceFileName name of the file to copy |
|
596 @type str |
|
597 @param hostFileName name of the file to copy to |
|
598 @type str |
|
599 @return flag indicating success |
|
600 @rtype bool |
|
601 @exception OSError raised to indicate an issue with the device |
|
602 """ |
|
603 if not hostFileName: |
|
604 hostFileName = deviceFileName |
|
605 |
|
606 commands = [ |
|
607 "\n".join([ |
|
608 "def send_data():", |
|
609 " try:", |
|
610 " from microbit import uart as u", |
|
611 " except ImportError:", |
|
612 " try:", |
|
613 " from machine import UART", |
|
614 " u = UART(0, {0})".format(115200), |
|
615 " except Exception:", |
|
616 " try:", |
|
617 " from sys import stdout as u", |
|
618 " except Exception:", |
|
619 " raise Exception('Could not find UART module" |
|
620 " in device.')", |
|
621 " f = open('{0}', 'rb')".format(deviceFileName), |
|
622 " r = f.read", |
|
623 " result = True", |
|
624 " while result:", |
|
625 " result = r(32)", |
|
626 " if result:", |
|
627 " u.write(result)", |
|
628 " f.close()", |
|
629 ]), |
|
630 "send_data()", |
|
631 ] |
|
632 out, err = self.execute(commands) |
|
633 if err: |
|
634 raise OSError(self.__shortError(err)) |
|
635 |
|
636 # write the received bytes to the local file |
|
637 # convert eol to "\n" |
|
638 out = out.replace(b"\r\n", b"\n") |
|
639 out = out.replace(b"\r", b"\n") |
|
640 with open(hostFileName, "wb") as hostFile: |
|
641 hostFile.write(out) |
|
642 return True |
|
643 |
|
644 def fileSystemInfo(self): |
|
645 """ |
|
646 Public method to obtain information about the currently mounted file |
|
647 systems. |
|
648 |
|
649 @return tuple of tuples containing the file system name, the total |
|
650 size, the used size and the free size |
|
651 @rtype tuple of tuples of (str, int, int, int) |
|
652 @exception OSError raised to indicate an issue with the device |
|
653 """ |
|
654 commands = [ |
|
655 "import os as __os_", |
|
656 "\n".join([ |
|
657 "def fsinfo():", |
|
658 " infolist = []", |
|
659 " info = __os_.statvfs('/')", |
|
660 " if info[0] == 0:", |
|
661 # assume it is just mount points |
|
662 " fsnames = __os_.listdir('/')", |
|
663 " for fs in fsnames:", |
|
664 " fs = '/' + fs", |
|
665 " infolist.append((fs, __os_.statvfs(fs)))", |
|
666 " else:", |
|
667 " infolist.append(('/', info))", |
|
668 " return infolist", |
|
669 ]), |
|
670 "print(fsinfo())", |
|
671 "del __os_, fsinfo", |
|
672 ] |
|
673 out, err = self.execute(commands) |
|
674 if err: |
|
675 raise OSError(self.__shortError(err)) |
|
676 infolist = ast.literal_eval(out.decode("utf-8")) |
|
677 if infolist is None: |
|
678 return None |
|
679 else: |
|
680 filesystemInfos = [] |
|
681 for fs, info in infolist: |
|
682 totalSize = info[2] * info[1] |
|
683 freeSize = info[4] * info[1] |
|
684 usedSize = totalSize - freeSize |
|
685 filesystemInfos.append((fs, totalSize, usedSize, freeSize)) |
|
686 |
|
687 return tuple(filesystemInfos) |
|
688 |
|
689 ################################################################## |
|
690 ## non-filesystem related methods below |
|
691 ################################################################## |
|
692 |
|
693 def version(self): |
|
694 """ |
|
695 Public method to get the MicroPython version information of the |
|
696 connected device. |
|
697 |
|
698 @return dictionary containing the version information |
|
699 @rtype dict |
|
700 @exception OSError raised to indicate an issue with the device |
|
701 """ |
|
702 commands = [ |
|
703 "import os as __os_", |
|
704 "print(__os_.uname())", |
|
705 "del __os_", |
|
706 ] |
|
707 out, err = self.execute(commands) |
|
708 if err: |
|
709 raise OSError(self.__shortError(err)) |
|
710 |
|
711 rawOutput = out.decode("utf-8").strip() |
|
712 rawOutput = rawOutput[1:-1] |
|
713 items = rawOutput.split(",") |
|
714 result = {} |
|
715 for item in items: |
|
716 key, value = item.strip().split("=") |
|
717 result[key.strip()] = value.strip()[1:-1] |
|
718 return result |
|
719 |
|
720 def getImplementation(self): |
|
721 """ |
|
722 Public method to get some implementation information of the connected |
|
723 device. |
|
724 |
|
725 @return dictionary containing the implementation information |
|
726 @rtype dict |
|
727 @exception OSError raised to indicate an issue with the device |
|
728 """ |
|
729 commands = [ |
|
730 "import sys as __sys_", |
|
731 "res = {}", # __IGNORE_WARNING_M613__ |
|
732 "\n".join([ |
|
733 "try:", |
|
734 " res['name'] = __sys_.implementation.name", |
|
735 "except AttributeError:", |
|
736 " res['name'] = 'unknown'", |
|
737 ]), |
|
738 "\n".join([ |
|
739 "try:", |
|
740 " res['version'] = '.'.join((str(i) for i in" |
|
741 " __sys_.implementation.version))", |
|
742 "except AttributeError:", |
|
743 " res['version'] = 'unknown'", |
|
744 ]), |
|
745 "print(res)", |
|
746 "del res, __sys_", |
|
747 ] |
|
748 out, err = self.execute(commands) |
|
749 if err: |
|
750 raise OSError(self.__shortError(err)) |
|
751 return ast.literal_eval(out.decode("utf-8")) |
|
752 |
|
753 def syncTime(self, deviceType): |
|
754 """ |
|
755 Public method to set the time of the connected device to the local |
|
756 computer's time. |
|
757 |
|
758 @param deviceType type of board to sync time to |
|
759 @type str |
|
760 @exception OSError raised to indicate an issue with the device |
|
761 """ |
|
762 # rtc_time[0] - year 4 digit |
|
763 # rtc_time[1] - month 1..12 |
|
764 # rtc_time[2] - day 1..31 |
|
765 # rtc_time[3] - weekday 1..7 1=Monday |
|
766 # rtc_time[4] - hour 0..23 |
|
767 # rtc_time[5] - minute 0..59 |
|
768 # rtc_time[6] - second 0..59 |
|
769 # rtc_time[7] - yearday 1..366 |
|
770 # rtc_time[8] - isdst 0, 1, or -1 |
|
771 if deviceType == "pyboard": |
|
772 # Pyboard (pyboard doesn't have machine.RTC()). |
|
773 # The pyb.RTC.datetime function takes the arguments in the |
|
774 # order: (year, month, day, weekday, hour, minute, second, |
|
775 # subseconds) |
|
776 # http://docs.micropython.org/en/latest/library/pyb.RTC.html |
|
777 # #pyb.RTC.datetime |
|
778 set_time = "\n".join([ |
|
779 "def set_time(rtc_time):", |
|
780 " import pyb", |
|
781 " rtc = pyb.RTC()", |
|
782 " rtc.datetime(rtc_time[:7] + (0,))", |
|
783 ]) |
|
784 elif deviceType == "esp": |
|
785 # The machine.RTC documentation was incorrect and doesn't agree |
|
786 # with the code, so no link is presented here. The order of the |
|
787 # arguments is the same as the pyboard except for LoBo MPy. |
|
788 set_time = "\n".join([ |
|
789 "def set_time(rtc_time):", |
|
790 " import machine", |
|
791 " rtc = machine.RTC()", |
|
792 " try:", # ESP8266 may use rtc.datetime() |
|
793 " rtc.datetime(rtc_time[:7] + (0,))", |
|
794 " except Exception:", # ESP32 uses rtc.init() |
|
795 " import os", |
|
796 " if 'LoBo' in os.uname()[0]:", # LoBo MPy |
|
797 " clock_time = rtc_time[:3] +" |
|
798 " rtc_time[4:7] + (rtc_time[3], rtc_time[7])", |
|
799 " else:", |
|
800 " clock_time = rtc_time[:7] + (0,)", |
|
801 " rtc.init(clock_time)", |
|
802 ]) |
|
803 elif deviceType == "circuitpython": |
|
804 set_time = "\n".join([ |
|
805 "def set_time(rtc_time):", |
|
806 " import rtc", |
|
807 " import time", |
|
808 " clock = rtc.RTC()", |
|
809 " clock_time = rtc_time[:3] + rtc_time[4:7] + (rtc_time[3]," |
|
810 " rtc_time[7], rtc_time[8])", |
|
811 " clock.datetime = time.struct_time(clock_time)", |
|
812 ]) |
|
813 elif deviceType in ("bbc_microbit", "calliope"): |
|
814 # BBC micro:bit and Calliope mini don't support time commands |
|
815 return |
|
816 elif deviceType == "rp2040": |
|
817 # Raspberry Pi Pico (RP2040) - machine.RTC doesn't exist |
|
818 set_time = "\n".join([ |
|
819 "def set_time(rtc_time):", |
|
820 " setup_0 = rtc_time[0] << 12 | rtc_time[1] << 8 |" |
|
821 " rtc_time[2]", |
|
822 " setup_1 = (rtc_time[3] % 7) << 24 | rtc_time[4] << 16 |" |
|
823 " rtc_time[5] << 8 | rtc_time[6]", |
|
824 " machine.mem32[0x4005c004] = setup_0", |
|
825 " machine.mem32[0x4005c008] = setup_1", |
|
826 " machine.mem32[0x4005c00c] |= 0x10", |
|
827 ]) |
|
828 elif deviceType == "pycom": |
|
829 # PyCom's machine.RTC takes its arguments in a slightly |
|
830 # different order than the official machine.RTC. |
|
831 # (year, month, day, hour, minute, second[, microsecond[, |
|
832 # tzinfo]]) |
|
833 # https://docs.pycom.io/firmwareapi/pycom/machine/rtc/ |
|
834 # #rtc-init-datetime-none-source-rtc-internal-rc |
|
835 set_time = "\n".join([ |
|
836 "def set_time(rtc_time):", |
|
837 " import pycom", |
|
838 " rtc_time2 = rtc_time[:3] + rtc_time[4:7]", |
|
839 " import machine", |
|
840 " rtc = machine.RTC()", |
|
841 " rtc.init(rtc_time2)", |
|
842 ]) |
|
843 else: |
|
844 # no set_time() support for generic boards |
|
845 return |
|
846 |
|
847 now = time.localtime(time.time()) |
|
848 commands = [ |
|
849 set_time, |
|
850 "set_time({0})".format(( |
|
851 now.tm_year, now.tm_mon, now.tm_mday, now.tm_wday + 1, |
|
852 now.tm_hour, now.tm_min, now.tm_sec, now.tm_yday, now.tm_isdst |
|
853 )), |
|
854 "del set_time", |
|
855 ] |
|
856 out, err = self.execute(commands) |
|
857 if err: |
|
858 raise OSError(self.__shortError(err)) |
|
859 |
|
860 def getTime(self): |
|
861 """ |
|
862 Public method to get the current time of the device. |
|
863 |
|
864 @return time of the device |
|
865 @rtype str |
|
866 @exception OSError raised to indicate an issue with the device |
|
867 """ |
|
868 commands = [ |
|
869 "\n".join([ |
|
870 "try:", |
|
871 " import rtc as __rtc_", |
|
872 " print('{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'" |
|
873 ".format(*__rtc_.RTC().datetime[:6]))", |
|
874 " del __rtc_", |
|
875 "except:", |
|
876 " import time as __time_", |
|
877 " try:", |
|
878 " print(__time_.strftime('%Y-%m-%d %H:%M:%S'," |
|
879 # __IGNORE_WARNING_M601__ |
|
880 " __time_.localtime()))", |
|
881 " except AttributeError:", |
|
882 " tm = __time_.localtime()", |
|
883 " print('{0:04d}-{1:02d}-{2:02d}" |
|
884 " {3:02d}:{4:02d}:{5:02d}'" |
|
885 ".format(tm[0], tm[1], tm[2], tm[3], tm[4], tm[5]))", |
|
886 " del tm", |
|
887 " del __time_" |
|
888 ]), |
|
889 ] |
|
890 out, err = self.execute(commands) |
|
891 if err: |
|
892 if b"NotImplementedError" in err: |
|
893 return "<unsupported> <unsupported>" |
|
894 raise OSError(self.__shortError(err)) |
|
895 return out.decode("utf-8").strip() |