Added MQTT V5 support for connect, disconnect and subscribe. eric7

Mon, 19 Jul 2021 20:00:17 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Mon, 19 Jul 2021 20:00:17 +0200
branch
eric7
changeset 99
420cb8adbf7e
parent 98
85d56e77e9df
child 100
9c29cfbd96c3

Added MQTT V5 support for connect, disconnect and subscribe.

MqttMonitor/MqttClient.py file | annotate | diff | comparison | revisions
MqttMonitor/MqttMonitorWidget.py file | annotate | diff | comparison | revisions
MqttMonitor/MqttReasonCodes.py file | annotate | diff | comparison | revisions
PluginMqttMonitor.epj file | annotate | diff | comparison | revisions
--- a/MqttMonitor/MqttClient.py	Sun Jul 18 19:32:16 2021 +0200
+++ b/MqttMonitor/MqttClient.py	Mon Jul 19 20:00:17 2021 +0200
@@ -14,6 +14,7 @@
 )
 
 import paho.mqtt.client as mqtt
+from paho.mqtt.packettypes import PacketTypes
 
 from Utilities.crypto import pwConvert
 
@@ -46,12 +47,15 @@
     @signal connectTimeout() emitted to indicate, that a connection attempt
         timed out
     """
-    onConnect = pyqtSignal(dict, int)
-    onDisconnected = pyqtSignal(int)
+    onConnectV3 = pyqtSignal(dict, int)
+    onConnectV5 = pyqtSignal(dict, int, int)
+    onDisconnectedV3 = pyqtSignal(int)
+    onDisconnectedV5 = pyqtSignal(int, int)
     onLog = pyqtSignal(int, str)
     onMessage = pyqtSignal(str, bytes, int, bool)
     onPublish = pyqtSignal(int)
-    onSubscribe = pyqtSignal(int, tuple)
+    onSubscribeV3 = pyqtSignal(int, tuple)
+    onSubscribeV5 = pyqtSignal(int, list)
     onUnsubscribe = pyqtSignal(int)
     
     connectTimeout = pyqtSignal()
@@ -100,9 +104,12 @@
             MqttClient.DefaultConnectTimeout * 1000)
         self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout)
         
-        self.onConnect.connect(self.__connectTimeoutTimer.stop)
+        self.onConnectV3.connect(self.__connectTimeoutTimer.stop)
+        self.onConnectV5.connect(self.__connectTimeoutTimer.stop)
         
         self.__cleanSession = cleanSession
+        self.__protocol = protocol
+        
         if protocol == MqttProtocols.MQTTv5:
             cleanSession = None
         
@@ -110,35 +117,80 @@
             client_id=clientId, clean_session=cleanSession, userdata=userdata,
             protocol=int(protocol), transport=transport)
         
-        self.__initCallbacks()
+        self.__initCallbacks(protocol)
 
-    def __initCallbacks(self):
+    def __initCallbacks(self, protocol):
         """
         Private method to initialize the MQTT callback methods.
