|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2023 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing a websocket class to be connect to the MicroPython webrepl |
|
8 interface. |
|
9 """ |
|
10 |
|
11 from PyQt6.QtCore import ( |
|
12 QCoreApplication, QEventLoop, QMutex, QTime, QTimer, QUrl, pyqtSignal, pyqtSlot |
|
13 ) |
|
14 from PyQt6.QtNetwork import QAbstractSocket |
|
15 from PyQt6.QtWebSockets import QWebSocket |
|
16 |
|
17 from eric7.EricUtilities.EricMutexLocker import EricMutexLocker |
|
18 |
|
19 |
|
20 class MicroPythonWebreplSocket(QWebSocket): |
|
21 """ |
|
22 Class implementing a websocket client to be connected to the MicroPython webrepl |
|
23 interface. |
|
24 |
|
25 @signal readyRead() emitted to signal the availability of data |
|
26 """ |
|
27 |
|
28 readyRead = pyqtSignal() |
|
29 |
|
30 def __init__(self, timeout=10000, parent=None): |
|
31 """ |
|
32 Constructor |
|
33 |
|
34 @param timeout timout in milliseconds to be set |
|
35 @type int |
|
36 @param parent reference to the parent object |
|
37 @type QObject |
|
38 """ |
|
39 super().__init__(parent=parent) |
|
40 |
|
41 self.__connected = False |
|
42 self.__timeout = timeout # 10s default timeout |
|
43 self.__timedOut = False |
|
44 |
|
45 self.__mutex = QMutex() |
|
46 self.__buffer = b"" |
|
47 self.textMessageReceived.connect(self.__textDataReceived) |
|
48 |
|
49 @pyqtSlot(str) |
|
50 def __textDataReceived(self, strMessage): |
|
51 """ |
|
52 Private slot handling a received text message. |
|
53 |
|
54 @param strMessage received text message |
|
55 @type str |
|
56 """ |
|
57 with EricMutexLocker(self.__mutex): |
|
58 self.__buffer += strMessage.encode("utf-8") |
|
59 |
|
60 self.readyRead.emit() |
|
61 |
|
62 def setTimeout(self, timeout): |
|
63 """ |
|
64 Public method to set the socket timeout value. |
|
65 |
|
66 @param timeout timout in milliseconds to be set |
|
67 @type int |
|
68 """ |
|
69 self.__timeout = timeout |
|
70 |
|
71 def waitForConnected(self): |
|
72 """ |
|
73 Public method to wait for the websocket being connected. |
|
74 |
|
75 @return flag indicating the connect result |
|
76 @rtype bool |
|
77 """ |
|
78 loop = QEventLoop() |
|
79 self.connected.connect(loop.quit) |
|
80 self.errorOccurred.connect(loop.quit) |
|
81 |
|
82 def timeout(): |
|
83 loop.quit() |
|
84 self.__timedOut = True |
|
85 |
|
86 self.__timedOut = False |
|
87 timer = QTimer() |
|
88 timer.setSingleShot(True) |
|
89 timer.timeout.connect(timeout) |
|
90 timer.start(self.__timeout) |
|
91 |
|
92 loop.exec() |
|
93 timer.stop() |
|
94 if self.state() == QAbstractSocket.SocketState.ConnectedState: |
|
95 self.__connected = True |
|
96 return True |
|
97 else: |
|
98 self.__connected = False |
|
99 return False |
|
100 |
|
101 def connectToDevice(self, host, port): |
|
102 """ |
|
103 Public method to connect to the given host and port. |
|
104 |
|
105 @param host host name or IP address |
|
106 @type str |
|
107 @param port port number |
|
108 @type int |
|
109 @return flag indicating success |
|
110 @rtype bool |
|
111 """ |
|
112 if self.__connected: |
|
113 self.disconnectFromDevice() |
|
114 |
|
115 url = QUrl(f"ws://{host}:{port}") |
|
116 self.open(url) |
|
117 ok = self.waitForConnected() |
|
118 if not ok: |
|
119 return False |
|
120 |
|
121 self.__connected = True |
|
122 return True |
|
123 |
|
124 def disconnect(self): |
|
125 """ |
|
126 Public method to disconnect the websocket. |
|
127 """ |
|
128 if self.__connected: |
|
129 self.close() |
|
130 self.__connected = False |
|
131 |
|
132 def isConnected(self): |
|
133 """ |
|
134 Public method to check the connected state of the websocket. |
|
135 |
|
136 @return flag indicating the connected state |
|
137 @rtype bool |
|
138 """ |
|
139 return self.__connected |
|
140 |
|
141 def hasTimedOut(self): |
|
142 """ |
|
143 Public method to check, if the last 'readUntil()' has timed out. |
|
144 |
|
145 @return flag indicating a timeout |
|
146 @@rtype bool |
|
147 """ |
|
148 return self.__timedOut |
|
149 |
|
150 def login(self, password): |
|
151 """ |
|
152 Public method to login to the webrepl console of the device. |
|
153 |
|
154 @param password password |
|
155 @type str |
|
156 @return flag indicating a successful login |
|
157 @rtype bool |
|
158 """ |
|
159 self.readUntil(expected=b": ") |
|
160 self.writeTextMessage(password.encode("utf-8") + b"\r") |
|
161 data = self.readUntil([b">>> ", b"denied\r\n"]) |
|
162 |
|
163 return not data.endswith(b"denied\r\n") |
|
164 |
|
165 def writeTextMessage(self, data): |
|
166 """ |
|
167 Public method write some text data to the webrepl server of the connected |
|
168 device. |
|
169 |
|
170 @param data text data to be sent |
|
171 @type bytes |
|
172 """ |
|
173 self.sendTextMessage(data.decode("utf-8")) |
|
174 self.flush() |
|
175 |
|
176 def readAll(self, timeout=0): |
|
177 """ |
|
178 Public method to read all available data. |
|
179 |
|
180 @param timeout timeout in milliseconds (0 for no timeout) |
|
181 (defaults to 0) |
|
182 @type int (optional) |
|
183 @return received data |
|
184 @rtype bytes |
|
185 """ |
|
186 QCoreApplication.processEvents( |
|
187 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents |
|
188 ) |
|
189 if timeout > 0: |
|
190 # receive data for 'timeout' milliseconds |
|
191 loop = QEventLoop() |
|
192 QTimer.singleShot(timeout, loop.quit) |
|
193 loop.exec() |
|
194 |
|
195 # return all buffered data |
|
196 with EricMutexLocker(self.__mutex): |
|
197 data = self.__buffer |
|
198 self.__buffer = b"" |
|
199 |
|
200 return data |
|
201 |
|
202 def readUntil(self, expected=b"\n", size=None, timeout=0): |
|
203 r""" |
|
204 Public method to read data until an expected sequence is found |
|
205 (default: \n) or a specific size is exceeded. |
|
206 |
|
207 @param expected expected bytes sequence |
|
208 @type bytes |
|
209 @param size maximum data to be read (defaults to None) |
|
210 @type int (optional) |
|
211 @param timeout timeout in milliseconds (0 for configured default) |
|
212 (defaults to 0) |
|
213 @type int (optional) |
|
214 @return bytes read from the device including the expected sequence |
|
215 @rtype bytes |
|
216 """ |
|
217 data = b"" |
|
218 self.__timedOut = False |
|
219 |
|
220 if timeout == 0: |
|
221 timeout = self.__timeout |
|
222 |
|
223 if not isinstance(expected, list): |
|
224 expected = [expected] |
|
225 |
|
226 t = QTime.currentTime() |
|
227 while True: |
|
228 QCoreApplication.processEvents( |
|
229 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents, 500 |
|
230 ) |
|
231 with EricMutexLocker(self.__mutex): |
|
232 if any(e in self.__buffer for e in expected): |
|
233 for e in expected: |
|
234 index = self.__buffer.find(e) |
|
235 if index >= 0: |
|
236 endIndex = index + len(e) |
|
237 data = self.__buffer[:endIndex] |
|
238 self.__buffer = self.__buffer[endIndex:] |
|
239 break |
|
240 break |
|
241 if size is not None and len(self.__buffer) >= size: |
|
242 data = self.__buffer[:size] |
|
243 self.__buffer = self.__buffer[size:] |
|
244 break |
|
245 if t.msecsTo(QTime.currentTime()) > timeout: |
|
246 self.__timedOut = True |
|
247 data = self.__buffer |
|
248 self.__buffer = b"" |
|
249 break |
|
250 |
|
251 return data |