MqttMonitor/MqttClient.py

changeset 31
40582e448c4b
parent 30
17ef10819773
child 43
a0853f7a8b80
equal deleted inserted replaced
30:17ef10819773 31:40582e448c4b
7 Module implementing a PyQt wrapper around the paho MQTT client. 7 Module implementing a PyQt wrapper around the paho MQTT client.
8 """ 8 """
9 9
10 from __future__ import unicode_literals 10 from __future__ import unicode_literals
11 11
12 from PyQt5.QtCore import pyqtSignal, QObject, QCoreApplication 12 from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject, QCoreApplication, \
13 QTimer
13 14
14 import paho.mqtt.client as mqtt 15 import paho.mqtt.client as mqtt
15 16
16 from Utilities.crypto import pwConvert 17 from Utilities.crypto import pwConvert
17 18
22 23
23 @signal onConnect(flags, rc) emitted after the client has connected to the 24 @signal onConnect(flags, rc) emitted after the client has connected to the
24 broker 25 broker
25 @signal onDisconnected(rc) emitted after the client has disconnected from 26 @signal onDisconnected(rc) emitted after the client has disconnected from
26 the broker 27 the broker
28 @signal onLog(level, message) emitted to send client log data
27 @signal onMessage(topic, payload, qos, retain) emitted after a message has 29 @signal onMessage(topic, payload, qos, retain) emitted after a message has
28 been received by the client 30 been received by the client
29 @signal onPublish(mid) emitted after a message has been published 31 @signal onPublish(mid) emitted after a message has been published
30 @signal onSubscribe(mid, grantedQos) emitted after the client has 32 @signal onSubscribe(mid, grantedQos) emitted after the client has
31 subscribed to some topics 33 subscribed to some topics
32 @signal onUnsubscribe(mid) emitted after the client has unsubscribed from 34 @signal onUnsubscribe(mid) emitted after the client has unsubscribed from
33 some topics 35 some topics
34 """ 36 """
35 onConnect = pyqtSignal(dict, int) 37 onConnect = pyqtSignal(dict, int)
36 onDisconnected = pyqtSignal(int) 38 onDisconnected = pyqtSignal(int)
39 onLog = pyqtSignal(int, str)
37 onMessage = pyqtSignal(str, bytes, int, bool) 40 onMessage = pyqtSignal(str, bytes, int, bool)
38 onPublish = pyqtSignal(int) 41 onPublish = pyqtSignal(int)
39 onSubscribe = pyqtSignal(int, tuple) 42 onSubscribe = pyqtSignal(int, tuple)
40 onUnsubscribe = pyqtSignal(int) 43 onUnsubscribe = pyqtSignal(int)
44
45 connectTimeout = pyqtSignal()
46
47 DefaultConnectTimeout = 15 # connect timeout in seconds
48
49 LogDebug = 0x01
50 LogInfo = 0x02
51 LogNotice = 0x04
52 LogWarning = 0x08
53 LogError = 0x10
54 LogLevelMap = {
55 mqtt.MQTT_LOG_DEBUG: LogDebug,
56 mqtt.MQTT_LOG_INFO: LogInfo,
57 mqtt.MQTT_LOG_NOTICE: LogNotice,
58 mqtt.MQTT_LOG_WARNING: LogWarning, # __NO-TASK__
59 mqtt.MQTT_LOG_ERR: LogError,
60 }
41 61
42 def __init__(self, clientId="", cleanSession=True, userdata=None, 62 def __init__(self, clientId="", cleanSession=True, userdata=None,
43 protocol=mqtt.MQTTv311, transport="tcp", parent=None): 63 protocol=mqtt.MQTTv311, transport="tcp", parent=None):
44 """ 64 """
45 Constructor 65 Constructor
59 """ 79 """
60 QObject.__init__(self, parent=parent) 80 QObject.__init__(self, parent=parent)
61 81
62 self.__loopStarted = False 82 self.__loopStarted = False
63 83
84 self.__connectTimeoutTimer = QTimer(self)
85 self.__connectTimeoutTimer.setSingleShot(True)
86 self.__connectTimeoutTimer.setInterval(
87 MqttClient.DefaultConnectTimeout * 1000)
88 self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout)
89
90 self.onConnect.connect(self.__connectTimeoutTimer.stop)
91
64 self.__mqttClient = mqtt.Client( 92 self.__mqttClient = mqtt.Client(
65 client_id=clientId, clean_session=cleanSession, userdata=None, 93 client_id=clientId, clean_session=cleanSession, userdata=None,
66 protocol=mqtt.MQTTv311, transport="tcp") 94 protocol=mqtt.MQTTv311, transport="tcp")
67 95
68 self.__initCallbacks() 96 self.__initCallbacks()
74 self.__mqttClient.on_connect = \ 102 self.__mqttClient.on_connect = \
75 lambda client, userdata, flags, rc: self.onConnect.emit( 103 lambda client, userdata, flags, rc: self.onConnect.emit(
76 flags, rc) 104 flags, rc)
77 self.__mqttClient.on_disconnect = \ 105 self.__mqttClient.on_disconnect = \
78 lambda client, userdata, rc: self.onDisconnected.emit(rc) 106 lambda client, userdata, rc: self.onDisconnected.emit(rc)
107 self.__mqttClient.on_log = \
108 lambda client, userdata, level, buf: self.onLog.emit(level, buf)
79 self.__mqttClient.on_message = \ 109 self.__mqttClient.on_message = \
80 lambda client, userdata, message: self.onMessage.emit( 110 lambda client, userdata, message: self.onMessage.emit(
81 message.topic, message.payload, message.qos, message.retain) 111 message.topic, message.payload, message.qos, message.retain)
82 self.__mqttClient.on_publish = \ 112 self.__mqttClient.on_publish = \
83 lambda client, userdata, mid: self.onPublish.emit(mid) 113 lambda client, userdata, mid: self.onPublish.emit(mid)
85 lambda client, userdata, mid, grantedQos: self.onSubscribe.emit( 115 lambda client, userdata, mid, grantedQos: self.onSubscribe.emit(
86 mid, grantedQos) 116 mid, grantedQos)
87 self.__mqttClient.on_unsubscribe = \ 117 self.__mqttClient.on_unsubscribe = \
88 lambda client, userdata, mid: self.onUnsubscribe.emit(mid) 118 lambda client, userdata, mid: self.onUnsubscribe.emit(mid)
89 119
120 @pyqtSlot()
121 def __connectTimeout(self):
122 """
123 Privat slot handling a failed connection attempt.
124 """
125 self.stopLoop()
126 self.connectTimeout.emit()
127
90 def reinitialise(self, clientId="", cleanSession=True, userdata=None): 128 def reinitialise(self, clientId="", cleanSession=True, userdata=None):
91 """ 129 """
92 Public method to reinitialize the client with given data. 130 Public method to reinitialize the client with given data.
93 131
94 @param clientId ID to be used for the client 132 @param clientId ID to be used for the client
100 """ 138 """
101 self.__mqttClient.reinitialise( 139 self.__mqttClient.reinitialise(
102 client_id=clientId, clean_session=cleanSession, userdata=userdata) 140 client_id=clientId, clean_session=cleanSession, userdata=userdata)
103 141
104 self.__initCallbacks() 142 self.__initCallbacks()
143
144 def setConnectionTimeout(self, timeout):
145 """
146 Public method to set the connection timeout value.
147
148 @param timeout timeout value to be set in seconds
149 @type int
150 """
151 self.__connectTimeoutTimer.setInterval(timeout * 1000)
105 152
106 def setMaxInflightMessages(self, inflight=20): 153 def setMaxInflightMessages(self, inflight=20):
107 """ 154 """
108 Public method to set the maximum number of messages with QoS > 0 that 155 Public method to set the maximum number of messages with QoS > 0 that
109 can be part way through their network flow at once. 156 can be part way through their network flow at once.
200 Public method to stop the MQTT client loop. 247 Public method to stop the MQTT client loop.
201 """ 248 """
202 self.__mqttClient.loop_stop() 249 self.__mqttClient.loop_stop()
203 self.__loopStarted = False 250 self.__loopStarted = False
204 251
205 def connectToServer(self, host, port=1883, keepalive=60, bindAddress=""): 252 def connectToServer(self, host, port=1883, keepalive=60, bindAddress="",
253 reinit=True):
206 """ 254 """
207 Public method to connect to a remote MQTT broker. 255 Public method to connect to a remote MQTT broker.
208 256
209 @param host host name or IP address of the remote broker 257 @param host host name or IP address of the remote broker
210 @type str 258 @type str
216 @type int 264 @type int
217 @param bindAddress IP address of a local network interface to bind 265 @param bindAddress IP address of a local network interface to bind
218 this client to 266 this client to
219 @type str 267 @type str
220 """ 268 """
221 # TODO: get this fixed or allow to interrupt 269 if reinit:
222 self.__mqttClient.reconnect_delay_set(max_delay=16) 270 self.reinitialise()
223 self.__mqttClient.connect_async( 271 self.__mqttClient.connect_async(
224 host, port=port, keepalive=keepalive, bind_address=bindAddress) 272 host, port=port, keepalive=keepalive, bind_address=bindAddress)
273
274 self.__connectTimeoutTimer.start()
225 275
226 if not self.__loopStarted: 276 if not self.__loopStarted:
227 self.startLoop() 277 self.startLoop()
228 278
229 def connectToServerWithOptions(self, host, port=1883, bindAddress="", 279 def connectToServerWithOptions(self, host, port=1883, bindAddress="",
241 @type str 291 @type str
242 @param options dictionary containing the connection options. This 292 @param options dictionary containing the connection options. This
243 dictionary should contain the keys "ClientId", "Keepalive", 293 dictionary should contain the keys "ClientId", "Keepalive",
244 "CleanSession", "Username", "Password", "WillTopic", "WillMessage", 294 "CleanSession", "Username", "Password", "WillTopic", "WillMessage",
245 "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", "TlsClientCert", 295 "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", "TlsClientCert",
246 "TlsClientKey" 296 "TlsClientKey", "ConnectionTimeout"
247 @type dict 297 @type dict
248 """ 298 """
249 if options: 299 if options:
250 parametersDict = self.defaultConnectionOptions() 300 parametersDict = self.defaultConnectionOptions()
251 parametersDict.update(options) 301 parametersDict.update(options)
253 # step 1: reinitialize to set the client ID and clean session flag 303 # step 1: reinitialize to set the client ID and clean session flag
254 self.reinitialise( 304 self.reinitialise(
255 clientId=parametersDict["ClientId"], 305 clientId=parametersDict["ClientId"],
256 cleanSession=parametersDict["CleanSession"] 306 cleanSession=parametersDict["CleanSession"]
257 ) 307 )
308 self.setConnectionTimeout(parametersDict["ConnectionTimeout"])
258 309
259 # step 2: set username and password 310 # step 2: set username and password
260 if parametersDict["Username"]: 311 if parametersDict["Username"]:
261 if parametersDict["Password"]: 312 if parametersDict["Password"]:
262 self.setUserCredentials( 313 self.setUserCredentials(
292 # use default TLS configuration 343 # use default TLS configuration
293 self.setTLS() 344 self.setTLS()
294 345
295 # step 5: connect to server 346 # step 5: connect to server
296 self.connectToServer(host, port=port, 347 self.connectToServer(host, port=port,
297 keepalive=parametersDict["Keepalive"]) 348 keepalive=parametersDict["Keepalive"],
349 reinit=False)
298 else: 350 else:
299 keepalive = self.defaultConnectionOptions["Keepalive"] 351 keepalive = self.defaultConnectionOptions["Keepalive"]
300 self.connectToServer(host, port=port, keepalive=keepalive, 352 self.connectToServer(host, port=port, keepalive=keepalive,
301 bindAddress=bindAddress) 353 bindAddress=bindAddress)
302 354
306 values. 358 values.
307 359
308 @return dictionary containing the default connection options. It has 360 @return dictionary containing the default connection options. It has
309 the keys "ClientId", "Keepalive", "CleanSession", "Username", 361 the keys "ClientId", "Keepalive", "CleanSession", "Username",
310 "Password", "WillTopic", "WillMessage", "WillQos", "WillRetain", 362 "Password", "WillTopic", "WillMessage", "WillQos", "WillRetain",
311 "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey". 363 "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey",
364 "ConnectionTimeout".
312 @rtype dict 365 @rtype dict
313 """ 366 """
314 return { 367 return {
315 "ClientId": "ERIC6_MQTT_MONITOR_CLIENT", 368 "ClientId": "ERIC6_MQTT_MONITOR_CLIENT",
369 "ConnectionTimeout": MqttClient.DefaultConnectTimeout,
316 "Keepalive": 60, 370 "Keepalive": 60,
317 "CleanSession": True, 371 "CleanSession": True,
318 "Username": "", 372 "Username": "",
319 "Password": "", 373 "Password": "",
320 "WillTopic": "", 374 "WillTopic": "",
329 383
330 def reconnectToServer(self): 384 def reconnectToServer(self):
331 """ 385 """
332 Public method to reconnect the client with the same parameters. 386 Public method to reconnect the client with the same parameters.
333 """ 387 """
388 self.__connectTimeoutTimer.start()
389
334 self.__mqttClient.reconnect() 390 self.__mqttClient.reconnect()
335 391
336 if not self.__loopStarted: 392 if not self.__loopStarted:
337 self.startLoop() 393 self.startLoop()
338 394
339 def disconnectFromServer(self): 395 def disconnectFromServer(self):
340 """ 396 """
341 Public method to disconnect the client from the remote broker. 397 Public method to disconnect the client from the remote broker.
342 """ 398 """
399 self.__connectTimeoutTimer.stop()
400
343 self.__mqttClient.disconnect() 401 self.__mqttClient.disconnect()
344 402
345 def subscribe(self, topic, qos=0): 403 def subscribe(self, topic, qos=0):
346 """ 404 """
347 Public method to subscribe to topics with quality of service. 405 Public method to subscribe to topics with quality of service.
387 retain=retain) 445 retain=retain)
388 446
389 447
390 def mqttConnackMessage(connackCode): 448 def mqttConnackMessage(connackCode):
391 """ 449 """
392 Public method to get the string associated with a CONNACK result. 450 Module function to get the string associated with a CONNACK result.
393 451
394 @param connackCode result code of the connection request 452 @param connackCode result code of the connection request
395 @type int 453 @type int
396 @return textual representation for the result code 454 @return textual representation for the result code
397 @rtype str 455 @rtype str
426 "Connection Refused: unknown reason.") 484 "Connection Refused: unknown reason.")
427 485
428 486
429 def mqttErrorMessage(mqttErrno): 487 def mqttErrorMessage(mqttErrno):
430 """ 488 """
431 Public method to get the error string associated with an MQTT error 489 Module function to get the error string associated with an MQTT error
432 number. 490 number.
433 491
434 @param mqttErrno result code of a MQTT request 492 @param mqttErrno result code of a MQTT request
435 @type int 493 @type int
436 @return textual representation of the result code 494 @return textual representation of the result code
499 "Error defined by errno.") 557 "Error defined by errno.")
500 else: 558 else:
501 return QCoreApplication.translate( 559 return QCoreApplication.translate(
502 "MqttErrorMessage", 560 "MqttErrorMessage",
503 "Unknown error.") 561 "Unknown error.")
562
563
564 def mqttLogLevelString(mqttLogLevel, isMqttLogLevel=True):
565 """
566 Module function to get the log level string associated with a log level.
567
568 @param mqttLogLevel log level of the paho-mqtt client
569 @type int
570 @param isMqttLogLevel flag indicating a MQTT log level is given (if
571 False it is the MqttClient variant, i.e. Debug being lowest)
572 @type bool
573 @return textual representation of the log level
574 @rtype str
575 """
576 if isMqttLogLevel:
577 try:
578 logLevel = MqttClient.LogLevelMap[mqttLogLevel]
579 except KeyError:
580 return QCoreApplication.translate("MqttLogLevelString", "Unknown")
581 else:
582 logLevel = mqttLogLevel
583
584 if logLevel == MqttClient.LogInfo:
585 return QCoreApplication.translate("MqttLogLevelString", "Info")
586 elif logLevel == MqttClient.LogNotice:
587 return QCoreApplication.translate("MqttLogLevelString", "Notice")
588 elif logLevel == MqttClient.LogWarning:
589 return QCoreApplication.translate("MqttLogLevelString", "Warning")
590 elif logLevel == MqttClient.LogError:
591 return QCoreApplication.translate("MqttLogLevelString", "Error")
592 elif logLevel == MqttClient.LogDebug:
593 return QCoreApplication.translate("MqttLogLevelString", "Debug")
594 else:
595 return QCoreApplication.translate("MqttLogLevelString", "Unknown")

eric ide

mercurial