+        
+        @param protocol MQTT protocol version
+        @type MqttProtocols
         """
-        # TODO: add properties to signal
+        if protocol in (MqttProtocols.MQTTv31, MqttProtocols.MQTTv311):
+            self.__mqttClient.on_connect = (
+                lambda client, userdata, flags, rc, properties=None:
+                    self.onConnectV3.emit(flags, rc)
+            )
+            self.__mqttClient.on_disconnect = (
+                lambda client, userdata, rc:
+                    self.onDisconnectedV3.emit(rc)
+                )
+            self.__mqttClient.on_subscribe = (
+                lambda client, userdata, mid, grantedQos, properties=None:
+                    self.onSubscribeV3.emit(mid, grantedQos)
+            )
+        else:
+            # TODO: add properties to signal
+            self.__mqttClient.on_connect = (
+                lambda client, userdata, flags, rc, properties=None:
+                    self.onConnectV5.emit(flags, rc.value, rc.packetType)
+            )
+            self.__mqttClient.on_disconnect = self.__onDisconnectedV5
+            # TODO: add properties to signal
+            self.__mqttClient.on_subscribe = (
+                lambda client, userdata, mid, reasonCodes, properties=None:
+                    self.onSubscribeV5.emit(mid, reasonCodes)
+            )
         # TODO: MQTTv5: add support for MQTTv5 signature
-        self.__mqttClient.on_connect = (
-            lambda client, userdata, flags, rc, properties=None:
-                self.onConnect.emit(flags, rc))
-        # TODO: MQTTv5: add support for MQTTv5 signature
-        self.__mqttClient.on_disconnect = (
-            lambda client, userdata, rc: self.onDisconnected.emit(rc))
         self.__mqttClient.on_log = (
-            lambda client, userdata, level, buf: self.onLog.emit(level, buf))
+            lambda client, userdata, level, buf:
+                self.onLog.emit(level, buf)
+        )
         self.__mqttClient.on_message = (
-            lambda client, userdata, message: self.onMessage.emit(
-                message.topic, message.payload, message.qos, message.retain))
+            lambda client, userdata, message:
+                self.onMessage.emit(message.topic, message.payload,
+                                    message.qos, message.retain)
+        )
         self.__mqttClient.on_publish = (
-            lambda client, userdata, mid: self.onPublish.emit(mid))
-        # TODO: add properties to signal
-        # TODO: MQTTv5: add support for MQTTv5 signature
-        self.__mqttClient.on_subscribe = (
-            lambda client, userdata, mid, grantedQos, properties=None:
-                self.onSubscribe.emit(mid, grantedQos))
+            lambda client, userdata, mid:
+                self.onPublish.emit(mid)
+        )
         # TODO: MQTTv5: add support for MQTTv5 signature
         self.__mqttClient.on_unsubscribe = (
-            lambda client, userdata, mid: self.onUnsubscribe.emit(mid))
+            lambda client, userdata, mid, properties=None, reasoncodes=None:
+                self.onUnsubscribe.emit(mid)
+        )
+    
+    def __onDisconnectedV5(self, client, userdata, rc, properties=None):
+        """
+        Private method to handle the disconnect from the broker.
+        
+        @param client reference to the client object
+        @type paho.mqtt.Client
+        @param userdata user data
+        @type list
+        @param rc result code or reason code
+        @type int or ReasonCodes
+        @param properties optional properties (defaults to None)
+        @type dict (optional)
+        """
+        if isinstance(rc, int):
+            packetType = PacketTypes.DISCONNECT
+            resultCode = rc
+        else:
+            packetType = rc.packetType
+            resultCode = rc.value
+        self.onDisconnectedV5.emit(resultCode, packetType)
     
     @pyqtSlot()
     def __connectTimeout(self):
@@ -261,6 +313,12 @@
         
         return False, "unspecific error occurred"
     
+    def getProtocol(self):
+        """
+        Public method to get the MQTT protocol version.
+        """
+        return self.__protocol
+    
     def startLoop(self):
         """
         Public method to start the MQTT client loop.
--- a/MqttMonitor/MqttMonitorWidget.py	Sun Jul 18 19:32:16 2021 +0200
+++ b/MqttMonitor/MqttMonitorWidget.py	Mon Jul 19 20:00:17 2021 +0200
@@ -25,6 +25,7 @@
     MqttClient, MqttProtocols, mqttConnackMessage, mqttErrorMessage,
     mqttLogLevelString
 )
+from .MqttReasonCodes import mqttReasonCode
 
 import UI.PixmapCache
 import Utilities
@@ -211,12 +212,15 @@
                             protocol=protocol)
         
         # connect the MQTT client signals
-        client.onConnect.connect(self.__brokerConnected)
-        client.onDisconnected.connect(self.__brokerDisconnected)
+        client.onConnectV3.connect(self.__brokerConnected)
+        client.onConnectV5.connect(self.__brokerConnected)
+        client.onDisconnectedV3.connect(self.__brokerDisconnected)
+        client.onDisconnectedV5.connect(self.__brokerDisconnected)
         client.onLog.connect(self.__clientLog)
         client.onMessage.connect(self.__messageReceived)
         client.onPublish.connect(self.__messagePublished)
-        client.onSubscribe.connect(self.__topicSubscribed)
+        client.onSubscribeV3.connect(self.__topicSubscribed)
+        client.onSubscribeV5.connect(self.__topicSubscribedV5)
         client.onUnsubscribe.connect(self.__topicUnsubscribed)
         
         client.connectTimeout.connect(self.__connectTimeout)
@@ -229,14 +233,16 @@
     
     # TODO: change to accept ReasonCode for rc
     @pyqtSlot(dict, int)
-    def __brokerConnected(self, flags, rc):
+    @pyqtSlot(dict, int, int)
+    def __brokerConnected(self, flags, rc, packetType=None):
         """
         Private slot to handle being connected to a broker.
         
         @param flags flags set for the connection
         @type dict
-        @param rc CONNACK result code
-        @type int
+        @param rc CONNACK result code or tuple containing the result code and
+            the packet type of the MQTTv5 reason code
+        @type int or tuple of (int, int)
         """
         self.brokerStatusLabel.hide()
         
@@ -245,7 +251,10 @@
             self.__connectedToBroker = True
             self.__connectionOptions = None
         
