eric7/MicroPython/MicroPythonCommandsInterface.py

branch
eric7
changeset 8312
800c432b34c8
parent 8257
28146736bbfc
child 8318
962bce857696
equal deleted inserted replaced
8311:4e8b98454baa 8312:800c432b34c8
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2019 - 2021 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing some file system commands for MicroPython.
8 """
9
10 import ast
11 import time
12 import os
13
14 from PyQt5.QtCore import (
15 pyqtSlot, pyqtSignal, QObject, QThread, QTimer, QCoreApplication,
16 QEventLoop
17 )
18
19 from .MicroPythonSerialPort import MicroPythonSerialPort
20
21 import Preferences
22
23
24 class MicroPythonCommandsInterface(QObject):
25 """
26 Class implementing some file system commands for MicroPython.
27
28 Commands are provided to perform operations on the file system of a
29 connected MicroPython device. Supported commands are:
30 <ul>
31 <li>ls: directory listing</li>
32 <li>lls: directory listing with meta data</li>
33 <li>cd: change directory</li>
34 <li>pwd: get the current directory</li>
35 <li>put: copy a file to the connected device</li>
36 <li>get: get a file from the connected device</li>
37 <li>rm: remove a file from the connected device</li>
38 <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash)
39 <li>mkdir: create a new directory</li>
40 <li>rmdir: remove an empty directory</li>
41 </ul>
42
43 There are additional commands related to time and version.
44 <ul>
45 <li>version: get version info about MicroPython</li>
46 <li>getImplementation: get some implementation information</li>
47 <li>syncTime: synchronize the time of the connected device</li>
48 <li>showTime: show the current time of the connected device</li>
49 </ul>
50
51 @signal executeAsyncFinished() emitted to indicate the end of an
52 asynchronously executed list of commands (e.g. a script)
53 @signal dataReceived(data) emitted to send data received via the serial
54 connection for further processing
55 """
56 executeAsyncFinished = pyqtSignal()
57 dataReceived = pyqtSignal(bytes)
58
59 def __init__(self, parent=None):
60 """
61 Constructor
62
63 @param parent reference to the parent object
64 @type QObject
65 """
66 super().__init__(parent)
67
68 self.__repl = parent
69
70 self.__blockReadyRead = False
71
72 self.__serial = MicroPythonSerialPort(
73 timeout=Preferences.getMicroPython("SerialTimeout"),
74 parent=self)
75 self.__serial.readyRead.connect(self.__readSerial)
76
77 @pyqtSlot()
78 def __readSerial(self):
79 """
80 Private slot to read all available serial data and emit it with the
81 "dataReceived" signal for further processing.
82 """
83 if not self.__blockReadyRead:
84 data = bytes(self.__serial.readAll())
85 self.dataReceived.emit(data)
86
87 @pyqtSlot()
88 def connectToDevice(self, port):
89 """
90 Public slot to start the manager.
91
92 @param port name of the port to be used
93 @type str
94 @return flag indicating success
95 @rtype bool
96 """
97 return self.__serial.openSerialLink(port)
98
99 @pyqtSlot()
100 def disconnectFromDevice(self):
101 """
102 Public slot to stop the thread.
103 """
104 self.__serial.closeSerialLink()
105
106 def isConnected(self):
107 """
108 Public method to get the connection status.
109
110 @return flag indicating the connection status
111 @rtype bool
112 """
113 return self.__serial.isConnected()
114
115 @pyqtSlot()
116 def handlePreferencesChanged(self):
117 """
118 Public slot to handle a change of the preferences.
119 """
120 self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout"))
121
122 def write(self, data):
123 """
124 Public method to write data to the connected device.
125
126 @param data data to be written
127 @type bytes or bytearray
128 """
129 self.__serial.isConnected() and self.__serial.write(data)
130
131 def __rawOn(self):
132 """
133 Private method to switch the connected device to 'raw' mode.
134
135 Note: switching to raw mode is done with synchronous writes.
136
137 @return flag indicating success
138 @@rtype bool
139 """
140 if not self.__serial:
141 return False
142
143 rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>"
144
145 self.__serial.write(b"\x02") # end raw mode if required
146 written = self.__serial.waitForBytesWritten(500)
147 # time out after 500ms if device is not responding
148 if not written:
149 return False
150 for _i in range(3):
151 # CTRL-C three times to break out of loops
152 self.__serial.write(b"\r\x03")
153 written = self.__serial.waitForBytesWritten(500)
154 # time out after 500ms if device is not responding
155 if not written:
156 return False
157 QThread.msleep(10)
158 self.__serial.readAll() # read all data and discard it
159 self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode
160 self.__serial.readUntil(rawReplMessage)
161 if self.__serial.hasTimedOut():
162 # it timed out; try it again and than fail
163 self.__serial.write(b"\r\x01") # send CTRL-A again
164 self.__serial.readUntil(rawReplMessage)
165 if self.__serial.hasTimedOut():
166 return False
167
168 QCoreApplication.processEvents(
169 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents)
170 self.__serial.readAll() # read all data and discard it
171 return True
172
173 def __rawOff(self):
174 """
175 Private method to switch 'raw' mode off.
176 """
177 if self.__serial:
178 self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode
179 self.__serial.readUntil(b">>> ") # read until Python prompt
180 self.__serial.readAll() # read all data and discard it
181
182 def execute(self, commands):
183 """
184 Public method to send commands to the connected device and return the
185 result.
186
187 If no serial connection is available, empty results will be returned.
188
189 @param commands list of commands to be executed
190 @type str
191 @return tuple containing stdout and stderr output of the device
192 @rtype tuple of (bytes, bytes)
193 """
194 if not self.__serial:
195 return b"", b""
196
197 if not self.__serial.isConnected():
198 return b"", b"Device not connected or not switched on."
199
200 result = bytearray()
201 err = b""
202
203 # switch on raw mode
204 self.__blockReadyRead = True
205 ok = self.__rawOn()
206 if not ok:
207 self.__blockReadyRead = False
208 return (
209 b"",
210 b"Could not switch to raw mode. Is the device switched on?"
211 )
212
213 # send commands
214 QThread.msleep(10)
215 for command in commands:
216 if command:
217 commandBytes = command.encode("utf-8")
218 self.__serial.write(commandBytes + b"\x04")
219 QCoreApplication.processEvents(
220 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents)
221 ok = self.__serial.readUntil(b"OK")
222 if ok != b"OK":
223 return (
224 b"",
225 "Expected 'OK', got '{0}', followed by '{1}'".format(
226 ok, self.__serial.readAll()).encode("utf-8")
227 )
228
229 # read until prompt
230 response = self.__serial.readUntil(b"\x04>")
231 if self.__serial.hasTimedOut():
232 self.__blockReadyRead = False
233 return b"", b"Timeout while processing commands."
234 if b"\x04" in response[:-2]:
235 # split stdout, stderr
236 out, err = response[:-2].split(b"\x04")
237 result += out
238 else:
239 err = b"invalid response received: " + response
240 if err:
241 self.__blockReadyRead = False
242 return b"", err
243
244 # switch off raw mode
245 QThread.msleep(10)
246 self.__rawOff()
247 self.__blockReadyRead = False
248
249 return bytes(result), err
250
251 def executeAsync(self, commandsList):
252 """
253 Public method to execute a series of commands over a period of time
254 without returning any result (asynchronous execution).
255
256 @param commandsList list of commands to be execute on the device
257 @type list of bytes
258 """
259 def remainingTask(commands):
260 self.executeAsync(commands)
261
262 if commandsList:
263 command = commandsList[0]
264 self.__serial.write(command)
265 remainder = commandsList[1:]
266 QTimer.singleShot(2, lambda: remainingTask(remainder))
267 else:
268 self.executeAsyncFinished.emit()
269
270 def __shortError(self, error):
271 """
272 Private method to create a shortened error message.
273
274 @param error verbose error message
275 @type bytes
276 @return shortened error message
277 @rtype str
278 """
279 if error:
280 decodedError = error.decode("utf-8")
281 try:
282 return decodedError.split["\r\n"][-2]
283 except Exception:
284 return decodedError
285 return self.tr("Detected an error without indications.")
286
287 ##################################################################
288 ## Methods below implement the file system commands
289 ##################################################################
290
291 def ls(self, dirname=""):
292 """
293 Public method to get a directory listing of the connected device.
294
295 @param dirname name of the directory to be listed
296 @type str
297 @return tuple containg the directory listing
298 @rtype tuple of str
299 @exception OSError raised to indicate an issue with the device
300 """
301 commands = (
302 # BBC micro:bit does not support directories
303 [
304 "import os as __os_",
305 "print(__os_.listdir())",
306 "del __os_",
307 ]
308 if self.__repl.isMicrobit() else
309 [
310 "import os as __os_",
311 "print(__os_.listdir('{0}'))".format(dirname),
312 "del __os_",
313 ]
314 )
315 out, err = self.execute(commands)
316 if err:
317 raise OSError(self.__shortError(err))
318 return ast.literal_eval(out.decode("utf-8"))
319
320 def lls(self, dirname="", fullstat=False, showHidden=False):
321 """
322 Public method to get a long directory listing of the connected device
323 including meta data.
324
325 @param dirname name of the directory to be listed
326 @type str
327 @param fullstat flag indicating to return the full stat() tuple
328 @type bool
329 @param showHidden flag indicating to show hidden files as well
330 @type bool
331 @return list containing the directory listing with tuple entries of
332 the name and and a tuple of mode, size and time (if fullstat is
333 false) or the complete stat() tuple. 'None' is returned in case the
334 directory doesn't exist.
335 @rtype tuple of (str, tuple)
336 @exception OSError raised to indicate an issue with the device
337 """
338 commands = (
339 # BBC micro:bit does not support directories
340 [
341 "import os as __os_",
342 "\n".join([
343 "def is_visible(filename, showHidden):",
344 " return showHidden or "
345 "(filename[0] != '.' and filename[-1] != '~')",
346 ]),
347 "\n".join([
348 "def stat(filename):",
349 " size = __os_.size(filename)",
350 " return (0, 0, 0, 0, 0, 0, size, 0, 0, 0)"
351 ]),
352 "\n".join([
353 "def listdir_stat(showHidden):",
354 " files = __os_.listdir()",
355 " return list((f, stat(f)) for f in files if"
356 " is_visible(f,showHidden))",
357 ]),
358 "print(listdir_stat({0}))".format(showHidden),
359 "del __os_, stat, listdir_stat, is_visible",
360 ]
361 if self.__repl.isMicrobit() else
362 [
363 "import os as __os_",
364 "\n".join([
365 "def is_visible(filename, showHidden):",
366 " return showHidden or "
367 "(filename[0] != '.' and filename[-1] != '~')",
368 ]),
369 "\n".join([
370 "def stat(filename):",
371 " try:",
372 " rstat = __os_.lstat(filename)",
373 " except:",
374 " rstat = __os_.stat(filename)",
375 " return tuple(rstat)",
376 ]),
377 "\n".join([
378 "def listdir_stat(dirname, showHidden):",
379 " try:",
380 " files = __os_.listdir(dirname)",
381 " except OSError:",
382 " return []",
383 " if dirname in ('', '/'):",
384 " return list((f, stat(f)) for f in files if"
385 " is_visible(f, showHidden))",
386 " return list((f, stat(dirname + '/' + f))"
387 " for f in files if is_visible(f, showHidden))",
388 ]),
389 "print(listdir_stat('{0}', {1}))".format(dirname, showHidden),
390 "del __os_, stat, listdir_stat, is_visible",
391 ]
392 )
393 out, err = self.execute(commands)
394 if err:
395 raise OSError(self.__shortError(err))
396 fileslist = ast.literal_eval(out.decode("utf-8"))
397 if fileslist is None:
398 return None
399 else:
400 if fullstat:
401 return fileslist
402 else:
403 return [(f, (s[0], s[6], s[8])) for f, s in fileslist]
404
405 def cd(self, dirname):
406 """
407 Public method to change the current directory on the connected device.
408
409 @param dirname directory to change to
410 @type str
411 @exception OSError raised to indicate an issue with the device
412 """
413 if dirname:
414 commands = [
415 "import os as __os_",
416 "__os_.chdir('{0}')".format(dirname),
417 "del __os_",
418 ]
419 out, err = self.execute(commands)
420 if err:
421 raise OSError(self.__shortError(err))
422
423 def pwd(self):
424 """
425 Public method to get the current directory of the connected device.
426
427 @return current directory
428 @rtype str
429 @exception OSError raised to indicate an issue with the device
430 """
431 if self.__repl.isMicrobit():
432 # BBC micro:bit does not support directories
433 return ""
434
435 commands = [
436 "import os as __os_",
437 "print(__os_.getcwd())",
438 "del __os_",
439 ]
440 out, err = self.execute(commands)
441 if err:
442 raise OSError(self.__shortError(err))
443 return out.decode("utf-8").strip()
444
445 def rm(self, filename):
446 """
447 Public method to remove a file from the connected device.
448
449 @param filename name of the file to be removed
450 @type str
451 @exception OSError raised to indicate an issue with the device
452 """
453 if filename:
454 commands = [
455 "import os as __os_",
456 "__os_.remove('{0}')".format(filename),
457 "del __os_",
458 ]
459 out, err = self.execute(commands)
460 if err:
461 raise OSError(self.__shortError(err))
462
463 def rmrf(self, name, recursive=False, force=False):
464 """
465 Public method to remove a file or directory recursively.
466
467 @param name of the file or directory to remove
468 @type str
469 @param recursive flag indicating a recursive deletion
470 @type bool
471 @param force flag indicating to ignore errors
472 @type bool
473 @return flag indicating success
474 @rtype bool
475 @exception OSError raised to indicate an issue with the device
476 """
477 if name:
478 commands = [
479 "import os as __os_",
480 "\n".join([
481 "def remove_file(name, recursive=False, force=False):",
482 " try:",
483 " mode = __os_.stat(name)[0]",
484 " if mode & 0x4000 != 0:",
485 " if recursive:",
486 " for file in __os_.listdir(name):",
487 " success = remove_file("
488 "name + '/' + file, recursive, force)",
489 " if not success and not force:",
490 " return False",
491 " __os_.rmdir(name)",
492 " else:",
493 " if not force:",
494 " return False",
495 " else:",
496 " __os_.remove(name)",
497 " except:",
498 " if not force:",
499 " return False",
500 " return True",
501 ]),
502 "print(remove_file('{0}', {1}, {2}))".format(name, recursive,
503 force),
504 "del __os_, remove_file",
505 ]
506 out, err = self.execute(commands)
507 if err:
508 raise OSError(self.__shortError(err))
509 return ast.literal_eval(out.decode("utf-8"))
510
511 return False
512
513 def mkdir(self, dirname):
514 """
515 Public method to create a new directory.
516
517 @param dirname name of the directory to create
518 @type str
519 @exception OSError raised to indicate an issue with the device
520 """
521 if dirname:
522 commands = [
523 "import os as __os_",
524 "__os_.mkdir('{0}')".format(dirname),
525 "del __os_",
526 ]
527 out, err = self.execute(commands)
528 if err:
529 raise OSError(self.__shortError(err))
530
531 def rmdir(self, dirname):
532 """
533 Public method to remove a directory.
534
535 @param dirname name of the directory to be removed
536 @type str
537 @exception OSError raised to indicate an issue with the device
538 """
539 if dirname:
540 commands = [
541 "import os as __os_",
542 "__os_.rmdir('{0}')".format(dirname),
543 "del __os_",
544 ]
545 out, err = self.execute(commands)
546 if err:
547 raise OSError(self.__shortError(err))
548
549 def put(self, hostFileName, deviceFileName=None):
550 """
551 Public method to copy a local file to the connected device.
552
553 @param hostFileName name of the file to be copied
554 @type str
555 @param deviceFileName name of the file to copy to
556 @type str
557 @return flag indicating success
558 @rtype bool
559 @exception OSError raised to indicate an issue with the device
560 """
561 if not os.path.isfile(hostFileName):
562 raise OSError("No such file: {0}".format(hostFileName))
563
564 with open(hostFileName, "rb") as hostFile:
565 content = hostFile.read()
566 # convert eol '\r'
567 content = content.replace(b"\r\n", b"\r")
568 content = content.replace(b"\n", b"\r")
569
570 if not deviceFileName:
571 deviceFileName = os.path.basename(hostFileName)
572
573 commands = [
574 "fd = open('{0}', 'wb')".format(deviceFileName),
575 "f = fd.write",
576 ]
577 while content:
578 chunk = content[:64]
579 commands.append("f(" + repr(chunk) + ")")
580 content = content[64:]
581 commands.extend([
582 "fd.close()",
583 "del f, fd",
584 ])
585
586 out, err = self.execute(commands)
587 if err:
588 raise OSError(self.__shortError(err))
589 return True
590
591 def get(self, deviceFileName, hostFileName=None):
592 """
593 Public method to copy a file from the connected device.
594
595 @param deviceFileName name of the file to copy
596 @type str
597 @param hostFileName name of the file to copy to
598 @type str
599 @return flag indicating success
600 @rtype bool
601 @exception OSError raised to indicate an issue with the device
602 """
603 if not hostFileName:
604 hostFileName = deviceFileName
605
606 commands = [
607 "\n".join([
608 "def send_data():",
609 " try:",
610 " from microbit import uart as u",
611 " except ImportError:",
612 " try:",
613 " from machine import UART",
614 " u = UART(0, {0})".format(115200),
615 " except Exception:",
616 " try:",
617 " from sys import stdout as u",
618 " except Exception:",
619 " raise Exception('Could not find UART module"
620 " in device.')",
621 " f = open('{0}', 'rb')".format(deviceFileName),
622 " r = f.read",
623 " result = True",
624 " while result:",
625 " result = r(32)",
626 " if result:",
627 " u.write(result)",
628 " f.close()",
629 ]),
630 "send_data()",
631 ]
632 out, err = self.execute(commands)
633 if err:
634 raise OSError(self.__shortError(err))
635
636 # write the received bytes to the local file
637 # convert eol to "\n"
638 out = out.replace(b"\r\n", b"\n")
639 out = out.replace(b"\r", b"\n")
640 with open(hostFileName, "wb") as hostFile:
641 hostFile.write(out)
642 return True
643
644 def fileSystemInfo(self):
645 """
646 Public method to obtain information about the currently mounted file
647 systems.
648
649 @return tuple of tuples containing the file system name, the total
650 size, the used size and the free size
651 @rtype tuple of tuples of (str, int, int, int)
652 @exception OSError raised to indicate an issue with the device
653 """
654 commands = [
655 "import os as __os_",
656 "\n".join([
657 "def fsinfo():",
658 " infolist = []",
659 " info = __os_.statvfs('/')",
660 " if info[0] == 0:",
661 # assume it is just mount points
662 " fsnames = __os_.listdir('/')",
663 " for fs in fsnames:",
664 " fs = '/' + fs",
665 " infolist.append((fs, __os_.statvfs(fs)))",
666 " else:",
667 " infolist.append(('/', info))",
668 " return infolist",
669 ]),
670 "print(fsinfo())",
671 "del __os_, fsinfo",
672 ]
673 out, err = self.execute(commands)
674 if err:
675 raise OSError(self.__shortError(err))
676 infolist = ast.literal_eval(out.decode("utf-8"))
677 if infolist is None:
678 return None
679 else:
680 filesystemInfos = []
681 for fs, info in infolist:
682 totalSize = info[2] * info[1]
683 freeSize = info[4] * info[1]
684 usedSize = totalSize - freeSize
685 filesystemInfos.append((fs, totalSize, usedSize, freeSize))
686
687 return tuple(filesystemInfos)
688
689 ##################################################################
690 ## non-filesystem related methods below
691 ##################################################################
692
693 def version(self):
694 """
695 Public method to get the MicroPython version information of the
696 connected device.
697
698 @return dictionary containing the version information
699 @rtype dict
700 @exception OSError raised to indicate an issue with the device
701 """
702 commands = [
703 "import os as __os_",
704 "print(__os_.uname())",
705 "del __os_",
706 ]
707 out, err = self.execute(commands)
708 if err:
709 raise OSError(self.__shortError(err))
710
711 rawOutput = out.decode("utf-8").strip()
712 rawOutput = rawOutput[1:-1]
713 items = rawOutput.split(",")
714 result = {}
715 for item in items:
716 key, value = item.strip().split("=")
717 result[key.strip()] = value.strip()[1:-1]
718 return result
719
720 def getImplementation(self):
721 """
722 Public method to get some implementation information of the connected
723 device.
724
725 @return dictionary containing the implementation information
726 @rtype dict
727 @exception OSError raised to indicate an issue with the device
728 """
729 commands = [
730 "import sys as __sys_",
731 "res = {}", # __IGNORE_WARNING_M613__
732 "\n".join([
733 "try:",
734 " res['name'] = __sys_.implementation.name",
735 "except AttributeError:",
736 " res['name'] = 'unknown'",
737 ]),
738 "\n".join([
739 "try:",
740 " res['version'] = '.'.join((str(i) for i in"
741 " __sys_.implementation.version))",
742 "except AttributeError:",
743 " res['version'] = 'unknown'",
744 ]),
745 "print(res)",
746 "del res, __sys_",
747 ]
748 out, err = self.execute(commands)
749 if err:
750 raise OSError(self.__shortError(err))
751 return ast.literal_eval(out.decode("utf-8"))
752
753 def syncTime(self, deviceType):
754 """
755 Public method to set the time of the connected device to the local
756 computer's time.
757
758 @param deviceType type of board to sync time to
759 @type str
760 @exception OSError raised to indicate an issue with the device
761 """
762 # rtc_time[0] - year 4 digit
763 # rtc_time[1] - month 1..12
764 # rtc_time[2] - day 1..31
765 # rtc_time[3] - weekday 1..7 1=Monday
766 # rtc_time[4] - hour 0..23
767 # rtc_time[5] - minute 0..59
768 # rtc_time[6] - second 0..59
769 # rtc_time[7] - yearday 1..366
770 # rtc_time[8] - isdst 0, 1, or -1
771 if deviceType == "pyboard":
772 # Pyboard (pyboard doesn't have machine.RTC()).
773 # The pyb.RTC.datetime function takes the arguments in the
774 # order: (year, month, day, weekday, hour, minute, second,
775 # subseconds)
776 # http://docs.micropython.org/en/latest/library/pyb.RTC.html
777 # #pyb.RTC.datetime
778 set_time = "\n".join([
779 "def set_time(rtc_time):",
780 " import pyb",
781 " rtc = pyb.RTC()",
782 " rtc.datetime(rtc_time[:7] + (0,))",
783 ])
784 elif deviceType == "esp":
785 # The machine.RTC documentation was incorrect and doesn't agree
786 # with the code, so no link is presented here. The order of the
787 # arguments is the same as the pyboard except for LoBo MPy.
788 set_time = "\n".join([
789 "def set_time(rtc_time):",
790 " import machine",
791 " rtc = machine.RTC()",
792 " try:", # ESP8266 may use rtc.datetime()
793 " rtc.datetime(rtc_time[:7] + (0,))",
794 " except Exception:", # ESP32 uses rtc.init()
795 " import os",
796 " if 'LoBo' in os.uname()[0]:", # LoBo MPy
797 " clock_time = rtc_time[:3] +"
798 " rtc_time[4:7] + (rtc_time[3], rtc_time[7])",
799 " else:",
800 " clock_time = rtc_time[:7] + (0,)",
801 " rtc.init(clock_time)",
802 ])
803 elif deviceType == "circuitpython":
804 set_time = "\n".join([
805 "def set_time(rtc_time):",
806 " import rtc",
807 " import time",
808 " clock = rtc.RTC()",
809 " clock_time = rtc_time[:3] + rtc_time[4:7] + (rtc_time[3],"
810 " rtc_time[7], rtc_time[8])",
811 " clock.datetime = time.struct_time(clock_time)",
812 ])
813 elif deviceType in ("bbc_microbit", "calliope"):
814 # BBC micro:bit and Calliope mini don't support time commands
815 return
816 elif deviceType == "rp2040":
817 # Raspberry Pi Pico (RP2040) - machine.RTC doesn't exist
818 set_time = "\n".join([
819 "def set_time(rtc_time):",
820 " setup_0 = rtc_time[0] << 12 | rtc_time[1] << 8 |"
821 " rtc_time[2]",
822 " setup_1 = (rtc_time[3] % 7) << 24 | rtc_time[4] << 16 |"
823 " rtc_time[5] << 8 | rtc_time[6]",
824 " machine.mem32[0x4005c004] = setup_0",
825 " machine.mem32[0x4005c008] = setup_1",
826 " machine.mem32[0x4005c00c] |= 0x10",
827 ])
828 elif deviceType == "pycom":
829 # PyCom's machine.RTC takes its arguments in a slightly
830 # different order than the official machine.RTC.
831 # (year, month, day, hour, minute, second[, microsecond[,
832 # tzinfo]])
833 # https://docs.pycom.io/firmwareapi/pycom/machine/rtc/
834 # #rtc-init-datetime-none-source-rtc-internal-rc
835 set_time = "\n".join([
836 "def set_time(rtc_time):",
837 " import pycom",
838 " rtc_time2 = rtc_time[:3] + rtc_time[4:7]",
839 " import machine",
840 " rtc = machine.RTC()",
841 " rtc.init(rtc_time2)",
842 ])
843 else:
844 # no set_time() support for generic boards
845 return
846
847 now = time.localtime(time.time())
848 commands = [
849 set_time,
850 "set_time({0})".format((
851 now.tm_year, now.tm_mon, now.tm_mday, now.tm_wday + 1,
852 now.tm_hour, now.tm_min, now.tm_sec, now.tm_yday, now.tm_isdst
853 )),
854 "del set_time",
855 ]
856 out, err = self.execute(commands)
857 if err:
858 raise OSError(self.__shortError(err))
859
860 def getTime(self):
861 """
862 Public method to get the current time of the device.
863
864 @return time of the device
865 @rtype str
866 @exception OSError raised to indicate an issue with the device
867 """
868 commands = [
869 "\n".join([
870 "try:",
871 " import rtc as __rtc_",
872 " print('{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'"
873 ".format(*__rtc_.RTC().datetime[:6]))",
874 " del __rtc_",
875 "except:",
876 " import time as __time_",
877 " try:",
878 " print(__time_.strftime('%Y-%m-%d %H:%M:%S',"
879 # __IGNORE_WARNING_M601__
880 " __time_.localtime()))",
881 " except AttributeError:",
882 " tm = __time_.localtime()",
883 " print('{0:04d}-{1:02d}-{2:02d}"
884 " {3:02d}:{4:02d}:{5:02d}'"
885 ".format(tm[0], tm[1], tm[2], tm[3], tm[4], tm[5]))",
886 " del tm",
887 " del __time_"
888 ]),
889 ]
890 out, err = self.execute(commands)
891 if err:
892 if b"NotImplementedError" in err:
893 return "&lt;unsupported&gt; &lt;unsupported&gt;"
894 raise OSError(self.__shortError(err))
895 return out.decode("utf-8").strip()

eric ide

mercurial