src/eric7/MicroPython/MicroPythonSerialDeviceInterface.py

branch
mpy_network
changeset 9990
54c614d91eff
parent 9989
286c2a21f36f
child 10008
c5bcafe3485c
equal deleted inserted replaced
9989:286c2a21f36f 9990:54c614d91eff
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2019 - 2023 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing an interface to talk to a connected MicroPython device via
8 a serial link.
9 """
10
11 from PyQt6.QtCore import QCoreApplication, QEventLoop, QThread, QTimer, pyqtSlot
12
13 from eric7 import Preferences
14
15 from .MicroPythonDeviceInterface import MicroPythonDeviceInterface
16 from .MicroPythonSerialPort import MicroPythonSerialPort
17
18
19 class MicroPythonSerialDeviceInterface(MicroPythonDeviceInterface):
20 """
21 Class implementing an interface to talk to a connected MicroPython device via
22 a serial link.
23 """
24
25 PasteModePrompt = b"=== "
26 TracebackMarker = b"Traceback (most recent call last):"
27
28 def __init__(self, parent=None):
29 """
30 Constructor
31
32 @param parent reference to the parent object
33 @type QObject
34 """
35 super().__init__(parent)
36
37 self.__blockReadyRead = False
38
39 self.__serial = MicroPythonSerialPort(
40 timeout=Preferences.getMicroPython("SerialTimeout"), parent=self
41 )
42 self.__serial.readyRead.connect(self.__readSerial)
43
44 @pyqtSlot()
45 def __readSerial(self):
46 """
47 Private slot to read all available serial data and emit it with the
48 "dataReceived" signal for further processing.
49 """
50 if not self.__blockReadyRead:
51 data = bytes(self.__serial.readAll())
52 self.dataReceived.emit(data)
53
54 @pyqtSlot()
55 def connectToDevice(self, connection):
56 """
57 Public slot to connect to the device.
58
59 @param connection name of the connection to be used
60 @type str
61 @return flag indicating success
62 @rtype bool
63 """
64 return self.__serial.openSerialLink(connection)
65
66 @pyqtSlot()
67 def disconnectFromDevice(self):
68 """
69 Public slot to disconnect from the device.
70 """
71 self.__serial.closeSerialLink()
72
73 def isConnected(self):
74 """
75 Public method to get the connection status.
76
77 @return flag indicating the connection status
78 @rtype bool
79 """
80 return self.__serial.isConnected()
81
82 @pyqtSlot()
83 def handlePreferencesChanged(self):
84 """
85 Public slot to handle a change of the preferences.
86 """
87 self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout"))
88
89 def write(self, data):
90 """
91 Public method to write data to the connected device.
92
93 @param data data to be written
94 @type bytes or bytearray
95 """
96 self.__serial.isConnected() and self.__serial.write(data)
97
98 def __pasteOn(self):
99 """
100 Private method to switch the connected device to 'paste' mode.
101
102 Note: switching to paste mode is done with synchronous writes.
103
104 @return flag indicating success
105 @rtype bool
106 """
107 if not self.__serial:
108 return False
109
110 pasteMessage = b"paste mode; Ctrl-C to cancel, Ctrl-D to finish\r\n=== "
111
112 self.__serial.clear() # clear any buffered output before entering paste mode
113 self.__serial.write(b"\x02") # end raw mode if required
114 written = self.__serial.waitForBytesWritten(500)
115 # time out after 500ms if device is not responding
116 if not written:
117 return False
118 for _i in range(3):
119 # CTRL-C three times to break out of loops
120 self.__serial.write(b"\r\x03")
121 written = self.__serial.waitForBytesWritten(500)
122 # time out after 500ms if device is not responding
123 if not written:
124 return False
125 QThread.msleep(10)
126 self.__serial.readAll() # read all data and discard it
127 self.__serial.write(b"\r\x05") # send CTRL-E to enter paste mode
128 self.__serial.readUntil(pasteMessage)
129
130 if self.__serial.hasTimedOut():
131 # it timed out; try it again and than fail
132 self.__serial.write(b"\r\x05") # send CTRL-E again
133 self.__serial.readUntil(pasteMessage)
134 if self.__serial.hasTimedOut():
135 return False
136
137 QCoreApplication.processEvents(
138 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
139 )
140 self.__serial.readAll() # read all data and discard it
141 return True
142
143 def __pasteOff(self):
144 """
145 Private method to switch 'paste' mode off.
146 """
147 if self.__serial:
148 self.__serial.write(b"\x04") # send CTRL-D to cancel paste mode
149
150 def __rawOn(self):
151 """
152 Private method to switch the connected device to 'raw' mode.
153
154 Note: switching to raw mode is done with synchronous writes.
155
156 @return flag indicating success
157 @rtype bool
158 """
159 if not self.__serial:
160 return False
161
162 rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>"
163
164 self.__serial.write(b"\x02") # end raw mode if required
165 written = self.__serial.waitForBytesWritten(500)
166 # time out after 500ms if device is not responding
167 if not written:
168 return False
169 for _i in range(3):
170 # CTRL-C three times to break out of loops
171 self.__serial.write(b"\r\x03")
172 written = self.__serial.waitForBytesWritten(500)
173 # time out after 500ms if device is not responding
174 if not written:
175 return False
176 QThread.msleep(10)
177 self.__serial.readAll() # read all data and discard it
178 self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode
179 self.__serial.readUntil(rawReplMessage)
180 if self.__serial.hasTimedOut():
181 # it timed out; try it again and than fail
182 self.__serial.write(b"\r\x01") # send CTRL-A again
183 self.__serial.readUntil(rawReplMessage)
184 if self.__serial.hasTimedOut():
185 return False
186
187 QCoreApplication.processEvents(
188 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
189 )
190 self.__serial.readAll() # read all data and discard it
191 return True
192
193 def __rawOff(self):
194 """
195 Private method to switch 'raw' mode off.
196 """
197 if self.__serial:
198 self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode
199 self.__serial.readUntil(b">>> ") # read until Python prompt
200 self.__serial.readAll() # read all data and discard it
201
202 def probeDevice(self):
203 """
204 Public method to check the device is responding.
205
206 If the device has not been flashed with a MicroPython firmware, the
207 probe will fail.
208
209 @return flag indicating a communicating MicroPython device
210 @rtype bool
211 """
212 if not self.__serial:
213 return False
214
215 if not self.__serial.isConnected():
216 return False
217
218 # switch on raw mode
219 self.__blockReadyRead = True
220 ok = self.__pasteOn()
221 if not ok:
222 self.__blockReadyRead = False
223 return False
224
225 # switch off raw mode
226 QThread.msleep(10)
227 self.__pasteOff()
228 self.__blockReadyRead = False
229
230 return True
231
232 def execute(self, commands, *, mode="raw", timeout=0):
233 """
234 Public method to send commands to the connected device and return the
235 result.
236
237 If no serial connection is available, empty results will be returned.
238
239 @param commands list of commands to be executed
240 @type str or list of str
241 @keyparam mode submit mode to be used (one of 'raw' or 'paste') (defaults to
242 'raw')
243 @type str
244 @keyparam timeout per command timeout in milliseconds (0 for configured default)
245 (defaults to 0)
246 @type int (optional)
247 @return tuple containing stdout and stderr output of the device
248 @rtype tuple of (bytes, bytes)
249 @exception ValueError raised in case of an unsupported submit mode
250 """
251 if mode not in ("paste", "raw"):
252 raise ValueError("Unsupported submit mode given ('{0}').".format(mode))
253
254 if mode == "raw":
255 return self.__execute_raw(commands, timeout=timeout)
256 elif mode == "paste":
257 return self.__execute_paste(commands, timeout=timeout)
258 else:
259 # just in case
260 return b"", b""
261
262 def __execute_raw(self, commands, timeout=0):
263 """
264 Private method to send commands to the connected device using 'raw REPL' mode
265 and return the result.
266
267 If no serial connection is available, empty results will be returned.
268
269 @param commands list of commands to be executed
270 @type str or list of str
271 @param timeout per command timeout in milliseconds (0 for configured default)
272 (defaults to 0)
273 @type int (optional)
274 @return tuple containing stdout and stderr output of the device
275 @rtype tuple of (bytes, bytes)
276 """
277 if not self.__serial:
278 return b"", b""
279
280 if not self.__serial.isConnected():
281 return b"", b"Device not connected or not switched on."
282
283 result = bytearray()
284 err = b""
285
286 if isinstance(commands, str):
287 commands = [commands]
288
289 # switch on raw mode
290 self.__blockReadyRead = True
291 ok = self.__rawOn()
292 if not ok:
293 self.__blockReadyRead = False
294 return (b"", b"Could not switch to raw mode. Is the device switched on?")
295
296 # send commands
297 QThread.msleep(10)
298 for command in commands:
299 if command:
300 commandBytes = command.encode("utf-8")
301 self.__serial.write(commandBytes + b"\x04")
302 QCoreApplication.processEvents(
303 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
304 )
305 ok = self.__serial.readUntil(b"OK")
306 if ok != b"OK":
307 self.__blockReadyRead = False
308 return (
309 b"",
310 "Expected 'OK', got '{0}', followed by '{1}'".format(
311 ok, self.__serial.readAll()
312 ).encode("utf-8"),
313 )
314
315 # read until prompt
316 response = self.__serial.readUntil(b"\x04>", timeout=timeout)
317 if self.__serial.hasTimedOut():
318 self.__blockReadyRead = False
319 return b"", b"Timeout while processing commands."
320 if b"\x04" in response[:-2]:
321 # split stdout, stderr
322 out, err = response[:-2].split(b"\x04")
323 result += out
324 else:
325 err = b"invalid response received: " + response
326 if err:
327 result = b""
328 break
329
330 # switch off raw mode
331 QThread.msleep(10)
332 self.__rawOff()
333 self.__blockReadyRead = False
334
335 return bytes(result), err
336
337 def __execute_paste(self, commands, timeout=0):
338 """
339 Private method to send commands to the connected device using 'paste' mode
340 and return the result.
341
342 If no serial connection is available, empty results will be returned.
343
344 @param commands list of commands to be executed
345 @type str or list of str
346 @param timeout per command timeout in milliseconds (0 for configured default)
347 (defaults to 0)
348 @type int (optional)
349 @return tuple containing stdout and stderr output of the device
350 @rtype tuple of (bytes, bytes)
351 """
352 if not self.__serial:
353 return b"", b""
354
355 if not self.__serial.isConnected():
356 return b"", b"Device not connected or not switched on."
357
358 if isinstance(commands, list):
359 commands = "\n".join(commands)
360
361 # switch on paste mode
362 self.__blockReadyRead = True
363 ok = self.__pasteOn()
364 if not ok:
365 self.__blockReadyRead = False
366 return (b"", b"Could not switch to paste mode. Is the device switched on?")
367
368 # send commands
369 QThread.msleep(10)
370 for command in commands.splitlines(keepends=True):
371 # send the data as single lines
372 commandBytes = command.encode("utf-8")
373 self.__serial.write(commandBytes)
374 QCoreApplication.processEvents(
375 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
376 )
377 QThread.msleep(10)
378 ok = self.__serial.readUntil(commandBytes)
379 if ok != commandBytes:
380 self.__blockReadyRead = False
381 return (
382 b"",
383 "Expected '{0}', got '{1}', followed by '{2}'".format(
384 commandBytes, ok, self.__serial.readAll()
385 ).encode("utf-8"),
386 )
387
388 # switch off paste mode causing the commands to be executed
389 self.__pasteOff()
390 QThread.msleep(10)
391 # read until Python prompt
392 result = (
393 self.__serial.readUntil(b">>> ", timeout=timeout)
394 .replace(b">>> ", b"")
395 .strip()
396 )
397 if self.__serial.hasTimedOut():
398 self.__blockReadyRead = False
399 return b"", b"Timeout while processing commands."
400
401 # get rid of any OSD string
402 if result.startswith(b"\x1b]0;"):
403 result = result.split(b"\x1b\\")[-1]
404
405 if self.TracebackMarker in result:
406 errorIndex = result.find(self.TracebackMarker)
407 out, err = result[:errorIndex], result[errorIndex:]
408 else:
409 out = result
410 err = b""
411
412 self.__blockReadyRead = False
413 return out, err
414
415 def executeAsync(self, commandsList, submitMode):
416 """
417 Public method to execute a series of commands over a period of time
418 without returning any result (asynchronous execution).
419
420 @param commandsList list of commands to be execute on the device
421 @type list of str
422 @param submitMode mode to be used to submit the commands
423 @type str (one of 'raw' or 'paste')
424 @exception ValueError raised to indicate an unknown submit mode
425 """
426 if submitMode not in ("raw", "paste"):
427 raise ValueError("Illegal submit mode given ({0})".format(submitMode))
428
429 if submitMode == "raw":
430 startSequence = [ # sequence of commands to enter raw mode
431 b"\x02", # Ctrl-B: exit raw repl (just in case)
432 b"\r\x03\x03\x03", # Ctrl-C three times: interrupt any running program
433 b"\r\x01", # Ctrl-A: enter raw REPL
434 b'print("\\n")\r',
435 ]
436 endSequence = [
437 b"\r",
438 b"\x04",
439 ]
440 self.__executeAsyncRaw(
441 startSequence
442 + [c.encode("utf-8") + b"\r" for c in commandsList]
443 + endSequence
444 )
445 elif submitMode == "paste":
446 self.__executeAsyncPaste(commandsList)
447
448 def __executeAsyncRaw(self, commandsList):
449 """
450 Private method to execute a series of commands over a period of time
451 without returning any result (asynchronous execution).
452
453 @param commandsList list of commands to be execute on the device
454 @type list of bytes
455 """
456 if commandsList:
457 command = commandsList.pop(0)
458 self.__serial.write(command)
459 QTimer.singleShot(2, lambda: self.__executeAsyncRaw(commandsList))
460 else:
461 self.__rawOff()
462 self.executeAsyncFinished.emit()
463
464 def __executeAsyncPaste(self, commandsList):
465 """
466 Private method to execute a series of commands over a period of time
467 without returning any result (asynchronous execution).
468
469 @param commandsList list of commands to be execute on the device
470 @type list of str
471 """
472 self.__blockReadyRead = True
473 self.__pasteOn()
474 command = b"\n".join(c.encode("utf-8)") for c in commandsList)
475 self.__serial.write(command)
476 self.__serial.readUntil(command)
477 self.__blockReadyRead = False
478 self.__pasteOff()
479 self.executeAsyncFinished.emit

eric ide

mercurial