src/eric7/MicroPython/Devices/DeviceBase.py

branch
eric7
changeset 9765
6378da868bb0
parent 9763
52f982c08301
child 9766
f0e22f3a5878
equal deleted inserted replaced
9764:57496966803c 9765:6378da868bb0
6 """ 6 """
7 Module implementing some utility functions and the MicroPythonDevice base 7 Module implementing some utility functions and the MicroPythonDevice base
8 class. 8 class.
9 """ 9 """
10 10
11 import ast
11 import contextlib 12 import contextlib
12 import copy 13 import copy
13 import os 14 import os
15 import time
14 16
15 from PyQt6.QtCore import QObject, pyqtSlot 17 from PyQt6.QtCore import QObject, pyqtSlot
16 from PyQt6.QtWidgets import QInputDialog 18 from PyQt6.QtWidgets import QInputDialog
17 19
18 from eric7 import Preferences 20 from eric7 import Preferences
37 @type QObject 39 @type QObject
38 """ 40 """
39 super().__init__(parent) 41 super().__init__(parent)
40 42
41 self._deviceType = deviceType 43 self._deviceType = deviceType
44 self._interface = microPythonWidget.deviceInterface()
42 self.microPython = microPythonWidget 45 self.microPython = microPythonWidget
43 self._deviceData = {} # dictionary with essential device data 46 self._deviceData = {} # dictionary with essential device data
44 47
45 def setConnected(self, connected): 48 def setConnected(self, connected):
46 """ 49 """
54 """ 57 """
55 self._deviceData = {} 58 self._deviceData = {}
56 59
57 if connected: 60 if connected:
58 with contextlib.suppress(OSError): 61 with contextlib.suppress(OSError):
59 self._deviceData = self.microPython.commandsInterface().getDeviceData() 62 self._deviceData = self.__getDeviceData()
60 63
61 def getDeviceType(self): 64 def getDeviceType(self):
62 """ 65 """
63 Public method to get the device type. 66 Public method to get the device type.
64 67
94 """<p>The device data is not available. Try to connect to the""" 97 """<p>The device data is not available. Try to connect to the"""
95 """ device again. Aborting...</p>""" 98 """ device again. Aborting...</p>"""
96 ).format(self.getDeviceType()), 99 ).format(self.getDeviceType()),
97 ) 100 )
98 return False 101 return False
102
103 def hasCircuitPython(self):
104 """
105 Public method to check, if the connected device is flashed with CircuitPython.
106
107 @return flag indicating CircuitPython
108 @rtype bool
109 """
110 return (
111 self.checkDeviceData()
112 and self._deviceData["mpy_name"].lower() == "circuitpython"
113 )
99 114
100 def setButtons(self): 115 def setButtons(self):
101 """ 116 """
102 Public method to enable the supported action buttons. 117 Public method to enable the supported action buttons.
103 """ 118 """
265 commands = [c.encode("utf-8)") + b"\r" for c in commandsList] 280 commands = [c.encode("utf-8)") + b"\r" for c in commandsList]
266 commands.append(b"\r") 281 commands.append(b"\r")
267 commands.append(b"\x04") 282 commands.append(b"\x04")
268 rawOff = [b"\x02", b"\x02"] 283 rawOff = [b"\x02", b"\x02"]
269 commandSequence = rawOn + newLine + commands + rawOff 284 commandSequence = rawOn + newLine + commands + rawOff
270 self.microPython.commandsInterface().executeAsync(commandSequence) 285 self._interface.executeAsync(commandSequence)
271 286
272 @pyqtSlot() 287 @pyqtSlot()
273 def handleDataFlood(self): 288 def handleDataFlood(self):
274 """ 289 """
275 Public slot handling a data floof from the device. 290 Public slot handling a data floof from the device.
358 @return list of tuples with menu text and URL to be opened for each 373 @return list of tuples with menu text and URL to be opened for each
359 entry 374 entry
360 @rtype list of tuple of (str, str) 375 @rtype list of tuple of (str, str)
361 """ 376 """
362 return [] 377 return []
378
379 ##################################################################
380 ## Methods below implement the file system commands
381 ##################################################################
382
383 def _shortError(self, error):
384 """
385 Protected method to create a shortened error message.
386
387 @param error verbose error message
388 @type bytes
389 @return shortened error message
390 @rtype str
391 """
392 if error:
393 decodedError = error.decode("utf-8")
394 try:
395 return decodedError.split["\r\n"][-2]
396 except Exception:
397 return decodedError
398
399 return self.tr("Detected an error without indications.")
400
401 def ls(self, dirname=""):
402 """
403 Public method to get a directory listing of the connected device.
404
405 @param dirname name of the directory to be listed
406 @type str
407 @return tuple containg the directory listing
408 @rtype tuple of str
409 @exception OSError raised to indicate an issue with the device
410 """
411 command = """
412 import os as __os_
413 print(__os_.listdir('{0}'))
414 del __os_
415 """.format(
416 dirname
417 )
418 out, err = self._interface.execute(command)
419 if err:
420 raise OSError(self._shortError(err))
421 return ast.literal_eval(out.decode("utf-8"))
422
423 def lls(self, dirname="", fullstat=False, showHidden=False):
424 """
425 Public method to get a long directory listing of the connected device
426 including meta data.
427
428 @param dirname name of the directory to be listed
429 @type str
430 @param fullstat flag indicating to return the full stat() tuple
431 @type bool
432 @param showHidden flag indicating to show hidden files as well
433 @type bool
434 @return list containing the directory listing with tuple entries of
435 the name and and a tuple of mode, size and time (if fullstat is
436 false) or the complete stat() tuple. 'None' is returned in case the
437 directory doesn't exist.
438 @rtype tuple of (str, tuple)
439 @exception OSError raised to indicate an issue with the device
440 """
441 command = """
442 import os as __os_
443
444 def is_visible(filename, showHidden):
445 return showHidden or (filename[0] != '.' and filename[-1] != '~')
446
447 def stat(filename):
448 try:
449 rstat = __os_.lstat(filename)
450 except:
451 rstat = __os_.stat(filename)
452 return tuple(rstat)
453
454 def listdir_stat(dirname, showHidden):
455 try:
456 files = __os_.listdir(dirname)
457 except OSError:
458 return []
459 if dirname in ('', '/'):
460 return list((f, stat(f)) for f in files if is_visible(f, showHidden))
461 return list(
462 (f, stat(dirname + '/' + f)) for f in files if is_visible(f, showHidden)
463 )
464
465 print(listdir_stat('{0}', {1}))
466 del __os_, stat, listdir_stat, is_visible
467 """.format(
468 dirname, showHidden
469 )
470 out, err = self._interface.execute(command)
471 if err:
472 raise OSError(self._shortError(err))
473 fileslist = ast.literal_eval(out.decode("utf-8"))
474 if fileslist is None:
475 return None
476 else:
477 if fullstat:
478 return fileslist
479 else:
480 return [(f, (s[0], s[6], s[8])) for f, s in fileslist]
481
482 def cd(self, dirname):
483 """
484 Public method to change the current directory on the connected device.
485
486 @param dirname directory to change to
487 @type str
488 @exception OSError raised to indicate an issue with the device
489 """
490 if dirname:
491 command = """
492 import os as __os_
493 __os_.chdir('{0}')
494 del __os_
495 """.format(
496 dirname
497 )
498 out, err = self._interface.execute(command)
499 if err:
500 raise OSError(self._shortError(err))
501
502 def pwd(self):
503 """
504 Public method to get the current directory of the connected device.
505
506 @return current directory
507 @rtype str
508 @exception OSError raised to indicate an issue with the device
509 """
510 command = """
511 import os as __os_
512 print(__os_.getcwd())
513 del __os_
514 """
515 out, err = self._interface.execute(command)
516 if err:
517 raise OSError(self._shortError(err))
518 return out.decode("utf-8").strip()
519
520 def rm(self, filename):
521 """
522 Public method to remove a file from the connected device.
523
524 @param filename name of the file to be removed
525 @type str
526 @exception OSError raised to indicate an issue with the device
527 """
528 if filename:
529 command = """
530 import os as __os_
531 __os_.remove('{0}')
532 del __os_
533 """.format(
534 filename
535 )
536 out, err = self._interface.execute(command)
537 if err:
538 raise OSError(self._shortError(err))
539
540 def rmrf(self, name, recursive=False, force=False):
541 """
542 Public method to remove a file or directory recursively.
543
544 @param name of the file or directory to remove
545 @type str
546 @param recursive flag indicating a recursive deletion
547 @type bool
548 @param force flag indicating to ignore errors
549 @type bool
550 @return flag indicating success
551 @rtype bool
552 @exception OSError raised to indicate an issue with the device
553 """
554 if name:
555 command = """
556 import os as __os_
557
558 def remove_file(name, recursive=False, force=False):
559 try:
560 mode = __os_.stat(name)[0]
561 if mode & 0x4000 != 0:
562 if recursive:
563 for file in __os_.listdir(name):
564 success = remove_file(name + '/' + file, recursive, force)
565 if not success and not force:
566 return False
567 __os_.rmdir(name)
568 else:
569 if not force:
570 return False
571 else:
572 __os_.remove(name)
573 except:
574 if not force:
575 return False
576 return True
577
578 print(remove_file('{0}', {1}, {2}))
579 del __os_, remove_file
580 """.format(
581 name, recursive, force
582 )
583 out, err = self._interface.execute(command)
584 if err:
585 raise OSError(self._shortError(err))
586 return ast.literal_eval(out.decode("utf-8"))
587
588 return False
589
590 def mkdir(self, dirname):
591 """
592 Public method to create a new directory.
593
594 @param dirname name of the directory to create
595 @type str
596 @exception OSError raised to indicate an issue with the device
597 """
598 if dirname:
599 command = """
600 import os as __os_
601 __os_.mkdir('{0}')
602 del __os_
603 """.format(
604 dirname
605 )
606 out, err = self._interface.execute(command)
607 if err:
608 raise OSError(self._shortError(err))
609
610 def rmdir(self, dirname):
611 """
612 Public method to remove a directory.
613
614 @param dirname name of the directory to be removed
615 @type str
616 @exception OSError raised to indicate an issue with the device
617 """
618 if dirname:
619 command = """
620 import os as __os_
621 __os_.rmdir('{0}')
622 del __os_
623 """.format(
624 dirname
625 )
626 out, err = self._interface.execute(command)
627 if err:
628 raise OSError(self._shortError(err))
629
630 def put(self, hostFileName, deviceFileName=None):
631 """
632 Public method to copy a local file to the connected device.
633
634 @param hostFileName name of the file to be copied
635 @type str
636 @param deviceFileName name of the file to copy to
637 @type str
638 @return flag indicating success
639 @rtype bool
640 @exception OSError raised to indicate an issue with the device
641 """
642 if not os.path.isfile(hostFileName):
643 raise OSError("No such file: {0}".format(hostFileName))
644
645 if not deviceFileName:
646 deviceFileName = os.path.basename(hostFileName)
647
648 with open(hostFileName, "rb") as hostFile:
649 content = hostFile.read()
650
651 return self.putData(deviceFileName, content)
652
653 def putData(self, deviceFileName, content):
654 """
655 Public method to write the given data to the connected device.
656
657 @param deviceFileName name of the file to write to
658 @type str
659 @param content data to write
660 @type bytes
661 @return flag indicating success
662 @rtype bool
663 @exception OSError raised to indicate an issue with the device
664 """
665 if not deviceFileName:
666 raise OSError("Missing device file name")
667
668 # convert eol '\r'
669 content = content.replace(b"\r\n", b"\r")
670 content = content.replace(b"\n", b"\r")
671
672 commands = [
673 "fd = open('{0}', 'wb')".format(deviceFileName),
674 "f = fd.write",
675 ]
676 while content:
677 chunk = content[:64]
678 commands.append("f(" + repr(chunk) + ")")
679 content = content[64:]
680 commands.extend(
681 [
682 "fd.close()",
683 "del f, fd",
684 ]
685 )
686 command = "\n".join(commands)
687
688 out, err = self._interface.execute(command)
689 if err:
690 raise OSError(self._shortError(err))
691 return True
692
693 def get(self, deviceFileName, hostFileName=None):
694 """
695 Public method to copy a file from the connected device.
696
697 @param deviceFileName name of the file to copy
698 @type str
699 @param hostFileName name of the file to copy to
700 @type str
701 @return flag indicating success
702 @rtype bool
703 @exception OSError raised to indicate an issue with the device
704 """
705 if not deviceFileName:
706 raise OSError("Missing device file name")
707
708 if not hostFileName:
709 hostFileName = deviceFileName
710
711 out = self.getData(deviceFileName)
712 with open(hostFileName, "wb") as hostFile:
713 hostFile.write(out)
714
715 return True
716
717 def getData(self, deviceFileName):
718 """
719 Public method to read data from the connected device.
720
721 @param deviceFileName name of the file to read from
722 @type str
723 @return data read from the device
724 @rtype bytes
725 @exception OSError raised to indicate an issue with the device
726 """
727 if not deviceFileName:
728 raise OSError("Missing device file name")
729
730 command = """
731 def send_data():
732 try:
733 from microbit import uart as u
734 except ImportError:
735 try:
736 from sys import stdout as u
737 except ImportError:
738 try:
739 from machine import UART
740 u = UART(0, 115200)
741 except Exception:
742 raise Exception('Could not find UART module in device.')
743 f = open('{0}', 'rb')
744 r = f.read
745 result = True
746 while result:
747 result = r(32)
748 if result:
749 u.write(result)
750 f.close()
751
752 send_data()
753 del send_data
754 """.format(
755 deviceFileName
756 )
757 out, err = self._interface.execute(command)
758 if err:
759 raise OSError(self._shortError(err))
760
761 # write the received bytes to the local file
762 # convert eol to "\n"
763 out = out.replace(b"\r\n", b"\n")
764 out = out.replace(b"\r", b"\n")
765
766 return out
767
768 def fileSystemInfo(self):
769 """
770 Public method to obtain information about the currently mounted file
771 systems.
772
773 @return tuple of tuples containing the file system name, the total
774 size, the used size and the free size
775 @rtype tuple of tuples of (str, int, int, int)
776 @exception OSError raised to indicate an issue with the device
777 """
778 command = """
779 import os as __os_
780
781 def fsinfo():
782 infolist = []
783 info = __os_.statvfs('/')
784 if info[0] == 0:
785 fsnames = __os_.listdir('/')
786 for fs in fsnames:
787 fs = '/' + fs
788 infolist.append((fs, __os_.statvfs(fs)))
789 else:
790 infolist.append(('/', info))
791 return infolist
792
793 print(fsinfo())
794 del __os_, fsinfo
795 """
796 out, err = self._interface.execute(command)
797 if err:
798 raise OSError(self._shortError(err))
799 infolist = ast.literal_eval(out.decode("utf-8"))
800 if infolist is None:
801 return None
802 else:
803 filesystemInfos = []
804 for fs, info in infolist:
805 totalSize = info[2] * info[1]
806 freeSize = info[4] * info[1]
807 usedSize = totalSize - freeSize
808 filesystemInfos.append((fs, totalSize, usedSize, freeSize))
809
810 return tuple(filesystemInfos)
811
812 ##################################################################
813 ## board information related methods below
814 ##################################################################
815
816 def __getDeviceData(self):
817 """
818 Private method to get some essential data for the connected board.
819
820 @return dictionary containing the determined data
821 @rtype dict
822 @exception OSError raised to indicate an issue with the device
823 """
824 command = """
825 res = {}
826
827 import os as __os_
828 uname = __os_.uname()
829 res['sysname'] = uname.sysname
830 res['nodename'] = uname.nodename
831 res['release'] = uname.release
832 res['version'] = uname.version
833 res['machine'] = uname.machine
834
835 import sys as __sys_
836 res['py_platform'] = __sys_.platform
837 res['py_version'] = __sys_.version
838
839 try:
840 res['mpy_name'] = __sys_.implementation.name
841 except AttributeError:
842 res['mpy_name'] = 'unknown'
843
844 try:
845 res['mpy_version'] = '.'.join((str(i) for i in __sys_.implementation.version))
846 except AttributeError:
847 res['mpy_version'] = 'unknown'
848
849 try:
850 import pimoroni as __pimoroni_
851 res['mpy_variant'] = 'Pimoroni'
852 del __pimoroni_
853 except ImportError:
854 res['mpy_variant'] = ''
855
856 print(res)
857 del res, uname, __os_, __sys_
858 """
859 out, err = self._interface.execute(command)
860 if err:
861 raise OSError(self._shortError(err))
862 return ast.literal_eval(out.decode("utf-8"))
863
864 def getBoardInformation(self):
865 """
866 Public method to get some information data of the connected board.
867
868 @return dictionary containing the determined data
869 @rtype dict
870 @exception OSError raised to indicate an issue with the device
871 """
872 command = """
873 res = {}
874
875 import gc as __gc_
876 __gc_.enable()
877 __gc_.collect()
878 mem_alloc = __gc_.mem_alloc()
879 mem_free = __gc_.mem_free()
880 mem_total = mem_alloc + mem_free
881 res['mem_total_kb'] = mem_total / 1024.0
882 res['mem_used_kb'] = mem_alloc / 1024.0
883 res['mem_used_pc'] = mem_alloc / mem_total * 100.0
884 res['mem_free_kb'] = mem_free / 1024.0
885 res['mem_free_pc'] = mem_free / mem_total * 100.0
886 del __gc_, mem_alloc, mem_free, mem_total
887
888 import os as __os_
889 uname = __os_.uname()
890 res['sysname'] = uname.sysname
891 res['nodename'] = uname.nodename
892 res['release'] = uname.release
893 res['version'] = uname.version
894 res['machine'] = uname.machine
895
896 import sys as __sys_
897 res['py_platform'] = __sys_.platform
898 res['py_version'] = __sys_.version
899
900 try:
901 res['mpy_name'] = __sys_.implementation.name
902 except AttributeError:
903 res['mpy_name'] = 'unknown'
904 try:
905 res['mpy_version'] = '.'.join((str(i) for i in __sys_.implementation.version))
906 except AttributeError:
907 res['mpy_version'] = 'unknown'
908 try:
909 import pimoroni as __pimoroni_
910 res['mpy_variant'] = 'Pimoroni'
911 del __pimoroni_
912 except ImportError:
913 res['mpy_variant'] = ''
914
915 try:
916 stat_ = __os_.statvfs('/flash')
917 res['flash_info_available'] = True
918 res['flash_total_kb'] = stat_[2] * stat_[0] / 1024.0
919 res['flash_free_kb'] = stat_[3] * stat_[0] / 1024.0
920 res['flash_used_kb'] = res['flash_total_kb'] - res['flash_free_kb']
921 res['flash_free_pc'] = res['flash_free_kb'] / res['flash_total_kb'] * 100.0
922 res['flash_used_pc'] = res['flash_used_kb'] / res['flash_total_kb'] * 100.0
923 del stat_
924 except AttributeError:
925 res['flash_info_available'] = False
926
927 try:
928 import machine as __mc_
929 if isinstance(__mc_.freq(), tuple):
930 res['mc_frequency_mhz'] = __mc_.freq()[0] / 1000000.0
931 else:
932 res['mc_frequency_mhz'] = __mc_.freq() / 1000000.0
933 res['mc_id'] = ':'.join(['{0:X}'.format(x) for x in __mc_.unique_id()])
934 del __mc_
935 except ImportError:
936 try:
937 import microcontroller as __mc_
938 res['mc_frequency_mhz'] = __mc_.cpu.frequency / 1000000.0
939 res['mc_temp_c'] = __mc_.cpu.temperature
940 res['mc_id'] = ':'.join(['{0:X}'.format(x) for x in __mc_.cpu.uid])
941 del __mc_
942 except ImportError:
943 res['mc_frequency'] = None
944 res['mc_temp'] = None
945
946 try:
947 import ulab as __ulab_
948 res['ulab'] = __ulab_.__version__
949 del __ulab_
950 except ImportError:
951 res['ulab'] = None
952
953 print(res)
954 del res, __os_, __sys_
955 """
956 out, err = self._interface.execute(command)
957 if err:
958 raise OSError(self._shortError(err))
959 return ast.literal_eval(out.decode("utf-8"))
960
961 def getModules(self):
962 """
963 Public method to show a list of modules built into the firmware.
964
965 @return list of builtin modules
966 @rtype list of str
967 @exception OSError raised to indicate an issue with the device
968 """
969 commands = ["help('modules')"]
970 out, err = self._interface.execute(commands)
971 if err:
972 raise OSError(self._shortError(err))
973
974 modules = []
975 for line in out.decode("utf-8").splitlines()[:-1]:
976 modules.extend(line.split())
977 return modules
978
979 ##################################################################
980 ## time related methods below
981 ##################################################################
982
983 def getTime(self):
984 """
985 Public method to get the current time of the device.
986
987 @return time of the device
988 @rtype str
989 @exception OSError raised to indicate an issue with the device
990 """
991 command = """
992 try:
993 import rtc as __rtc_
994 print(
995 '{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'
996 .format(*__rtc_.RTC().datetime[:6])
997 )
998 del __rtc_
999 except:
1000 import time as __time_
1001 try:
1002 print(__time_.strftime('%Y-%m-%d %H:%M:%S', __time_.localtime()))
1003 except AttributeError:
1004 tm = __time_.localtime()
1005 print(
1006 '{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}'
1007 .format(tm[0], tm[1], tm[2], tm[3], tm[4], tm[5])
1008 )
1009 del tm
1010 del __time_
1011 """
1012 out, err = self._interface.execute(command)
1013 if err:
1014 if b"NotImplementedError" in err:
1015 return "&lt;unsupported&gt; &lt;unsupported&gt;"
1016 raise OSError(self._shortError(err))
1017 return out.decode("utf-8").strip()
1018
1019 def _getSetTimeCode(self):
1020 """
1021 Protected method to get the device code to set the time.
1022
1023 Note: This method must be implemented in the various device specific
1024 subclasses.
1025
1026 @return code to be executed on the connected device to set the time
1027 @rtype str
1028 """
1029 # rtc_time[0] - year 4 digit
1030 # rtc_time[1] - month 1..12
1031 # rtc_time[2] - day 1..31
1032 # rtc_time[3] - weekday 1..7 1=Monday
1033 # rtc_time[4] - hour 0..23
1034 # rtc_time[5] - minute 0..59
1035 # rtc_time[6] - second 0..59
1036 # rtc_time[7] - yearday 1..366
1037 # rtc_time[8] - isdst 0, 1, or -1
1038 if self.hasCircuitPython():
1039 # CircuitPython is handled here in order to not duplicate the code in all
1040 # specific boards able to be flashed with CircuitPython or MicroPython
1041 return """
1042 def set_time(rtc_time):
1043 import rtc
1044 import time
1045 clock = rtc.RTC()
1046 clock_time = rtc_time[:3] + rtc_time[4:7] + (rtc_time[3], rtc_time[7], rtc_time[8])
1047 clock.datetime = time.struct_time(clock_time)
1048 """
1049 else:
1050 return ""
1051
1052 def syncTime(self, deviceType, hasCPy=False):
1053 """
1054 Public method to set the time of the connected device to the local
1055 computer's time.
1056
1057 @param deviceType type of board to sync time to
1058 @type str
1059 @param hasCPy flag indicating that the device has CircuitPython loadede
1060 (defaults to False)
1061 @type bool
1062 @exception OSError raised to indicate an issue with the device
1063 """
1064 setTimeCode = self._getSetTimeCode()
1065 if setTimeCode:
1066 now = time.localtime(time.time())
1067 command = """{0}
1068 set_time({1})
1069 del set_time
1070 """.format(
1071 setTimeCode,
1072 (
1073 now.tm_year,
1074 now.tm_mon,
1075 now.tm_mday,
1076 now.tm_wday + 1,
1077 now.tm_hour,
1078 now.tm_min,
1079 now.tm_sec,
1080 now.tm_yday,
1081 now.tm_isdst,
1082 ),
1083 )
1084 out, err = self._interface.execute(command)
1085 if err:
1086 raise OSError(self._shortError(err))
1087
1088
1089 #
1090 # eflag: noqa = M613

eric ide

mercurial