--- a/MqttMonitor/MqttClient.py Tue Jul 20 18:10:55 2021 +0200 +++ b/MqttMonitor/MqttClient.py Wed Jul 21 20:10:36 2021 +0200 @@ -15,6 +15,7 @@ import paho.mqtt.client as mqtt from paho.mqtt.packettypes import PacketTypes +from paho.mqtt.properties import Properties from Utilities.crypto import pwConvert @@ -33,22 +34,32 @@ Class implementing a PyQt wrapper around the paho MQTT client. @signal onConnectV3(flags, rc) emitted after the client has connected to - the broker + the broker (MQTT v3) + @signal onConnectV5(flags, rc, packetType, properties emitted after the + client has connected to the broker (MQTT v5) @signal onDisconnectedV3(rc) emitted after the client has disconnected from - the broker + the broker (MQTT v3) + @signal onDisconnectedV5(rc, packetType) emitted after the client has + disconnected from the broker (MQTT v5) @signal onLog(level, message) emitted to send client log data @signal onMessageV3(topic, payload, qos, retain) emitted after a message - has been received by the client + has been received by the client (MQTT v3) + @signal onMessageV5(topic, payload, qos, retain, properties) emitted after + a message has been received by the client (MQTT v5) @signal onPublish(mid) emitted after a message has been published @signal onSubscribeV3(mid, grantedQos) emitted after the client has - subscribed to some topics + subscribed to some topics (MQTT v3) + @signal onSubscribeV5(mid, reasonCodes, properties) emitted after the + client has subscribed to some topics (MQTT v5) @signal onUnsubscribeV3(mid) emitted after the client has unsubscribed from - some topics + some topics (MQTT v3) + @signal onUnsubscribeV5(mid, rc, packetType, properties) emitted after the + client has unsubscribed from some topics (MQTT v5) @signal connectTimeout() emitted to indicate, that a connection attempt timed out """ onConnectV3 = pyqtSignal(dict, int) - onConnectV5 = pyqtSignal(dict, int, int) + onConnectV5 = pyqtSignal(dict, int, int, dict) onDisconnectedV3 = pyqtSignal(int) onDisconnectedV5 = pyqtSignal(int, int) onLog = pyqtSignal(int, str) @@ -56,9 +67,9 @@ onMessageV5 = pyqtSignal(str, bytes, int, bool, dict) onPublish = pyqtSignal(int) onSubscribeV3 = pyqtSignal(int, tuple) - onSubscribeV5 = pyqtSignal(int, list) + onSubscribeV5 = pyqtSignal(int, list, dict) onUnsubscribeV3 = pyqtSignal(int) - onUnsubscribeV5 = pyqtSignal(int, int, int) + onUnsubscribeV5 = pyqtSignal(int, int, int, dict) connectTimeout = pyqtSignal() @@ -151,25 +162,34 @@ message.qos, message.retain) ) else: - # TODO: add properties to signals self.__mqttClient.on_connect = ( lambda client, userdata, flags, rc, properties=None: - self.onConnectV5.emit(flags, rc.value, rc.packetType) + self.onConnectV5.emit( + flags, rc.value, rc.packetType, + properties.json() if properties is not None else {} + ) ) self.__mqttClient.on_disconnect = self.__onDisconnectedV5 self.__mqttClient.on_subscribe = ( lambda client, userdata, mid, reasonCodes, properties=None: - self.onSubscribeV5.emit(mid, reasonCodes) + self.onSubscribeV5.emit( + mid, reasonCodes, + properties.json() if properties is not None else {} + ) ) self.__mqttClient.on_unsubscribe = ( lambda client, userdata, mid, properties, rc: - self.onUnsubscribeV5.emit(mid, rc.value, rc.packetType) + self.onUnsubscribeV5.emit( + mid, rc.value, rc.packetType, + properties.json() if properties is not None else {} + ) ) self.__mqttClient.on_message = ( lambda client, userdata, message: - self.onMessageV5.emit(message.topic, message.payload, - message.qos, message.retain, - message.properties.json()) + self.onMessageV5.emit( + message.topic, message.payload, message.qos, + message.retain, message.properties.json() + ) ) self.__mqttClient.on_log = ( lambda client, userdata, level, buf: @@ -209,22 +229,6 @@ self.stopLoop() self.connectTimeout.emit() -## def reinitialise(self, clientId="", cleanSession=True, userdata=None): -## """ -## Public method to reinitialize the client with given data. -## -## @param clientId ID to be used for the client -## @type str -## @param cleanSession flag indicating to start a clean session -## @type bool -## @param userdata user data -## @type any -## """ -## self.__mqttClient.reinitialise( -## client_id=clientId, clean_session=cleanSession, userdata=userdata) -## -## self.__initCallbacks() -## def setConnectionTimeout(self, timeout): """ Public method to set the connection timeout value. @@ -365,8 +369,6 @@ trying to connect with the given parameters @type bool """ -## if reinit: -## self.reinitialise() # TODO: MQTTv5: add support for MQTTv5 properties self.__mqttClient.connect_async( host, port=port, keepalive=keepalive, bind_address=bindAddress, @@ -445,7 +447,6 @@ self.__cleanSession = parametersDict["CleanSession"] self.connectToServer(host, port=port, keepalive=parametersDict["Keepalive"]) -## reinit=False) else: keepalive = self.defaultConnectionOptions["Keepalive"] self.connectToServer(host, port=port, keepalive=keepalive, @@ -503,9 +504,7 @@ # TODO: MQTTv5: add support for reason code self.__mqttClient.disconnect() - # TODO: MQTTv5: add support for properties - # TODO: MQTTv5: add support for subscribe options - def subscribe(self, topic, qos=0): + def subscribe(self, topic, qos=0, properties=None): """ Public method to subscribe to topics with quality of service. @@ -514,22 +513,37 @@ @type str or tuple of (str, int) or list of tuple of (str, int) @param qos quality of service @type int, one of 0, 1 or 2 + @param properties list of user properties to be sent with the + subscription + @type list of tuple of (str, str) @return tuple containing the result code and the message ID @rtype tuple of (int, int) """ - return self.__mqttClient.subscribe(topic, qos=qos) + props = ( + self.__createPropertiesObject(PacketTypes.SUBSCRIBE, properties) + if properties else + None + ) + return self.__mqttClient.subscribe(topic, qos=qos, properties=props) - # TODO: MQTTv5: add support for properties (?) - def unsubscribe(self, topic): + def unsubscribe(self, topic, properties=None): """ Public method to unsubscribe topics. @param topic topic or list of topics to unsubscribe @type str or list of str + @param properties list of user properties to be sent with the + subscription + @type list of tuple of (str, str) @return tuple containing the result code and the message ID @rtype tuple of (int, int) """ - return self.__mqttClient.unsubscribe(topic) + props = ( + self.__createPropertiesObject(PacketTypes.SUBSCRIBE, properties) + if properties else + None + ) + return self.__mqttClient.unsubscribe(topic, properties=props) # TODO: MQTTv5: add support for properties def publish(self, topic, payload=None, qos=0, retain=False): @@ -550,6 +564,22 @@ """ return self.__mqttClient.publish(topic, payload=payload, qos=qos, retain=retain) + + def __createPropertiesObject(self, packetType, properties): + """ + Private method to assemble the MQTT v5 properties object. + + @param packetType type of the MQTT packet + @type PacketTypes (= int) + @param properties list of user properties + @type list of tuple of (str, str) + @return MQTT v5 properties object + @rtype Properties + """ + props = Properties(packetType) + for userProperty in properties: + props.UserProperty = tuple(userProperty) + return props def mqttConnackMessage(connackCode):