eric6/MicroPython/MicroPythonCommandsInterface.py

branch
micropython
changeset 7095
8e10acb1cd85
child 7102
5e77aa4671e6
equal deleted inserted replaced
7094:d5f340dfb986 7095:8e10acb1cd85
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2019 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing some file system commands for MicroPython.
8 """
9
10 from __future__ import unicode_literals
11
12 import ast
13 import time
14 import os
15
16 from PyQt5.QtCore import pyqtSlot, pyqtSignal, QObject, QThread, QTimer
17
18 from .MicroPythonSerialPort import MicroPythonSerialPort
19
20 import Preferences
21
22
23 class MicroPythonCommandsInterface(QObject):
24 """
25 Class implementing some file system commands for MicroPython.
26
27 Commands are provided to perform operations on the file system of a
28 connected MicroPython device. Supported commands are:
29 <ul>
30 <li>ls: directory listing</li>
31 <li>lls: directory listing with meta data</li>
32 <li>cd: change directory</li>
33 <li>pwd: get the current directory</li>
34 <li>put: copy a file to the connected device</li>
35 <li>get: get a file from the connected device</li>
36 <li>rm: remove a file from the connected device</li>
37 <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash)
38 <li>mkdir: create a new directory</li>
39 <li>rmdir: remove an empty directory</li>
40 </ul>
41
42 There are additional commands related to time and version.
43 <ul>
44 <li>version: get version info about MicroPython</li>
45 <li>getImplementation: get some implementation information</li>
46 <li>syncTime: synchronize the time of the connected device</li>
47 <li>showTime: show the current time of the connected device</li>
48 </ul>
49
50 @signal executeAsyncFinished() emitted to indicate the end of an
51 asynchronously executed list of commands (e.g. a script)
52 @signal dataReceived(data) emitted to send data received via the serial
53 connection for further processing
54 """
55 executeAsyncFinished = pyqtSignal()
56 dataReceived = pyqtSignal(bytes)
57
58 def __init__(self, parent=None):
59 """
60 Constructor
61
62 @param parent reference to the parent object
63 @type QObject
64 """
65 super(MicroPythonCommandsInterface, self).__init__(parent)
66
67 self.__blockReadyRead = False
68
69 self.__serial = MicroPythonSerialPort(
70 timeout=Preferences.getMicroPython("SerialTimeout"),
71 parent=self)
72 self.__serial.readyRead.connect(self.__readSerial)
73
74 @pyqtSlot()
75 def __readSerial(self):
76 """
77 Private slot to read all available serial data and emit it with the
78 "dataReceived" signal for further processing.
79 """
80 if not self.__blockReadyRead:
81 data = bytes(self.__serial.readAll())
82 self.dataReceived.emit(data)
83
84 @pyqtSlot()
85 def connectToDevice(self, port):
86 """
87 Public slot to start the manager.
88
89 @param port name of the port to be used
90 @type str
91 @return flag indicating success
92 @rtype bool
93 """
94 return self.__serial.openSerialLink(port)
95
96 @pyqtSlot()
97 def disconnectFromDevice(self):
98 """
99 Public slot to stop the thread.
100 """
101 self.__serial.closeSerialLink()
102
103 def isConnected(self):
104 """
105 Public method to get the connection status.
106
107 @return flag indicating the connection status
108 @rtype bool
109 """
110 return self.__serial.isConnected()
111
112 @pyqtSlot()
113 def handlePreferencesChanged(self):
114 """
115 Public slot to handle a change of the preferences.
116 """
117 self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout"))
118
119 def write(self, data):
120 """
121 Public method to write data to the connected device.
122
123 @param data data to be written
124 @type bytes or bytearray
125 """
126 self.__serial.isConnected() and self.__serial.write(data)
127
128 def __rawOn(self):
129 """
130 Private method to switch the connected device to 'raw' mode.
131
132 Note: switching to raw mode is done with synchronous writes.
133
134 @return flag indicating success
135 @@rtype bool
136 """
137 if not self.__serial:
138 return False
139
140 rawReplMessage = b"raw REPL; CTRL-B to exit\r\n"
141 softRebootMessage = b"soft reboot\r\n"
142
143 self.__serial.write(b"\x02") # end raw mode if required
144 self.__serial.waitForBytesWritten()
145 for _i in range(3):
146 # CTRL-C three times to break out of loops
147 self.__serial.write(b"\r\x03")
148 self.__serial.waitForBytesWritten()
149 QThread.msleep(10)
150 self.__serial.readAll() # read all data and discard it
151 self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode
152 self.__serial.readUntil(rawReplMessage)
153 if self.__serial.hasTimedOut():
154 return False
155 self.__serial.write(b"\x04") # send CTRL-D to soft reset
156 self.__serial.readUntil(softRebootMessage)
157 if self.__serial.hasTimedOut():
158 return False
159
160 # some MicroPython devices seem to need to be convinced in some
161 # special way
162 data = self.__serial.readUntil(rawReplMessage)
163 if self.__serial.hasTimedOut():
164 return False
165 if not data.endswith(rawReplMessage):
166 self.__serial.write(b"\r\x01") # send CTRL-A again
167 self.__serial.readUntil(rawReplMessage)
168 if self.__serial.hasTimedOut():
169 return False
170 self.__serial.readAll() # read all data and discard it
171 return True
172
173 def __rawOff(self):
174 """
175 Private method to switch 'raw' mode off.
176 """
177 if self.__serial:
178 self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode
179
180 def execute(self, commands):
181 """
182 Public method to send commands to the connected device and return the
183 result.
184
185 If no serial connection is available, empty results will be returned.
186
187 @param commands list of commands to be executed
188 @type str
189 @return tuple containing stdout and stderr output of the device
190 @rtype tuple of (bytes, bytes)
191 """
192 if not self.__serial:
193 return b"", b""
194
195 if not self.__serial.isConnected():
196 return b"", b"Device not connected or not switched on."
197
198 result = bytearray()
199 err = b""
200
201 self.__blockReadyRead = True
202 ok = self.__rawOn()
203 if not ok:
204 self.__blockReadyRead = False
205 return (
206 b"",
207 b"Could not switch to raw mode. Is the device switched on?"
208 )
209
210 QThread.msleep(10)
211 for command in commands:
212 if command:
213 commandBytes = command.encode("utf-8")
214 self.__serial.write(commandBytes + b"\x04")
215 # read until prompt
216 response = self.__serial.readUntil(b"\x04>")
217 if self.__serial.hasTimedOut():
218 self.__blockReadyRead = False
219 return b"", b"Timeout while processing commands."
220 if b"\x04" in response[2:-2]:
221 # split stdout, stderr
222 out, err = response[2:-2].split(b"\x04")
223 result += out
224 else:
225 err = b"invalid response received: " + response
226 if err:
227 self.__blockReadyRead = False
228 return b"", err
229 QThread.msleep(10)
230 self.__rawOff()
231 self.__blockReadyRead = False
232
233 return bytes(result), err
234
235 def executeAsync(self, commandsList):
236 """
237 Public method to execute a series of commands over a period of time
238 without returning any result (asynchronous execution).
239
240 @param commandsList list of commands to be execute on the device
241 @type list of bytes
242 """
243 def remainingTask(commands):
244 self.executeAsync(commands)
245
246 if commandsList:
247 command = commandsList[0]
248 self.__serial.write(command)
249 remainder = commandsList[1:]
250 QTimer.singleShot(2, lambda: remainingTask(remainder))
251 else:
252 self.executeAsyncFinished.emit()
253
254 def __shortError(self, error):
255 """
256 Private method to create a shortened error message.
257
258 @param error verbose error message
259 @type bytes
260 @return shortened error message
261 @rtype str
262 """
263 if error:
264 decodedError = error.decode("utf-8")
265 try:
266 return decodedError.split["\r\n"][-2]
267 except Exception:
268 return decodedError
269 return self.tr("Detected an error without indications.")
270
271 ##################################################################
272 ## Methods below implement the file system commands
273 ##################################################################
274
275 def ls(self, dirname=""):
276 """
277 Public method to get a directory listing of the connected device.
278
279 @param dirname name of the directory to be listed
280 @type str
281 @return tuple containg the directory listing
282 @rtype tuple of str
283 @exception IOError raised to indicate an issue with the device
284 """
285 commands = [
286 "import os",
287 "print(os.listdir('{0}'))".format(dirname),
288 ]
289 out, err = self.execute(commands)
290 if err:
291 raise IOError(self.__shortError(err))
292 return ast.literal_eval(out.decode("utf-8"))
293
294 def lls(self, dirname="", fullstat=False):
295 """
296 Public method to get a long directory listing of the connected device
297 including meta data.
298
299 @param dirname name of the directory to be listed
300 @type str
301 @param fullstat flag indicating to return the full stat() tuple
302 @type bool
303 @return list containing the directory listing with tuple entries of
304 the name and and a tuple of mode, size and time (if fullstat is
305 false) or the complete stat() tuple. 'None' is returned in case the
306 directory doesn't exist.
307 @rtype tuple of (str, tuple)
308 @exception IOError raised to indicate an issue with the device
309 """
310 commands = [
311 "import os",
312 "\n".join([
313 "def stat(filename):",
314 " try:",
315 " rstat = os.lstat(filename)",
316 " except:",
317 " rstat = os.stat(filename)",
318 " return tuple(rstat)",
319 ]),
320 "\n".join([
321 "def listdir_stat(dirname):",
322 " try:",
323 " files = os.listdir(dirname)",
324 " except OSError:",
325 " return None",
326 " if dirname in ('', '/'):",
327 " return list((f, stat(f)) for f in files)",
328 " return list((f, stat(dirname + '/' + f)) for f in files)",
329 ]),
330 "print(listdir_stat('{0}'))".format(dirname),
331 ]
332 out, err = self.execute(commands)
333 if err:
334 raise IOError(self.__shortError(err))
335 fileslist = ast.literal_eval(out.decode("utf-8"))
336 if fileslist is None:
337 return None
338 else:
339 if fullstat:
340 return fileslist
341 else:
342 return [(f, (s[0], s[6], s[8])) for f, s in fileslist]
343
344 def cd(self, dirname):
345 """
346 Public method to change the current directory on the connected device.
347
348 @param dirname directory to change to
349 @type str
350 @exception IOError raised to indicate an issue with the device
351 """
352 assert dirname
353
354 commands = [
355 "import os",
356 "os.chdir('{0}')".format(dirname),
357 ]
358 out, err = self.execute(commands)
359 if err:
360 raise IOError(self.__shortError(err))
361
362 def pwd(self):
363 """
364 Public method to get the current directory of the connected device.
365
366 @return current directory
367 @rtype str
368 @exception IOError raised to indicate an issue with the device
369 """
370 commands = [
371 "import os",
372 "print(os.getcwd())",
373 ]
374 out, err = self.execute(commands)
375 if err:
376 raise IOError(self.__shortError(err))
377 return out.decode("utf-8").strip()
378
379 def rm(self, filename):
380 """
381 Public method to remove a file from the connected device.
382
383 @param filename name of the file to be removed
384 @type str
385 @exception IOError raised to indicate an issue with the device
386 """
387 assert filename
388
389 commands = [
390 "import os",
391 "os.remove('{0}')".format(filename),
392 ]
393 out, err = self.execute(commands)
394 if err:
395 raise IOError(self.__shortError(err))
396
397 def rmrf(self, name, recursive=False, force=False):
398 """
399 Public method to remove a file or directory recursively.
400
401 @param name of the file or directory to remove
402 @type str
403 @param recursive flag indicating a recursive deletion
404 @type bool
405 @param force flag indicating to ignore errors
406 @type bool
407 @return flag indicating success
408 @rtype bool
409 @exception IOError raised to indicate an issue with the device
410 """
411 assert name
412
413 commands = [
414 "import os",
415 "\n".join([
416 "def remove_file(name, recursive=False, force=False):",
417 " try:",
418 " mode = os.stat(name)[0]",
419 " if mode & 0x4000 != 0:",
420 " if recursive:",
421 " for file in os.listdir(name):",
422 " success = remove_file(name + '/' + file,"
423 " recursive, force)",
424 " if not success and not force:",
425 " return False",
426 " os.rmdir(name)",
427 " else:",
428 " if not force:",
429 " return False",
430 " else:",
431 " os.remove(name)",
432 " except:",
433 " if not force:",
434 " return False",
435 " return True",
436 ]),
437 "print(remove_file('{0}', {1}, {2}))".format(name, recursive,
438 force),
439 ]
440 out, err = self.execute(commands)
441 if err:
442 raise IOError(self.__shortError(err))
443 return ast.literal_eval(out.decode("utf-8"))
444
445 def mkdir(self, dirname):
446 """
447 Public method to create a new directory.
448
449 @param dirname name of the directory to create
450 @type str
451 @exception IOError raised to indicate an issue with the device
452 """
453 assert dirname
454
455 commands = [
456 "import os",
457 "os.mkdir('{0}')".format(dirname),
458 ]
459 out, err = self.execute(commands)
460 if err:
461 raise IOError(self.__shortError(err))
462
463 def rmdir(self, dirname):
464 """
465 Public method to remove a directory.
466
467 @param dirname name of the directory to be removed
468 @type str
469 @exception IOError raised to indicate an issue with the device
470 """
471 assert dirname
472
473 commands = [
474 "import os",
475 "os.rmdir('{0}')".format(dirname),
476 ]
477 out, err = self.execute(commands)
478 if err:
479 raise IOError(self.__shortError(err))
480
481 def put(self, hostFileName, deviceFileName=None):
482 """
483 Public method to copy a local file to the connected device.
484
485 @param hostFileName name of the file to be copied
486 @type str
487 @param deviceFileName name of the file to copy to
488 @type str
489 @return flag indicating success
490 @rtype bool
491 @exception IOError raised to indicate an issue with the device
492 """
493 if not os.path.isfile(hostFileName):
494 raise IOError("No such file: {0}".format(hostFileName))
495
496 with open(hostFileName, "rb") as hostFile:
497 content = hostFile.read()
498 # convert eol '\r'
499 content = content.replace(b"\r\n", b"\r")
500 content = content.replace(b"\n", b"\r")
501
502 if not deviceFileName:
503 deviceFileName = os.path.basename(hostFileName)
504
505 commands = [
506 "fd = open('{0}', 'wb')".format(deviceFileName),
507 "f = fd.write",
508 ]
509 while content:
510 chunk = content[:64]
511 commands.append("f(" + repr(chunk) + ")")
512 content = content[64:]
513 commands.append("fd.close()")
514
515 out, err = self.execute(commands)
516 if err:
517 raise IOError(self.__shortError(err))
518 return True
519
520 def get(self, deviceFileName, hostFileName=None):
521 """
522 Public method to copy a file from the connected device.
523
524 @param deviceFileName name of the file to copy
525 @type str
526 @param hostFileName name of the file to copy to
527 @type str
528 @return flag indicating success
529 @rtype bool
530 @exception IOError raised to indicate an issue with the device
531 """
532 if not hostFileName:
533 hostFileName = deviceFileName
534
535 commands = [
536 "\n".join([
537 "try:",
538 " from microbit import uart as u",
539 "except ImportError:",
540 " try:",
541 " from machine import UART",
542 " u = UART(0, {0})".format(115200),
543 " except Exception:",
544 " try:",
545 " from sys import stdout as u",
546 " except Exception:",
547 " raise Exception('Could not find UART module in"
548 " device.')",
549 ]),
550 "f = open('{0}', 'rb')".format(deviceFileName),
551 "r = f.read",
552 "result = True",
553 "\n".join([
554 "while result:",
555 " result = r(32)",
556 " if result:",
557 " u.write(result)",
558 ]),
559 "f.close()",
560 ]
561 out, err = self.execute(commands)
562 if err:
563 raise IOError(self.__shortError(err))
564
565 # write the received bytes to the local file
566 # convert eol to "\n"
567 out = out.replace(b"\r\n", b"\n")
568 out = out.replace(b"\r", b"\n")
569 with open(hostFileName, "wb") as hostFile:
570 hostFile.write(out)
571 return True
572
573 def fileSystemInfo(self):
574 """
575 Public method to obtain information about the currently mounted file
576 systems.
577
578 @return tuple of tuples containing the file system name, the total
579 size, the used size and the free size
580 @rtype tuple of tuples of (str, int, int, int)
581 @exception IOError raised to indicate an issue with the device
582 """
583 commands = [
584 "import os",
585 "\n".join([
586 "def fsinfo():",
587 " infolist = []",
588 " fsnames = os.listdir('/')",
589 " for fs in fsnames:",
590 " fs = '/' + fs",
591 " infolist.append((fs, os.statvfs(fs)))",
592 " return infolist",
593 ]),
594 "print(fsinfo())",
595 ]
596 out, err = self.execute(commands)
597 if err:
598 raise IOError(self.__shortError(err))
599 infolist = ast.literal_eval(out.decode("utf-8"))
600 if infolist is None:
601 return None
602 else:
603 filesystemInfos = []
604 for fs, info in infolist:
605 totalSize = info[2] * info[1]
606 freeSize = info[4] * info[1]
607 usedSize = totalSize - freeSize
608 filesystemInfos.append((fs, totalSize, usedSize, freeSize))
609
610 return tuple(filesystemInfos)
611
612 ##################################################################
613 ## non-filesystem related methods below
614 ##################################################################
615
616 def version(self):
617 """
618 Public method to get the MicroPython version information of the
619 connected device.
620
621 @return dictionary containing the version information
622 @rtype dict
623 @exception IOError raised to indicate an issue with the device
624 """
625 commands = [
626 "import os",
627 "print(os.uname())",
628 ]
629 out, err = self.execute(commands)
630 if err:
631 raise IOError(self.__shortError(err))
632
633 rawOutput = out.decode("utf-8").strip()
634 rawOutput = rawOutput[1:-1]
635 items = rawOutput.split(",")
636 result = {}
637 for item in items:
638 key, value = item.strip().split("=")
639 result[key.strip()] = value.strip()[1:-1]
640 return result
641
642 def getImplementation(self):
643 """
644 Public method to get some implementation information of the connected
645 device.
646
647 @return dictionary containing the implementation information
648 @rtype dict
649 @exception IOError raised to indicate an issue with the device
650 """
651 commands = [
652 "import sys",
653 "res = {}", # __IGNORE_WARNING_M613__
654 "\n".join([
655 "try:",
656 " res['name'] = sys.implementation.name",
657 "except AttributeError:",
658 " res['name'] = 'unknown'",
659 ]),
660 "\n".join([
661 "try:",
662 " res['version'] = '.'.join((str(i) for i in"
663 " sys.implementation.version))",
664 "except AttributeError:",
665 " res['version'] = 'unknown'",
666 ]),
667 "print(res)",
668 ]
669 out, err = self.execute(commands)
670 if err:
671 raise IOError(self.__shortError(err))
672 return ast.literal_eval(out.decode("utf-8"))
673
674 def syncTime(self):
675 """
676 Public method to set the time of the connected device to the local
677 computer's time.
678
679 @exception IOError raised to indicate an issue with the device
680 """
681 now = time.localtime(time.time())
682 commands = [
683 "\n".join([
684 "def set_time(rtc_time):",
685 " rtc = None",
686 " try:", # Pyboard (it doesn't have machine.RTC())
687 " import pyb",
688 " rtc = pyb.RTC()",
689 " clock_time = rtc_time[:6] + (rtc_time[6] + 1, 0)",
690 " rtc.datetime(clock_time)",
691 " except:",
692 " try:",
693 " import machine",
694 " rtc = machine.RTC()",
695 " try:", # ESP8266 may use rtc.datetime()
696 " clock_time = rtc_time[:6] +"
697 " (rtc_time[6] + 1, 0)",
698 " rtc.datetime(clock_time)",
699 " except:", # ESP32 uses rtc.init()
700 " rtc.init(rtc_time[:6])",
701 " except:",
702 " try:",
703 " import rtc, time",
704 " clock=rtc.RTC()",
705 " clock.datetime = time.struct_time(rtc_time +"
706 " (-1, -1))",
707 " except:",
708 " pass",
709 ]),
710 "set_time({0})".format((now.tm_year, now.tm_mon, now.tm_mday,
711 now.tm_hour, now.tm_min, now.tm_sec,
712 now.tm_wday))
713 ]
714 out, err = self.execute(commands)
715 if err:
716 raise IOError(self.__shortError(err))
717
718 def showTime(self):
719 """
720 Public method to get the current time of the device.
721
722 @return time of the device
723 @rtype str
724 @exception IOError raised to indicate an issue with the device
725 """
726 commands = [
727 "import time",
728 "print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))",
729 # __IGNORE_WARNING_M601__
730 ]
731 out, err = self.execute(commands)
732 if err:
733 raise IOError(self.__shortError(err))
734 return out.decode("utf-8").strip()

eric ide

mercurial