MqttMonitor/MqttClient.py

branch
eric7
changeset 143
51bc5bcc672a
parent 140
853ffd248dda
child 151
f9830d91be9a
equal deleted inserted replaced
142:cf158867ed89 143:51bc5bcc672a
20 20
21 class MqttClient(QObject): 21 class MqttClient(QObject):
22 """ 22 """
23 Class implementing a PyQt wrapper around the paho MQTT client. 23 Class implementing a PyQt wrapper around the paho MQTT client.
24 24
25 @signal onConnectV3(flags, rc) emitted after the client has connected to 25 @signal onConnect(connectFlags, rc, packetType, properties) emitted after the
26 the broker (MQTT v3) 26 client has connected to the broker
27 @signal onConnectV5(flags, rc, packetType, properties emitted after the 27 @signal onDisconnected(rc, packetType) emitted after the client has
28 client has connected to the broker (MQTT v5) 28 disconnected from the broker
29 @signal onDisconnectedV3(rc) emitted after the client has disconnected from
30 the broker (MQTT v3)
31 @signal onDisconnectedV5(rc, packetType) emitted after the client has
32 disconnected from the broker (MQTT v5)
33 @signal onLog(level, message) emitted to send client log data 29 @signal onLog(level, message) emitted to send client log data
34 @signal onMessageV3(topic, payload, qos, retain) emitted after a message 30 @signal onMessage(topic, payload, qos, retain, properties) emitted after
35 has been received by the client (MQTT v3) 31 a message has been received by the client
36 @signal onMessageV5(topic, payload, qos, retain, properties) emitted after
37 a message has been received by the client (MQTT v5)
38 @signal onPublish(mid) emitted after a message has been published 32 @signal onPublish(mid) emitted after a message has been published
39 @signal onSubscribeV3(mid, grantedQos) emitted after the client has 33 @signal onSubscribe(mid, reasonCodes, properties) emitted after the
40 subscribed to some topics (MQTT v3) 34 client has subscribed to some topics
41 @signal onSubscribeV5(mid, reasonCodes, properties) emitted after the 35 @signal onUnsubscribe(mid) emitted after the client has unsubscribed from
42 client has subscribed to some topics (MQTT v5)
43 @signal onUnsubscribeV3(mid) emitted after the client has unsubscribed from
44 some topics (MQTT v3) 36 some topics (MQTT v3)
45 @signal onUnsubscribeV5(mid, rc, packetType, properties) emitted after the 37 @signal onUnsubscribe(mid, rc, packetType, properties) emitted after the
46 client has unsubscribed from some topics (MQTT v5) 38 client has unsubscribed from some topics (MQTT v5)
47 @signal connectTimeout() emitted to indicate, that a connection attempt 39 @signal connectTimeout() emitted to indicate, that a connection attempt
48 timed out 40 timed out
49 """ 41 """
50 42
51 onConnectV3 = pyqtSignal(dict, int) 43 onConnect = pyqtSignal(bool, int, int, dict)
52 onConnectV5 = pyqtSignal(dict, int, int, dict) 44 onDisconnected = pyqtSignal(int, int)
53 onDisconnectedV3 = pyqtSignal(int)
54 onDisconnectedV5 = pyqtSignal(int, int)
55 onLog = pyqtSignal(int, str) 45 onLog = pyqtSignal(int, str)
56 onMessageV3 = pyqtSignal(str, bytes, int, bool) 46 onMessage = pyqtSignal(str, bytes, int, bool, dict)
57 onMessageV5 = pyqtSignal(str, bytes, int, bool, dict)
58 onPublish = pyqtSignal(int) 47 onPublish = pyqtSignal(int)
59 onSubscribeV3 = pyqtSignal(int, tuple) 48 onSubscribe = pyqtSignal(int, list, dict)
60 onSubscribeV5 = pyqtSignal(int, list, dict) 49 onUnsubscribe = pyqtSignal((int,), (int, int, int, dict))
61 onUnsubscribeV3 = pyqtSignal(int)
62 onUnsubscribeV5 = pyqtSignal(int, int, int, dict)
63 50
64 connectTimeout = pyqtSignal() 51 connectTimeout = pyqtSignal()
65 52
66 DefaultConnectTimeout = 15 # connect timeout in seconds 53 DefaultConnectTimeout = 15 # connect timeout in seconds
67 54
111 self.__connectTimeoutTimer = QTimer(self) 98 self.__connectTimeoutTimer = QTimer(self)
112 self.__connectTimeoutTimer.setSingleShot(True) 99 self.__connectTimeoutTimer.setSingleShot(True)
113 self.__connectTimeoutTimer.setInterval(MqttClient.DefaultConnectTimeout * 1000) 100 self.__connectTimeoutTimer.setInterval(MqttClient.DefaultConnectTimeout * 1000)
114 self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout) 101 self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout)
115 102
116 self.onConnectV3.connect(self.__connectTimeoutTimer.stop) 103 self.onConnect.connect(self.__connectTimeoutTimer.stop)
117 self.onConnectV5.connect(self.__connectTimeoutTimer.stop)
118 104
119 self.__cleanSession = cleanSession 105 self.__cleanSession = cleanSession
120 self.__protocol = protocol 106 self.__protocol = protocol
121 self.__disconnectUserProperties = [] 107 self.__disconnectUserProperties = []
122 108
123 if protocol == MqttProtocols.MQTTv5: 109 if protocol == MqttProtocols.MQTTv5:
124 cleanSession = None 110 cleanSession = None
125 111
126 self.__mqttClient = mqtt.Client( 112 self.__mqttClient = mqtt.Client(
113 mqtt.CallbackAPIVersion.VERSION2,
127 client_id=clientId, 114 client_id=clientId,
128 clean_session=cleanSession, 115 clean_session=cleanSession,
129 userdata=userdata, 116 userdata=userdata,
130 protocol=int(protocol), 117 protocol=int(protocol),
131 transport=transport, 118 transport=transport,
132 ) 119 )
133 120
134 self.__initCallbacks(protocol) 121 self.__initCallbacks()
135 122
136 def __initCallbacks(self, protocol): 123 def __initCallbacks(self):
137 """ 124 """
138 Private method to initialize the MQTT callback methods. 125 Private method to initialize the MQTT callback methods.
139 126 """
140 @param protocol MQTT protocol version 127 self.__mqttClient.on_connect = self.__onConnect
141 @type MqttProtocols 128 self.__mqttClient.on_disconnect = self.__onDisconnected
142 """
143 if protocol in (MqttProtocols.MQTTv31, MqttProtocols.MQTTv311):
144 self.__mqttClient.on_connect = self.__onConnectV3
145 self.__mqttClient.on_disconnect = self.__onDisconnectedV3
146 self.__mqttClient.on_subscribe = self.__onSubscribeV3
147 self.__mqttClient.on_unsubscribe = self.__onUnsubscribeV3
148 self.__mqttClient.on_message = self.__onMessageV3
149 else:
150 self.__mqttClient.on_connect = self.__onConnectV5
151 self.__mqttClient.on_disconnect = self.__onDisconnectedV5
152 self.__mqttClient.on_subscribe = self.__onSubscribeV5
153 self.__mqttClient.on_unsubscribe = self.__onUnsubscribeV5
154 self.__mqttClient.on_message = self.__onMessageV5
155
156 self.__mqttClient.on_log = self.__onLog 129 self.__mqttClient.on_log = self.__onLog
130 self.__mqttClient.on_message = self.__onMessage
157 self.__mqttClient.on_publish = self.__onPublish 131 self.__mqttClient.on_publish = self.__onPublish
158 132 self.__mqttClient.on_subscribe = self.__onSubscribe
159 def __onConnectV3( 133 self.__mqttClient.on_unsubscribe = self.__onUnsubscribe
134
135 def __onConnect(
160 self, 136 self,
161 client, # noqa: U100 137 _client,
162 userdata, # noqa: U100 138 _userdata,
163 flags, 139 connectFlags,
164 rc, 140 rc,
165 properties=None, # noqa: U100 141 properties,
166 ): 142 ):
167 """ 143 """
168 Private method to handle the connect to the broker (MQTT v3.1 and v3.1.1). 144 Private method to handle the connect to the broker.
169 145
170 @param client reference to the client object 146 @param _client reference to the client object (unused)
171 @type paho.mqtt.Client 147 @type paho.mqtt.Client
172 @param userdata user data 148 @param _userdata user data (unused)
173 @type Any 149 @type Any
174 @param flags dictionary containing the response flags sent by the broker 150 @param connectFlags response flags sent by the broker
175 @type dict 151 @type mqtt.ConnectFlags
176 @param rc result code
177 @type int
178 @param properties optional properties (defaults to None)
179 @type dict (optional)
180 """
181 self.onConnectV3.emit(flags, rc)
182
183 def __onDisconnectedV3(
184 self,
185 client, # noqa: U100
186 userdata, # noqa: U100
187 rc,
188 ):
189 """
190 Private method to handle the disconnect from the broker (MQTT v3.1 and v3.1.1).
191
192 @param client reference to the client object
193 @type paho.mqtt.Client
194 @param userdata user data
195 @type Any
196 @param rc result code
197 @type int
198 """
199 self.onDisconnectedV3.emit(rc)
200
201 def __onSubscribeV3(
202 self,
203 client, # noqa: U100
204 userdata, # noqa: U100
205 mid,
206 grantedQos,
207 ):
208 """
209 Private method to handle a subscribe event (MQTT v3.1 and v3.1.1).
210
211 @param client reference to the client object
212 @type paho.mqtt.Client
213 @param userdata user data
214 @type Any
215 @param mid message ID
216 @type int
217 @param grantedQos list of granted QoS for each subscription request
218 @type list of int
219 """
220 self.onSubscribeV3.emit(mid, grantedQos)
221
222 def __onUnsubscribeV3(
223 self,
224 client, # noqa: U100
225 userdata, # noqa: U100
226 mid,
227 ):
228 """
229 Private method to handle an unsubscribe event (MQTT v3.1 and v3.1.1).
230
231 @param client reference to the client object
232 @type paho.mqtt.Client
233 @param userdata user data
234 @type Any
235 @param mid message ID
236 @type int
237 """
238 self.onUnsubscribeV3.emit(mid)
239
240 def __onMessageV3(
241 self,
242 client, # noqa: U100
243 userdata, # noqa: U100
244 message,
245 ):
246 """
247 Private method to handle a new message received from the broker (MQTT v3.1
248 and v3.1.1).
249
250 @param client reference to the client object
251 @type paho.mqtt.Client
252 @param userdata user data
253 @type Any
254 @param message received message object
255 @type paho.mqtt.MQTTMessage
256 """
257 self.onMessageV3.emit(
258 message.topic, message.payload, message.qos, message.retain
259 )
260
261 def __onConnectV5(
262 self,
263 client, # noqa: U100
264 userdata, # noqa: U100
265 flags,
266 rc,
267 properties=None,
268 ):
269 """
270 Private method to handle the connect to the broker (MQTT v5.0).
271
272 @param client reference to the client object
273 @type paho.mqtt.Client
274 @param userdata user data
275 @type Any
276 @param flags dictionary containing the response flags sent by the broker
277 @type dict
278 @param rc reason code 152 @param rc reason code
279 @type paho.mqtt.ReasonCodes 153 @type paho.mqtt.ReasonCodes
280 @param properties optional properties (defaults to None) 154 @param properties MQTT v5.0 properties received from the broker
281 @type dict (optional) 155 @type dict
282 """ 156 """
283 self.onConnectV5.emit( 157 self.onConnect.emit(
284 flags, 158 connectFlags.session_present, rc.value, rc.packetType, properties.json()
285 rc.value, 159 )
286 rc.packetType, 160
287 properties.json() if properties is not None else {}, 161 def __onDisconnected(
288 )
289
290 def __onDisconnectedV5(
291 self, 162 self,
292 client, # noqa: U100 163 _client,
293 userdata, # noqa: U100 164 _userdata,
165 _flags,
294 rc, 166 rc,
295 properties=None, # noqa: U100 167 _properties=None,
296 ): 168 ):
297 """ 169 """
298 Private method to handle the disconnect from the broker (MQTT v5.0). 170 Private method to handle the disconnect from the broker.
299 171
300 @param client reference to the client object 172 @param _client reference to the client object (unused)
301 @type paho.mqtt.Client 173 @type paho.mqtt.Client
302 @param userdata user data 174 @param _userdata user data (unused)
303 @type Any 175 @type Any
176 @param _flags dictionary containing the response flags sent by the broker
177 (unused)
178 @type dict
304 @param rc result code or reason code 179 @param rc result code or reason code
305 @type int or paho.mqtt.ReasonCodes 180 @type int or paho.mqtt.ReasonCodes
306 @param properties optional properties (defaults to None) 181 @param _properties MQTT v5.0 properties received from the broker
182 (defaults to None) (unused)
307 @type dict (optional) 183 @type dict (optional)
308 """ 184 """
309 if isinstance(rc, int): 185 if isinstance(rc, int):
310 packetType = PacketTypes.DISCONNECT 186 packetType = PacketTypes.DISCONNECT
311 resultCode = rc 187 resultCode = rc
312 else: 188 else:
313 packetType = rc.packetType 189 packetType = rc.packetType
314 resultCode = rc.value 190 resultCode = rc.value
315 self.onDisconnectedV5.emit(resultCode, packetType) 191 self.onDisconnected.emit(resultCode, packetType)
316 192
317 def __onSubscribeV5( 193 def __onSubscribe(
318 self, 194 self,
319 client, # noqa: U100 195 _client,
320 userdata, # noqa: U100 196 _userdata,
321 mid, 197 mid,
322 reasonCodes, 198 reasonCodes,
323 properties=None, 199 properties,
324 ): 200 ):
325 """ 201 """
326 Private method to handle a subscribe event (MQTT v5.0). 202 Private method to handle a subscribe event.
327 203
328 @param client reference to the client object 204 @param _client reference to the client object (unused)
329 @type paho.mqtt.Client 205 @type paho.mqtt.Client
330 @param userdata user data 206 @param _userdata user data (unused)
331 @type Any 207 @type Any
332 @param mid message ID 208 @param mid message ID
333 @type int 209 @type int
334 @param reasonCodes list of reason code for each subscribed topic 210 @param reasonCodes list of reason code for each subscribed topic
335 @type list of paho.mqtt.ReasonCodes 211 @type list of paho.mqtt.ReasonCodes
336 @param properties optional properties (defaults to None) 212 @param properties MQTT v5.0 properties received from the broker
337 @type dict (optional) 213 @type dict
338 """ 214 """
339 self.onSubscribeV5.emit( 215 self.onSubscribe.emit(mid, reasonCodes, properties.json())
340 mid, 216
341 reasonCodes, 217 def __onUnsubscribe(
342 properties.json() if properties is not None else {},
343 )
344
345 def __onUnsubscribeV5(
346 self, 218 self,
347 client, # noqa: U100 219 _client,
348 userdata, # noqa: U100 220 _userdata,
349 mid, 221 mid,
222 reasonCodes,
350 properties, 223 properties,
351 reasonCodes, 224 ):
352 ): 225 """
353 """ 226 Private method to handle an unsubscribe event.
354 Private method to handle an unsubscribe event (MQTT v5.0). 227
355 228 @param _client reference to the client object (unused)
356 @param client reference to the client object
357 @type paho.mqtt.Client 229 @type paho.mqtt.Client
358 @param userdata user data 230 @param _userdata user data (unused)
359 @type Any 231 @type Any
360 @param mid message ID 232 @param mid message ID
361 @type int 233 @type int
362 @param properties optional properties (defaults to None)
363 @type dict (optional)
364 @param reasonCodes list of reason code for each unsubscribed topic 234 @param reasonCodes list of reason code for each unsubscribed topic
365 @type list of paho.mqtt.ReasonCodes 235 @type list of paho.mqtt.ReasonCodes
366 """ 236 @param properties MQTT v5.0 properties received from the broker
367 self.onUnsubscribeV5.emit( 237 @type dict
368 mid, 238 """
369 reasonCodes.value, 239 if reasonCodes:
370 reasonCodes.packetType, 240 self.onUnsubscribe[int, int, int, dict].emit(
371 properties.json() if properties is not None else {}, 241 mid, reasonCodes[0].value, reasonCodes[0].packetType, properties.json()
372 ) 242 )
373 243 else:
374 def __onMessageV5( 244 self.onUnsubscribe[int].emit(mid)
245
246 def __onMessage(
375 self, 247 self,
376 client, # noqa: U100 248 _client,
377 userdata, # noqa: U100 249 _userdata,
378 message, 250 message,
379 ): 251 ):
380 """ 252 """
381 Private method to handle a new message received from the broker (MQTT v5.0). 253 Private method to handle a new message received from the broker.
382 254
383 @param client reference to the client object 255 @param _client reference to the client object (unused)
384 @type paho.mqtt.Client 256 @type paho.mqtt.Client
385 @param userdata user data 257 @param _userdata user data (unused)
386 @type Any 258 @type Any
387 @param message received message object 259 @param message received message object
388 @type paho.mqtt.MQTTMessage 260 @type paho.mqtt.MQTTMessage
389 """ 261 """
390 self.onMessageV5.emit( 262 self.onMessage.emit(
391 message.topic, 263 message.topic,
392 message.payload, 264 message.payload,
393 message.qos, 265 message.qos,
394 message.retain, 266 message.retain,
395 message.properties.json(), 267 message.properties.json() if message.properties is not None else {},
396 ) 268 )
397 269
398 def __onLog( 270 def __onLog(
399 self, 271 self,
400 client, # noqa: U100 272 _client,
401 userdata, # noqa: U100 273 _userdata,
402 level, 274 level,
403 buf, 275 buf,
404 ): 276 ):
405 """ 277 """
406 Private method to handle a log event (MQTT v3.1, v3.1.1 and v5.0). 278 Private method to handle a log event.
407 279
408 @param client reference to the client object 280 @param _client reference to the client object (unused)
409 @type paho.mqtt.Client 281 @type paho.mqtt.Client
410 @param userdata user data 282 @param _userdata user data (unused)
411 @type Any 283 @type Any
412 @param level severity of the log message 284 @param level severity of the log message
413 @type int 285 @type int
414 @param buf log message 286 @param buf log message
415 @type str 287 @type str
416 """ 288 """
417 self.onLog.emit(level, buf) 289 self.onLog.emit(level, buf)
418 290
419 def __onPublish( 291 def __onPublish(
420 self, 292 self,
421 client, # noqa: U100 293 _client,
422 userdata, # noqa: U100 294 _userdata,
423 mid, 295 mid,
424 ): 296 _reasonCode,
425 """ 297 _properties,
426 Private method to handle the publishing of a message (MQTT v3.1, v3.1.1 298 ):
427 and v5.0). 299 """
428 300 Private method to handle the publishing of a message.
429 @param client reference to the client object 301
302 @param _client reference to the client object (unused)
430 @type paho.mqtt.Client 303 @type paho.mqtt.Client
431 @param userdata user data 304 @param _userdata user data (unused)
432 @type Any 305 @type Any
433 @param mid message ID 306 @param mid message ID
434 @type int 307 @type int
308 @param _reasonCode reason code (unused)
309 @type paho.mqtt.ReasonCodes
310 @param _properties MQTT v5.0 properties received from the broker (unused)
311 @type dict
435 """ 312 """
436 self.onPublish.emit(mid) 313 self.onPublish.emit(mid)
437 314
438 @pyqtSlot() 315 @pyqtSlot()
439 def __connectTimeout(self): 316 def __connectTimeout(self):
743 "CleanSession", "Username", "Password", "WillTopic", "WillMessage", 620 "CleanSession", "Username", "Password", "WillTopic", "WillMessage",
744 "WillQos", "WillRetain", "WillProperties", "TlsEnable", 621 "WillQos", "WillRetain", "WillProperties", "TlsEnable",
745 "TlsCaCert", "TlsClientCert", "TlsClientKey", "UserProperties". 622 "TlsCaCert", "TlsClientCert", "TlsClientKey", "UserProperties".
746 @rtype dict 623 @rtype dict
747 """ 624 """
748 from PluginMqttMonitor import mqttPluginObject 625 from PluginMqttMonitor import mqttPluginObject # noqa: I102
749 626
750 return { 627 return {
751 "ClientId": "ERIC7_MQTT_MONITOR_CLIENT", 628 "ClientId": "ERIC7_MQTT_MONITOR_CLIENT",
752 "Protocol": mqttPluginObject.getPreferences("DefaultProtocol"), 629 "Protocol": mqttPluginObject.getPreferences("DefaultProtocol"),
753 "ConnectionTimeout": MqttClient.DefaultConnectTimeout, 630 "ConnectionTimeout": MqttClient.DefaultConnectTimeout,
880 props = Properties(packetType) 757 props = Properties(packetType)
881 props.UserProperty = properties 758 props.UserProperty = properties
882 return props 759 return props
883 760
884 761
885 def mqttConnackMessage(connackCode):
886 """
887 Module function to get the string associated with a CONNACK result.
888
889 @param connackCode result code of the connection request
890 @type int
891 @return textual representation for the result code
892 @rtype str
893 """
894 if connackCode == mqtt.CONNACK_ACCEPTED:
895 return QCoreApplication.translate("MqttConnackMessage", "Connection Accepted.")
896 elif connackCode == mqtt.CONNACK_REFUSED_PROTOCOL_VERSION:
897 return QCoreApplication.translate(
898 "MqttConnackMessage", "Connection Refused: unacceptable protocol version."
899 )
900 elif connackCode == mqtt.CONNACK_REFUSED_IDENTIFIER_REJECTED:
901 return QCoreApplication.translate(
902 "MqttConnackMessage", "Connection Refused: identifier rejected."
903 )
904 elif connackCode == mqtt.CONNACK_REFUSED_SERVER_UNAVAILABLE:
905 return QCoreApplication.translate(
906 "MqttConnackMessage", "Connection Refused: broker unavailable."
907 )
908 elif connackCode == mqtt.CONNACK_REFUSED_BAD_USERNAME_PASSWORD:
909 return QCoreApplication.translate(
910 "MqttConnackMessage", "Connection Refused: bad user name or password."
911 )
912 elif connackCode == mqtt.CONNACK_REFUSED_NOT_AUTHORIZED:
913 return QCoreApplication.translate(
914 "MqttConnackMessage", "Connection Refused: not authorised."
915 )
916 else:
917 return QCoreApplication.translate(
918 "MqttConnackMessage", "Connection Refused: unknown reason."
919 )
920
921
922 def mqttErrorMessage(mqttErrno): 762 def mqttErrorMessage(mqttErrno):
923 """ 763 """
924 Module function to get the error string associated with an MQTT error 764 Module function to get the error string associated with an MQTT error
925 number. 765 number.
926 766

eric ide

mercurial