MqttMonitor/MqttClient.py

branch
eric7
changeset 92
2fb5c08019fd
parent 84
044df16e55aa
child 95
d830314cca87
equal deleted inserted replaced
91:3cb08e5db764 92:2fb5c08019fd
5 5
6 """ 6 """
7 Module implementing a PyQt wrapper around the paho MQTT client. 7 Module implementing a PyQt wrapper around the paho MQTT client.
8 """ 8 """
9 9
10 from PyQt5.QtCore import ( 10 from PyQt6.QtCore import (
11 pyqtSignal, pyqtSlot, QObject, QCoreApplication, QTimer 11 pyqtSignal, pyqtSlot, QObject, QCoreApplication, QTimer
12 ) 12 )
13 13
14 import paho.mqtt.client as mqtt 14 import paho.mqtt.client as mqtt
15 15
71 @param cleanSession flag indicating to start a clean session 71 @param cleanSession flag indicating to start a clean session
72 @type bool 72 @type bool
73 @param userdata user data 73 @param userdata user data
74 @type any 74 @type any
75 @param protocol version of the MQTT protocol to use 75 @param protocol version of the MQTT protocol to use
76 @type int, one of mqtt.MQTTv31 or mqtt.MQTTv311 76 @type int, one of mqtt.MQTTv3, mqtt.MQTTv311 or mqtt.MQTTv5
77 @param transport transport to be used 77 @param transport transport to be used
78 @type str, one of "tcp" or "websockets" 78 @type str, one of "tcp" or "websockets"
79 @param parent reference to the parent object 79 @param parent reference to the parent object
80 @type QObject 80 @type QObject
81 """ 81 """
89 MqttClient.DefaultConnectTimeout * 1000) 89 MqttClient.DefaultConnectTimeout * 1000)
90 self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout) 90 self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout)
91 91
92 self.onConnect.connect(self.__connectTimeoutTimer.stop) 92 self.onConnect.connect(self.__connectTimeoutTimer.stop)
93 93
94 # TODO: MQTTv5: set clean_session to None and remember cleanSession
94 self.__mqttClient = mqtt.Client( 95 self.__mqttClient = mqtt.Client(
95 client_id=clientId, clean_session=cleanSession, userdata=None, 96 client_id=clientId, clean_session=cleanSession, userdata=None,
96 protocol=mqtt.MQTTv311, transport="tcp") 97 protocol=mqtt.MQTTv311, transport="tcp")
97 98
98 self.__initCallbacks() 99 self.__initCallbacks()
99 100
100 def __initCallbacks(self): 101 def __initCallbacks(self):
101 """ 102 """
102 Private method to initialize the MQTT callback methods. 103 Private method to initialize the MQTT callback methods.
103 """ 104 """
105 # TODO: add properties to signal
106 # TODO: MQTTv5: add support for MQTTv5 signature
104 self.__mqttClient.on_connect = ( 107 self.__mqttClient.on_connect = (
105 lambda client, userdata, flags, rc: self.onConnect.emit( 108 lambda client, userdata, flags, rc, properties:
106 flags, rc)) 109 self.onConnect.emit(flags, rc))
110 # TODO: MQTTv5: add support for MQTTv5 signature
107 self.__mqttClient.on_disconnect = ( 111 self.__mqttClient.on_disconnect = (
108 lambda client, userdata, rc: self.onDisconnected.emit(rc)) 112 lambda client, userdata, rc: self.onDisconnected.emit(rc))
109 self.__mqttClient.on_log = ( 113 self.__mqttClient.on_log = (
110 lambda client, userdata, level, buf: self.onLog.emit(level, buf)) 114 lambda client, userdata, level, buf: self.onLog.emit(level, buf))
111 self.__mqttClient.on_message = ( 115 self.__mqttClient.on_message = (
112 lambda client, userdata, message: self.onMessage.emit( 116 lambda client, userdata, message: self.onMessage.emit(
113 message.topic, message.payload, message.qos, message.retain)) 117 message.topic, message.payload, message.qos, message.retain))
114 self.__mqttClient.on_publish = ( 118 self.__mqttClient.on_publish = (
115 lambda client, userdata, mid: self.onPublish.emit(mid)) 119 lambda client, userdata, mid: self.onPublish.emit(mid))
120 # TODO: add properties to signal
121 # TODO: MQTTv5: add support for MQTTv5 signature
116 self.__mqttClient.on_subscribe = ( 122 self.__mqttClient.on_subscribe = (
117 lambda client, userdata, mid, grantedQos: self.onSubscribe.emit( 123 lambda client, userdata, mid, grantedQos, properties:
118 mid, grantedQos)) 124 self.onSubscribe.emit(mid, grantedQos))
125 # TODO: MQTTv5: add support for MQTTv5 signature
119 self.__mqttClient.on_unsubscribe = ( 126 self.__mqttClient.on_unsubscribe = (
120 lambda client, userdata, mid: self.onUnsubscribe.emit(mid)) 127 lambda client, userdata, mid: self.onUnsubscribe.emit(mid))
121 128
122 @pyqtSlot() 129 @pyqtSlot()
123 def __connectTimeout(self): 130 def __connectTimeout(self):
190 @param userdata user data 197 @param userdata user data
191 @type any 198 @type any
192 """ 199 """
193 self.__mqttClient.user_data_set(userdata) 200 self.__mqttClient.user_data_set(userdata)
194 201
202 # TODO: MQTTv5: add support for properties
195 def setLastWill(self, topic, payload=None, qos=0, retain=False): 203 def setLastWill(self, topic, payload=None, qos=0, retain=False):
196 """ 204 """
197 Public method to set the last will of the client. 205 Public method to set the last will of the client.
198 206
199 @param topic topic the will message should be published on 207 @param topic topic the will message should be published on
273 trying to connect with the given parameters 281 trying to connect with the given parameters
274 @type bool 282 @type bool
275 """ 283 """
276 if reinit: 284 if reinit:
277 self.reinitialise() 285 self.reinitialise()
286 # TODO: MQTTv5: use 'clean_start' set to the remembered 'cleanSession'
287 # TODO: MQTTv5: add support for MQTTv5 properties
278 self.__mqttClient.connect_async( 288 self.__mqttClient.connect_async(
279 host, port=port, keepalive=keepalive, bind_address=bindAddress) 289 host, port=port, keepalive=keepalive, bind_address=bindAddress)
280 290
281 self.__connectTimeoutTimer.start() 291 self.__connectTimeoutTimer.start()
282 292
372 "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey", 382 "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey",
373 "ConnectionTimeout". 383 "ConnectionTimeout".
374 @rtype dict 384 @rtype dict
375 """ 385 """
376 return { 386 return {
377 "ClientId": "ERIC6_MQTT_MONITOR_CLIENT", 387 "ClientId": "ERIC7_MQTT_MONITOR_CLIENT",
378 "ConnectionTimeout": MqttClient.DefaultConnectTimeout, 388 "ConnectionTimeout": MqttClient.DefaultConnectTimeout,
379 "Keepalive": 60, 389 "Keepalive": 60,
380 "CleanSession": True, 390 "CleanSession": True,
381 "Username": "", 391 "Username": "",
382 "Password": "", 392 "Password": "",
405 """ 415 """
406 Public method to disconnect the client from the remote broker. 416 Public method to disconnect the client from the remote broker.
407 """ 417 """
408 self.__connectTimeoutTimer.stop() 418 self.__connectTimeoutTimer.stop()
409 419
420 # TODO: MQTTv5: add support for properties (?)
421 # TODO: MQTTv5: add support for reason code
410 self.__mqttClient.disconnect() 422 self.__mqttClient.disconnect()
411 423
424 # TODO: MQTTv5: add support for properties
425 # TODO: MQTTv5: add support for subscribe options
412 def subscribe(self, topic, qos=0): 426 def subscribe(self, topic, qos=0):
413 """ 427 """
414 Public method to subscribe to topics with quality of service. 428 Public method to subscribe to topics with quality of service.
415 429
416 @param topic single topic to subscribe to or a tuple with a topic 430 @param topic single topic to subscribe to or a tuple with a topic
421 @return tuple containing the result code and the message ID 435 @return tuple containing the result code and the message ID
422 @rtype tuple of (int, int) 436 @rtype tuple of (int, int)
423 """ 437 """
424 return self.__mqttClient.subscribe(topic, qos=qos) 438 return self.__mqttClient.subscribe(topic, qos=qos)
425 439
440 # TODO: MQTTv5: add support for properties (?)
426 def unsubscribe(self, topic): 441 def unsubscribe(self, topic):
427 """ 442 """
428 Public method to unsubscribe topics. 443 Public method to unsubscribe topics.
429 444
430 @param topic topic or list of topics to unsubscribe 445 @param topic topic or list of topics to unsubscribe
432 @return tuple containing the result code and the message ID 447 @return tuple containing the result code and the message ID
433 @rtype tuple of (int, int) 448 @rtype tuple of (int, int)
434 """ 449 """
435 return self.__mqttClient.unsubscribe(topic) 450 return self.__mqttClient.unsubscribe(topic)
436 451
452 # TODO: MQTTv5: add support for properties
437 def publish(self, topic, payload=None, qos=0, retain=False): 453 def publish(self, topic, payload=None, qos=0, retain=False):
438 """ 454 """
439 Public method to publish to a topic. 455 Public method to publish to a topic.
440 456
441 @param topic topic to publish to 457 @param topic topic to publish to
562 "Unknown error.") 578 "Unknown error.")
563 elif mqttErrno == mqtt.MQTT_ERR_ERRNO: 579 elif mqttErrno == mqtt.MQTT_ERR_ERRNO:
564 return QCoreApplication.translate( 580 return QCoreApplication.translate(
565 "MqttErrorMessage", 581 "MqttErrorMessage",
566 "Error defined by errno.") 582 "Error defined by errno.")
583 elif mqttErrno == mqtt.MQTT_ERR_QUEUE_SIZE:
584 return QCoreApplication.translate(
585 "MqttErrorMessage",
586 "Message queue full.")
567 else: 587 else:
568 return QCoreApplication.translate( 588 return QCoreApplication.translate(
569 "MqttErrorMessage", 589 "MqttErrorMessage",
570 "Unknown error.") 590 "Unknown error.")
571 591

eric ide

mercurial