MqttMonitor/MqttClient.py

branch
eric7
changeset 102
70b8858199f5
parent 101
0eae5f616154
child 103
5fe4f179975f
--- a/MqttMonitor/MqttClient.py	Tue Jul 20 18:10:55 2021 +0200
+++ b/MqttMonitor/MqttClient.py	Wed Jul 21 20:10:36 2021 +0200
@@ -15,6 +15,7 @@
 
 import paho.mqtt.client as mqtt
 from paho.mqtt.packettypes import PacketTypes
+from paho.mqtt.properties import Properties
 
 from Utilities.crypto import pwConvert
 
@@ -33,22 +34,32 @@
     Class implementing a PyQt wrapper around the paho MQTT client.
     
     @signal onConnectV3(flags, rc) emitted after the client has connected to
-        the broker
+        the broker (MQTT v3)
+    @signal onConnectV5(flags, rc, packetType, properties emitted after the
+        client has connected to the broker (MQTT v5)
     @signal onDisconnectedV3(rc) emitted after the client has disconnected from
-        the broker
+        the broker (MQTT v3)
+    @signal onDisconnectedV5(rc, packetType) emitted after the client has
+        disconnected from the broker (MQTT v5)
     @signal onLog(level, message) emitted to send client log data
     @signal onMessageV3(topic, payload, qos, retain) emitted after a message
-        has been received by the client
+        has been received by the client (MQTT v3)
+    @signal onMessageV5(topic, payload, qos, retain, properties) emitted after
+        a message has been received by the client (MQTT v5)
     @signal onPublish(mid) emitted after a message has been published
     @signal onSubscribeV3(mid, grantedQos) emitted after the client has
-        subscribed to some topics
+        subscribed to some topics (MQTT v3)
+    @signal onSubscribeV5(mid, reasonCodes, properties) emitted after the
+        client has subscribed to some topics (MQTT v5)
     @signal onUnsubscribeV3(mid) emitted after the client has unsubscribed from
-        some topics
+        some topics (MQTT v3)
+    @signal onUnsubscribeV5(mid, rc, packetType, properties) emitted after the
+        client has unsubscribed from some topics (MQTT v5)
     @signal connectTimeout() emitted to indicate, that a connection attempt
         timed out
     """
     onConnectV3 = pyqtSignal(dict, int)
-    onConnectV5 = pyqtSignal(dict, int, int)
+    onConnectV5 = pyqtSignal(dict, int, int, dict)
     onDisconnectedV3 = pyqtSignal(int)
     onDisconnectedV5 = pyqtSignal(int, int)
     onLog = pyqtSignal(int, str)
@@ -56,9 +67,9 @@
     onMessageV5 = pyqtSignal(str, bytes, int, bool, dict)
     onPublish = pyqtSignal(int)
     onSubscribeV3 = pyqtSignal(int, tuple)
-    onSubscribeV5 = pyqtSignal(int, list)
+    onSubscribeV5 = pyqtSignal(int, list, dict)
     onUnsubscribeV3 = pyqtSignal(int)
-    onUnsubscribeV5 = pyqtSignal(int, int, int)
+    onUnsubscribeV5 = pyqtSignal(int, int, int, dict)
     
     connectTimeout = pyqtSignal()
     
@@ -151,25 +162,34 @@
                                           message.qos, message.retain)
             )
         else:
-            # TODO: add properties to signals
             self.__mqttClient.on_connect = (
                 lambda client, userdata, flags, rc, properties=None:
-                    self.onConnectV5.emit(flags, rc.value, rc.packetType)
+                    self.onConnectV5.emit(
+                        flags, rc.value, rc.packetType,
+                        properties.json() if properties is not None else {}
+                    )
             )
             self.__mqttClient.on_disconnect = self.__onDisconnectedV5
             self.__mqttClient.on_subscribe = (
                 lambda client, userdata, mid, reasonCodes, properties=None:
-                    self.onSubscribeV5.emit(mid, reasonCodes)
+                    self.onSubscribeV5.emit(
+                        mid, reasonCodes,
+                        properties.json() if properties is not None else {}
+                    )
             )
             self.__mqttClient.on_unsubscribe = (
                 lambda client, userdata, mid, properties, rc:
-                    self.onUnsubscribeV5.emit(mid, rc.value, rc.packetType)
+                    self.onUnsubscribeV5.emit(
+                        mid, rc.value, rc.packetType,
+                        properties.json() if properties is not None else {}
+                    )
             )
             self.__mqttClient.on_message = (
                 lambda client, userdata, message:
-                    self.onMessageV5.emit(message.topic, message.payload,
-                                          message.qos, message.retain,
-                                          message.properties.json())
+                    self.onMessageV5.emit(
+                        message.topic, message.payload, message.qos,
+                        message.retain, message.properties.json()
+                    )
             )
         self.__mqttClient.on_log = (
             lambda client, userdata, level, buf:
@@ -209,22 +229,6 @@
         self.stopLoop()
         self.connectTimeout.emit()
     
-##    def reinitialise(self, clientId="", cleanSession=True, userdata=None):
-##        """
-##        Public method to reinitialize the client with given data.
-##        
-##        @param clientId ID to be used for the client
-##        @type str
-##        @param cleanSession flag indicating to start a clean session
-##        @type bool
-##        @param userdata user data
-##        @type any
-##        """
-##        self.__mqttClient.reinitialise(
-##            client_id=clientId, clean_session=cleanSession, userdata=userdata)
-##        
-##        self.__initCallbacks()
-##    
     def setConnectionTimeout(self, timeout):
         """
         Public method to set the connection timeout value.
@@ -365,8 +369,6 @@
             trying to connect with the given parameters
         @type bool
         """
-##        if reinit:
-##            self.reinitialise()
         # TODO: MQTTv5: add support for MQTTv5 properties
         self.__mqttClient.connect_async(
             host, port=port, keepalive=keepalive, bind_address=bindAddress,
@@ -445,7 +447,6 @@
             self.__cleanSession = parametersDict["CleanSession"]
             self.connectToServer(host, port=port,
                                  keepalive=parametersDict["Keepalive"])
-##                                 reinit=False)
         else:
             keepalive = self.defaultConnectionOptions["Keepalive"]
             self.connectToServer(host, port=port, keepalive=keepalive,
@@ -503,9 +504,7 @@
         # TODO: MQTTv5: add support for reason code
         self.__mqttClient.disconnect()
     
-    # TODO: MQTTv5: add support for properties
-    # TODO: MQTTv5: add support for subscribe options
-    def subscribe(self, topic, qos=0):
+    def subscribe(self, topic, qos=0, properties=None):
         """
         Public method to subscribe to topics with quality of service.
         
@@ -514,22 +513,37 @@
         @type str or tuple of (str, int) or list of tuple of (str, int)
         @param qos quality of service
         @type int, one of 0, 1 or 2
+        @param properties list of user properties to be sent with the
+            subscription
+        @type list of tuple of (str, str)
         @return tuple containing the result code and the message ID
         @rtype tuple of (int, int)
         """
-        return self.__mqttClient.subscribe(topic, qos=qos)
+        props = (
+            self.__createPropertiesObject(PacketTypes.SUBSCRIBE, properties)
+            if properties else
+            None
+        )
+        return self.__mqttClient.subscribe(topic, qos=qos, properties=props)
     
-    # TODO: MQTTv5: add support for properties (?)
-    def unsubscribe(self, topic):
+    def unsubscribe(self, topic, properties=None):
         """
         Public method to unsubscribe topics.
         
         @param topic topic or list of topics to unsubscribe
         @type str or list of str
+        @param properties list of user properties to be sent with the
+            subscription
+        @type list of tuple of (str, str)
         @return tuple containing the result code and the message ID
         @rtype tuple of (int, int)
         """
-        return self.__mqttClient.unsubscribe(topic)
+        props = (
+            self.__createPropertiesObject(PacketTypes.SUBSCRIBE, properties)
+            if properties else
+            None
+        )
+        return self.__mqttClient.unsubscribe(topic, properties=props)
     
     # TODO: MQTTv5: add support for properties
     def publish(self, topic, payload=None, qos=0, retain=False):
@@ -550,6 +564,22 @@
         """
         return self.__mqttClient.publish(topic, payload=payload, qos=qos,
                                          retain=retain)
+    
+    def __createPropertiesObject(self, packetType, properties):
+        """
+        Private method to assemble the MQTT v5 properties object.
+        
+        @param packetType type of the MQTT packet
+        @type PacketTypes (= int)
+        @param properties list of user properties
+        @type list of tuple of (str, str)
+        @return MQTT v5 properties object
+        @rtype Properties
+        """
+        props = Properties(packetType)
+        for userProperty in properties:
+            props.UserProperty = tuple(userProperty)
+        return props
 
 
 def mqttConnackMessage(connackCode):

eric ide

mercurial