-        msg = mqttConnackMessage(rc)
+        if packetType is not None:
+            msg = mqttReasonCode(rc, packetType)
+        else:
+            msg = mqttConnackMessage(rc)
         self.__flashBrokerStatusLabel(msg)
         
         self.connectButton.setEnabled(True)
@@ -276,7 +285,8 @@
         self.__setConnectButtonState()
     
     @pyqtSlot(int)
-    def __brokerDisconnected(self, rc):
+    @pyqtSlot(int, int)
+    def __brokerDisconnected(self, rc, packetType=None):
         """
         Private slot to handle a disconnection from the broker.
         
@@ -288,11 +298,16 @@
         # ensure, the client loop is stopped
         self.__client.stopLoop()
         
-        msg = (
-            mqttErrorMessage(rc)
-            if rc > 0 else
-            self.tr("Connection to Broker shut down cleanly.")
-        )
+        if packetType is not None:
+            # MQTT v5
+            msg = mqttReasonCode(rc, packetType)
+        else:
+            # MQTT v3
+            msg = (
+                mqttErrorMessage(rc)
+                if rc > 0 else
+                self.tr("Connection to Broker shut down cleanly.")
+            )
         self.__flashBrokerStatusLabel(msg)
         
         self.connectButton.setIcon(UI.PixmapCache.getIcon("ircConnect"))
@@ -383,10 +398,11 @@
         # TODO: check this 'pass' statement
         pass
     
-    @pyqtSlot(int, tuple)
-    def __topicSubscribed(self, mid, grantedQos):
+    @pyqtSlot(int)
+    def __topicSubscribed(self, mid):
         """
-        Private slot to handle being subscribed to topics.
+        Private slot to handle being subscribed to topics (MQTT v3.1,
+        MQTT v3.1.1).
         
         @param mid ID of the subscribe request
         @type int
@@ -401,6 +417,20 @@
             self.__updateUnsubscribeTopicComboBox()
             self.__updatePublishTopicComboBox()
     
