src/eric7/MicroPython/MicroPythonDeviceInterface.py

branch
eric7
changeset 9765
6378da868bb0
parent 9764
57496966803c
child 9766
f0e22f3a5878
equal deleted inserted replaced
9764:57496966803c 9765:6378da868bb0
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2019 - 2023 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing some file system commands for MicroPython.
8 """
9
10 from PyQt6.QtCore import (
11 QCoreApplication,
12 QEventLoop,
13 QObject,
14 QThread,
15 QTimer,
16 pyqtSignal,
17 pyqtSlot,
18 )
19
20 from eric7 import Preferences
21
22 from .MicroPythonSerialPort import MicroPythonSerialPort
23
24
25 class MicroPythonDeviceInterface(QObject):
26 """
27 Class implementing some file system commands for MicroPython.
28
29 Commands are provided to perform operations on the file system of a
30 connected MicroPython device. Supported commands are:
31 <ul>
32 <li>ls: directory listing</li>
33 <li>lls: directory listing with meta data</li>
34 <li>cd: change directory</li>
35 <li>pwd: get the current directory</li>
36 <li>put: copy a file to the connected device</li>
37 <li>putData: write data to a file of the connected device</li>
38 <li>get: get a file from the connected device</li>
39 <li>getData: read data of a file of the connected device</li>
40 <li>rm: remove a file from the connected device</li>
41 <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash)
42 <li>mkdir: create a new directory</li>
43 <li>rmdir: remove an empty directory</li>
44 <li>fileSystemInfo: get information about the file system
45 </ul>
46
47 There are additional non file systemcommands.
48 <ul>
49 <li>getBoardData: get information about the connected board</li>
50 <li>getDeviceData: get version info about MicroPython and some implementation
51 information</li>
52 <li>getModules: get a list of built-in modules</li>
53 <li>getTime: get the current time</li>
54 <li>syncTime: synchronize the time of the connected device</li>
55 <li>showTime: show the current time of the connected device</li>
56 </ul>
57
58 @signal executeAsyncFinished() emitted to indicate the end of an
59 asynchronously executed list of commands (e.g. a script)
60 @signal dataReceived(data) emitted to send data received via the serial
61 connection for further processing
62 """
63
64 executeAsyncFinished = pyqtSignal()
65 dataReceived = pyqtSignal(bytes)
66
67 def __init__(self, parent=None):
68 """
69 Constructor
70
71 @param parent reference to the parent object
72 @type QObject
73 """
74 super().__init__(parent)
75
76 self.__repl = parent
77
78 self.__blockReadyRead = False
79
80 self.__serial = MicroPythonSerialPort(
81 timeout=Preferences.getMicroPython("SerialTimeout"), parent=self
82 )
83 self.__serial.readyRead.connect(self.__readSerial)
84
85 @pyqtSlot()
86 def __readSerial(self):
87 """
88 Private slot to read all available serial data and emit it with the
89 "dataReceived" signal for further processing.
90 """
91 if not self.__blockReadyRead:
92 data = bytes(self.__serial.readAll())
93 self.dataReceived.emit(data)
94
95 @pyqtSlot()
96 def connectToDevice(self, port):
97 """
98 Public slot to start the manager.
99
100 @param port name of the port to be used
101 @type str
102 @return flag indicating success
103 @rtype bool
104 """
105 return self.__serial.openSerialLink(port)
106
107 @pyqtSlot()
108 def disconnectFromDevice(self):
109 """
110 Public slot to stop the thread.
111 """
112 self.__serial.closeSerialLink()
113
114 def isConnected(self):
115 """
116 Public method to get the connection status.
117
118 @return flag indicating the connection status
119 @rtype bool
120 """
121 return self.__serial.isConnected()
122
123 @pyqtSlot()
124 def handlePreferencesChanged(self):
125 """
126 Public slot to handle a change of the preferences.
127 """
128 self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout"))
129
130 def write(self, data):
131 """
132 Public method to write data to the connected device.
133
134 @param data data to be written
135 @type bytes or bytearray
136 """
137 self.__serial.isConnected() and self.__serial.write(data)
138
139 def __rawOn(self):
140 """
141 Private method to switch the connected device to 'raw' mode.
142
143 Note: switching to raw mode is done with synchronous writes.
144
145 @return flag indicating success
146 @@rtype bool
147 """
148 if not self.__serial:
149 return False
150
151 rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>"
152
153 self.__serial.write(b"\x02") # end raw mode if required
154 written = self.__serial.waitForBytesWritten(500)
155 # time out after 500ms if device is not responding
156 if not written:
157 return False
158 for _i in range(3):
159 # CTRL-C three times to break out of loops
160 self.__serial.write(b"\r\x03")
161 written = self.__serial.waitForBytesWritten(500)
162 # time out after 500ms if device is not responding
163 if not written:
164 return False
165 QThread.msleep(10)
166 self.__serial.readAll() # read all data and discard it
167 self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode
168 self.__serial.readUntil(rawReplMessage)
169 if self.__serial.hasTimedOut():
170 # it timed out; try it again and than fail
171 self.__serial.write(b"\r\x01") # send CTRL-A again
172 self.__serial.readUntil(rawReplMessage)
173 if self.__serial.hasTimedOut():
174 return False
175
176 QCoreApplication.processEvents(
177 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
178 )
179 self.__serial.readAll() # read all data and discard it
180 return True
181
182 def __rawOff(self):
183 """
184 Private method to switch 'raw' mode off.
185 """
186 if self.__serial:
187 self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode
188 self.__serial.readUntil(b">>> ") # read until Python prompt
189 self.__serial.readAll() # read all data and discard it
190
191 def probeDevice(self):
192 """
193 Public method to check the device is responding.
194
195 If the device has not been flashed with a MicroPython formware, the
196 probe will fail.
197
198 @return flag indicating a communicating MicroPython device
199 @rtype bool
200 """
201 if not self.__serial:
202 return False
203
204 if not self.__serial.isConnected():
205 return False
206
207 # switch on raw mode
208 self.__blockReadyRead = True
209 ok = self.__rawOn()
210 if not ok:
211 self.__blockReadyRead = False
212 return False
213
214 # switch off raw mode
215 QThread.msleep(10)
216 self.__rawOff()
217 self.__blockReadyRead = False
218
219 return True
220
221 def execute(self, commands):
222 """
223 Public method to send commands to the connected device and return the
224 result.
225
226 If no serial connection is available, empty results will be returned.
227
228 @param commands list of commands to be executed
229 @type str or list of str
230 @return tuple containing stdout and stderr output of the device
231 @rtype tuple of (bytes, bytes)
232 """
233 if not self.__serial:
234 return b"", b""
235
236 if not self.__serial.isConnected():
237 return b"", b"Device not connected or not switched on."
238
239 result = bytearray()
240 err = b""
241
242 if isinstance(commands, str):
243 commands = [commands]
244
245 # switch on raw mode
246 self.__blockReadyRead = True
247 ok = self.__rawOn()
248 if not ok:
249 self.__blockReadyRead = False
250 return (b"", b"Could not switch to raw mode. Is the device switched on?")
251
252 # send commands
253 QThread.msleep(10)
254 for command in commands:
255 if command:
256 commandBytes = command.encode("utf-8")
257 self.__serial.write(commandBytes + b"\x04")
258 QCoreApplication.processEvents(
259 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
260 )
261 ok = self.__serial.readUntil(b"OK")
262 if ok != b"OK":
263 return (
264 b"",
265 "Expected 'OK', got '{0}', followed by '{1}'".format(
266 ok, self.__serial.readAll()
267 ).encode("utf-8"),
268 )
269
270 # read until prompt
271 response = self.__serial.readUntil(b"\x04>")
272 if self.__serial.hasTimedOut():
273 self.__blockReadyRead = False
274 return b"", b"Timeout while processing commands."
275 if b"\x04" in response[:-2]:
276 # split stdout, stderr
277 out, err = response[:-2].split(b"\x04")
278 result += out
279 else:
280 err = b"invalid response received: " + response
281 if err:
282 result = b""
283 break
284
285 # switch off raw mode
286 QThread.msleep(10)
287 self.__rawOff()
288 self.__blockReadyRead = False
289
290 return bytes(result), err
291
292 def executeAsync(self, commandsList):
293 """
294 Public method to execute a series of commands over a period of time
295 without returning any result (asynchronous execution).
296
297 @param commandsList list of commands to be execute on the device
298 @type list of bytes
299 """
300 if commandsList:
301 command = commandsList.pop(0)
302 self.__serial.write(command)
303 QTimer.singleShot(2, lambda: self.executeAsync(commandsList))
304 else:
305 self.executeAsyncFinished.emit()

eric ide

mercurial