|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2019 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing some file system commands for MicroPython. |
|
8 """ |
|
9 |
|
10 from __future__ import unicode_literals |
|
11 |
|
12 import ast |
|
13 import time |
|
14 import os |
|
15 |
|
16 from PyQt5.QtCore import pyqtSlot, pyqtSignal, QObject, QThread, QTimer |
|
17 |
|
18 from .MicroPythonSerialPort import MicroPythonSerialPort |
|
19 |
|
20 import Preferences |
|
21 |
|
22 |
|
23 class MicroPythonCommandsInterface(QObject): |
|
24 """ |
|
25 Class implementing some file system commands for MicroPython. |
|
26 |
|
27 Commands are provided to perform operations on the file system of a |
|
28 connected MicroPython device. Supported commands are: |
|
29 <ul> |
|
30 <li>ls: directory listing</li> |
|
31 <li>lls: directory listing with meta data</li> |
|
32 <li>cd: change directory</li> |
|
33 <li>pwd: get the current directory</li> |
|
34 <li>put: copy a file to the connected device</li> |
|
35 <li>get: get a file from the connected device</li> |
|
36 <li>rm: remove a file from the connected device</li> |
|
37 <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash) |
|
38 <li>mkdir: create a new directory</li> |
|
39 <li>rmdir: remove an empty directory</li> |
|
40 </ul> |
|
41 |
|
42 There are additional commands related to time and version. |
|
43 <ul> |
|
44 <li>version: get version info about MicroPython</li> |
|
45 <li>getImplementation: get some implementation information</li> |
|
46 <li>syncTime: synchronize the time of the connected device</li> |
|
47 <li>showTime: show the current time of the connected device</li> |
|
48 </ul> |
|
49 |
|
50 @signal executeAsyncFinished() emitted to indicate the end of an |
|
51 asynchronously executed list of commands (e.g. a script) |
|
52 @signal dataReceived(data) emitted to send data received via the serial |
|
53 connection for further processing |
|
54 """ |
|
55 executeAsyncFinished = pyqtSignal() |
|
56 dataReceived = pyqtSignal(bytes) |
|
57 |
|
58 def __init__(self, parent=None): |
|
59 """ |
|
60 Constructor |
|
61 |
|
62 @param parent reference to the parent object |
|
63 @type QObject |
|
64 """ |
|
65 super(MicroPythonCommandsInterface, self).__init__(parent) |
|
66 |
|
67 self.__blockReadyRead = False |
|
68 |
|
69 self.__serial = MicroPythonSerialPort( |
|
70 timeout=Preferences.getMicroPython("SerialTimeout"), |
|
71 parent=self) |
|
72 self.__serial.readyRead.connect(self.__readSerial) |
|
73 |
|
74 @pyqtSlot() |
|
75 def __readSerial(self): |
|
76 """ |
|
77 Private slot to read all available serial data and emit it with the |
|
78 "dataReceived" signal for further processing. |
|
79 """ |
|
80 if not self.__blockReadyRead: |
|
81 data = bytes(self.__serial.readAll()) |
|
82 self.dataReceived.emit(data) |
|
83 |
|
84 @pyqtSlot() |
|
85 def connectToDevice(self, port): |
|
86 """ |
|
87 Public slot to start the manager. |
|
88 |
|
89 @param port name of the port to be used |
|
90 @type str |
|
91 @return flag indicating success |
|
92 @rtype bool |
|
93 """ |
|
94 return self.__serial.openSerialLink(port) |
|
95 |
|
96 @pyqtSlot() |
|
97 def disconnectFromDevice(self): |
|
98 """ |
|
99 Public slot to stop the thread. |
|
100 """ |
|
101 self.__serial.closeSerialLink() |
|
102 |
|
103 def isConnected(self): |
|
104 """ |
|
105 Public method to get the connection status. |
|
106 |
|
107 @return flag indicating the connection status |
|
108 @rtype bool |
|
109 """ |
|
110 return self.__serial.isConnected() |
|
111 |
|
112 @pyqtSlot() |
|
113 def handlePreferencesChanged(self): |
|
114 """ |
|
115 Public slot to handle a change of the preferences. |
|
116 """ |
|
117 self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout")) |
|
118 |
|
119 def write(self, data): |
|
120 """ |
|
121 Public method to write data to the connected device. |
|
122 |
|
123 @param data data to be written |
|
124 @type bytes or bytearray |
|
125 """ |
|
126 self.__serial.isConnected() and self.__serial.write(data) |
|
127 |
|
128 def __rawOn(self): |
|
129 """ |
|
130 Private method to switch the connected device to 'raw' mode. |
|
131 |
|
132 Note: switching to raw mode is done with synchronous writes. |
|
133 |
|
134 @return flag indicating success |
|
135 @@rtype bool |
|
136 """ |
|
137 if not self.__serial: |
|
138 return False |
|
139 |
|
140 rawReplMessage = b"raw REPL; CTRL-B to exit\r\n" |
|
141 softRebootMessage = b"soft reboot\r\n" |
|
142 |
|
143 self.__serial.write(b"\x02") # end raw mode if required |
|
144 self.__serial.waitForBytesWritten() |
|
145 for _i in range(3): |
|
146 # CTRL-C three times to break out of loops |
|
147 self.__serial.write(b"\r\x03") |
|
148 self.__serial.waitForBytesWritten() |
|
149 QThread.msleep(10) |
|
150 self.__serial.readAll() # read all data and discard it |
|
151 self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode |
|
152 self.__serial.readUntil(rawReplMessage) |
|
153 if self.__serial.hasTimedOut(): |
|
154 return False |
|
155 self.__serial.write(b"\x04") # send CTRL-D to soft reset |
|
156 self.__serial.readUntil(softRebootMessage) |
|
157 if self.__serial.hasTimedOut(): |
|
158 return False |
|
159 |
|
160 # some MicroPython devices seem to need to be convinced in some |
|
161 # special way |
|
162 data = self.__serial.readUntil(rawReplMessage) |
|
163 if self.__serial.hasTimedOut(): |
|
164 return False |
|
165 if not data.endswith(rawReplMessage): |
|
166 self.__serial.write(b"\r\x01") # send CTRL-A again |
|
167 self.__serial.readUntil(rawReplMessage) |
|
168 if self.__serial.hasTimedOut(): |
|
169 return False |
|
170 self.__serial.readAll() # read all data and discard it |
|
171 return True |
|
172 |
|
173 def __rawOff(self): |
|
174 """ |
|
175 Private method to switch 'raw' mode off. |
|
176 """ |
|
177 if self.__serial: |
|
178 self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode |
|
179 |
|
180 def execute(self, commands): |
|
181 """ |
|
182 Public method to send commands to the connected device and return the |
|
183 result. |
|
184 |
|
185 If no serial connection is available, empty results will be returned. |
|
186 |
|
187 @param commands list of commands to be executed |
|
188 @type str |
|
189 @return tuple containing stdout and stderr output of the device |
|
190 @rtype tuple of (bytes, bytes) |
|
191 """ |
|
192 if not self.__serial: |
|
193 return b"", b"" |
|
194 |
|
195 if not self.__serial.isConnected(): |
|
196 return b"", b"Device not connected or not switched on." |
|
197 |
|
198 result = bytearray() |
|
199 err = b"" |
|
200 |
|
201 self.__blockReadyRead = True |
|
202 ok = self.__rawOn() |
|
203 if not ok: |
|
204 self.__blockReadyRead = False |
|
205 return ( |
|
206 b"", |
|
207 b"Could not switch to raw mode. Is the device switched on?" |
|
208 ) |
|
209 |
|
210 QThread.msleep(10) |
|
211 for command in commands: |
|
212 if command: |
|
213 commandBytes = command.encode("utf-8") |
|
214 self.__serial.write(commandBytes + b"\x04") |
|
215 # read until prompt |
|
216 response = self.__serial.readUntil(b"\x04>") |
|
217 if self.__serial.hasTimedOut(): |
|
218 self.__blockReadyRead = False |
|
219 return b"", b"Timeout while processing commands." |
|
220 if b"\x04" in response[2:-2]: |
|
221 # split stdout, stderr |
|
222 out, err = response[2:-2].split(b"\x04") |
|
223 result += out |
|
224 else: |
|
225 err = b"invalid response received: " + response |
|
226 if err: |
|
227 self.__blockReadyRead = False |
|
228 return b"", err |
|
229 QThread.msleep(10) |
|
230 self.__rawOff() |
|
231 self.__blockReadyRead = False |
|
232 |
|
233 return bytes(result), err |
|
234 |
|
235 def executeAsync(self, commandsList): |
|
236 """ |
|
237 Public method to execute a series of commands over a period of time |
|
238 without returning any result (asynchronous execution). |
|
239 |
|
240 @param commandsList list of commands to be execute on the device |
|
241 @type list of bytes |
|
242 """ |
|
243 def remainingTask(commands): |
|
244 self.executeAsync(commands) |
|
245 |
|
246 if commandsList: |
|
247 command = commandsList[0] |
|
248 self.__serial.write(command) |
|
249 remainder = commandsList[1:] |
|
250 QTimer.singleShot(2, lambda: remainingTask(remainder)) |
|
251 else: |
|
252 self.executeAsyncFinished.emit() |
|
253 |
|
254 def __shortError(self, error): |
|
255 """ |
|
256 Private method to create a shortened error message. |
|
257 |
|
258 @param error verbose error message |
|
259 @type bytes |
|
260 @return shortened error message |
|
261 @rtype str |
|
262 """ |
|
263 if error: |
|
264 decodedError = error.decode("utf-8") |
|
265 try: |
|
266 return decodedError.split["\r\n"][-2] |
|
267 except Exception: |
|
268 return decodedError |
|
269 return self.tr("Detected an error without indications.") |
|
270 |
|
271 ################################################################## |
|
272 ## Methods below implement the file system commands |
|
273 ################################################################## |
|
274 |
|
275 def ls(self, dirname=""): |
|
276 """ |
|
277 Public method to get a directory listing of the connected device. |
|
278 |
|
279 @param dirname name of the directory to be listed |
|
280 @type str |
|
281 @return tuple containg the directory listing |
|
282 @rtype tuple of str |
|
283 @exception IOError raised to indicate an issue with the device |
|
284 """ |
|
285 commands = [ |
|
286 "import os", |
|
287 "print(os.listdir('{0}'))".format(dirname), |
|
288 ] |
|
289 out, err = self.execute(commands) |
|
290 if err: |
|
291 raise IOError(self.__shortError(err)) |
|
292 return ast.literal_eval(out.decode("utf-8")) |
|
293 |
|
294 def lls(self, dirname="", fullstat=False): |
|
295 """ |
|
296 Public method to get a long directory listing of the connected device |
|
297 including meta data. |
|
298 |
|
299 @param dirname name of the directory to be listed |
|
300 @type str |
|
301 @param fullstat flag indicating to return the full stat() tuple |
|
302 @type bool |
|
303 @return list containing the directory listing with tuple entries of |
|
304 the name and and a tuple of mode, size and time (if fullstat is |
|
305 false) or the complete stat() tuple. 'None' is returned in case the |
|
306 directory doesn't exist. |
|
307 @rtype tuple of (str, tuple) |
|
308 @exception IOError raised to indicate an issue with the device |
|
309 """ |
|
310 commands = [ |
|
311 "import os", |
|
312 "\n".join([ |
|
313 "def stat(filename):", |
|
314 " try:", |
|
315 " rstat = os.lstat(filename)", |
|
316 " except:", |
|
317 " rstat = os.stat(filename)", |
|
318 " return tuple(rstat)", |
|
319 ]), |
|
320 "\n".join([ |
|
321 "def listdir_stat(dirname):", |
|
322 " try:", |
|
323 " files = os.listdir(dirname)", |
|
324 " except OSError:", |
|
325 " return None", |
|
326 " if dirname in ('', '/'):", |
|
327 " return list((f, stat(f)) for f in files)", |
|
328 " return list((f, stat(dirname + '/' + f)) for f in files)", |
|
329 ]), |
|
330 "print(listdir_stat('{0}'))".format(dirname), |
|
331 ] |
|
332 out, err = self.execute(commands) |
|
333 if err: |
|
334 raise IOError(self.__shortError(err)) |
|
335 fileslist = ast.literal_eval(out.decode("utf-8")) |
|
336 if fileslist is None: |
|
337 return None |
|
338 else: |
|
339 if fullstat: |
|
340 return fileslist |
|
341 else: |
|
342 return [(f, (s[0], s[6], s[8])) for f, s in fileslist] |
|
343 |
|
344 def cd(self, dirname): |
|
345 """ |
|
346 Public method to change the current directory on the connected device. |
|
347 |
|
348 @param dirname directory to change to |
|
349 @type str |
|
350 @exception IOError raised to indicate an issue with the device |
|
351 """ |
|
352 assert dirname |
|
353 |
|
354 commands = [ |
|
355 "import os", |
|
356 "os.chdir('{0}')".format(dirname), |
|
357 ] |
|
358 out, err = self.execute(commands) |
|
359 if err: |
|
360 raise IOError(self.__shortError(err)) |
|
361 |
|
362 def pwd(self): |
|
363 """ |
|
364 Public method to get the current directory of the connected device. |
|
365 |
|
366 @return current directory |
|
367 @rtype str |
|
368 @exception IOError raised to indicate an issue with the device |
|
369 """ |
|
370 commands = [ |
|
371 "import os", |
|
372 "print(os.getcwd())", |
|
373 ] |
|
374 out, err = self.execute(commands) |
|
375 if err: |
|
376 raise IOError(self.__shortError(err)) |
|
377 return out.decode("utf-8").strip() |
|
378 |
|
379 def rm(self, filename): |
|
380 """ |
|
381 Public method to remove a file from the connected device. |
|
382 |
|
383 @param filename name of the file to be removed |
|
384 @type str |
|
385 @exception IOError raised to indicate an issue with the device |
|
386 """ |
|
387 assert filename |
|
388 |
|
389 commands = [ |
|
390 "import os", |
|
391 "os.remove('{0}')".format(filename), |
|
392 ] |
|
393 out, err = self.execute(commands) |
|
394 if err: |
|
395 raise IOError(self.__shortError(err)) |
|
396 |
|
397 def rmrf(self, name, recursive=False, force=False): |
|
398 """ |
|
399 Public method to remove a file or directory recursively. |
|
400 |
|
401 @param name of the file or directory to remove |
|
402 @type str |
|
403 @param recursive flag indicating a recursive deletion |
|
404 @type bool |
|
405 @param force flag indicating to ignore errors |
|
406 @type bool |
|
407 @return flag indicating success |
|
408 @rtype bool |
|
409 @exception IOError raised to indicate an issue with the device |
|
410 """ |
|
411 assert name |
|
412 |
|
413 commands = [ |
|
414 "import os", |
|
415 "\n".join([ |
|
416 "def remove_file(name, recursive=False, force=False):", |
|
417 " try:", |
|
418 " mode = os.stat(name)[0]", |
|
419 " if mode & 0x4000 != 0:", |
|
420 " if recursive:", |
|
421 " for file in os.listdir(name):", |
|
422 " success = remove_file(name + '/' + file," |
|
423 " recursive, force)", |
|
424 " if not success and not force:", |
|
425 " return False", |
|
426 " os.rmdir(name)", |
|
427 " else:", |
|
428 " if not force:", |
|
429 " return False", |
|
430 " else:", |
|
431 " os.remove(name)", |
|
432 " except:", |
|
433 " if not force:", |
|
434 " return False", |
|
435 " return True", |
|
436 ]), |
|
437 "print(remove_file('{0}', {1}, {2}))".format(name, recursive, |
|
438 force), |
|
439 ] |
|
440 out, err = self.execute(commands) |
|
441 if err: |
|
442 raise IOError(self.__shortError(err)) |
|
443 return ast.literal_eval(out.decode("utf-8")) |
|
444 |
|
445 def mkdir(self, dirname): |
|
446 """ |
|
447 Public method to create a new directory. |
|
448 |
|
449 @param dirname name of the directory to create |
|
450 @type str |
|
451 @exception IOError raised to indicate an issue with the device |
|
452 """ |
|
453 assert dirname |
|
454 |
|
455 commands = [ |
|
456 "import os", |
|
457 "os.mkdir('{0}')".format(dirname), |
|
458 ] |
|
459 out, err = self.execute(commands) |
|
460 if err: |
|
461 raise IOError(self.__shortError(err)) |
|
462 |
|
463 def rmdir(self, dirname): |
|
464 """ |
|
465 Public method to remove a directory. |
|
466 |
|
467 @param dirname name of the directory to be removed |
|
468 @type str |
|
469 @exception IOError raised to indicate an issue with the device |
|
470 """ |
|
471 assert dirname |
|
472 |
|
473 commands = [ |
|
474 "import os", |
|
475 "os.rmdir('{0}')".format(dirname), |
|
476 ] |
|
477 out, err = self.execute(commands) |
|
478 if err: |
|
479 raise IOError(self.__shortError(err)) |
|
480 |
|
481 def put(self, hostFileName, deviceFileName=None): |
|
482 """ |
|
483 Public method to copy a local file to the connected device. |
|
484 |
|
485 @param hostFileName name of the file to be copied |
|
486 @type str |
|
487 @param deviceFileName name of the file to copy to |
|
488 @type str |
|
489 @return flag indicating success |
|
490 @rtype bool |
|
491 @exception IOError raised to indicate an issue with the device |
|
492 """ |
|
493 if not os.path.isfile(hostFileName): |
|
494 raise IOError("No such file: {0}".format(hostFileName)) |
|
495 |
|
496 with open(hostFileName, "rb") as hostFile: |
|
497 content = hostFile.read() |
|
498 # convert eol '\r' |
|
499 content = content.replace(b"\r\n", b"\r") |
|
500 content = content.replace(b"\n", b"\r") |
|
501 |
|
502 if not deviceFileName: |
|
503 deviceFileName = os.path.basename(hostFileName) |
|
504 |
|
505 commands = [ |
|
506 "fd = open('{0}', 'wb')".format(deviceFileName), |
|
507 "f = fd.write", |
|
508 ] |
|
509 while content: |
|
510 chunk = content[:64] |
|
511 commands.append("f(" + repr(chunk) + ")") |
|
512 content = content[64:] |
|
513 commands.append("fd.close()") |
|
514 |
|
515 out, err = self.execute(commands) |
|
516 if err: |
|
517 raise IOError(self.__shortError(err)) |
|
518 return True |
|
519 |
|
520 def get(self, deviceFileName, hostFileName=None): |
|
521 """ |
|
522 Public method to copy a file from the connected device. |
|
523 |
|
524 @param deviceFileName name of the file to copy |
|
525 @type str |
|
526 @param hostFileName name of the file to copy to |
|
527 @type str |
|
528 @return flag indicating success |
|
529 @rtype bool |
|
530 @exception IOError raised to indicate an issue with the device |
|
531 """ |
|
532 if not hostFileName: |
|
533 hostFileName = deviceFileName |
|
534 |
|
535 commands = [ |
|
536 "\n".join([ |
|
537 "try:", |
|
538 " from microbit import uart as u", |
|
539 "except ImportError:", |
|
540 " try:", |
|
541 " from machine import UART", |
|
542 " u = UART(0, {0})".format(115200), |
|
543 " except Exception:", |
|
544 " try:", |
|
545 " from sys import stdout as u", |
|
546 " except Exception:", |
|
547 " raise Exception('Could not find UART module in" |
|
548 " device.')", |
|
549 ]), |
|
550 "f = open('{0}', 'rb')".format(deviceFileName), |
|
551 "r = f.read", |
|
552 "result = True", |
|
553 "\n".join([ |
|
554 "while result:", |
|
555 " result = r(32)", |
|
556 " if result:", |
|
557 " u.write(result)", |
|
558 ]), |
|
559 "f.close()", |
|
560 ] |
|
561 out, err = self.execute(commands) |
|
562 if err: |
|
563 raise IOError(self.__shortError(err)) |
|
564 |
|
565 # write the received bytes to the local file |
|
566 # convert eol to "\n" |
|
567 out = out.replace(b"\r\n", b"\n") |
|
568 out = out.replace(b"\r", b"\n") |
|
569 with open(hostFileName, "wb") as hostFile: |
|
570 hostFile.write(out) |
|
571 return True |
|
572 |
|
573 def fileSystemInfo(self): |
|
574 """ |
|
575 Public method to obtain information about the currently mounted file |
|
576 systems. |
|
577 |
|
578 @return tuple of tuples containing the file system name, the total |
|
579 size, the used size and the free size |
|
580 @rtype tuple of tuples of (str, int, int, int) |
|
581 @exception IOError raised to indicate an issue with the device |
|
582 """ |
|
583 commands = [ |
|
584 "import os", |
|
585 "\n".join([ |
|
586 "def fsinfo():", |
|
587 " infolist = []", |
|
588 " fsnames = os.listdir('/')", |
|
589 " for fs in fsnames:", |
|
590 " fs = '/' + fs", |
|
591 " infolist.append((fs, os.statvfs(fs)))", |
|
592 " return infolist", |
|
593 ]), |
|
594 "print(fsinfo())", |
|
595 ] |
|
596 out, err = self.execute(commands) |
|
597 if err: |
|
598 raise IOError(self.__shortError(err)) |
|
599 infolist = ast.literal_eval(out.decode("utf-8")) |
|
600 if infolist is None: |
|
601 return None |
|
602 else: |
|
603 filesystemInfos = [] |
|
604 for fs, info in infolist: |
|
605 totalSize = info[2] * info[1] |
|
606 freeSize = info[4] * info[1] |
|
607 usedSize = totalSize - freeSize |
|
608 filesystemInfos.append((fs, totalSize, usedSize, freeSize)) |
|
609 |
|
610 return tuple(filesystemInfos) |
|
611 |
|
612 ################################################################## |
|
613 ## non-filesystem related methods below |
|
614 ################################################################## |
|
615 |
|
616 def version(self): |
|
617 """ |
|
618 Public method to get the MicroPython version information of the |
|
619 connected device. |
|
620 |
|
621 @return dictionary containing the version information |
|
622 @rtype dict |
|
623 @exception IOError raised to indicate an issue with the device |
|
624 """ |
|
625 commands = [ |
|
626 "import os", |
|
627 "print(os.uname())", |
|
628 ] |
|
629 out, err = self.execute(commands) |
|
630 if err: |
|
631 raise IOError(self.__shortError(err)) |
|
632 |
|
633 rawOutput = out.decode("utf-8").strip() |
|
634 rawOutput = rawOutput[1:-1] |
|
635 items = rawOutput.split(",") |
|
636 result = {} |
|
637 for item in items: |
|
638 key, value = item.strip().split("=") |
|
639 result[key.strip()] = value.strip()[1:-1] |
|
640 return result |
|
641 |
|
642 def getImplementation(self): |
|
643 """ |
|
644 Public method to get some implementation information of the connected |
|
645 device. |
|
646 |
|
647 @return dictionary containing the implementation information |
|
648 @rtype dict |
|
649 @exception IOError raised to indicate an issue with the device |
|
650 """ |
|
651 commands = [ |
|
652 "import sys", |
|
653 "res = {}", # __IGNORE_WARNING_M613__ |
|
654 "\n".join([ |
|
655 "try:", |
|
656 " res['name'] = sys.implementation.name", |
|
657 "except AttributeError:", |
|
658 " res['name'] = 'unknown'", |
|
659 ]), |
|
660 "\n".join([ |
|
661 "try:", |
|
662 " res['version'] = '.'.join((str(i) for i in" |
|
663 " sys.implementation.version))", |
|
664 "except AttributeError:", |
|
665 " res['version'] = 'unknown'", |
|
666 ]), |
|
667 "print(res)", |
|
668 ] |
|
669 out, err = self.execute(commands) |
|
670 if err: |
|
671 raise IOError(self.__shortError(err)) |
|
672 return ast.literal_eval(out.decode("utf-8")) |
|
673 |
|
674 def syncTime(self): |
|
675 """ |
|
676 Public method to set the time of the connected device to the local |
|
677 computer's time. |
|
678 |
|
679 @exception IOError raised to indicate an issue with the device |
|
680 """ |
|
681 now = time.localtime(time.time()) |
|
682 commands = [ |
|
683 "\n".join([ |
|
684 "def set_time(rtc_time):", |
|
685 " rtc = None", |
|
686 " try:", # Pyboard (it doesn't have machine.RTC()) |
|
687 " import pyb", |
|
688 " rtc = pyb.RTC()", |
|
689 " clock_time = rtc_time[:6] + (rtc_time[6] + 1, 0)", |
|
690 " rtc.datetime(clock_time)", |
|
691 " except:", |
|
692 " try:", |
|
693 " import machine", |
|
694 " rtc = machine.RTC()", |
|
695 " try:", # ESP8266 may use rtc.datetime() |
|
696 " clock_time = rtc_time[:6] +" |
|
697 " (rtc_time[6] + 1, 0)", |
|
698 " rtc.datetime(clock_time)", |
|
699 " except:", # ESP32 uses rtc.init() |
|
700 " rtc.init(rtc_time[:6])", |
|
701 " except:", |
|
702 " try:", |
|
703 " import rtc, time", |
|
704 " clock=rtc.RTC()", |
|
705 " clock.datetime = time.struct_time(rtc_time +" |
|
706 " (-1, -1))", |
|
707 " except:", |
|
708 " pass", |
|
709 ]), |
|
710 "set_time({0})".format((now.tm_year, now.tm_mon, now.tm_mday, |
|
711 now.tm_hour, now.tm_min, now.tm_sec, |
|
712 now.tm_wday)) |
|
713 ] |
|
714 out, err = self.execute(commands) |
|
715 if err: |
|
716 raise IOError(self.__shortError(err)) |
|
717 |
|
718 def showTime(self): |
|
719 """ |
|
720 Public method to get the current time of the device. |
|
721 |
|
722 @return time of the device |
|
723 @rtype str |
|
724 @exception IOError raised to indicate an issue with the device |
|
725 """ |
|
726 commands = [ |
|
727 "import time", |
|
728 "print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))", |
|
729 # __IGNORE_WARNING_M601__ |
|
730 ] |
|
731 out, err = self.execute(commands) |
|
732 if err: |
|
733 raise IOError(self.__shortError(err)) |
|
734 return out.decode("utf-8").strip() |