MqttMonitor/MqttClient.py

branch
eric7
changeset 102
70b8858199f5
parent 101
0eae5f616154
child 103
5fe4f179975f
equal deleted inserted replaced
101:0eae5f616154 102:70b8858199f5
13 pyqtSignal, pyqtSlot, QObject, QCoreApplication, QTimer 13 pyqtSignal, pyqtSlot, QObject, QCoreApplication, QTimer
14 ) 14 )
15 15
16 import paho.mqtt.client as mqtt 16 import paho.mqtt.client as mqtt
17 from paho.mqtt.packettypes import PacketTypes 17 from paho.mqtt.packettypes import PacketTypes
18 from paho.mqtt.properties import Properties
18 19
19 from Utilities.crypto import pwConvert 20 from Utilities.crypto import pwConvert
20 21
21 22
22 class MqttProtocols(enum.IntEnum): 23 class MqttProtocols(enum.IntEnum):
31 class MqttClient(QObject): 32 class MqttClient(QObject):
32 """ 33 """
33 Class implementing a PyQt wrapper around the paho MQTT client. 34 Class implementing a PyQt wrapper around the paho MQTT client.
34 35
35 @signal onConnectV3(flags, rc) emitted after the client has connected to 36 @signal onConnectV3(flags, rc) emitted after the client has connected to
36 the broker 37 the broker (MQTT v3)
38 @signal onConnectV5(flags, rc, packetType, properties emitted after the
39 client has connected to the broker (MQTT v5)
37 @signal onDisconnectedV3(rc) emitted after the client has disconnected from 40 @signal onDisconnectedV3(rc) emitted after the client has disconnected from
38 the broker 41 the broker (MQTT v3)
42 @signal onDisconnectedV5(rc, packetType) emitted after the client has
43 disconnected from the broker (MQTT v5)
39 @signal onLog(level, message) emitted to send client log data 44 @signal onLog(level, message) emitted to send client log data
40 @signal onMessageV3(topic, payload, qos, retain) emitted after a message 45 @signal onMessageV3(topic, payload, qos, retain) emitted after a message
41 has been received by the client 46 has been received by the client (MQTT v3)
47 @signal onMessageV5(topic, payload, qos, retain, properties) emitted after
48 a message has been received by the client (MQTT v5)
42 @signal onPublish(mid) emitted after a message has been published 49 @signal onPublish(mid) emitted after a message has been published
43 @signal onSubscribeV3(mid, grantedQos) emitted after the client has 50 @signal onSubscribeV3(mid, grantedQos) emitted after the client has
44 subscribed to some topics 51 subscribed to some topics (MQTT v3)
52 @signal onSubscribeV5(mid, reasonCodes, properties) emitted after the
53 client has subscribed to some topics (MQTT v5)
45 @signal onUnsubscribeV3(mid) emitted after the client has unsubscribed from 54 @signal onUnsubscribeV3(mid) emitted after the client has unsubscribed from
46 some topics 55 some topics (MQTT v3)
56 @signal onUnsubscribeV5(mid, rc, packetType, properties) emitted after the
57 client has unsubscribed from some topics (MQTT v5)
47 @signal connectTimeout() emitted to indicate, that a connection attempt 58 @signal connectTimeout() emitted to indicate, that a connection attempt
48 timed out 59 timed out
49 """ 60 """
50 onConnectV3 = pyqtSignal(dict, int) 61 onConnectV3 = pyqtSignal(dict, int)
51 onConnectV5 = pyqtSignal(dict, int, int) 62 onConnectV5 = pyqtSignal(dict, int, int, dict)
52 onDisconnectedV3 = pyqtSignal(int) 63 onDisconnectedV3 = pyqtSignal(int)
53 onDisconnectedV5 = pyqtSignal(int, int) 64 onDisconnectedV5 = pyqtSignal(int, int)
54 onLog = pyqtSignal(int, str) 65 onLog = pyqtSignal(int, str)
55 onMessageV3 = pyqtSignal(str, bytes, int, bool) 66 onMessageV3 = pyqtSignal(str, bytes, int, bool)
56 onMessageV5 = pyqtSignal(str, bytes, int, bool, dict) 67 onMessageV5 = pyqtSignal(str, bytes, int, bool, dict)
57 onPublish = pyqtSignal(int) 68 onPublish = pyqtSignal(int)
58 onSubscribeV3 = pyqtSignal(int, tuple) 69 onSubscribeV3 = pyqtSignal(int, tuple)
59 onSubscribeV5 = pyqtSignal(int, list) 70 onSubscribeV5 = pyqtSignal(int, list, dict)
60 onUnsubscribeV3 = pyqtSignal(int) 71 onUnsubscribeV3 = pyqtSignal(int)
61 onUnsubscribeV5 = pyqtSignal(int, int, int) 72 onUnsubscribeV5 = pyqtSignal(int, int, int, dict)
62 73
63 connectTimeout = pyqtSignal() 74 connectTimeout = pyqtSignal()
64 75
65 DefaultConnectTimeout = 15 # connect timeout in seconds 76 DefaultConnectTimeout = 15 # connect timeout in seconds
66 77
149 lambda client, userdata, message: 160 lambda client, userdata, message:
150 self.onMessageV3.emit(message.topic, message.payload, 161 self.onMessageV3.emit(message.topic, message.payload,
151 message.qos, message.retain) 162 message.qos, message.retain)
152 ) 163 )
153 else: 164 else:
154 # TODO: add properties to signals
155 self.__mqttClient.on_connect = ( 165 self.__mqttClient.on_connect = (
156 lambda client, userdata, flags, rc, properties=None: 166 lambda client, userdata, flags, rc, properties=None:
157 self.onConnectV5.emit(flags, rc.value, rc.packetType) 167 self.onConnectV5.emit(
168 flags, rc.value, rc.packetType,
169 properties.json() if properties is not None else {}
170 )
158 ) 171 )
159 self.__mqttClient.on_disconnect = self.__onDisconnectedV5 172 self.__mqttClient.on_disconnect = self.__onDisconnectedV5
160 self.__mqttClient.on_subscribe = ( 173 self.__mqttClient.on_subscribe = (
161 lambda client, userdata, mid, reasonCodes, properties=None: 174 lambda client, userdata, mid, reasonCodes, properties=None:
162 self.onSubscribeV5.emit(mid, reasonCodes) 175 self.onSubscribeV5.emit(
176 mid, reasonCodes,
177 properties.json() if properties is not None else {}
178 )
163 ) 179 )
164 self.__mqttClient.on_unsubscribe = ( 180 self.__mqttClient.on_unsubscribe = (
165 lambda client, userdata, mid, properties, rc: 181 lambda client, userdata, mid, properties, rc:
166 self.onUnsubscribeV5.emit(mid, rc.value, rc.packetType) 182 self.onUnsubscribeV5.emit(
183 mid, rc.value, rc.packetType,
184 properties.json() if properties is not None else {}
185 )
167 ) 186 )
168 self.__mqttClient.on_message = ( 187 self.__mqttClient.on_message = (
169 lambda client, userdata, message: 188 lambda client, userdata, message:
170 self.onMessageV5.emit(message.topic, message.payload, 189 self.onMessageV5.emit(
171 message.qos, message.retain, 190 message.topic, message.payload, message.qos,
172 message.properties.json()) 191 message.retain, message.properties.json()
192 )
173 ) 193 )
174 self.__mqttClient.on_log = ( 194 self.__mqttClient.on_log = (
175 lambda client, userdata, level, buf: 195 lambda client, userdata, level, buf:
176 self.onLog.emit(level, buf) 196 self.onLog.emit(level, buf)
177 ) 197 )
207 Private slot handling a failed connection attempt. 227 Private slot handling a failed connection attempt.
208 """ 228 """
209 self.stopLoop() 229 self.stopLoop()
210 self.connectTimeout.emit() 230 self.connectTimeout.emit()
211 231
212 ## def reinitialise(self, clientId="", cleanSession=True, userdata=None):
213 ## """
214 ## Public method to reinitialize the client with given data.
215 ##
216 ## @param clientId ID to be used for the client
217 ## @type str
218 ## @param cleanSession flag indicating to start a clean session
219 ## @type bool
220 ## @param userdata user data
221 ## @type any
222 ## """
223 ## self.__mqttClient.reinitialise(
224 ## client_id=clientId, clean_session=cleanSession, userdata=userdata)
225 ##
226 ## self.__initCallbacks()
227 ##
228 def setConnectionTimeout(self, timeout): 232 def setConnectionTimeout(self, timeout):
229 """ 233 """
230 Public method to set the connection timeout value. 234 Public method to set the connection timeout value.
231 235
232 @param timeout timeout value to be set in seconds 236 @param timeout timeout value to be set in seconds
363 @type str 367 @type str
364 @param reinit flag indicating to reinitialize the MQTT client before 368 @param reinit flag indicating to reinitialize the MQTT client before
365 trying to connect with the given parameters 369 trying to connect with the given parameters
366 @type bool 370 @type bool
367 """ 371 """
368 ## if reinit:
369 ## self.reinitialise()
370 # TODO: MQTTv5: add support for MQTTv5 properties 372 # TODO: MQTTv5: add support for MQTTv5 properties
371 self.__mqttClient.connect_async( 373 self.__mqttClient.connect_async(
372 host, port=port, keepalive=keepalive, bind_address=bindAddress, 374 host, port=port, keepalive=keepalive, bind_address=bindAddress,
373 clean_start=self.__cleanSession) 375 clean_start=self.__cleanSession)
374 376
443 445
444 # step 4: connect to server 446 # step 4: connect to server
445 self.__cleanSession = parametersDict["CleanSession"] 447 self.__cleanSession = parametersDict["CleanSession"]
446 self.connectToServer(host, port=port, 448 self.connectToServer(host, port=port,
447 keepalive=parametersDict["Keepalive"]) 449 keepalive=parametersDict["Keepalive"])
448 ## reinit=False)
449 else: 450 else:
450 keepalive = self.defaultConnectionOptions["Keepalive"] 451 keepalive = self.defaultConnectionOptions["Keepalive"]
451 self.connectToServer(host, port=port, keepalive=keepalive, 452 self.connectToServer(host, port=port, keepalive=keepalive,
452 bindAddress=bindAddress) 453 bindAddress=bindAddress)
453 454
501 502
502 # TODO: MQTTv5: add support for properties (?) 503 # TODO: MQTTv5: add support for properties (?)
503 # TODO: MQTTv5: add support for reason code 504 # TODO: MQTTv5: add support for reason code
504 self.__mqttClient.disconnect() 505 self.__mqttClient.disconnect()
505 506
506 # TODO: MQTTv5: add support for properties 507 def subscribe(self, topic, qos=0, properties=None):
507 # TODO: MQTTv5: add support for subscribe options
508 def subscribe(self, topic, qos=0):
509 """ 508 """
510 Public method to subscribe to topics with quality of service. 509 Public method to subscribe to topics with quality of service.
511 510
512 @param topic single topic to subscribe to or a tuple with a topic 511 @param topic single topic to subscribe to or a tuple with a topic
513 and a QoS or a list of tuples with a topic and a QoS each 512 and a QoS or a list of tuples with a topic and a QoS each
514 @type str or tuple of (str, int) or list of tuple of (str, int) 513 @type str or tuple of (str, int) or list of tuple of (str, int)
515 @param qos quality of service 514 @param qos quality of service
516 @type int, one of 0, 1 or 2 515 @type int, one of 0, 1 or 2
516 @param properties list of user properties to be sent with the
517 subscription
518 @type list of tuple of (str, str)
517 @return tuple containing the result code and the message ID 519 @return tuple containing the result code and the message ID
518 @rtype tuple of (int, int) 520 @rtype tuple of (int, int)
519 """ 521 """
520 return self.__mqttClient.subscribe(topic, qos=qos) 522 props = (
521 523 self.__createPropertiesObject(PacketTypes.SUBSCRIBE, properties)
522 # TODO: MQTTv5: add support for properties (?) 524 if properties else
523 def unsubscribe(self, topic): 525 None
526 )
527 return self.__mqttClient.subscribe(topic, qos=qos, properties=props)
528
529 def unsubscribe(self, topic, properties=None):
524 """ 530 """
525 Public method to unsubscribe topics. 531 Public method to unsubscribe topics.
526 532
527 @param topic topic or list of topics to unsubscribe 533 @param topic topic or list of topics to unsubscribe
528 @type str or list of str 534 @type str or list of str
535 @param properties list of user properties to be sent with the
536 subscription
537 @type list of tuple of (str, str)
529 @return tuple containing the result code and the message ID 538 @return tuple containing the result code and the message ID
530 @rtype tuple of (int, int) 539 @rtype tuple of (int, int)
531 """ 540 """
532 return self.__mqttClient.unsubscribe(topic) 541 props = (
542 self.__createPropertiesObject(PacketTypes.SUBSCRIBE, properties)
543 if properties else
544 None
545 )
546 return self.__mqttClient.unsubscribe(topic, properties=props)
533 547
534 # TODO: MQTTv5: add support for properties 548 # TODO: MQTTv5: add support for properties
535 def publish(self, topic, payload=None, qos=0, retain=False): 549 def publish(self, topic, payload=None, qos=0, retain=False):
536 """ 550 """
537 Public method to publish to a topic. 551 Public method to publish to a topic.
548 @return message info object 562 @return message info object
549 @rtype mqtt.MQTTMessageInfo 563 @rtype mqtt.MQTTMessageInfo
550 """ 564 """
551 return self.__mqttClient.publish(topic, payload=payload, qos=qos, 565 return self.__mqttClient.publish(topic, payload=payload, qos=qos,
552 retain=retain) 566 retain=retain)
567
568 def __createPropertiesObject(self, packetType, properties):
569 """
570 Private method to assemble the MQTT v5 properties object.
571
572 @param packetType type of the MQTT packet
573 @type PacketTypes (= int)
574 @param properties list of user properties
575 @type list of tuple of (str, str)
576 @return MQTT v5 properties object
577 @rtype Properties
578 """
579 props = Properties(packetType)
580 for userProperty in properties:
581 props.UserProperty = tuple(userProperty)
582 return props
553 583
554 584
555 def mqttConnackMessage(connackCode): 585 def mqttConnackMessage(connackCode):
556 """ 586 """
557 Module function to get the string associated with a CONNACK result. 587 Module function to get the string associated with a CONNACK result.

eric ide

mercurial