7 Module implementing a PyQt wrapper around the paho MQTT client. |
7 Module implementing a PyQt wrapper around the paho MQTT client. |
8 """ |
8 """ |
9 |
9 |
10 from __future__ import unicode_literals |
10 from __future__ import unicode_literals |
11 |
11 |
12 from PyQt5.QtCore import pyqtSignal, QObject, QCoreApplication |
12 from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject, QCoreApplication, \ |
|
13 QTimer |
13 |
14 |
14 import paho.mqtt.client as mqtt |
15 import paho.mqtt.client as mqtt |
15 |
16 |
16 from Utilities.crypto import pwConvert |
17 from Utilities.crypto import pwConvert |
17 |
18 |
22 |
23 |
23 @signal onConnect(flags, rc) emitted after the client has connected to the |
24 @signal onConnect(flags, rc) emitted after the client has connected to the |
24 broker |
25 broker |
25 @signal onDisconnected(rc) emitted after the client has disconnected from |
26 @signal onDisconnected(rc) emitted after the client has disconnected from |
26 the broker |
27 the broker |
|
28 @signal onLog(level, message) emitted to send client log data |
27 @signal onMessage(topic, payload, qos, retain) emitted after a message has |
29 @signal onMessage(topic, payload, qos, retain) emitted after a message has |
28 been received by the client |
30 been received by the client |
29 @signal onPublish(mid) emitted after a message has been published |
31 @signal onPublish(mid) emitted after a message has been published |
30 @signal onSubscribe(mid, grantedQos) emitted after the client has |
32 @signal onSubscribe(mid, grantedQos) emitted after the client has |
31 subscribed to some topics |
33 subscribed to some topics |
32 @signal onUnsubscribe(mid) emitted after the client has unsubscribed from |
34 @signal onUnsubscribe(mid) emitted after the client has unsubscribed from |
33 some topics |
35 some topics |
34 """ |
36 """ |
35 onConnect = pyqtSignal(dict, int) |
37 onConnect = pyqtSignal(dict, int) |
36 onDisconnected = pyqtSignal(int) |
38 onDisconnected = pyqtSignal(int) |
|
39 onLog = pyqtSignal(int, str) |
37 onMessage = pyqtSignal(str, bytes, int, bool) |
40 onMessage = pyqtSignal(str, bytes, int, bool) |
38 onPublish = pyqtSignal(int) |
41 onPublish = pyqtSignal(int) |
39 onSubscribe = pyqtSignal(int, tuple) |
42 onSubscribe = pyqtSignal(int, tuple) |
40 onUnsubscribe = pyqtSignal(int) |
43 onUnsubscribe = pyqtSignal(int) |
|
44 |
|
45 connectTimeout = pyqtSignal() |
|
46 |
|
47 DefaultConnectTimeout = 15 # connect timeout in seconds |
|
48 |
|
49 LogDebug = 0x01 |
|
50 LogInfo = 0x02 |
|
51 LogNotice = 0x04 |
|
52 LogWarning = 0x08 |
|
53 LogError = 0x10 |
|
54 LogLevelMap = { |
|
55 mqtt.MQTT_LOG_DEBUG: LogDebug, |
|
56 mqtt.MQTT_LOG_INFO: LogInfo, |
|
57 mqtt.MQTT_LOG_NOTICE: LogNotice, |
|
58 mqtt.MQTT_LOG_WARNING: LogWarning, # __NO-TASK__ |
|
59 mqtt.MQTT_LOG_ERR: LogError, |
|
60 } |
41 |
61 |
42 def __init__(self, clientId="", cleanSession=True, userdata=None, |
62 def __init__(self, clientId="", cleanSession=True, userdata=None, |
43 protocol=mqtt.MQTTv311, transport="tcp", parent=None): |
63 protocol=mqtt.MQTTv311, transport="tcp", parent=None): |
44 """ |
64 """ |
45 Constructor |
65 Constructor |
59 """ |
79 """ |
60 QObject.__init__(self, parent=parent) |
80 QObject.__init__(self, parent=parent) |
61 |
81 |
62 self.__loopStarted = False |
82 self.__loopStarted = False |
63 |
83 |
|
84 self.__connectTimeoutTimer = QTimer(self) |
|
85 self.__connectTimeoutTimer.setSingleShot(True) |
|
86 self.__connectTimeoutTimer.setInterval( |
|
87 MqttClient.DefaultConnectTimeout * 1000) |
|
88 self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout) |
|
89 |
|
90 self.onConnect.connect(self.__connectTimeoutTimer.stop) |
|
91 |
64 self.__mqttClient = mqtt.Client( |
92 self.__mqttClient = mqtt.Client( |
65 client_id=clientId, clean_session=cleanSession, userdata=None, |
93 client_id=clientId, clean_session=cleanSession, userdata=None, |
66 protocol=mqtt.MQTTv311, transport="tcp") |
94 protocol=mqtt.MQTTv311, transport="tcp") |
67 |
95 |
68 self.__initCallbacks() |
96 self.__initCallbacks() |
74 self.__mqttClient.on_connect = \ |
102 self.__mqttClient.on_connect = \ |
75 lambda client, userdata, flags, rc: self.onConnect.emit( |
103 lambda client, userdata, flags, rc: self.onConnect.emit( |
76 flags, rc) |
104 flags, rc) |
77 self.__mqttClient.on_disconnect = \ |
105 self.__mqttClient.on_disconnect = \ |
78 lambda client, userdata, rc: self.onDisconnected.emit(rc) |
106 lambda client, userdata, rc: self.onDisconnected.emit(rc) |
|
107 self.__mqttClient.on_log = \ |
|
108 lambda client, userdata, level, buf: self.onLog.emit(level, buf) |
79 self.__mqttClient.on_message = \ |
109 self.__mqttClient.on_message = \ |
80 lambda client, userdata, message: self.onMessage.emit( |
110 lambda client, userdata, message: self.onMessage.emit( |
81 message.topic, message.payload, message.qos, message.retain) |
111 message.topic, message.payload, message.qos, message.retain) |
82 self.__mqttClient.on_publish = \ |
112 self.__mqttClient.on_publish = \ |
83 lambda client, userdata, mid: self.onPublish.emit(mid) |
113 lambda client, userdata, mid: self.onPublish.emit(mid) |
85 lambda client, userdata, mid, grantedQos: self.onSubscribe.emit( |
115 lambda client, userdata, mid, grantedQos: self.onSubscribe.emit( |
86 mid, grantedQos) |
116 mid, grantedQos) |
87 self.__mqttClient.on_unsubscribe = \ |
117 self.__mqttClient.on_unsubscribe = \ |
88 lambda client, userdata, mid: self.onUnsubscribe.emit(mid) |
118 lambda client, userdata, mid: self.onUnsubscribe.emit(mid) |
89 |
119 |
|
120 @pyqtSlot() |
|
121 def __connectTimeout(self): |
|
122 """ |
|
123 Privat slot handling a failed connection attempt. |
|
124 """ |
|
125 self.stopLoop() |
|
126 self.connectTimeout.emit() |
|
127 |
90 def reinitialise(self, clientId="", cleanSession=True, userdata=None): |
128 def reinitialise(self, clientId="", cleanSession=True, userdata=None): |
91 """ |
129 """ |
92 Public method to reinitialize the client with given data. |
130 Public method to reinitialize the client with given data. |
93 |
131 |
94 @param clientId ID to be used for the client |
132 @param clientId ID to be used for the client |
100 """ |
138 """ |
101 self.__mqttClient.reinitialise( |
139 self.__mqttClient.reinitialise( |
102 client_id=clientId, clean_session=cleanSession, userdata=userdata) |
140 client_id=clientId, clean_session=cleanSession, userdata=userdata) |
103 |
141 |
104 self.__initCallbacks() |
142 self.__initCallbacks() |
|
143 |
|
144 def setConnectionTimeout(self, timeout): |
|
145 """ |
|
146 Public method to set the connection timeout value. |
|
147 |
|
148 @param timeout timeout value to be set in seconds |
|
149 @type int |
|
150 """ |
|
151 self.__connectTimeoutTimer.setInterval(timeout * 1000) |
105 |
152 |
106 def setMaxInflightMessages(self, inflight=20): |
153 def setMaxInflightMessages(self, inflight=20): |
107 """ |
154 """ |
108 Public method to set the maximum number of messages with QoS > 0 that |
155 Public method to set the maximum number of messages with QoS > 0 that |
109 can be part way through their network flow at once. |
156 can be part way through their network flow at once. |
200 Public method to stop the MQTT client loop. |
247 Public method to stop the MQTT client loop. |
201 """ |
248 """ |
202 self.__mqttClient.loop_stop() |
249 self.__mqttClient.loop_stop() |
203 self.__loopStarted = False |
250 self.__loopStarted = False |
204 |
251 |
205 def connectToServer(self, host, port=1883, keepalive=60, bindAddress=""): |
252 def connectToServer(self, host, port=1883, keepalive=60, bindAddress="", |
|
253 reinit=True): |
206 """ |
254 """ |
207 Public method to connect to a remote MQTT broker. |
255 Public method to connect to a remote MQTT broker. |
208 |
256 |
209 @param host host name or IP address of the remote broker |
257 @param host host name or IP address of the remote broker |
210 @type str |
258 @type str |
216 @type int |
264 @type int |
217 @param bindAddress IP address of a local network interface to bind |
265 @param bindAddress IP address of a local network interface to bind |
218 this client to |
266 this client to |
219 @type str |
267 @type str |
220 """ |
268 """ |
221 # TODO: get this fixed or allow to interrupt |
269 if reinit: |
222 self.__mqttClient.reconnect_delay_set(max_delay=16) |
270 self.reinitialise() |
223 self.__mqttClient.connect_async( |
271 self.__mqttClient.connect_async( |
224 host, port=port, keepalive=keepalive, bind_address=bindAddress) |
272 host, port=port, keepalive=keepalive, bind_address=bindAddress) |
|
273 |
|
274 self.__connectTimeoutTimer.start() |
225 |
275 |
226 if not self.__loopStarted: |
276 if not self.__loopStarted: |
227 self.startLoop() |
277 self.startLoop() |
228 |
278 |
229 def connectToServerWithOptions(self, host, port=1883, bindAddress="", |
279 def connectToServerWithOptions(self, host, port=1883, bindAddress="", |
241 @type str |
291 @type str |
242 @param options dictionary containing the connection options. This |
292 @param options dictionary containing the connection options. This |
243 dictionary should contain the keys "ClientId", "Keepalive", |
293 dictionary should contain the keys "ClientId", "Keepalive", |
244 "CleanSession", "Username", "Password", "WillTopic", "WillMessage", |
294 "CleanSession", "Username", "Password", "WillTopic", "WillMessage", |
245 "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", "TlsClientCert", |
295 "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", "TlsClientCert", |
246 "TlsClientKey" |
296 "TlsClientKey", "ConnectionTimeout" |
247 @type dict |
297 @type dict |
248 """ |
298 """ |
249 if options: |
299 if options: |
250 parametersDict = self.defaultConnectionOptions() |
300 parametersDict = self.defaultConnectionOptions() |
251 parametersDict.update(options) |
301 parametersDict.update(options) |
253 # step 1: reinitialize to set the client ID and clean session flag |
303 # step 1: reinitialize to set the client ID and clean session flag |
254 self.reinitialise( |
304 self.reinitialise( |
255 clientId=parametersDict["ClientId"], |
305 clientId=parametersDict["ClientId"], |
256 cleanSession=parametersDict["CleanSession"] |
306 cleanSession=parametersDict["CleanSession"] |
257 ) |
307 ) |
|
308 self.setConnectionTimeout(parametersDict["ConnectionTimeout"]) |
258 |
309 |
259 # step 2: set username and password |
310 # step 2: set username and password |
260 if parametersDict["Username"]: |
311 if parametersDict["Username"]: |
261 if parametersDict["Password"]: |
312 if parametersDict["Password"]: |
262 self.setUserCredentials( |
313 self.setUserCredentials( |
292 # use default TLS configuration |
343 # use default TLS configuration |
293 self.setTLS() |
344 self.setTLS() |
294 |
345 |
295 # step 5: connect to server |
346 # step 5: connect to server |
296 self.connectToServer(host, port=port, |
347 self.connectToServer(host, port=port, |
297 keepalive=parametersDict["Keepalive"]) |
348 keepalive=parametersDict["Keepalive"], |
|
349 reinit=False) |
298 else: |
350 else: |
299 keepalive = self.defaultConnectionOptions["Keepalive"] |
351 keepalive = self.defaultConnectionOptions["Keepalive"] |
300 self.connectToServer(host, port=port, keepalive=keepalive, |
352 self.connectToServer(host, port=port, keepalive=keepalive, |
301 bindAddress=bindAddress) |
353 bindAddress=bindAddress) |
302 |
354 |
306 values. |
358 values. |
307 |
359 |
308 @return dictionary containing the default connection options. It has |
360 @return dictionary containing the default connection options. It has |
309 the keys "ClientId", "Keepalive", "CleanSession", "Username", |
361 the keys "ClientId", "Keepalive", "CleanSession", "Username", |
310 "Password", "WillTopic", "WillMessage", "WillQos", "WillRetain", |
362 "Password", "WillTopic", "WillMessage", "WillQos", "WillRetain", |
311 "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey". |
363 "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey", |
|
364 "ConnectionTimeout". |
312 @rtype dict |
365 @rtype dict |
313 """ |
366 """ |
314 return { |
367 return { |
315 "ClientId": "ERIC6_MQTT_MONITOR_CLIENT", |
368 "ClientId": "ERIC6_MQTT_MONITOR_CLIENT", |
|
369 "ConnectionTimeout": MqttClient.DefaultConnectTimeout, |
316 "Keepalive": 60, |
370 "Keepalive": 60, |
317 "CleanSession": True, |
371 "CleanSession": True, |
318 "Username": "", |
372 "Username": "", |
319 "Password": "", |
373 "Password": "", |
320 "WillTopic": "", |
374 "WillTopic": "", |
329 |
383 |
330 def reconnectToServer(self): |
384 def reconnectToServer(self): |
331 """ |
385 """ |
332 Public method to reconnect the client with the same parameters. |
386 Public method to reconnect the client with the same parameters. |
333 """ |
387 """ |
|
388 self.__connectTimeoutTimer.start() |
|
389 |
334 self.__mqttClient.reconnect() |
390 self.__mqttClient.reconnect() |
335 |
391 |
336 if not self.__loopStarted: |
392 if not self.__loopStarted: |
337 self.startLoop() |
393 self.startLoop() |
338 |
394 |
339 def disconnectFromServer(self): |
395 def disconnectFromServer(self): |
340 """ |
396 """ |
341 Public method to disconnect the client from the remote broker. |
397 Public method to disconnect the client from the remote broker. |
342 """ |
398 """ |
|
399 self.__connectTimeoutTimer.stop() |
|
400 |
343 self.__mqttClient.disconnect() |
401 self.__mqttClient.disconnect() |
344 |
402 |
345 def subscribe(self, topic, qos=0): |
403 def subscribe(self, topic, qos=0): |
346 """ |
404 """ |
347 Public method to subscribe to topics with quality of service. |
405 Public method to subscribe to topics with quality of service. |
499 "Error defined by errno.") |
557 "Error defined by errno.") |
500 else: |
558 else: |
501 return QCoreApplication.translate( |
559 return QCoreApplication.translate( |
502 "MqttErrorMessage", |
560 "MqttErrorMessage", |
503 "Unknown error.") |
561 "Unknown error.") |
|
562 |
|
563 |
|
564 def mqttLogLevelString(mqttLogLevel, isMqttLogLevel=True): |
|
565 """ |
|
566 Module function to get the log level string associated with a log level. |
|
567 |
|
568 @param mqttLogLevel log level of the paho-mqtt client |
|
569 @type int |
|
570 @param isMqttLogLevel flag indicating a MQTT log level is given (if |
|
571 False it is the MqttClient variant, i.e. Debug being lowest) |
|
572 @type bool |
|
573 @return textual representation of the log level |
|
574 @rtype str |
|
575 """ |
|
576 if isMqttLogLevel: |
|
577 try: |
|
578 logLevel = MqttClient.LogLevelMap[mqttLogLevel] |
|
579 except KeyError: |
|
580 return QCoreApplication.translate("MqttLogLevelString", "Unknown") |
|
581 else: |
|
582 logLevel = mqttLogLevel |
|
583 |
|
584 if logLevel == MqttClient.LogInfo: |
|
585 return QCoreApplication.translate("MqttLogLevelString", "Info") |
|
586 elif logLevel == MqttClient.LogNotice: |
|
587 return QCoreApplication.translate("MqttLogLevelString", "Notice") |
|
588 elif logLevel == MqttClient.LogWarning: |
|
589 return QCoreApplication.translate("MqttLogLevelString", "Warning") |
|
590 elif logLevel == MqttClient.LogError: |
|
591 return QCoreApplication.translate("MqttLogLevelString", "Error") |
|
592 elif logLevel == MqttClient.LogDebug: |
|
593 return QCoreApplication.translate("MqttLogLevelString", "Debug") |
|
594 else: |
|
595 return QCoreApplication.translate("MqttLogLevelString", "Unknown") |