src/eric7/MicroPython/MicroPythonWebreplDeviceInterface.py

branch
mpy_network
changeset 10008
c5bcafe3485c
child 10012
d649d500a9a1
equal deleted inserted replaced
9990:54c614d91eff 10008:c5bcafe3485c
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2023 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing an interface to talk to a connected MicroPython device via
8 a webrepl connection.
9 """
10
11 from PyQt6.QtCore import QThread, pyqtSlot
12 from PyQt6.QtWidgets import QInputDialog, QLineEdit
13
14 from eric7 import Preferences
15 from eric7.EricWidgets import EricMessageBox
16
17 from .MicroPythonDeviceInterface import MicroPythonDeviceInterface
18 from .MicroPythonWebreplSocket import MicroPythonWebreplSocket
19
20
21 class MicroPythonWebreplDeviceInterface(MicroPythonDeviceInterface):
22 """
23 Class implementing an interface to talk to a connected MicroPython device via
24 a webrepl connection.
25 """
26
27 def __init__(self, parent=None):
28 """
29 Constructor
30
31 @param parent reference to the parent object
32 @type QObject
33 """
34 super().__init__(parent)
35
36 self.__blockReadyRead = False
37
38 self.__socket = MicroPythonWebreplSocket(
39 timeout=Preferences.getMicroPython("WebreplTimeout"), parent=self
40 )
41 self.__connected = False
42 self.__socket.readyRead.connect(self.__readSocket)
43
44 @pyqtSlot()
45 def __readSocket(self):
46 """
47 Private slot to read all available data and emit it with the
48 "dataReceived" signal for further processing.
49 """
50 if not self.__blockReadyRead:
51 data = bytes(self.__socket.readAll())
52 self.dataReceived.emit(data)
53
54 def __readAll(self):
55 """
56 Private method to read all data and emit it for further processing.
57 """
58 data = self.__socket.readAll()
59 self.dataReceived.emit(data)
60
61 @pyqtSlot()
62 def connectToDevice(self, connection):
63 """
64 Public slot to connect to the device.
65
66 @param connection name of the connection to be used in the form of an URL string
67 (ws://password@host:port)
68 @type str
69 @return flag indicating success
70 @rtype bool
71 """
72 connection = connection.replace("ws://", "")
73 try:
74 password, hostPort = connection.split("@", 1)
75 except ValueError:
76 password, hostPort = None, connection
77 if password is None:
78 password, ok = QInputDialog.getText(
79 None,
80 self.tr("WebRepl Password"),
81 self.tr("Enter the WebRepl password:"),
82 QLineEdit.EchoMode.Password,
83 )
84 if not ok:
85 return False
86
87 try:
88 host, port = hostPort.split(":", 1)
89 port = int(port)
90 except ValueError:
91 host, port = hostPort, 8266 # default port is 8266
92
93 self.__blockReadyRead = True
94 ok = self.__socket.connectToDevice(host, port)
95 if ok:
96 ok = self.__socket.login(password)
97 if not ok:
98 EricMessageBox.warning(
99 None,
100 self.tr("WebRepl Login"),
101 self.tr(
102 "The login to the selected device 'webrepl' failed. The given"
103 " password may be incorrect."
104 ),
105 )
106
107 self.__connected = ok
108 self.__blockReadyRead = False
109
110 return self.__connected
111
112 @pyqtSlot()
113 def disconnectFromDevice(self):
114 """
115 Public slot to disconnect from the device.
116 """
117 self.__socket.disconnect()
118 self.__connected = False
119
120 def isConnected(self):
121 """
122 Public method to get the connection status.
123
124 @return flag indicating the connection status
125 @rtype bool
126 """
127 return self.__connected
128
129 @pyqtSlot()
130 def handlePreferencesChanged(self):
131 """
132 Public slot to handle a change of the preferences.
133 """
134 self.__socket.setTimeout(Preferences.getMicroPython("WebreplTimeout"))
135
136 def write(self, data):
137 """
138 Public method to write data to the connected device.
139
140 @param data data to be written
141 @type bytes or bytearray
142 """
143 self.__connected and self.__socket.writeTextMessage(data)
144
145 def probeDevice(self):
146 """
147 Public method to check the device is responding.
148
149 If the device has not been flashed with a MicroPython firmware, the
150 probe will fail.
151
152 @return flag indicating a communicating MicroPython device
153 @rtype bool
154 """
155 if not self.__connected:
156 return False
157
158 # switch on paste mode
159 self.__blockReadyRead = True
160 ok = self.__pasteOn()
161 if not ok:
162 self.__blockReadyRead = False
163 return False
164
165 # switch off raw mode
166 QThread.msleep(10)
167 self.__pasteOff()
168 self.__blockReadyRead = False
169
170 return True
171
172 def execute(self, commands, *, mode="raw", timeout=0):
173 """
174 Public method to send commands to the connected device and return the
175 result.
176
177 @param commands list of commands to be executed
178 @type str or list of str
179 @keyparam mode submit mode to be used (one of 'raw' or 'paste') (defaults to
180 'raw'). This is ignored because webrepl always uses 'paste' mode.
181 @type str
182 @keyparam timeout per command timeout in milliseconds (0 for configured default)
183 (defaults to 0)
184 @type int (optional)
185 @return tuple containing stdout and stderr output of the device
186 @rtype tuple of (bytes, bytes)
187 """
188 if not self.__connected:
189 return b"", b"Device is not connected."
190
191 if isinstance(commands, list):
192 commands = "\n".join(commands)
193
194 # switch on paste mode
195 self.__blockReadyRead = True
196 ok = self.__pasteOn()
197 if not ok:
198 self.__blockReadyRead = False
199 return (b"", b"Could not switch to paste mode. Is the device switched on?")
200
201 # send commands
202 commandBytes = commands.encode("utf-8")
203 self.__socket.writeTextMessage(commandBytes)
204 ok = self.__socket.readUntil(commandBytes)
205 if ok != commandBytes:
206 self.__blockReadyRead = False
207 return (
208 b"",
209 "Expected '{0}', got '{1}', followed by '{2}'".format(
210 commandBytes, ok, self.__socket.readAll()
211 ).encode("utf-8"),
212 )
213
214 # switch off paste mode causing the commands to be executed
215 self.__pasteOff()
216
217 # read until Python prompt
218 result = (
219 self.__socket.readUntil(b">>> ", timeout=timeout)
220 .replace(b">>> ", b"")
221 .strip()
222 )
223 if self.__socket.hasTimedOut():
224 self.__blockReadyRead = False
225 return b"", b"Timeout while processing commands."
226
227 # get rid of any OSD string
228 # TODO: emit the OSD data
229 if result.startswith(b"\x1b]0;"):
230 result = result.split(b"\x1b\\")[-1]
231
232 if self.TracebackMarker in result:
233 errorIndex = result.find(self.TracebackMarker)
234 out, err = result[:errorIndex], result[errorIndex:].replace(">>> ", "")
235 else:
236 out = result
237 err = b""
238
239 self.__blockReadyRead = False
240 return out, err
241
242 def executeAsync(self, commandsList, submitMode):
243 """
244 Public method to execute a series of commands over a period of time
245 without returning any result (asynchronous execution).
246
247 @param commandsList list of commands to be execute on the device
248 @type list of str
249 @param submitMode mode to be used to submit the commands
250 @type str (one of 'raw' or 'paste')
251 """
252 self.__blockReadyRead = True
253 self.__pasteOn()
254 command = b"\n".join(c.encode("utf-8)") for c in commandsList)
255 self.__socket.writeTextMessage(command)
256 self.__socket.readUntil(command)
257 self.__blockReadyRead = False
258 self.__pasteOff()
259 self.executeAsyncFinished.emit()
260
261 def __pasteOn(self):
262 """
263 Private method to switch the connected device to 'paste' mode.
264
265 Note: switching to paste mode is done with synchronous writes.
266
267 @return flag indicating success
268 @rtype bool
269 """
270 if not self.__connected:
271 return False
272
273 pasteMessage = b"paste mode; Ctrl-C to cancel, Ctrl-D to finish\r\n=== "
274
275 self.__socket.writeTextMessage(b"\x02") # end raw mode if required
276 for _i in range(3):
277 # CTRL-C three times to break out of loops
278 self.__socket.writeTextMessage(b"\r\x03")
279 # time out after 500ms if device is not responding
280 self.__socket.readAll() # read all data and discard it
281 self.__socket.writeTextMessage(b"\r\x05") # send CTRL-E to enter paste mode
282 self.__socket.readUntil(pasteMessage)
283
284 if self.__socket.hasTimedOut():
285 # it timed out; try it again and than fail
286 self.__socket.writeTextMessage(b"\r\x05") # send CTRL-E again
287 self.__socket.readUntil(pasteMessage)
288 if self.__socket.hasTimedOut():
289 return False
290
291 self.__socket.readAll() # read all data and discard it
292 return True
293
294 def __pasteOff(self):
295 """
296 Private method to switch 'paste' mode off.
297 """
298 if self.__connected:
299 self.__socket.writeTextMessage(b"\x04") # send CTRL-D to cancel paste mode

eric ide

mercurial