src/eric7/MicroPython/MicroPythonCommandsInterface.py

branch
eric7
changeset 9765
6378da868bb0
parent 9764
57496966803c
child 9766
f0e22f3a5878
equal deleted inserted replaced
9764:57496966803c 9765:6378da868bb0
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2019 - 2023 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing some file system commands for MicroPython.
8 """
9
10 import ast
11 import os
12 import time
13
14 from PyQt6.QtCore import (
15 QCoreApplication,
16 QEventLoop,
17 QObject,
18 QThread,
19 QTimer,
20 pyqtSignal,
21 pyqtSlot,
22 )
23
24 from eric7 import Preferences
25
26 from .MicroPythonSerialPort import MicroPythonSerialPort
27
28
29 class MicroPythonCommandsInterface(QObject):
30 """
31 Class implementing some file system commands for MicroPython.
32
33 Commands are provided to perform operations on the file system of a
34 connected MicroPython device. Supported commands are:
35 <ul>
36 <li>ls: directory listing</li>
37 <li>lls: directory listing with meta data</li>
38 <li>cd: change directory</li>
39 <li>pwd: get the current directory</li>
40 <li>put: copy a file to the connected device</li>
41 <li>get: get a file from the connected device</li>
42 <li>rm: remove a file from the connected device</li>
43 <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash)
44 <li>mkdir: create a new directory</li>
45 <li>rmdir: remove an empty directory</li>
46 </ul>
47
48 There are additional commands related to time and version.
49 <ul>
50 <li>version: get version info about MicroPython</li>
51 <li>getImplementation: get some implementation information</li>
52 <li>syncTime: synchronize the time of the connected device</li>
53 <li>showTime: show the current time of the connected device</li>
54 </ul>
55
56 @signal executeAsyncFinished() emitted to indicate the end of an
57 asynchronously executed list of commands (e.g. a script)
58 @signal dataReceived(data) emitted to send data received via the serial
59 connection for further processing
60 """
61
62 executeAsyncFinished = pyqtSignal()
63 dataReceived = pyqtSignal(bytes)
64
65 def __init__(self, parent=None):
66 """
67 Constructor
68
69 @param parent reference to the parent object
70 @type QObject
71 """
72 super().__init__(parent)
73
74 self.__repl = parent
75
76 self.__blockReadyRead = False
77
78 self.__serial = MicroPythonSerialPort(
79 timeout=Preferences.getMicroPython("SerialTimeout"), parent=self
80 )
81 self.__serial.readyRead.connect(self.__readSerial)
82
83 @pyqtSlot()
84 def __readSerial(self):
85 """
86 Private slot to read all available serial data and emit it with the
87 "dataReceived" signal for further processing.
88 """
89 if not self.__blockReadyRead:
90 data = bytes(self.__serial.readAll())
91 self.dataReceived.emit(data)
92
93 @pyqtSlot()
94 def connectToDevice(self, port):
95 """
96 Public slot to start the manager.
97
98 @param port name of the port to be used
99 @type str
100 @return flag indicating success
101 @rtype bool
102 """
103 return self.__serial.openSerialLink(port)
104
105 @pyqtSlot()
106 def disconnectFromDevice(self):
107 """
108 Public slot to stop the thread.
109 """
110 self.__serial.closeSerialLink()
111
112 def isConnected(self):
113 """
114 Public method to get the connection status.
115
116 @return flag indicating the connection status
117 @rtype bool
118 """
119 return self.__serial.isConnected()
120
121 @pyqtSlot()
122 def handlePreferencesChanged(self):
123 """
124 Public slot to handle a change of the preferences.
125 """
126 self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout"))
127
128 def write(self, data):
129 """
130 Public method to write data to the connected device.
131
132 @param data data to be written
133 @type bytes or bytearray
134 """
135 self.__serial.isConnected() and self.__serial.write(data)
136
137 def __rawOn(self):
138 """
139 Private method to switch the connected device to 'raw' mode.
140
141 Note: switching to raw mode is done with synchronous writes.
142
143 @return flag indicating success
144 @@rtype bool
145 """
146 if not self.__serial:
147 return False
148
149 rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>"
150
151 self.__serial.write(b"\x02") # end raw mode if required
152 written = self.__serial.waitForBytesWritten(500)
153 # time out after 500ms if device is not responding
154 if not written:
155 return False
156 for _i in range(3):
157 # CTRL-C three times to break out of loops
158 self.__serial.write(b"\r\x03")
159 written = self.__serial.waitForBytesWritten(500)
160 # time out after 500ms if device is not responding
161 if not written:
162 return False
163 QThread.msleep(10)
164 self.__serial.readAll() # read all data and discard it
165 self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode
166 self.__serial.readUntil(rawReplMessage)
167 if self.__serial.hasTimedOut():
168 # it timed out; try it again and than fail
169 self.__serial.write(b"\r\x01") # send CTRL-A again
170 self.__serial.readUntil(rawReplMessage)
171 if self.__serial.hasTimedOut():
172 return False
173
174 QCoreApplication.processEvents(
175 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
176 )
177 self.__serial.readAll() # read all data and discard it
178 return True
179
180 def __rawOff(self):
181 """
182 Private method to switch 'raw' mode off.
183 """
184 if self.__serial:
185 self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode
186 self.__serial.readUntil(b">>> ") # read until Python prompt
187 self.__serial.readAll() # read all data and discard it
188
189 def probeDevice(self):
190 """
191 Public method to check the device is responding.
192
193 If the device has not been flashed with a MicroPython formware, the
194 probe will fail.
195
196 @return flag indicating a communicating MicroPython device
197 @rtype bool
198 """
199 if not self.__serial:
200 return False
201
202 if not self.__serial.isConnected():
203 return False
204
205 # switch on raw mode
206 self.__blockReadyRead = True
207 ok = self.__rawOn()
208 if not ok:
209 self.__blockReadyRead = False
210 return False
211
212 # switch off raw mode
213 QThread.msleep(10)
214 self.__rawOff()
215 self.__blockReadyRead = False
216
217 return True
218
219 def execute(self, commands):
220 """
221 Public method to send commands to the connected device and return the
222 result.
223
224 If no serial connection is available, empty results will be returned.
225
226 @param commands list of commands to be executed
227 @type str
228 @return tuple containing stdout and stderr output of the device
229 @rtype tuple of (bytes, bytes)
230 """
231 if not self.__serial:
232 return b"", b""
233
234 if not self.__serial.isConnected():
235 return b"", b"Device not connected or not switched on."
236
237 result = bytearray()
238 err = b""
239
240 # switch on raw mode
241 self.__blockReadyRead = True
242 ok = self.__rawOn()
243 if not ok:
244 self.__blockReadyRead = False
245 return (b"", b"Could not switch to raw mode. Is the device switched on?")
246
247 # send commands
248 QThread.msleep(10)
249 for command in commands:
250 if command:
251 commandBytes = command.encode("utf-8")
252 self.__serial.write(commandBytes + b"\x04")
253 QCoreApplication.processEvents(
254 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
255 )
256 ok = self.__serial.readUntil(b"OK")
257 if ok != b"OK":
258 return (
259 b"",
260 "Expected 'OK', got '{0}', followed by '{1}'".format(
261 ok, self.__serial.readAll()
262 ).encode("utf-8"),
263 )
264
265 # read until prompt
266 response = self.__serial.readUntil(b"\x04>")
267 if self.__serial.hasTimedOut():
268 self.__blockReadyRead = False
269 return b"", b"Timeout while processing commands."
270 if b"\x04" in response[:-2]:
271 # split stdout, stderr
272 out, err = response[:-2].split(b"\x04")
273 result += out
274 else:
275 err = b"invalid response received: " + response
276 if err:
277 result = b""
278 break
279
280 # switch off raw mode
281 QThread.msleep(10)
282 self.__rawOff()
283 self.__blockReadyRead = False
284
285 return bytes(result), err
286
287 def executeAsync(self, commandsList):
288 """
289 Public method to execute a series of commands over a period of time
290 without returning any result (asynchronous execution).
291
292 @param commandsList list of commands to be execute on the device
293 @type list of bytes
294 """
295
296 if commandsList:
297 command = commandsList.pop(0)
298 self.__serial.write(command)
299 QTimer.singleShot(2, lambda: self.executeAsync(commandsList))
300 else:
301 self.executeAsyncFinished.emit()
302
303 def __shortError(self, error):
304 """
305 Private method to create a shortened error message.
306
307 @param error verbose error message
308 @type bytes
309 @return shortened error message
310 @rtype str
311 """
312 if error:
313 decodedError = error.decode("utf-8")
314 try:
315 return decodedError.split["\r\n"][-2]
316 except Exception:
317 return decodedError
318 return self.tr("Detected an error without indications.")
319
320 # TODO: move these methods to the devices.
321 ##################################################################
322 ## Methods below implement the file system commands
323 ##################################################################
324
325 def ls(self, dirname=""):
326 """
327 Public method to get a directory listing of the connected device.
328
329 @param dirname name of the directory to be listed
330 @type str
331 @return tuple containg the directory listing
332 @rtype tuple of str
333 @exception OSError raised to indicate an issue with the device
334 """
335 commands = (
336 # BBC micro:bit does not support directories
337 [
338 "import os as __os_",
339 "print(__os_.listdir())",
340 "del __os_",
341 ]
342 if self.__repl.isMicrobit()
343 else [
344 "import os as __os_",
345 "print(__os_.listdir('{0}'))".format(dirname),
346 "del __os_",
347 ]
348 )
349 out, err = self.execute(commands)
350 if err:
351 raise OSError(self.__shortError(err))
352 return ast.literal_eval(out.decode("utf-8"))
353
354 def lls(self, dirname="", fullstat=False, showHidden=False):
355 """
356 Public method to get a long directory listing of the connected device
357 including meta data.
358
359 @param dirname name of the directory to be listed
360 @type str
361 @param fullstat flag indicating to return the full stat() tuple
362 @type bool
363 @param showHidden flag indicating to show hidden files as well
364 @type bool
365 @return list containing the directory listing with tuple entries of
366 the name and and a tuple of mode, size and time (if fullstat is
367 false) or the complete stat() tuple. 'None' is returned in case the
368 directory doesn't exist.
369 @rtype tuple of (str, tuple)
370 @exception OSError raised to indicate an issue with the device
371 """
372 commands = (
373 # BBC micro:bit does not support directories
374 [
375 "import os as __os_",
376 "\n".join(
377 [
378 "def is_visible(filename, showHidden):",
379 " return showHidden or "
380 "(filename[0] != '.' and filename[-1] != '~')",
381 ]
382 ),
383 "\n".join(
384 [
385 "def stat(filename):",
386 " size = __os_.size(filename)",
387 " return (0, 0, 0, 0, 0, 0, size, 0, 0, 0)",
388 ]
389 ),
390 "\n".join(
391 [
392 "def listdir_stat(showHidden):",
393 " files = __os_.listdir()",
394 " return list((f, stat(f)) for f in files if"
395 " is_visible(f,showHidden))",
396 ]
397 ),
398 "print(listdir_stat({0}))".format(showHidden),
399 "del __os_, stat, listdir_stat, is_visible",
400 ]
401 if self.__repl.isMicrobit()
402 else [
403 "import os as __os_",
404 "\n".join(
405 [
406 "def is_visible(filename, showHidden):",
407 " return showHidden or "
408 "(filename[0] != '.' and filename[-1] != '~')",
409 ]
410 ),
411 "\n".join(
412 [
413 "def stat(filename):",
414 " try:",
415 " rstat = __os_.lstat(filename)",
416 " except:",
417 " rstat = __os_.stat(filename)",
418 " return tuple(rstat)",
419 ]
420 ),
421 "\n".join(
422 [
423 "def listdir_stat(dirname, showHidden):",
424 " try:",
425 " files = __os_.listdir(dirname)",
426 " except OSError:",
427 " return []",
428 " if dirname in ('', '/'):",
429 " return list((f, stat(f)) for f in files if"
430 " is_visible(f, showHidden))",
431 " return list((f, stat(dirname + '/' + f))"
432 " for f in files if is_visible(f, showHidden))",
433 ]
434 ),
435 "print(listdir_stat('{0}', {1}))".format(dirname, showHidden),
436 "del __os_, stat, listdir_stat, is_visible",
437 ]
438 )
439 out, err = self.execute(commands)
440 if err:
441 raise OSError(self.__shortError(err))
442 fileslist = ast.literal_eval(out.decode("utf-8"))
443 if fileslist is None:
444 return None
445 else:
446 if fullstat:
447 return fileslist
448 else:
449 return [(f, (s[0], s[6], s[8])) for f, s in fileslist]
450
451 def cd(self, dirname):
452 """
453 Public method to change the current directory on the connected device.
454
455 @param dirname directory to change to
456 @type str
457 @exception OSError raised to indicate an issue with the device
458 """
459 if dirname:
460 commands = [
461 "import os as __os_",
462 "__os_.chdir('{0}')".format(dirname),
463 "del __os_",
464 ]
465 out, err = self.execute(commands)
466 if err:
467 raise OSError(self.__shortError(err))
468
469 def pwd(self):
470 """
471 Public method to get the current directory of the connected device.
472
473 @return current directory
474 @rtype str
475 @exception OSError raised to indicate an issue with the device
476 """
477 if self.__repl.isMicrobit():
478 # BBC micro:bit does not support directories
479 return ""
480
481 commands = [
482 "import os as __os_",
483 "print(__os_.getcwd())",
484 "del __os_",
485 ]
486 out, err = self.execute(commands)
487 if err:
488 raise OSError(self.__shortError(err))
489 return out.decode("utf-8").strip()
490
491 def rm(self, filename):
492 """
493 Public method to remove a file from the connected device.
494
495 @param filename name of the file to be removed
496 @type str
497 @exception OSError raised to indicate an issue with the device
498 """
499 if filename:
500 commands = [
501 "import os as __os_",
502 "__os_.remove('{0}')".format(filename),
503 "del __os_",
504 ]
505 out, err = self.execute(commands)
506 if err:
507 raise OSError(self.__shortError(err))
508
509 def rmrf(self, name, recursive=False, force=False):
510 """
511 Public method to remove a file or directory recursively.
512
513 @param name of the file or directory to remove
514 @type str
515 @param recursive flag indicating a recursive deletion
516 @type bool
517 @param force flag indicating to ignore errors
518 @type bool
519 @return flag indicating success
520 @rtype bool
521 @exception OSError raised to indicate an issue with the device
522 """
523 if name:
524 commands = [
525 "import os as __os_",
526 "\n".join(
527 [
528 "def remove_file(name, recursive=False, force=False):",
529 " try:",
530 " mode = __os_.stat(name)[0]",
531 " if mode & 0x4000 != 0:",
532 " if recursive:",
533 " for file in __os_.listdir(name):",
534 " success = remove_file("
535 "name + '/' + file, recursive, force)",
536 " if not success and not force:",
537 " return False",
538 " __os_.rmdir(name)",
539 " else:",
540 " if not force:",
541 " return False",
542 " else:",
543 " __os_.remove(name)",
544 " except:",
545 " if not force:",
546 " return False",
547 " return True",
548 ]
549 ),
550 "print(remove_file('{0}', {1}, {2}))".format(name, recursive, force),
551 "del __os_, remove_file",
552 ]
553 out, err = self.execute(commands)
554 if err:
555 raise OSError(self.__shortError(err))
556 return ast.literal_eval(out.decode("utf-8"))
557
558 return False
559
560 def mkdir(self, dirname):
561 """
562 Public method to create a new directory.
563
564 @param dirname name of the directory to create
565 @type str
566 @exception OSError raised to indicate an issue with the device
567 """
568 if dirname:
569 commands = [
570 "import os as __os_",
571 "__os_.mkdir('{0}')".format(dirname),
572 "del __os_",
573 ]
574 out, err = self.execute(commands)
575 if err:
576 raise OSError(self.__shortError(err))
577
578 def rmdir(self, dirname):
579 """
580 Public method to remove a directory.
581
582 @param dirname name of the directory to be removed
583 @type str
584 @exception OSError raised to indicate an issue with the device
585 """
586 if dirname:
587 commands = [
588 "import os as __os_",
589 "__os_.rmdir('{0}')".format(dirname),
590 "del __os_",
591 ]
592 out, err = self.execute(commands)
593 if err:
594 raise OSError(self.__shortError(err))
595
596 def put(self, hostFileName, deviceFileName=None):
597 """
598 Public method to copy a local file to the connected device.
599
600 @param hostFileName name of the file to be copied
601 @type str
602 @param deviceFileName name of the file to copy to
603 @type str
604 @return flag indicating success
605 @rtype bool
606 @exception OSError raised to indicate an issue with the device
607 """
608 if not os.path.isfile(hostFileName):
609 raise OSError("No such file: {0}".format(hostFileName))
610
611 if not deviceFileName:
612 deviceFileName = os.path.basename(hostFileName)
613
614 with open(hostFileName, "rb") as hostFile:
615 content = hostFile.read()
616
617 return self.putData(deviceFileName, content)
618
619 def putData(self, deviceFileName, content):
620 """
621 Public method to write the given data to the connected device.
622
623 @param deviceFileName name of the file to write to
624 @type str
625 @param content data to write
626 @type bytes
627 @return flag indicating success
628 @rtype bool
629 @exception OSError raised to indicate an issue with the device
630 """
631 if not deviceFileName:
632 raise OSError("Missing device file name")
633
634 # convert eol '\r'
635 content = content.replace(b"\r\n", b"\r")
636 content = content.replace(b"\n", b"\r")
637
638 commands = [
639 "fd = open('{0}', 'wb')".format(deviceFileName),
640 "f = fd.write",
641 ]
642 while content:
643 chunk = content[:64]
644 commands.append("f(" + repr(chunk) + ")")
645 content = content[64:]
646 commands.extend(
647 [
648 "fd.close()",
649 "del f, fd",
650 ]
651 )
652
653 out, err = self.execute(commands)
654 if err:
655 raise OSError(self.__shortError(err))
656 return True
657
658 def get(self, deviceFileName, hostFileName=None):
659 """
660 Public method to copy a file from the connected device.
661
662 @param deviceFileName name of the file to copy
663 @type str
664 @param hostFileName name of the file to copy to
665 @type str
666 @return flag indicating success
667 @rtype bool
668 @exception OSError raised to indicate an issue with the device
669 """
670 if not deviceFileName:
671 raise OSError("Missing device file name")
672
673 if not hostFileName:
674 hostFileName = deviceFileName
675
676 out = self.getData(deviceFileName)
677 with open(hostFileName, "wb") as hostFile:
678 hostFile.write(out)
679
680 return True
681
682 def getData(self, deviceFileName):
683 """
684 Public method to read data from the connected device.
685
686 @param deviceFileName name of the file to read from
687 @type str
688 @return data read from the device
689 @rtype bytes
690 @exception OSError raised to indicate an issue with the device
691 """
692 if not deviceFileName:
693 raise OSError("Missing device file name")
694
695 commands = [
696 "\n".join(
697 [
698 "def send_data():",
699 " try:",
700 " from microbit import uart as u",
701 " except ImportError:",
702 " try:",
703 " from sys import stdout as u",
704 " except ImportError:",
705 " try:",
706 " from machine import UART",
707 " u = UART(0, {0})".format(115200),
708 " except Exception:",
709 " raise Exception('Could not find UART module"
710 " in device.')",
711 " f = open('{0}', 'rb')".format(deviceFileName),
712 " r = f.read",
713 " result = True",
714 " while result:",
715 " result = r(32)",
716 " if result:",
717 " u.write(result)",
718 " f.close()",
719 ]
720 ),
721 "send_data()",
722 ]
723 out, err = self.execute(commands)
724 if err:
725 raise OSError(self.__shortError(err))
726
727 # write the received bytes to the local file
728 # convert eol to "\n"
729 out = out.replace(b"\r\n", b"\n")
730 out = out.replace(b"\r", b"\n")
731
732 return out
733
734 def fileSystemInfo(self):
735 """
736 Public method to obtain information about the currently mounted file
737 systems.
738
739 @return tuple of tuples containing the file system name, the total
740 size, the used size and the free size
741 @rtype tuple of tuples of (str, int, int, int)
742 @exception OSError raised to indicate an issue with the device
743 """
744 commands = [
745 "import os as __os_",
746 "\n".join(
747 [
748 "def fsinfo():",
749 " infolist = []",
750 " info = __os_.statvfs('/')",
751 " if info[0] == 0:",
752 # assume it is just mount points
753 " fsnames = __os_.listdir('/')",
754 " for fs in fsnames:",
755 " fs = '/' + fs",
756 " infolist.append((fs, __os_.statvfs(fs)))",
757 " else:",
758 " infolist.append(('/', info))",
759 " return infolist",
760 ]
761 ),
762 "print(fsinfo())",
763 "del __os_, fsinfo",
764 ]
765 out, err = self.execute(commands)
766 if err:
767 raise OSError(self.__shortError(err))
768 infolist = ast.literal_eval(out.decode("utf-8"))
769 if infolist is None:
770 return None
771 else:
772 filesystemInfos = []
773 for fs, info in infolist:
774 totalSize = info[2] * info[1]
775 freeSize = info[4] * info[1]
776 usedSize = totalSize - freeSize
777 filesystemInfos.append((fs, totalSize, usedSize, freeSize))
778
779 return tuple(filesystemInfos)
780
781 ##################################################################
782 ## non-filesystem related methods below
783 ##################################################################
784
785 def getDeviceData(self):
786 """
787 Public method to get some essential data for the connected board.
788
789 @return dictionary containing the determined data
790 @rtype dict
791 @exception OSError raised to indicate an issue with the device
792 """
793 commands = [
794 "res = {}", # __IGNORE_WARNING_M613__
795 "import os as __os_",
796 "uname = __os_.uname()",
797 "res['sysname'] = uname.sysname",
798 "res['nodename'] = uname.nodename",
799 "res['release'] = uname.release",
800 "res['version'] = uname.version",
801 "res['machine'] = uname.machine",
802 "import sys as __sys_",
803 "res['py_platform'] = __sys_.platform",
804 "res['py_version'] = __sys_.version",
805 "\n".join(
806 [
807 "try:",
808 " res['mpy_name'] = __sys_.implementation.name",
809 "except AttributeError:",
810 " res['mpy_name'] = 'unknown'",
811 ]
812 ),
813 "\n".join(
814 [
815 "try:",
816 " res['mpy_version'] = '.'.join((str(i) for i in"
817 " __sys_.implementation.version))",
818 "except AttributeError:",
819 " res['mpy_version'] = 'unknown'",
820 ]
821 ),
822 "\n".join(
823 [
824 "try:",
825 " import pimoroni as __pimoroni_",
826 " res['mpy_variant'] = 'Pimoroni'",
827 " del __pimoroni_",
828 "except ImportError:",
829 " res['mpy_variant'] = ''",
830 ]
831 ),
832 "print(res)",
833 "del res, __os_, __sys_",
834 ]
835 out, err = self.execute(commands)
836 if err:
837 raise OSError(self.__shortError(err))
838 return ast.literal_eval(out.decode("utf-8"))
839
840 def getBoardInformation(self):
841 """
842 Public method to get some information data of the connected board.
843
844 @return dictionary containing the determined data
845 @rtype dict
846 @exception OSError raised to indicate an issue with the device
847 """
848 commands = [
849 "res = {}", # __IGNORE_WARNING_M613__
850 "import gc as __gc_",
851 "__gc_.enable()",
852 "__gc_.collect()",
853 "mem_alloc = __gc_.mem_alloc()",
854 "mem_free = __gc_.mem_free()",
855 "mem_total = mem_alloc + mem_free",
856 "res['mem_total_kb'] = mem_total / 1024.0",
857 "res['mem_used_kb'] = mem_alloc / 1024.0",
858 "res['mem_used_pc'] = mem_alloc / mem_total * 100.0",
859 "res['mem_free_kb'] = mem_free / 1024.0",
860 "res['mem_free_pc'] = mem_free / mem_total * 100.0",
861 "del __gc_, mem_alloc, mem_free, mem_total",
862 "import os as __os_",
863 "uname = __os_.uname()",
864 "res['sysname'] = uname.sysname",
865 "res['nodename'] = uname.nodename",
866 "res['release'] = uname.release",
867 "res['version'] = uname.version",
868 "res['machine'] = uname.machine",
869 "import sys as __sys_",
870 "res['py_platform'] = __sys_.platform",
871 "res['py_version'] = __sys_.version",
872 "\n".join(
873 [
874 "try:",
875 " res['mpy_name'] = __sys_.implementation.name",
876 "except AttributeError:",
877 " res['mpy_name'] = 'unknown'",
878 ]
879 ),
880 "\n".join(
881 [
882 "try:",
883 " res['mpy_version'] = '.'.join((str(i) for i in"
884 " __sys_.implementation.version))",
885 "except AttributeError:",
886 " res['mpy_version'] = 'unknown'",
887 ]
888 ),
889 "\n".join(
890 [
891 "try:",
892 " import pimoroni as __pimoroni_",
893 " res['mpy_variant'] = 'Pimoroni'",
894 " del __pimoroni_",
895 "except ImportError:",
896 " res['mpy_variant'] = ''",
897 ]
898 ),
899 "\n".join(
900 [
901 "try:",
902 " stat_ = __os_.statvfs('/flash')",
903 " res['flash_info_available'] = True",
904 " res['flash_total_kb'] = stat_[2] * stat_[0] / 1024.0",
905 " res['flash_free_kb'] = stat_[3] * stat_[0] / 1024.0",
906 " res['flash_used_kb'] = res['flash_total_kb'] -"
907 " res['flash_free_kb']",
908 " res['flash_free_pc'] = res['flash_free_kb'] /"
909 " res['flash_total_kb'] * 100.0",
910 " res['flash_used_pc'] = res['flash_used_kb'] /"
911 " res['flash_total_kb'] * 100.0",
912 " del stat_",
913 "except AttributeError:",
914 " res['flash_info_available'] = False",
915 ]
916 ),
917 "\n".join(
918 [
919 "try:",
920 " import machine as __mc_",
921 " if isinstance(__mc_.freq(), tuple):",
922 " res['mc_frequency_mhz'] = __mc_.freq()[0] / 1000000.0",
923 " else:",
924 " res['mc_frequency_mhz'] = __mc_.freq() / 1000000.0",
925 " res['mc_id'] = ':'.join(['{0:X}'.format(x)"
926 " for x in __mc_.unique_id()])",
927 " del __mc_",
928 "except ImportError:",
929 "\n".join(
930 [
931 " try:",
932 " import microcontroller as __mc_",
933 " res['mc_frequency_mhz'] = __mc_.cpu.frequency"
934 " / 1000000.0",
935 " res['mc_temp_c'] = __mc_.cpu.temperature",
936 " res['mc_id'] = ':'.join(['{0:X}'.format(x)"
937 " for x in __mc_.cpu.uid])",
938 " del __mc_",
939 " except ImportError:",
940 " res['mc_frequency'] = None",
941 " res['mc_temp'] = None",
942 ]
943 ),
944 ]
945 ),
946 "\n".join(
947 [
948 "try:",
949 " import ulab as __ulab_",
950 " res['ulab'] = __ulab_.__version__",
951 " del __ulab_",
952 "except ImportError:",
953 " res['ulab'] = None",
954 ]
955 ),
956 "print(res)",
957 "del res, __os_, __sys_",
958 ]
959 out, err = self.execute(commands)
960 if err:
961 raise OSError(self.__shortError(err))
962 return ast.literal_eval(out.decode("utf-8"))
963
964 def syncTime(self, deviceType, hasCPy=False):
965 """
966 Public method to set the time of the connected device to the local
967 computer's time.
968
969 @param deviceType type of board to sync time to
970 @type str
971 @param hasCPy flag indicating that the device has CircuitPython loadede
972 (defaults to False)
973 @type bool
974 @exception OSError raised to indicate an issue with the device
975 """
976 # rtc_time[0] - year 4 digit
977 # rtc_time[1] - month 1..12
978 # rtc_time[2] - day 1..31
979 # rtc_time[3] - weekday 1..7 1=Monday
980 # rtc_time[4] - hour 0..23
981 # rtc_time[5] - minute 0..59
982 # rtc_time[6] - second 0..59
983 # rtc_time[7] - yearday 1..366
984 # rtc_time[8] - isdst 0, 1, or -1
985 if deviceType == "circuitpython" or hasCPy:
986 set_time = "\n".join(
987 [
988 "def set_time(rtc_time):",
989 " import rtc",
990 " import time",
991 " clock = rtc.RTC()",
992 " clock_time = rtc_time[:3] + rtc_time[4:7] + (rtc_time[3],"
993 " rtc_time[7], rtc_time[8])",
994 " clock.datetime = time.struct_time(clock_time)",
995 ]
996 )
997 elif deviceType == "pyboard":
998 # Pyboard (pyboard doesn't have machine.RTC()).
999 # The pyb.RTC.datetime function takes the arguments in the
1000 # order: (year, month, day, weekday, hour, minute, second,
1001 # subseconds)
1002 # http://docs.micropython.org/en/latest/library/pyb.RTC.html
1003 # #pyb.RTC.datetime
1004 set_time = "\n".join(
1005 [
1006 "def set_time(rtc_time):",
1007 " import pyb",
1008 " rtc = pyb.RTC()",
1009 " rtc.datetime(rtc_time[:7] + (0,))",
1010 ]
1011 )
1012 elif deviceType == "teensy":
1013 # The pyb.RTC.datetime function takes the arguments in the
1014 # order: (year, month, day, weekday, hour, minute, second,
1015 # subseconds)
1016 # https://docs.micropython.org/en/latest/library/machine.RTC.html
1017 # #machine-rtc
1018 set_time = "\n".join(
1019 [
1020 "def set_time(rtc_time):",
1021 " import machine",
1022 " rtc = machine.RTC()",
1023 " rtc.init(rtc_time[:7] + (0,))",
1024 ]
1025 )
1026 elif deviceType == "esp":
1027 # The machine.RTC documentation was incorrect and doesn't agree
1028 # with the code, so no link is presented here. The order of the
1029 # arguments is the same as the pyboard except for LoBo MPy.
1030 set_time = "\n".join(
1031 [
1032 "def set_time(rtc_time):",
1033 " import machine",
1034 " rtc = machine.RTC()",
1035 " try:", # ESP8266 may use rtc.datetime()
1036 " rtc.datetime(rtc_time[:7] + (0,))",
1037 " except Exception:", # ESP32 uses rtc.init()
1038 " import os",
1039 " if 'LoBo' in os.uname()[0]:", # LoBo MPy
1040 " clock_time = rtc_time[:3] +"
1041 " rtc_time[4:7] + (rtc_time[3], rtc_time[7])",
1042 " else:",
1043 " clock_time = rtc_time[:7] + (0,)",
1044 " rtc.init(clock_time)",
1045 ]
1046 )
1047 elif deviceType in ("bbc_microbit", "calliope"):
1048 # BBC micro:bit and Calliope mini with MicroPython don't support
1049 # time commands.
1050 return
1051 elif deviceType == "rp2040":
1052 # Raspberry Pi Pico (RP2040) - machine.RTC doesn't exist
1053 set_time = "\n".join(
1054 [
1055 "def set_time(rtc_time):",
1056 " setup_0 = rtc_time[0] << 12 | rtc_time[1] << 8 |"
1057 " rtc_time[2]",
1058 " setup_1 = (rtc_time[3] % 7) << 24 | rtc_time[4] << 16 |"
1059 " rtc_time[5] << 8 | rtc_time[6]",
1060 " machine.mem32[0x4005c004] = setup_0",
1061 " machine.mem32[0x4005c008] = setup_1",
1062 " machine.mem32[0x4005c00c] |= 0x10",
1063 ]
1064 )
1065 elif deviceType == "pycom":
1066 # PyCom's machine.RTC takes its arguments in a slightly
1067 # different order than the official machine.RTC.
1068 # (year, month, day, hour, minute, second[, microsecond[,
1069 # tzinfo]])
1070 # https://docs.pycom.io/firmwareapi/pycom/machine/rtc/
1071 # #rtc-init-datetime-none-source-rtc-internal-rc
1072 set_time = "\n".join(
1073 [
1074 "def set_time(rtc_time):",
1075 " import pycom",
1076 " rtc_time2 = rtc_time[:3] + rtc_time[4:7]",
1077 " import machine",
1078 " rtc = machine.RTC()",
1079 " rtc.init(rtc_time2)",
1080 ]
1081 )
1082 else:
1083 # no set_time() support for generic boards
1084 return
1085
1086 now = time.localtime(time.time())
1087 commands = [
1088 set_time,
1089 "set_time({0})".format(
1090 (
1091 now.tm_year,
1092 now.tm_mon,
1093 now.tm_mday,
1094 now.tm_wday + 1,
1095 now.tm_hour,
1096 now.tm_min,
1097 now.tm_sec,
1098 now.tm_yday,
1099 now.tm_isdst,
1100 )
1101 ),
1102 "del set_time",
1103 ]
1104 out, err = self.execute(commands)
1105 if err:
1106 raise OSError(self.__shortError(err))
1107
1108 def getTime(self):
1109 """
1110 Public method to get the current time of the device.
1111
1112 @return time of the device
1113 @rtype str
1114 @exception OSError raised to indicate an issue with the device
1115 """
1116 commands = [
1117 "\n".join(
1118 [
1119 "try:",
1120 " import rtc as __rtc_",
1121 " print('{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'"
1122 ".format(*__rtc_.RTC().datetime[:6]))",
1123 " del __rtc_",
1124 "except:",
1125 " import time as __time_",
1126 " try:",
1127 " print(__time_.strftime('%Y-%m-%d %H:%M:%S',"
1128 # __IGNORE_WARNING_M601__
1129 " __time_.localtime()))",
1130 " except AttributeError:",
1131 " tm = __time_.localtime()",
1132 " print('{0:04d}-{1:02d}-{2:02d}"
1133 " {3:02d}:{4:02d}:{5:02d}'"
1134 ".format(tm[0], tm[1], tm[2], tm[3], tm[4], tm[5]))",
1135 " del tm",
1136 " del __time_",
1137 ]
1138 ),
1139 ]
1140 out, err = self.execute(commands)
1141 if err:
1142 if b"NotImplementedError" in err:
1143 return "&lt;unsupported&gt; &lt;unsupported&gt;"
1144 raise OSError(self.__shortError(err))
1145 return out.decode("utf-8").strip()
1146
1147 def getModules(self):
1148 """
1149 Public method to show a list of modules built into the firmware.
1150
1151 @return list of builtin modules
1152 @rtype list of str
1153 @exception OSError raised to indicate an issue with the device
1154 """
1155 commands = ["help('modules')"]
1156 out, err = self.execute(commands)
1157 if err:
1158 raise OSError(self.__shortError(err))
1159
1160 modules = []
1161 for line in out.decode("utf-8").splitlines()[:-1]:
1162 modules.extend(line.split())
1163 return modules

eric ide

mercurial