src/eric7/MicroPython/MicroPythonWebreplSocket.py

branch
mpy_network
changeset 10008
c5bcafe3485c
child 10017
6d5ba2c97a8a
equal deleted inserted replaced
9990:54c614d91eff 10008:c5bcafe3485c
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2023 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing a websocket class to be connect to the MicroPython webrepl
8 interface.
9 """
10
11 from PyQt6.QtCore import (
12 QCoreApplication, QEventLoop, QMutex, QTime, QTimer, QUrl, pyqtSignal, pyqtSlot
13 )
14 from PyQt6.QtNetwork import QAbstractSocket
15 from PyQt6.QtWebSockets import QWebSocket
16
17 from eric7.EricUtilities.EricMutexLocker import EricMutexLocker
18
19
20 class MicroPythonWebreplSocket(QWebSocket):
21 """
22 Class implementing a websocket client to be connected to the MicroPython webrepl
23 interface.
24
25 @signal readyRead() emitted to signal the availability of data
26 """
27
28 readyRead = pyqtSignal()
29
30 def __init__(self, timeout=10000, parent=None):
31 """
32 Constructor
33
34 @param timeout timout in milliseconds to be set
35 @type int
36 @param parent reference to the parent object
37 @type QObject
38 """
39 super().__init__(parent=parent)
40
41 self.__connected = False
42 self.__timeout = timeout # 10s default timeout
43 self.__timedOut = False
44
45 self.__mutex = QMutex()
46 self.__buffer = b""
47 self.textMessageReceived.connect(self.__textDataReceived)
48
49 @pyqtSlot(str)
50 def __textDataReceived(self, strMessage):
51 """
52 Private slot handling a received text message.
53
54 @param strMessage received text message
55 @type str
56 """
57 with EricMutexLocker(self.__mutex):
58 self.__buffer += strMessage.encode("utf-8")
59
60 self.readyRead.emit()
61
62 def setTimeout(self, timeout):
63 """
64 Public method to set the socket timeout value.
65
66 @param timeout timout in milliseconds to be set
67 @type int
68 """
69 self.__timeout = timeout
70
71 def waitForConnected(self):
72 """
73 Public method to wait for the websocket being connected.
74
75 @return flag indicating the connect result
76 @rtype bool
77 """
78 loop = QEventLoop()
79 self.connected.connect(loop.quit)
80 self.errorOccurred.connect(loop.quit)
81
82 def timeout():
83 loop.quit()
84 self.__timedOut = True
85
86 self.__timedOut = False
87 timer = QTimer()
88 timer.setSingleShot(True)
89 timer.timeout.connect(timeout)
90 timer.start(self.__timeout)
91
92 loop.exec()
93 timer.stop()
94 if self.state() == QAbstractSocket.SocketState.ConnectedState:
95 self.__connected = True
96 return True
97 else:
98 self.__connected = False
99 return False
100
101 def connectToDevice(self, host, port):
102 """
103 Public method to connect to the given host and port.
104
105 @param host host name or IP address
106 @type str
107 @param port port number
108 @type int
109 @return flag indicating success
110 @rtype bool
111 """
112 if self.__connected:
113 self.disconnectFromDevice()
114
115 url = QUrl(f"ws://{host}:{port}")
116 self.open(url)
117 ok = self.waitForConnected()
118 if not ok:
119 return False
120
121 self.__connected = True
122 return True
123
124 def disconnect(self):
125 """
126 Public method to disconnect the websocket.
127 """
128 if self.__connected:
129 self.close()
130 self.__connected = False
131
132 def isConnected(self):
133 """
134 Public method to check the connected state of the websocket.
135
136 @return flag indicating the connected state
137 @rtype bool
138 """
139 return self.__connected
140
141 def hasTimedOut(self):
142 """
143 Public method to check, if the last 'readUntil()' has timed out.
144
145 @return flag indicating a timeout
146 @@rtype bool
147 """
148 return self.__timedOut
149
150 def login(self, password):
151 """
152 Public method to login to the webrepl console of the device.
153
154 @param password password
155 @type str
156 @return flag indicating a successful login
157 @rtype bool
158 """
159 self.readUntil(expected=b": ")
160 self.writeTextMessage(password.encode("utf-8") + b"\r")
161 data = self.readUntil([b">>> ", b"denied\r\n"])
162
163 return not data.endswith(b"denied\r\n")
164
165 def writeTextMessage(self, data):
166 """
167 Public method write some text data to the webrepl server of the connected
168 device.
169
170 @param data text data to be sent
171 @type bytes
172 """
173 self.sendTextMessage(data.decode("utf-8"))
174 self.flush()
175
176 def readAll(self, timeout=0):
177 """
178 Public method to read all available data.
179
180 @param timeout timeout in milliseconds (0 for no timeout)
181 (defaults to 0)
182 @type int (optional)
183 @return received data
184 @rtype bytes
185 """
186 QCoreApplication.processEvents(
187 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
188 )
189 if timeout > 0:
190 # receive data for 'timeout' milliseconds
191 loop = QEventLoop()
192 QTimer.singleShot(timeout, loop.quit)
193 loop.exec()
194
195 # return all buffered data
196 with EricMutexLocker(self.__mutex):
197 data = self.__buffer
198 self.__buffer = b""
199
200 return data
201
202 def readUntil(self, expected=b"\n", size=None, timeout=0):
203 r"""
204 Public method to read data until an expected sequence is found
205 (default: \n) or a specific size is exceeded.
206
207 @param expected expected bytes sequence
208 @type bytes
209 @param size maximum data to be read (defaults to None)
210 @type int (optional)
211 @param timeout timeout in milliseconds (0 for configured default)
212 (defaults to 0)
213 @type int (optional)
214 @return bytes read from the device including the expected sequence
215 @rtype bytes
216 """
217 data = b""
218 self.__timedOut = False
219
220 if timeout == 0:
221 timeout = self.__timeout
222
223 if not isinstance(expected, list):
224 expected = [expected]
225
226 t = QTime.currentTime()
227 while True:
228 QCoreApplication.processEvents(
229 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents, 500
230 )
231 with EricMutexLocker(self.__mutex):
232 if any(e in self.__buffer for e in expected):
233 for e in expected:
234 index = self.__buffer.find(e)
235 if index >= 0:
236 endIndex = index + len(e)
237 data = self.__buffer[:endIndex]
238 self.__buffer = self.__buffer[endIndex:]
239 break
240 break
241 if size is not None and len(self.__buffer) >= size:
242 data = self.__buffer[:size]
243 self.__buffer = self.__buffer[size:]
244 break
245 if t.msecsTo(QTime.currentTime()) > timeout:
246 self.__timedOut = True
247 data = self.__buffer
248 self.__buffer = b""
249 break
250
251 return data

eric ide

mercurial