Mon, 19 Jul 2021 20:00:17 +0200
Added MQTT V5 support for connect, disconnect and subscribe.
--- a/MqttMonitor/MqttClient.py Sun Jul 18 19:32:16 2021 +0200 +++ b/MqttMonitor/MqttClient.py Mon Jul 19 20:00:17 2021 +0200 @@ -14,6 +14,7 @@ ) import paho.mqtt.client as mqtt +from paho.mqtt.packettypes import PacketTypes from Utilities.crypto import pwConvert @@ -46,12 +47,15 @@ @signal connectTimeout() emitted to indicate, that a connection attempt timed out """ - onConnect = pyqtSignal(dict, int) - onDisconnected = pyqtSignal(int) + onConnectV3 = pyqtSignal(dict, int) + onConnectV5 = pyqtSignal(dict, int, int) + onDisconnectedV3 = pyqtSignal(int) + onDisconnectedV5 = pyqtSignal(int, int) onLog = pyqtSignal(int, str) onMessage = pyqtSignal(str, bytes, int, bool) onPublish = pyqtSignal(int) - onSubscribe = pyqtSignal(int, tuple) + onSubscribeV3 = pyqtSignal(int, tuple) + onSubscribeV5 = pyqtSignal(int, list) onUnsubscribe = pyqtSignal(int) connectTimeout = pyqtSignal() @@ -100,9 +104,12 @@ MqttClient.DefaultConnectTimeout * 1000) self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout) - self.onConnect.connect(self.__connectTimeoutTimer.stop) + self.onConnectV3.connect(self.__connectTimeoutTimer.stop) + self.onConnectV5.connect(self.__connectTimeoutTimer.stop) self.__cleanSession = cleanSession + self.__protocol = protocol + if protocol == MqttProtocols.MQTTv5: cleanSession = None @@ -110,35 +117,80 @@ client_id=clientId, clean_session=cleanSession, userdata=userdata, protocol=int(protocol), transport=transport) - self.__initCallbacks() + self.__initCallbacks(protocol) - def __initCallbacks(self): + def __initCallbacks(self, protocol): """ Private method to initialize the MQTT callback methods. + + @param protocol MQTT protocol version + @type MqttProtocols """ - # TODO: add properties to signal + if protocol in (MqttProtocols.MQTTv31, MqttProtocols.MQTTv311): + self.__mqttClient.on_connect = ( + lambda client, userdata, flags, rc, properties=None: + self.onConnectV3.emit(flags, rc) + ) + self.__mqttClient.on_disconnect = ( + lambda client, userdata, rc: + self.onDisconnectedV3.emit(rc) + ) + self.__mqttClient.on_subscribe = ( + lambda client, userdata, mid, grantedQos, properties=None: + self.onSubscribeV3.emit(mid, grantedQos) + ) + else: + # TODO: add properties to signal + self.__mqttClient.on_connect = ( + lambda client, userdata, flags, rc, properties=None: + self.onConnectV5.emit(flags, rc.value, rc.packetType) + ) + self.__mqttClient.on_disconnect = self.__onDisconnectedV5 + # TODO: add properties to signal + self.__mqttClient.on_subscribe = ( + lambda client, userdata, mid, reasonCodes, properties=None: + self.onSubscribeV5.emit(mid, reasonCodes) + ) # TODO: MQTTv5: add support for MQTTv5 signature - self.__mqttClient.on_connect = ( - lambda client, userdata, flags, rc, properties=None: - self.onConnect.emit(flags, rc)) - # TODO: MQTTv5: add support for MQTTv5 signature - self.__mqttClient.on_disconnect = ( - lambda client, userdata, rc: self.onDisconnected.emit(rc)) self.__mqttClient.on_log = ( - lambda client, userdata, level, buf: self.onLog.emit(level, buf)) + lambda client, userdata, level, buf: + self.onLog.emit(level, buf) + ) self.__mqttClient.on_message = ( - lambda client, userdata, message: self.onMessage.emit( - message.topic, message.payload, message.qos, message.retain)) + lambda client, userdata, message: + self.onMessage.emit(message.topic, message.payload, + message.qos, message.retain) + ) self.__mqttClient.on_publish = ( - lambda client, userdata, mid: self.onPublish.emit(mid)) - # TODO: add properties to signal - # TODO: MQTTv5: add support for MQTTv5 signature - self.__mqttClient.on_subscribe = ( - lambda client, userdata, mid, grantedQos, properties=None: - self.onSubscribe.emit(mid, grantedQos)) + lambda client, userdata, mid: + self.onPublish.emit(mid) + ) # TODO: MQTTv5: add support for MQTTv5 signature self.__mqttClient.on_unsubscribe = ( - lambda client, userdata, mid: self.onUnsubscribe.emit(mid)) + lambda client, userdata, mid, properties=None, reasoncodes=None: + self.onUnsubscribe.emit(mid) + ) + + def __onDisconnectedV5(self, client, userdata, rc, properties=None): + """ + Private method to handle the disconnect from the broker. + + @param client reference to the client object + @type paho.mqtt.Client + @param userdata user data + @type list + @param rc result code or reason code + @type int or ReasonCodes + @param properties optional properties (defaults to None) + @type dict (optional) + """ + if isinstance(rc, int): + packetType = PacketTypes.DISCONNECT + resultCode = rc + else: + packetType = rc.packetType + resultCode = rc.value + self.onDisconnectedV5.emit(resultCode, packetType) @pyqtSlot() def __connectTimeout(self): @@ -261,6 +313,12 @@ return False, "unspecific error occurred" + def getProtocol(self): + """ + Public method to get the MQTT protocol version. + """ + return self.__protocol + def startLoop(self): """ Public method to start the MQTT client loop.
--- a/MqttMonitor/MqttMonitorWidget.py Sun Jul 18 19:32:16 2021 +0200 +++ b/MqttMonitor/MqttMonitorWidget.py Mon Jul 19 20:00:17 2021 +0200 @@ -25,6 +25,7 @@ MqttClient, MqttProtocols, mqttConnackMessage, mqttErrorMessage, mqttLogLevelString ) +from .MqttReasonCodes import mqttReasonCode import UI.PixmapCache import Utilities @@ -211,12 +212,15 @@ protocol=protocol) # connect the MQTT client signals - client.onConnect.connect(self.__brokerConnected) - client.onDisconnected.connect(self.__brokerDisconnected) + client.onConnectV3.connect(self.__brokerConnected) + client.onConnectV5.connect(self.__brokerConnected) + client.onDisconnectedV3.connect(self.__brokerDisconnected) + client.onDisconnectedV5.connect(self.__brokerDisconnected) client.onLog.connect(self.__clientLog) client.onMessage.connect(self.__messageReceived) client.onPublish.connect(self.__messagePublished) - client.onSubscribe.connect(self.__topicSubscribed) + client.onSubscribeV3.connect(self.__topicSubscribed) + client.onSubscribeV5.connect(self.__topicSubscribedV5) client.onUnsubscribe.connect(self.__topicUnsubscribed) client.connectTimeout.connect(self.__connectTimeout) @@ -229,14 +233,16 @@ # TODO: change to accept ReasonCode for rc @pyqtSlot(dict, int) - def __brokerConnected(self, flags, rc): + @pyqtSlot(dict, int, int) + def __brokerConnected(self, flags, rc, packetType=None): """ Private slot to handle being connected to a broker. @param flags flags set for the connection @type dict - @param rc CONNACK result code - @type int + @param rc CONNACK result code or tuple containing the result code and + the packet type of the MQTTv5 reason code + @type int or tuple of (int, int) """ self.brokerStatusLabel.hide() @@ -245,7 +251,10 @@ self.__connectedToBroker = True self.__connectionOptions = None - msg = mqttConnackMessage(rc) + if packetType is not None: + msg = mqttReasonCode(rc, packetType) + else: + msg = mqttConnackMessage(rc) self.__flashBrokerStatusLabel(msg) self.connectButton.setEnabled(True) @@ -276,7 +285,8 @@ self.__setConnectButtonState() @pyqtSlot(int) - def __brokerDisconnected(self, rc): + @pyqtSlot(int, int) + def __brokerDisconnected(self, rc, packetType=None): """ Private slot to handle a disconnection from the broker. @@ -288,11 +298,16 @@ # ensure, the client loop is stopped self.__client.stopLoop() - msg = ( - mqttErrorMessage(rc) - if rc > 0 else - self.tr("Connection to Broker shut down cleanly.") - ) + if packetType is not None: + # MQTT v5 + msg = mqttReasonCode(rc, packetType) + else: + # MQTT v3 + msg = ( + mqttErrorMessage(rc) + if rc > 0 else + self.tr("Connection to Broker shut down cleanly.") + ) self.__flashBrokerStatusLabel(msg) self.connectButton.setIcon(UI.PixmapCache.getIcon("ircConnect")) @@ -383,10 +398,11 @@ # TODO: check this 'pass' statement pass - @pyqtSlot(int, tuple) - def __topicSubscribed(self, mid, grantedQos): + @pyqtSlot(int) + def __topicSubscribed(self, mid): """ - Private slot to handle being subscribed to topics. + Private slot to handle being subscribed to topics (MQTT v3.1, + MQTT v3.1.1). @param mid ID of the subscribe request @type int @@ -401,6 +417,20 @@ self.__updateUnsubscribeTopicComboBox() self.__updatePublishTopicComboBox() + @pyqtSlot(int, list) + def __topicSubscribedV5(self, mid, reasonCodes): + """ + Private slot to handle being subscribed to topics (MQTT v5). + + @param mid ID of the subscribe request + @type int + @param grantedQos tuple of granted quality of service + @type tuple of int + """ + msg = mqttReasonCode(reasonCodes[0].value, reasonCodes[0].packetType) + self.__flashBrokerStatusLabel(msg) + self.__topicSubscribed(mid) + @pyqtSlot(int) def __topicUnsubscribed(self, mid): """
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/MqttMonitor/MqttReasonCodes.py Mon Jul 19 20:00:17 2021 +0200 @@ -0,0 +1,231 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2021 Detlev Offenbach <detlev@die-offenbachs.de> +# + +""" +Module implementing the translated MQTT v5 reason codes. +""" + +from PyQt6.QtCore import QCoreApplication + +from paho.mqtt.packettypes import PacketTypes + +MqttReasonCodeNames = { + 0: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Success" + ): [PacketTypes.CONNACK, PacketTypes.PUBACK, PacketTypes.PUBREC, + PacketTypes.PUBREL, PacketTypes.PUBCOMP, PacketTypes.UNSUBACK, + PacketTypes.AUTH], + QCoreApplication.translate( + "MqttReasonCodeNames", "Normal disconnection" + ): [PacketTypes.DISCONNECT], + QCoreApplication.translate( + "MqttReasonCodeNames", "Granted QoS 0" + ): [PacketTypes.SUBACK]}, + 1: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Granted QoS 1" + ): [PacketTypes.SUBACK]}, + 2: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Granted QoS 2" + ): [PacketTypes.SUBACK]}, + 4: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Disconnect with will message" + ): [PacketTypes.DISCONNECT]}, + 16: { + QCoreApplication.translate( + "MqttReasonCodeNames", "No matching subscribers" + ): [PacketTypes.PUBACK, PacketTypes.PUBREC]}, + 17: { + QCoreApplication.translate( + "MqttReasonCodeNames", "No subscription found" + ): [PacketTypes.UNSUBACK]}, + 24: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Continue authentication" + ): [PacketTypes.AUTH]}, + 25: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Re-authenticate" + ): [PacketTypes.AUTH]}, + 128: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Unspecified error" + ): [PacketTypes.CONNACK, PacketTypes.PUBACK, + PacketTypes.PUBREC, PacketTypes.SUBACK, + PacketTypes.UNSUBACK, PacketTypes.DISCONNECT]}, + 129: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Malformed packet" + ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]}, + 130: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Protocol error" + ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]}, + 131: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Implementation specific error" + ): [PacketTypes.CONNACK, PacketTypes.PUBACK, + PacketTypes.PUBREC, PacketTypes.SUBACK, + PacketTypes.UNSUBACK, PacketTypes.DISCONNECT]}, + 132: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Unsupported protocol version" + ): [PacketTypes.CONNACK]}, + 133: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Client identifier not valid" + ): [PacketTypes.CONNACK]}, + 134: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Bad user name or password" + ): [PacketTypes.CONNACK]}, + 135: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Not authorized" + ): [PacketTypes.CONNACK, PacketTypes.PUBACK, + PacketTypes.PUBREC, PacketTypes.SUBACK, + PacketTypes.UNSUBACK, PacketTypes.DISCONNECT]}, + 136: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Server unavailable" + ): [PacketTypes.CONNACK]}, + 137: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Server busy" + ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]}, + 138: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Banned" + ): [PacketTypes.CONNACK]}, + 139: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Server shutting down" + ): [PacketTypes.DISCONNECT]}, + 140: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Bad authentication method" + ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]}, + 141: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Keep alive timeout" + ): [PacketTypes.DISCONNECT]}, + 142: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Session taken over" + ): [PacketTypes.DISCONNECT]}, + 143: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Topic filter invalid" + ): [PacketTypes.SUBACK, PacketTypes.UNSUBACK, + PacketTypes.DISCONNECT]}, + 144: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Topic name invalid" + ): [PacketTypes.CONNACK, PacketTypes.PUBACK, + PacketTypes.PUBREC, PacketTypes.DISCONNECT]}, + 145: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Packet identifier in use" + ): [PacketTypes.PUBACK, PacketTypes.PUBREC, + PacketTypes.SUBACK, PacketTypes.UNSUBACK]}, + 146: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Packet identifier not found" + ): [PacketTypes.PUBREL, PacketTypes.PUBCOMP]}, + 147: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Receive maximum exceeded" + ): [PacketTypes.DISCONNECT]}, + 148: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Topic alias invalid" + ): [PacketTypes.DISCONNECT]}, + 149: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Packet too large" + ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]}, + 150: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Message rate too high" + ): [PacketTypes.DISCONNECT]}, + 151: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Quota exceeded" + ): [PacketTypes.CONNACK, PacketTypes.PUBACK, + PacketTypes.PUBREC, PacketTypes.SUBACK, + PacketTypes.DISCONNECT], }, + 152: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Administrative action" + ): [PacketTypes.DISCONNECT]}, + 153: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Payload format invalid" + ): [PacketTypes.PUBACK, PacketTypes.PUBREC, PacketTypes.DISCONNECT]}, + 154: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Retain not supported" + ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]}, + 155: { + QCoreApplication.translate( + "MqttReasonCodeNames", "QoS not supported" + ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]}, + 156: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Use another server" + ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]}, + 157: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Server moved" + ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]}, + 158: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Shared subscription not supported" + ): [PacketTypes.SUBACK, PacketTypes.DISCONNECT]}, + 159: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Connection rate exceeded" + ): [PacketTypes.CONNACK, PacketTypes.DISCONNECT]}, + 160: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Maximum connect time" + ): [PacketTypes.DISCONNECT]}, + 161: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Subscription identifiers not supported" + ): [PacketTypes.SUBACK, PacketTypes.DISCONNECT]}, + 162: { + QCoreApplication.translate( + "MqttReasonCodeNames", "Wildcard subscription not supported" + ): [PacketTypes.SUBACK, PacketTypes.DISCONNECT]}, +} + +def mqttReasonCode(rc, packetType): + """ + Function to get the readable reason code string given the result code and + the packet type. + + @param rc result code + @type int + @param packetType packet type + @type PacketTypes (= int) + """ + if rc not in MqttReasonCodeNames: + return QCoreApplication.translate( + "MqttReasonCodeNames", "Unknown result code ({0})").format(rc) + + messages = MqttReasonCodeNames[rc] + messagesList = [message for message in messages.keys() + if packetType in messages[message]] + if len(messagesList) == 0: + return QCoreApplication.translate( + "MqttReasonCodeNames", + "Unknown result code ({0}) for packet type '{1}'" + ).format(rc, packetType) + + return messagesList[0]
--- a/PluginMqttMonitor.epj Sun Jul 18 19:32:16 2021 +0200 +++ b/PluginMqttMonitor.epj Mon Jul 19 20:00:17 2021 +0200 @@ -213,7 +213,8 @@ "MqttMonitor/MqttMonitorWidget.py", "MqttMonitor/__init__.py", "PluginMqttMonitor.py", - "__init__.py" + "__init__.py", + "MqttMonitor/MqttReasonCodes.py" ], "SPELLEXCLUDES": "", "SPELLLANGUAGE": "en",