+    @pyqtSlot(int, list)
+    def __topicSubscribedV5(self, mid, reasonCodes):
+        """
+        Private slot to handle being subscribed to topics (MQTT v5).
+        
+        @param mid ID of the subscribe request
+        @type int
+        @param grantedQos tuple of granted quality of service
+        @type tuple of int
+        """
+        msg = mqttReasonCode(reasonCodes[0].value, reasonCodes[0].packetType)
+        self.__flashBrokerStatusLabel(msg)
+        self.__topicSubscribed(mid)
+    
     @pyqtSlot(int)
     def __topicUnsubscribed(self, mid):
         """
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MqttMonitor/MqttReasonCodes.py	Mon Jul 19 20:00:17 2021 +0200
@@ -0,0 +1,231 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2021 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing the translated MQTT v5 reason codes.
+"""
+
+from PyQt6.QtCore import QCoreApplication
+
+from paho.mqtt.packettypes import PacketTypes
+
+MqttReasonCodeNames = {
+    0: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Success"
+        ): [PacketTypes.CONNACK, PacketTypes.PUBACK, PacketTypes.PUBREC,
+            PacketTypes.PUBREL, PacketTypes.PUBCOMP, PacketTypes.UNSUBACK,
+            PacketTypes.AUTH],
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Normal disconnection"
+        ): [PacketTypes.DISCONNECT],
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Granted QoS 0"
+        ): [PacketTypes.SUBACK]},
+    1: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Granted QoS 1"
+        ): [PacketTypes.SUBACK]},
+    2: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Granted QoS 2"
+        ): [PacketTypes.SUBACK]},
+    4: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Disconnect with will message"
+        ): [PacketTypes.DISCONNECT]},
+    16: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "No matching subscribers"
+        ): [PacketTypes.PUBACK, PacketTypes.PUBREC]},
+    17: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "No subscription found"
+        ): [PacketTypes.UNSUBACK]},
+    24: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Continue authentication"
+        ): [PacketTypes.AUTH]},
+    25: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Re-authenticate"
+        ): [PacketTypes.AUTH]},
+    128: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Unspecified error"
+        ): [PacketTypes.CONNACK, PacketTypes.PUBACK,
+            PacketTypes.PUBREC, PacketTypes.SUBACK,
+            PacketTypes.UNSUBACK, PacketTypes.DISCONNECT]},
+    129: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Malformed packet"
+        ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
+    130: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Protocol error"
+        ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
+    131: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Implementation specific error"
+        ): [PacketTypes.CONNACK, PacketTypes.PUBACK,
+            PacketTypes.PUBREC, PacketTypes.SUBACK,
+            PacketTypes.UNSUBACK, PacketTypes.DISCONNECT]},
+    132: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Unsupported protocol version"
+        ): [PacketTypes.CONNACK]},
+    133: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Client identifier not valid"
+        ): [PacketTypes.CONNACK]},
+    134: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Bad user name or password"
+        ): [PacketTypes.CONNACK]},
+    135: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Not authorized"
+        ): [PacketTypes.CONNACK, PacketTypes.PUBACK,
+            PacketTypes.PUBREC, PacketTypes.SUBACK,
+            PacketTypes.UNSUBACK, PacketTypes.DISCONNECT]},
+    136: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Server unavailable"
+        ): [PacketTypes.CONNACK]},
+    137: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Server busy"
+        ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
+    138: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Banned"
+        ): [PacketTypes.CONNACK]},
+    139: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Server shutting down"
+        ): [PacketTypes.DISCONNECT]},
+    140: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Bad authentication method"
+        ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
+    141: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Keep alive timeout"
+        ): [PacketTypes.DISCONNECT]},
+    142: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Session taken over"
+        ): [PacketTypes.DISCONNECT]},
+    143: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Topic filter invalid"
+        ): [PacketTypes.SUBACK, PacketTypes.UNSUBACK,
+            PacketTypes.DISCONNECT]},
+    144: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Topic name invalid"
+        ): [PacketTypes.CONNACK, PacketTypes.PUBACK,
+            PacketTypes.PUBREC, PacketTypes.DISCONNECT]},
+    145: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Packet identifier in use"
+        ): [PacketTypes.PUBACK, PacketTypes.PUBREC,
+            PacketTypes.SUBACK, PacketTypes.UNSUBACK]},
+    146: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Packet identifier not found"
+        ): [PacketTypes.PUBREL, PacketTypes.PUBCOMP]},
+    147: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Receive maximum exceeded"
+        ): [PacketTypes.DISCONNECT]},
+    148: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Topic alias invalid"
+        ): [PacketTypes.DISCONNECT]},
+    149: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Packet too large"
+        ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
+    150: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Message rate too high"
+        ): [PacketTypes.DISCONNECT]},
+    151: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Quota exceeded"
+        ): [PacketTypes.CONNACK, PacketTypes.PUBACK,
+            PacketTypes.PUBREC, PacketTypes.SUBACK,
+            PacketTypes.DISCONNECT], },
+    152: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Administrative action"
+        ): [PacketTypes.DISCONNECT]},
+    153: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Payload format invalid"
+        ): [PacketTypes.PUBACK, PacketTypes.PUBREC, PacketTypes.DISCONNECT]},
+    154: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Retain not supported"
+        ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
+    155: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "QoS not supported"
+        ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
+    156: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Use another server"
+        ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
+    157: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Server moved"
+        ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
+    158: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Shared subscription not supported"
+        ): [PacketTypes.SUBACK, PacketTypes.DISCONNECT]},
+    159: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Connection rate exceeded"
+        ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
+    160: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Maximum connect time"
+        ): [PacketTypes.DISCONNECT]},
+    161: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Subscription identifiers not supported"
+        ): [PacketTypes.SUBACK, PacketTypes.DISCONNECT]},
+    162: {
+        QCoreApplication.translate(
+            "MqttReasonCodeNames", "Wildcard subscription not supported"
+        ): [PacketTypes.SUBACK, PacketTypes.DISCONNECT]},
+}
+
+def mqttReasonCode(rc, packetType):
+    """
+    Function to get the readable reason code string given the result code and
+    the packet type.
+    
+    @param rc result code
+    @type int
+    @param packetType packet type
+    @type PacketTypes (= int)
+    """
+    if rc not in MqttReasonCodeNames:
+        return QCoreApplication.translate(
+            "MqttReasonCodeNames", "Unknown result code ({0})").format(rc)
+    
+    messages = MqttReasonCodeNames[rc]
+    messagesList = [message for message in messages.keys()
+                    if packetType in messages[message]]
+    if len(messagesList) == 0:
+        return QCoreApplication.translate(
+            "MqttReasonCodeNames",
+            "Unknown result code ({0}) for packet type '{1}'"
+        ).format(rc, packetType)
+    
+    return messagesList[0]
--- a/PluginMqttMonitor.epj	Sun Jul 18 19:32:16 2021 +0200
+++ b/PluginMqttMonitor.epj	Mon Jul 19 20:00:17 2021 +0200
@@ -213,7 +213,8 @@
       "MqttMonitor/MqttMonitorWidget.py",
       "MqttMonitor/__init__.py",
       "PluginMqttMonitor.py",
-      "__init__.py"
+      "__init__.py",
+      "MqttMonitor/MqttReasonCodes.py"
     ],
     "SPELLEXCLUDES": "",
     "SPELLLANGUAGE": "en",

eric ide

mercurial