|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2019 - 2023 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing an interface to talk to a connected MicroPython device via |
|
8 a serial link. |
|
9 """ |
|
10 |
|
11 from PyQt6.QtCore import QCoreApplication, QEventLoop, QThread, QTimer, pyqtSlot |
|
12 |
|
13 from eric7 import Preferences |
|
14 |
|
15 from .MicroPythonDeviceInterface import MicroPythonDeviceInterface |
|
16 from .MicroPythonSerialPort import MicroPythonSerialPort |
|
17 |
|
18 |
|
19 class MicroPythonSerialDeviceInterface(MicroPythonDeviceInterface): |
|
20 """ |
|
21 Class implementing an interface to talk to a connected MicroPython device via |
|
22 a serial link. |
|
23 """ |
|
24 |
|
25 PasteModePrompt = b"=== " |
|
26 TracebackMarker = b"Traceback (most recent call last):" |
|
27 |
|
28 def __init__(self, parent=None): |
|
29 """ |
|
30 Constructor |
|
31 |
|
32 @param parent reference to the parent object |
|
33 @type QObject |
|
34 """ |
|
35 super().__init__(parent) |
|
36 |
|
37 self.__blockReadyRead = False |
|
38 |
|
39 self.__serial = MicroPythonSerialPort( |
|
40 timeout=Preferences.getMicroPython("SerialTimeout"), parent=self |
|
41 ) |
|
42 self.__serial.readyRead.connect(self.__readSerial) |
|
43 |
|
44 @pyqtSlot() |
|
45 def __readSerial(self): |
|
46 """ |
|
47 Private slot to read all available serial data and emit it with the |
|
48 "dataReceived" signal for further processing. |
|
49 """ |
|
50 if not self.__blockReadyRead: |
|
51 data = bytes(self.__serial.readAll()) |
|
52 self.dataReceived.emit(data) |
|
53 |
|
54 @pyqtSlot() |
|
55 def connectToDevice(self, connection): |
|
56 """ |
|
57 Public slot to connect to the device. |
|
58 |
|
59 @param connection name of the connection to be used |
|
60 @type str |
|
61 @return flag indicating success |
|
62 @rtype bool |
|
63 """ |
|
64 return self.__serial.openSerialLink(connection) |
|
65 |
|
66 @pyqtSlot() |
|
67 def disconnectFromDevice(self): |
|
68 """ |
|
69 Public slot to disconnect from the device. |
|
70 """ |
|
71 self.__serial.closeSerialLink() |
|
72 |
|
73 def isConnected(self): |
|
74 """ |
|
75 Public method to get the connection status. |
|
76 |
|
77 @return flag indicating the connection status |
|
78 @rtype bool |
|
79 """ |
|
80 return self.__serial.isConnected() |
|
81 |
|
82 @pyqtSlot() |
|
83 def handlePreferencesChanged(self): |
|
84 """ |
|
85 Public slot to handle a change of the preferences. |
|
86 """ |
|
87 self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout")) |
|
88 |
|
89 def write(self, data): |
|
90 """ |
|
91 Public method to write data to the connected device. |
|
92 |
|
93 @param data data to be written |
|
94 @type bytes or bytearray |
|
95 """ |
|
96 self.__serial.isConnected() and self.__serial.write(data) |
|
97 |
|
98 def __pasteOn(self): |
|
99 """ |
|
100 Private method to switch the connected device to 'paste' mode. |
|
101 |
|
102 Note: switching to paste mode is done with synchronous writes. |
|
103 |
|
104 @return flag indicating success |
|
105 @rtype bool |
|
106 """ |
|
107 if not self.__serial: |
|
108 return False |
|
109 |
|
110 pasteMessage = b"paste mode; Ctrl-C to cancel, Ctrl-D to finish\r\n=== " |
|
111 |
|
112 self.__serial.clear() # clear any buffered output before entering paste mode |
|
113 self.__serial.write(b"\x02") # end raw mode if required |
|
114 written = self.__serial.waitForBytesWritten(500) |
|
115 # time out after 500ms if device is not responding |
|
116 if not written: |
|
117 return False |
|
118 for _i in range(3): |
|
119 # CTRL-C three times to break out of loops |
|
120 self.__serial.write(b"\r\x03") |
|
121 written = self.__serial.waitForBytesWritten(500) |
|
122 # time out after 500ms if device is not responding |
|
123 if not written: |
|
124 return False |
|
125 QThread.msleep(10) |
|
126 self.__serial.readAll() # read all data and discard it |
|
127 self.__serial.write(b"\r\x05") # send CTRL-E to enter paste mode |
|
128 self.__serial.readUntil(pasteMessage) |
|
129 |
|
130 if self.__serial.hasTimedOut(): |
|
131 # it timed out; try it again and than fail |
|
132 self.__serial.write(b"\r\x05") # send CTRL-E again |
|
133 self.__serial.readUntil(pasteMessage) |
|
134 if self.__serial.hasTimedOut(): |
|
135 return False |
|
136 |
|
137 QCoreApplication.processEvents( |
|
138 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents |
|
139 ) |
|
140 self.__serial.readAll() # read all data and discard it |
|
141 return True |
|
142 |
|
143 def __pasteOff(self): |
|
144 """ |
|
145 Private method to switch 'paste' mode off. |
|
146 """ |
|
147 if self.__serial: |
|
148 self.__serial.write(b"\x04") # send CTRL-D to cancel paste mode |
|
149 |
|
150 def __rawOn(self): |
|
151 """ |
|
152 Private method to switch the connected device to 'raw' mode. |
|
153 |
|
154 Note: switching to raw mode is done with synchronous writes. |
|
155 |
|
156 @return flag indicating success |
|
157 @rtype bool |
|
158 """ |
|
159 if not self.__serial: |
|
160 return False |
|
161 |
|
162 rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>" |
|
163 |
|
164 self.__serial.write(b"\x02") # end raw mode if required |
|
165 written = self.__serial.waitForBytesWritten(500) |
|
166 # time out after 500ms if device is not responding |
|
167 if not written: |
|
168 return False |
|
169 for _i in range(3): |
|
170 # CTRL-C three times to break out of loops |
|
171 self.__serial.write(b"\r\x03") |
|
172 written = self.__serial.waitForBytesWritten(500) |
|
173 # time out after 500ms if device is not responding |
|
174 if not written: |
|
175 return False |
|
176 QThread.msleep(10) |
|
177 self.__serial.readAll() # read all data and discard it |
|
178 self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode |
|
179 self.__serial.readUntil(rawReplMessage) |
|
180 if self.__serial.hasTimedOut(): |
|
181 # it timed out; try it again and than fail |
|
182 self.__serial.write(b"\r\x01") # send CTRL-A again |
|
183 self.__serial.readUntil(rawReplMessage) |
|
184 if self.__serial.hasTimedOut(): |
|
185 return False |
|
186 |
|
187 QCoreApplication.processEvents( |
|
188 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents |
|
189 ) |
|
190 self.__serial.readAll() # read all data and discard it |
|
191 return True |
|
192 |
|
193 def __rawOff(self): |
|
194 """ |
|
195 Private method to switch 'raw' mode off. |
|
196 """ |
|
197 if self.__serial: |
|
198 self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode |
|
199 self.__serial.readUntil(b">>> ") # read until Python prompt |
|
200 self.__serial.readAll() # read all data and discard it |
|
201 |
|
202 def probeDevice(self): |
|
203 """ |
|
204 Public method to check the device is responding. |
|
205 |
|
206 If the device has not been flashed with a MicroPython firmware, the |
|
207 probe will fail. |
|
208 |
|
209 @return flag indicating a communicating MicroPython device |
|
210 @rtype bool |
|
211 """ |
|
212 if not self.__serial: |
|
213 return False |
|
214 |
|
215 if not self.__serial.isConnected(): |
|
216 return False |
|
217 |
|
218 # switch on raw mode |
|
219 self.__blockReadyRead = True |
|
220 ok = self.__pasteOn() |
|
221 if not ok: |
|
222 self.__blockReadyRead = False |
|
223 return False |
|
224 |
|
225 # switch off raw mode |
|
226 QThread.msleep(10) |
|
227 self.__pasteOff() |
|
228 self.__blockReadyRead = False |
|
229 |
|
230 return True |
|
231 |
|
232 def execute(self, commands, *, mode="raw", timeout=0): |
|
233 """ |
|
234 Public method to send commands to the connected device and return the |
|
235 result. |
|
236 |
|
237 If no serial connection is available, empty results will be returned. |
|
238 |
|
239 @param commands list of commands to be executed |
|
240 @type str or list of str |
|
241 @keyparam mode submit mode to be used (one of 'raw' or 'paste') (defaults to |
|
242 'raw') |
|
243 @type str |
|
244 @keyparam timeout per command timeout in milliseconds (0 for configured default) |
|
245 (defaults to 0) |
|
246 @type int (optional) |
|
247 @return tuple containing stdout and stderr output of the device |
|
248 @rtype tuple of (bytes, bytes) |
|
249 @exception ValueError raised in case of an unsupported submit mode |
|
250 """ |
|
251 if mode not in ("paste", "raw"): |
|
252 raise ValueError("Unsupported submit mode given ('{0}').".format(mode)) |
|
253 |
|
254 if mode == "raw": |
|
255 return self.__execute_raw(commands, timeout=timeout) |
|
256 elif mode == "paste": |
|
257 return self.__execute_paste(commands, timeout=timeout) |
|
258 else: |
|
259 # just in case |
|
260 return b"", b"" |
|
261 |
|
262 def __execute_raw(self, commands, timeout=0): |
|
263 """ |
|
264 Private method to send commands to the connected device using 'raw REPL' mode |
|
265 and return the result. |
|
266 |
|
267 If no serial connection is available, empty results will be returned. |
|
268 |
|
269 @param commands list of commands to be executed |
|
270 @type str or list of str |
|
271 @param timeout per command timeout in milliseconds (0 for configured default) |
|
272 (defaults to 0) |
|
273 @type int (optional) |
|
274 @return tuple containing stdout and stderr output of the device |
|
275 @rtype tuple of (bytes, bytes) |
|
276 """ |
|
277 if not self.__serial: |
|
278 return b"", b"" |
|
279 |
|
280 if not self.__serial.isConnected(): |
|
281 return b"", b"Device not connected or not switched on." |
|
282 |
|
283 result = bytearray() |
|
284 err = b"" |
|
285 |
|
286 if isinstance(commands, str): |
|
287 commands = [commands] |
|
288 |
|
289 # switch on raw mode |
|
290 self.__blockReadyRead = True |
|
291 ok = self.__rawOn() |
|
292 if not ok: |
|
293 self.__blockReadyRead = False |
|
294 return (b"", b"Could not switch to raw mode. Is the device switched on?") |
|
295 |
|
296 # send commands |
|
297 QThread.msleep(10) |
|
298 for command in commands: |
|
299 if command: |
|
300 commandBytes = command.encode("utf-8") |
|
301 self.__serial.write(commandBytes + b"\x04") |
|
302 QCoreApplication.processEvents( |
|
303 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents |
|
304 ) |
|
305 ok = self.__serial.readUntil(b"OK") |
|
306 if ok != b"OK": |
|
307 self.__blockReadyRead = False |
|
308 return ( |
|
309 b"", |
|
310 "Expected 'OK', got '{0}', followed by '{1}'".format( |
|
311 ok, self.__serial.readAll() |
|
312 ).encode("utf-8"), |
|
313 ) |
|
314 |
|
315 # read until prompt |
|
316 response = self.__serial.readUntil(b"\x04>", timeout=timeout) |
|
317 if self.__serial.hasTimedOut(): |
|
318 self.__blockReadyRead = False |
|
319 return b"", b"Timeout while processing commands." |
|
320 if b"\x04" in response[:-2]: |
|
321 # split stdout, stderr |
|
322 out, err = response[:-2].split(b"\x04") |
|
323 result += out |
|
324 else: |
|
325 err = b"invalid response received: " + response |
|
326 if err: |
|
327 result = b"" |
|
328 break |
|
329 |
|
330 # switch off raw mode |
|
331 QThread.msleep(10) |
|
332 self.__rawOff() |
|
333 self.__blockReadyRead = False |
|
334 |
|
335 return bytes(result), err |
|
336 |
|
337 def __execute_paste(self, commands, timeout=0): |
|
338 """ |
|
339 Private method to send commands to the connected device using 'paste' mode |
|
340 and return the result. |
|
341 |
|
342 If no serial connection is available, empty results will be returned. |
|
343 |
|
344 @param commands list of commands to be executed |
|
345 @type str or list of str |
|
346 @param timeout per command timeout in milliseconds (0 for configured default) |
|
347 (defaults to 0) |
|
348 @type int (optional) |
|
349 @return tuple containing stdout and stderr output of the device |
|
350 @rtype tuple of (bytes, bytes) |
|
351 """ |
|
352 if not self.__serial: |
|
353 return b"", b"" |
|
354 |
|
355 if not self.__serial.isConnected(): |
|
356 return b"", b"Device not connected or not switched on." |
|
357 |
|
358 if isinstance(commands, list): |
|
359 commands = "\n".join(commands) |
|
360 |
|
361 # switch on paste mode |
|
362 self.__blockReadyRead = True |
|
363 ok = self.__pasteOn() |
|
364 if not ok: |
|
365 self.__blockReadyRead = False |
|
366 return (b"", b"Could not switch to paste mode. Is the device switched on?") |
|
367 |
|
368 # send commands |
|
369 QThread.msleep(10) |
|
370 for command in commands.splitlines(keepends=True): |
|
371 # send the data as single lines |
|
372 commandBytes = command.encode("utf-8") |
|
373 self.__serial.write(commandBytes) |
|
374 QCoreApplication.processEvents( |
|
375 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents |
|
376 ) |
|
377 QThread.msleep(10) |
|
378 ok = self.__serial.readUntil(commandBytes) |
|
379 if ok != commandBytes: |
|
380 self.__blockReadyRead = False |
|
381 return ( |
|
382 b"", |
|
383 "Expected '{0}', got '{1}', followed by '{2}'".format( |
|
384 commandBytes, ok, self.__serial.readAll() |
|
385 ).encode("utf-8"), |
|
386 ) |
|
387 |
|
388 # switch off paste mode causing the commands to be executed |
|
389 self.__pasteOff() |
|
390 QThread.msleep(10) |
|
391 # read until Python prompt |
|
392 result = ( |
|
393 self.__serial.readUntil(b">>> ", timeout=timeout) |
|
394 .replace(b">>> ", b"") |
|
395 .strip() |
|
396 ) |
|
397 if self.__serial.hasTimedOut(): |
|
398 self.__blockReadyRead = False |
|
399 return b"", b"Timeout while processing commands." |
|
400 |
|
401 # get rid of any OSD string |
|
402 if result.startswith(b"\x1b]0;"): |
|
403 result = result.split(b"\x1b\\")[-1] |
|
404 |
|
405 if self.TracebackMarker in result: |
|
406 errorIndex = result.find(self.TracebackMarker) |
|
407 out, err = result[:errorIndex], result[errorIndex:] |
|
408 else: |
|
409 out = result |
|
410 err = b"" |
|
411 |
|
412 self.__blockReadyRead = False |
|
413 return out, err |
|
414 |
|
415 def executeAsync(self, commandsList, submitMode): |
|
416 """ |
|
417 Public method to execute a series of commands over a period of time |
|
418 without returning any result (asynchronous execution). |
|
419 |
|
420 @param commandsList list of commands to be execute on the device |
|
421 @type list of str |
|
422 @param submitMode mode to be used to submit the commands |
|
423 @type str (one of 'raw' or 'paste') |
|
424 @exception ValueError raised to indicate an unknown submit mode |
|
425 """ |
|
426 if submitMode not in ("raw", "paste"): |
|
427 raise ValueError("Illegal submit mode given ({0})".format(submitMode)) |
|
428 |
|
429 if submitMode == "raw": |
|
430 startSequence = [ # sequence of commands to enter raw mode |
|
431 b"\x02", # Ctrl-B: exit raw repl (just in case) |
|
432 b"\r\x03\x03\x03", # Ctrl-C three times: interrupt any running program |
|
433 b"\r\x01", # Ctrl-A: enter raw REPL |
|
434 b'print("\\n")\r', |
|
435 ] |
|
436 endSequence = [ |
|
437 b"\r", |
|
438 b"\x04", |
|
439 ] |
|
440 self.__executeAsyncRaw( |
|
441 startSequence |
|
442 + [c.encode("utf-8") + b"\r" for c in commandsList] |
|
443 + endSequence |
|
444 ) |
|
445 elif submitMode == "paste": |
|
446 self.__executeAsyncPaste(commandsList) |
|
447 |
|
448 def __executeAsyncRaw(self, commandsList): |
|
449 """ |
|
450 Private method to execute a series of commands over a period of time |
|
451 without returning any result (asynchronous execution). |
|
452 |
|
453 @param commandsList list of commands to be execute on the device |
|
454 @type list of bytes |
|
455 """ |
|
456 if commandsList: |
|
457 command = commandsList.pop(0) |
|
458 self.__serial.write(command) |
|
459 QTimer.singleShot(2, lambda: self.__executeAsyncRaw(commandsList)) |
|
460 else: |
|
461 self.__rawOff() |
|
462 self.executeAsyncFinished.emit() |
|
463 |
|
464 def __executeAsyncPaste(self, commandsList): |
|
465 """ |
|
466 Private method to execute a series of commands over a period of time |
|
467 without returning any result (asynchronous execution). |
|
468 |
|
469 @param commandsList list of commands to be execute on the device |
|
470 @type list of str |
|
471 """ |
|
472 self.__blockReadyRead = True |
|
473 self.__pasteOn() |
|
474 command = b"\n".join(c.encode("utf-8)") for c in commandsList) |
|
475 self.__serial.write(command) |
|
476 self.__serial.readUntil(command) |
|
477 self.__blockReadyRead = False |
|
478 self.__pasteOff() |
|
479 self.executeAsyncFinished.emit |