|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2023 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing an interface to talk to a connected MicroPython device via |
|
8 a webrepl connection. |
|
9 """ |
|
10 |
|
11 from PyQt6.QtCore import QThread, pyqtSlot |
|
12 from PyQt6.QtWidgets import QInputDialog, QLineEdit |
|
13 |
|
14 from eric7 import Preferences |
|
15 from eric7.EricWidgets import EricMessageBox |
|
16 |
|
17 from .MicroPythonDeviceInterface import MicroPythonDeviceInterface |
|
18 from .MicroPythonWebreplSocket import MicroPythonWebreplSocket |
|
19 |
|
20 |
|
21 class MicroPythonWebreplDeviceInterface(MicroPythonDeviceInterface): |
|
22 """ |
|
23 Class implementing an interface to talk to a connected MicroPython device via |
|
24 a webrepl connection. |
|
25 """ |
|
26 |
|
27 def __init__(self, parent=None): |
|
28 """ |
|
29 Constructor |
|
30 |
|
31 @param parent reference to the parent object |
|
32 @type QObject |
|
33 """ |
|
34 super().__init__(parent) |
|
35 |
|
36 self.__blockReadyRead = False |
|
37 |
|
38 self.__socket = MicroPythonWebreplSocket( |
|
39 timeout=Preferences.getMicroPython("WebreplTimeout"), parent=self |
|
40 ) |
|
41 self.__connected = False |
|
42 self.__socket.readyRead.connect(self.__readSocket) |
|
43 |
|
44 @pyqtSlot() |
|
45 def __readSocket(self): |
|
46 """ |
|
47 Private slot to read all available data and emit it with the |
|
48 "dataReceived" signal for further processing. |
|
49 """ |
|
50 if not self.__blockReadyRead: |
|
51 data = bytes(self.__socket.readAll()) |
|
52 self.dataReceived.emit(data) |
|
53 |
|
54 def __readAll(self): |
|
55 """ |
|
56 Private method to read all data and emit it for further processing. |
|
57 """ |
|
58 data = self.__socket.readAll() |
|
59 self.dataReceived.emit(data) |
|
60 |
|
61 @pyqtSlot() |
|
62 def connectToDevice(self, connection): |
|
63 """ |
|
64 Public slot to connect to the device. |
|
65 |
|
66 @param connection name of the connection to be used in the form of an URL string |
|
67 (ws://password@host:port) |
|
68 @type str |
|
69 @return flag indicating success |
|
70 @rtype bool |
|
71 """ |
|
72 connection = connection.replace("ws://", "") |
|
73 try: |
|
74 password, hostPort = connection.split("@", 1) |
|
75 except ValueError: |
|
76 password, hostPort = None, connection |
|
77 if password is None: |
|
78 password, ok = QInputDialog.getText( |
|
79 None, |
|
80 self.tr("WebRepl Password"), |
|
81 self.tr("Enter the WebRepl password:"), |
|
82 QLineEdit.EchoMode.Password, |
|
83 ) |
|
84 if not ok: |
|
85 return False |
|
86 |
|
87 try: |
|
88 host, port = hostPort.split(":", 1) |
|
89 port = int(port) |
|
90 except ValueError: |
|
91 host, port = hostPort, 8266 # default port is 8266 |
|
92 |
|
93 self.__blockReadyRead = True |
|
94 ok = self.__socket.connectToDevice(host, port) |
|
95 if ok: |
|
96 ok = self.__socket.login(password) |
|
97 if not ok: |
|
98 EricMessageBox.warning( |
|
99 None, |
|
100 self.tr("WebRepl Login"), |
|
101 self.tr( |
|
102 "The login to the selected device 'webrepl' failed. The given" |
|
103 " password may be incorrect." |
|
104 ), |
|
105 ) |
|
106 |
|
107 self.__connected = ok |
|
108 self.__blockReadyRead = False |
|
109 |
|
110 return self.__connected |
|
111 |
|
112 @pyqtSlot() |
|
113 def disconnectFromDevice(self): |
|
114 """ |
|
115 Public slot to disconnect from the device. |
|
116 """ |
|
117 self.__socket.disconnect() |
|
118 self.__connected = False |
|
119 |
|
120 def isConnected(self): |
|
121 """ |
|
122 Public method to get the connection status. |
|
123 |
|
124 @return flag indicating the connection status |
|
125 @rtype bool |
|
126 """ |
|
127 return self.__connected |
|
128 |
|
129 @pyqtSlot() |
|
130 def handlePreferencesChanged(self): |
|
131 """ |
|
132 Public slot to handle a change of the preferences. |
|
133 """ |
|
134 self.__socket.setTimeout(Preferences.getMicroPython("WebreplTimeout")) |
|
135 |
|
136 def write(self, data): |
|
137 """ |
|
138 Public method to write data to the connected device. |
|
139 |
|
140 @param data data to be written |
|
141 @type bytes or bytearray |
|
142 """ |
|
143 self.__connected and self.__socket.writeTextMessage(data) |
|
144 |
|
145 def probeDevice(self): |
|
146 """ |
|
147 Public method to check the device is responding. |
|
148 |
|
149 If the device has not been flashed with a MicroPython firmware, the |
|
150 probe will fail. |
|
151 |
|
152 @return flag indicating a communicating MicroPython device |
|
153 @rtype bool |
|
154 """ |
|
155 if not self.__connected: |
|
156 return False |
|
157 |
|
158 # switch on paste mode |
|
159 self.__blockReadyRead = True |
|
160 ok = self.__pasteOn() |
|
161 if not ok: |
|
162 self.__blockReadyRead = False |
|
163 return False |
|
164 |
|
165 # switch off raw mode |
|
166 QThread.msleep(10) |
|
167 self.__pasteOff() |
|
168 self.__blockReadyRead = False |
|
169 |
|
170 return True |
|
171 |
|
172 def execute(self, commands, *, mode="raw", timeout=0): |
|
173 """ |
|
174 Public method to send commands to the connected device and return the |
|
175 result. |
|
176 |
|
177 @param commands list of commands to be executed |
|
178 @type str or list of str |
|
179 @keyparam mode submit mode to be used (one of 'raw' or 'paste') (defaults to |
|
180 'raw'). This is ignored because webrepl always uses 'paste' mode. |
|
181 @type str |
|
182 @keyparam timeout per command timeout in milliseconds (0 for configured default) |
|
183 (defaults to 0) |
|
184 @type int (optional) |
|
185 @return tuple containing stdout and stderr output of the device |
|
186 @rtype tuple of (bytes, bytes) |
|
187 """ |
|
188 if not self.__connected: |
|
189 return b"", b"Device is not connected." |
|
190 |
|
191 if isinstance(commands, list): |
|
192 commands = "\n".join(commands) |
|
193 |
|
194 # switch on paste mode |
|
195 self.__blockReadyRead = True |
|
196 ok = self.__pasteOn() |
|
197 if not ok: |
|
198 self.__blockReadyRead = False |
|
199 return (b"", b"Could not switch to paste mode. Is the device switched on?") |
|
200 |
|
201 # send commands |
|
202 commandBytes = commands.encode("utf-8") |
|
203 self.__socket.writeTextMessage(commandBytes) |
|
204 ok = self.__socket.readUntil(commandBytes) |
|
205 if ok != commandBytes: |
|
206 self.__blockReadyRead = False |
|
207 return ( |
|
208 b"", |
|
209 "Expected '{0}', got '{1}', followed by '{2}'".format( |
|
210 commandBytes, ok, self.__socket.readAll() |
|
211 ).encode("utf-8"), |
|
212 ) |
|
213 |
|
214 # switch off paste mode causing the commands to be executed |
|
215 self.__pasteOff() |
|
216 |
|
217 # read until Python prompt |
|
218 result = ( |
|
219 self.__socket.readUntil(b">>> ", timeout=timeout) |
|
220 .replace(b">>> ", b"") |
|
221 .strip() |
|
222 ) |
|
223 if self.__socket.hasTimedOut(): |
|
224 self.__blockReadyRead = False |
|
225 return b"", b"Timeout while processing commands." |
|
226 |
|
227 # get rid of any OSD string |
|
228 # TODO: emit the OSD data |
|
229 if result.startswith(b"\x1b]0;"): |
|
230 result = result.split(b"\x1b\\")[-1] |
|
231 |
|
232 if self.TracebackMarker in result: |
|
233 errorIndex = result.find(self.TracebackMarker) |
|
234 out, err = result[:errorIndex], result[errorIndex:].replace(">>> ", "") |
|
235 else: |
|
236 out = result |
|
237 err = b"" |
|
238 |
|
239 self.__blockReadyRead = False |
|
240 return out, err |
|
241 |
|
242 def executeAsync(self, commandsList, submitMode): |
|
243 """ |
|
244 Public method to execute a series of commands over a period of time |
|
245 without returning any result (asynchronous execution). |
|
246 |
|
247 @param commandsList list of commands to be execute on the device |
|
248 @type list of str |
|
249 @param submitMode mode to be used to submit the commands |
|
250 @type str (one of 'raw' or 'paste') |
|
251 """ |
|
252 self.__blockReadyRead = True |
|
253 self.__pasteOn() |
|
254 command = b"\n".join(c.encode("utf-8)") for c in commandsList) |
|
255 self.__socket.writeTextMessage(command) |
|
256 self.__socket.readUntil(command) |
|
257 self.__blockReadyRead = False |
|
258 self.__pasteOff() |
|
259 self.executeAsyncFinished.emit() |
|
260 |
|
261 def __pasteOn(self): |
|
262 """ |
|
263 Private method to switch the connected device to 'paste' mode. |
|
264 |
|
265 Note: switching to paste mode is done with synchronous writes. |
|
266 |
|
267 @return flag indicating success |
|
268 @rtype bool |
|
269 """ |
|
270 if not self.__connected: |
|
271 return False |
|
272 |
|
273 pasteMessage = b"paste mode; Ctrl-C to cancel, Ctrl-D to finish\r\n=== " |
|
274 |
|
275 self.__socket.writeTextMessage(b"\x02") # end raw mode if required |
|
276 for _i in range(3): |
|
277 # CTRL-C three times to break out of loops |
|
278 self.__socket.writeTextMessage(b"\r\x03") |
|
279 # time out after 500ms if device is not responding |
|
280 self.__socket.readAll() # read all data and discard it |
|
281 self.__socket.writeTextMessage(b"\r\x05") # send CTRL-E to enter paste mode |
|
282 self.__socket.readUntil(pasteMessage) |
|
283 |
|
284 if self.__socket.hasTimedOut(): |
|
285 # it timed out; try it again and than fail |
|
286 self.__socket.writeTextMessage(b"\r\x05") # send CTRL-E again |
|
287 self.__socket.readUntil(pasteMessage) |
|
288 if self.__socket.hasTimedOut(): |
|
289 return False |
|
290 |
|
291 self.__socket.readAll() # read all data and discard it |
|
292 return True |
|
293 |
|
294 def __pasteOff(self): |
|
295 """ |
|
296 Private method to switch 'paste' mode off. |
|
297 """ |
|
298 if self.__connected: |
|
299 self.__socket.writeTextMessage(b"\x04") # send CTRL-D to cancel paste mode |