MqttMonitor/MqttClient.py

branch
eric7
changeset 99
420cb8adbf7e
parent 98
85d56e77e9df
child 100
9c29cfbd96c3
--- a/MqttMonitor/MqttClient.py	Sun Jul 18 19:32:16 2021 +0200
+++ b/MqttMonitor/MqttClient.py	Mon Jul 19 20:00:17 2021 +0200
@@ -14,6 +14,7 @@
 )
 
 import paho.mqtt.client as mqtt
+from paho.mqtt.packettypes import PacketTypes
 
 from Utilities.crypto import pwConvert
 
@@ -46,12 +47,15 @@
     @signal connectTimeout() emitted to indicate, that a connection attempt
         timed out
     """
-    onConnect = pyqtSignal(dict, int)
-    onDisconnected = pyqtSignal(int)
+    onConnectV3 = pyqtSignal(dict, int)
+    onConnectV5 = pyqtSignal(dict, int, int)
+    onDisconnectedV3 = pyqtSignal(int)
+    onDisconnectedV5 = pyqtSignal(int, int)
     onLog = pyqtSignal(int, str)
     onMessage = pyqtSignal(str, bytes, int, bool)
     onPublish = pyqtSignal(int)
-    onSubscribe = pyqtSignal(int, tuple)
+    onSubscribeV3 = pyqtSignal(int, tuple)
+    onSubscribeV5 = pyqtSignal(int, list)
     onUnsubscribe = pyqtSignal(int)
     
     connectTimeout = pyqtSignal()
@@ -100,9 +104,12 @@
             MqttClient.DefaultConnectTimeout * 1000)
         self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout)
         
-        self.onConnect.connect(self.__connectTimeoutTimer.stop)
+        self.onConnectV3.connect(self.__connectTimeoutTimer.stop)
+        self.onConnectV5.connect(self.__connectTimeoutTimer.stop)
         
         self.__cleanSession = cleanSession
+        self.__protocol = protocol
+        
         if protocol == MqttProtocols.MQTTv5:
             cleanSession = None
         
@@ -110,35 +117,80 @@
             client_id=clientId, clean_session=cleanSession, userdata=userdata,
             protocol=int(protocol), transport=transport)
         
-        self.__initCallbacks()
+        self.__initCallbacks(protocol)
 
-    def __initCallbacks(self):
+    def __initCallbacks(self, protocol):
         """
         Private method to initialize the MQTT callback methods.
+        
+        @param protocol MQTT protocol version
+        @type MqttProtocols
         """
-        # TODO: add properties to signal
+        if protocol in (MqttProtocols.MQTTv31, MqttProtocols.MQTTv311):
+            self.__mqttClient.on_connect = (
+                lambda client, userdata, flags, rc, properties=None:
+                    self.onConnectV3.emit(flags, rc)
+            )
+            self.__mqttClient.on_disconnect = (
+                lambda client, userdata, rc:
+                    self.onDisconnectedV3.emit(rc)
+                )
+            self.__mqttClient.on_subscribe = (
+                lambda client, userdata, mid, grantedQos, properties=None:
+                    self.onSubscribeV3.emit(mid, grantedQos)
+            )
+        else:
+            # TODO: add properties to signal
+            self.__mqttClient.on_connect = (
+                lambda client, userdata, flags, rc, properties=None:
+                    self.onConnectV5.emit(flags, rc.value, rc.packetType)
+            )
+            self.__mqttClient.on_disconnect = self.__onDisconnectedV5
+            # TODO: add properties to signal
+            self.__mqttClient.on_subscribe = (
+                lambda client, userdata, mid, reasonCodes, properties=None:
+                    self.onSubscribeV5.emit(mid, reasonCodes)
+            )
         # TODO: MQTTv5: add support for MQTTv5 signature
-        self.__mqttClient.on_connect = (
-            lambda client, userdata, flags, rc, properties=None:
-                self.onConnect.emit(flags, rc))
-        # TODO: MQTTv5: add support for MQTTv5 signature
-        self.__mqttClient.on_disconnect = (
-            lambda client, userdata, rc: self.onDisconnected.emit(rc))
         self.__mqttClient.on_log = (
-            lambda client, userdata, level, buf: self.onLog.emit(level, buf))
+            lambda client, userdata, level, buf:
+                self.onLog.emit(level, buf)
+        )
         self.__mqttClient.on_message = (
-            lambda client, userdata, message: self.onMessage.emit(
-                message.topic, message.payload, message.qos, message.retain))
+            lambda client, userdata, message:
+                self.onMessage.emit(message.topic, message.payload,
+                                    message.qos, message.retain)
+        )
         self.__mqttClient.on_publish = (
-            lambda client, userdata, mid: self.onPublish.emit(mid))
-        # TODO: add properties to signal
-        # TODO: MQTTv5: add support for MQTTv5 signature
-        self.__mqttClient.on_subscribe = (
-            lambda client, userdata, mid, grantedQos, properties=None:
-                self.onSubscribe.emit(mid, grantedQos))
+            lambda client, userdata, mid:
+                self.onPublish.emit(mid)
+        )
         # TODO: MQTTv5: add support for MQTTv5 signature
         self.__mqttClient.on_unsubscribe = (
-            lambda client, userdata, mid: self.onUnsubscribe.emit(mid))
+            lambda client, userdata, mid, properties=None, reasoncodes=None:
+                self.onUnsubscribe.emit(mid)
+        )
+    
+    def __onDisconnectedV5(self, client, userdata, rc, properties=None):
+        """
+        Private method to handle the disconnect from the broker.
+        
+        @param client reference to the client object
+        @type paho.mqtt.Client
+        @param userdata user data
+        @type list
+        @param rc result code or reason code
+        @type int or ReasonCodes
+        @param properties optional properties (defaults to None)
+        @type dict (optional)
+        """
+        if isinstance(rc, int):
+            packetType = PacketTypes.DISCONNECT
+            resultCode = rc
+        else:
+            packetType = rc.packetType
+            resultCode = rc.value
+        self.onDisconnectedV5.emit(resultCode, packetType)
     
     @pyqtSlot()
     def __connectTimeout(self):
@@ -261,6 +313,12 @@
         
         return False, "unspecific error occurred"
     
+    def getProtocol(self):
+        """
+        Public method to get the MQTT protocol version.
+        """
+        return self.__protocol
+    
     def startLoop(self):
         """
         Public method to start the MQTT client loop.

eric ide

mercurial