MqttMonitor/MqttClient.py

Sat, 19 Oct 2024 16:04:24 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sat, 19 Oct 2024 16:04:24 +0200
branch
eric7
changeset 151
f9830d91be9a
parent 143
51bc5bcc672a
child 154
b66cfd856e93
permissions
-rw-r--r--

Fixed a backward compatibility issue with eric-ide < 24.11.

# -*- coding: utf-8 -*-

# Copyright (c) 2018 - 2024 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing a PyQt wrapper around the paho MQTT client.
"""

import paho.mqtt.client as mqtt

from paho.mqtt.packettypes import PacketTypes
from paho.mqtt.properties import Properties
from PyQt6.QtCore import QCoreApplication, QObject, QTimer, pyqtSignal, pyqtSlot

try:
    from eric7.EricUtilities.crypto import pwConvert
except ImportError:
    # backward compatibility for eric-ide < 24.11
    from eric7.Utilities.crypto import pwConvert

from .MqttProtocols import MqttProtocols


class MqttClient(QObject):
    """
    Class implementing a PyQt wrapper around the paho MQTT client.

    @signal onConnect(connectFlags, rc, packetType, properties) emitted after the
        client has connected to the broker
    @signal onDisconnected(rc, packetType) emitted after the client has
        disconnected from the broker
    @signal onLog(level, message) emitted to send client log data
    @signal onMessage(topic, payload, qos, retain, properties) emitted after
        a message has been received by the client
    @signal onPublish(mid) emitted after a message has been published
    @signal onSubscribe(mid, reasonCodes, properties) emitted after the
        client has subscribed to some topics
    @signal onUnsubscribe(mid) emitted after the client has unsubscribed from
        some topics (MQTT v3)
    @signal onUnsubscribe(mid, rc, packetType, properties) emitted after the
        client has unsubscribed from some topics (MQTT v5)
    @signal connectTimeout() emitted to indicate, that a connection attempt
        timed out
    """

    onConnect = pyqtSignal(bool, int, int, dict)
    onDisconnected = pyqtSignal(int, int)
    onLog = pyqtSignal(int, str)
    onMessage = pyqtSignal(str, bytes, int, bool, dict)
    onPublish = pyqtSignal(int)
    onSubscribe = pyqtSignal(int, list, dict)
    onUnsubscribe = pyqtSignal((int,), (int, int, int, dict))

    connectTimeout = pyqtSignal()

    DefaultConnectTimeout = 15  # connect timeout in seconds

    LogDebug = 0x01
    LogInfo = 0x02
    LogNotice = 0x04
    LogWarning = 0x08
    LogError = 0x10
    LogDisabled = 0xFF
    LogLevelMap = {
        mqtt.MQTT_LOG_DEBUG: LogDebug,
        mqtt.MQTT_LOG_INFO: LogInfo,
        mqtt.MQTT_LOG_NOTICE: LogNotice,
        mqtt.MQTT_LOG_WARNING: LogWarning,  # __NO-TASK__
        mqtt.MQTT_LOG_ERR: LogError,
    }

    def __init__(
        self,
        clientId="",
        cleanSession=True,
        userdata=None,
        protocol=mqtt.MQTTv311,
        transport="tcp",
        parent=None,
    ):
        """
        Constructor

        @param clientId ID to be used for the client
        @type str
        @param cleanSession flag indicating to start a clean session
        @type bool
        @param userdata user data
        @type any
        @param protocol version of the MQTT protocol to use
        @type int, one of mqtt.MQTTv31, mqtt.MQTTv311 or mqtt.MQTTv5
        @param transport transport to be used
        @type str, one of "tcp" or "websockets"
        @param parent reference to the parent object
        @type QObject
        """
        QObject.__init__(self, parent=parent)

        self.__loopStarted = False

        self.__connectTimeoutTimer = QTimer(self)
        self.__connectTimeoutTimer.setSingleShot(True)
        self.__connectTimeoutTimer.setInterval(MqttClient.DefaultConnectTimeout * 1000)
        self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout)

        self.onConnect.connect(self.__connectTimeoutTimer.stop)

        self.__cleanSession = cleanSession
        self.__protocol = protocol
        self.__disconnectUserProperties = []

        if protocol == MqttProtocols.MQTTv5:
            cleanSession = None

        self.__mqttClient = mqtt.Client(
            mqtt.CallbackAPIVersion.VERSION2,
            client_id=clientId,
            clean_session=cleanSession,
            userdata=userdata,
            protocol=int(protocol),
            transport=transport,
        )

        self.__initCallbacks()

    def __initCallbacks(self):
        """
        Private method to initialize the MQTT callback methods.
        """
        self.__mqttClient.on_connect = self.__onConnect
        self.__mqttClient.on_disconnect = self.__onDisconnected
        self.__mqttClient.on_log = self.__onLog
        self.__mqttClient.on_message = self.__onMessage
        self.__mqttClient.on_publish = self.__onPublish
        self.__mqttClient.on_subscribe = self.__onSubscribe
        self.__mqttClient.on_unsubscribe = self.__onUnsubscribe

    def __onConnect(
        self,
        _client,
        _userdata,
        connectFlags,
        rc,
        properties,
    ):
        """
        Private method to handle the connect to the broker.

        @param _client reference to the client object (unused)
        @type paho.mqtt.Client
        @param _userdata user data (unused)
        @type Any
        @param connectFlags response flags sent by the broker
        @type mqtt.ConnectFlags
        @param rc reason code
        @type paho.mqtt.ReasonCodes
        @param properties MQTT v5.0 properties received from the broker
        @type dict
        """
        self.onConnect.emit(
            connectFlags.session_present, rc.value, rc.packetType, properties.json()
        )

    def __onDisconnected(
        self,
        _client,
        _userdata,
        _flags,
        rc,
        _properties=None,
    ):
        """
        Private method to handle the disconnect from the broker.

        @param _client reference to the client object (unused)
        @type paho.mqtt.Client
        @param _userdata user data (unused)
        @type Any
        @param _flags dictionary containing the response flags sent by the broker
            (unused)
        @type dict
        @param rc result code or reason code
        @type int or paho.mqtt.ReasonCodes
        @param _properties MQTT v5.0 properties received from the broker
            (defaults to None) (unused)
        @type dict (optional)
        """
        if isinstance(rc, int):
            packetType = PacketTypes.DISCONNECT
            resultCode = rc
        else:
            packetType = rc.packetType
            resultCode = rc.value
        self.onDisconnected.emit(resultCode, packetType)

    def __onSubscribe(
        self,
        _client,
        _userdata,
        mid,
        reasonCodes,
        properties,
    ):
        """
        Private method to handle a subscribe event.

        @param _client reference to the client object (unused)
        @type paho.mqtt.Client
        @param _userdata user data (unused)
        @type Any
        @param mid message ID
        @type int
        @param reasonCodes list of reason code for each subscribed topic
        @type list of paho.mqtt.ReasonCodes
        @param properties MQTT v5.0 properties received from the broker
        @type dict
        """
        self.onSubscribe.emit(mid, reasonCodes, properties.json())

    def __onUnsubscribe(
        self,
        _client,
        _userdata,
        mid,
        reasonCodes,
        properties,
    ):
        """
        Private method to handle an unsubscribe event.

        @param _client reference to the client object (unused)
        @type paho.mqtt.Client
        @param _userdata user data (unused)
        @type Any
        @param mid message ID
        @type int
        @param reasonCodes list of reason code for each unsubscribed topic
        @type list of paho.mqtt.ReasonCodes
        @param properties MQTT v5.0 properties received from the broker
        @type dict
        """
        if reasonCodes:
            self.onUnsubscribe[int, int, int, dict].emit(
                mid, reasonCodes[0].value, reasonCodes[0].packetType, properties.json()
            )
        else:
            self.onUnsubscribe[int].emit(mid)

    def __onMessage(
        self,
        _client,
        _userdata,
        message,
    ):
        """
        Private method to handle a new message received from the broker.

        @param _client reference to the client object (unused)
        @type paho.mqtt.Client
        @param _userdata user data (unused)
        @type Any
        @param message received message object
        @type paho.mqtt.MQTTMessage
        """
        self.onMessage.emit(
            message.topic,
            message.payload,
            message.qos,
            message.retain,
            message.properties.json() if message.properties is not None else {},
        )

    def __onLog(
        self,
        _client,
        _userdata,
        level,
        buf,
    ):
        """
        Private method to handle a log event.

        @param _client reference to the client object (unused)
        @type paho.mqtt.Client
        @param _userdata user data (unused)
        @type Any
        @param level severity of the log message
        @type int
        @param buf log message
        @type str
        """
        self.onLog.emit(level, buf)

    def __onPublish(
        self,
        _client,
        _userdata,
        mid,
        _reasonCode,
        _properties,
    ):
        """
        Private method to handle the publishing of a message.

        @param _client reference to the client object (unused)
        @type paho.mqtt.Client
        @param _userdata user data (unused)
        @type Any
        @param mid message ID
        @type int
        @param _reasonCode reason code (unused)
        @type paho.mqtt.ReasonCodes
        @param _properties MQTT v5.0 properties received from the broker (unused)
        @type dict
        """
        self.onPublish.emit(mid)

    @pyqtSlot()
    def __connectTimeout(self):
        """
        Private slot handling a failed connection attempt.
        """
        self.stopLoop()
        self.connectTimeout.emit()

    def setConnectionTimeout(self, timeout):
        """
        Public method to set the connection timeout value.

        @param timeout timeout value to be set in seconds
        @type int
        """
        self.__connectTimeoutTimer.setInterval(timeout * 1000)

    def setMaxInflightMessages(self, inflight=20):
        """
        Public method to set the maximum number of messages with QoS > 0 that
        can be part way through their network flow at once.

        @param inflight maximum number of messages in flight
        @type int
        """
        self.__mqttClient.max_inflight_messages_set(inflight)

    def setMaxQueuedMessages(self, queueSize=0):
        """
        Public method to set the maximum number of messages with QoS > 0 that
        can be pending in the outgoing message queue.

        @param queueSize maximum number of queued messages (0 = unlimited)
        @type int
        """
        self.__mqttClient.max_queued_messages_set(queueSize)

    def setUserCredentials(self, username, password=None):
        """
        Public method to set the user name and optionally the password.

        @param username user name to be set
        @type str
        @param password optional password
        @type str
        """
        self.__mqttClient.username_pw_set(username, password=password)

    def setUserData(self, userdata):
        """
        Public method to set the user data.

        @param userdata user data
        @type any
        """
        self.__mqttClient.user_data_set(userdata)

    def setLastWill(self, topic, payload=None, qos=0, retain=False, properties=None):
        """
        Public method to set the last will of the client.

        @param topic topic the will message should be published on
        @type str
        @param payload message to send as a will
        @type str, bytes, int or float
        @param qos quality of service level to use for the will
        @type int, one of 0, 1 or 2
        @param retain flag indicating to set as the "last known good"/retained
            message for the will topic
        @type bool
        @param properties list of user properties to be sent with the
            last will message
        @type list of tuple of (str, str)
        """
        self.__mqttClient.will_set(
            topic, payload=payload, qos=qos, retain=retain, properties=properties
        )

    def clearLastWill(self):
        """
        Public method to remove a will that was previously configured with
        setLastWill().
        """
        self.__mqttClient.will_clear()

    def setTLS(self, caCerts=None, certFile=None, keyFile=None):
        """
        Public method to enable secure connections and set the TLS parameters.

        @param caCerts path to the Certificate Authority certificates file
        @type str
        @param certFile PEM encoded client certificate file
        @type str
        @param keyFile PEM encoded private key file
        @type str
        @return tuple containing a success flag and the error string of the
            paho-mqtt library
        @rtype tuple of (bool, str)
        """
        try:
            self.__mqttClient.tls_set(
                ca_certs=caCerts, certfile=certFile, keyfile=keyFile
            )
            return True, ""
        except (FileNotFoundError, ValueError) as err:
            return False, str(err)

        return False, "unspecific error occurred"

    def getProtocol(self):
        """
        Public method to get the MQTT protocol version.

        @return MQTT protocol version in use
        @rtype int
        """
        return self.__protocol

    def startLoop(self):
        """
        Public method to start the MQTT client loop.
        """
        self.__mqttClient.loop_start()
        self.__loopStarted = True

    def stopLoop(self):
        """
        Public method to stop the MQTT client loop.
        """
        self.__mqttClient.loop_stop()
        self.__loopStarted = False

    def connectToServer(
        self,
        host,
        port=1883,
        keepalive=60,
        bindAddress="",
        properties=None,
        clearWill=False,
    ):
        """
        Public method to connect to a remote MQTT broker.

        @param host host name or IP address of the remote broker
        @type str
        @param port network port of the server host to connect to (default:
            1883, using TLS: 8883)
        @type int
        @param keepalive maximum period in seconds allowed between
            communications with the broker
        @type int
        @param bindAddress IP address of a local network interface to bind
            this client to
        @type str
        @param properties list of user properties to be sent with the
            subscription
        @type list of tuple of (str, str)
        @param clearWill flag indicating to clear the last will previously set
        @type bool
        """
        if clearWill:
            self.clearLastWill()

        props = (
            self.__createPropertiesObject(PacketTypes.CONNECT, properties)
            if properties
            else None
        )
        self.__mqttClient.connect_async(
            host,
            port=port,
            keepalive=keepalive,
            bind_address=bindAddress,
            clean_start=self.__cleanSession,
            properties=props,
        )

        self.__connectTimeoutTimer.start()

        if not self.__loopStarted:
            self.startLoop()

    def connectToServerWithOptions(
        self, host, port=1883, bindAddress="", options=None, clearWill=False
    ):
        """
        Public method to connect to a remote MQTT broker.

        @param host host name or IP address of the remote broker
        @type str
        @param port network port of the server host to connect to (default:
            1883, using TLS: 8883)
        @type int
        @param bindAddress IP address of a local network interface to bind
            this client to
        @type str
        @param options dictionary containing the connection options. This
            dictionary should contain the keys "ClientId", "ConnectionTimeout",
            "Keepalive", "CleanSession", "Username", "Password", "WillTopic",
            "WillMessage", "WillQos", "WillRetain", "WillProperties",
            "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey",
            "UserProperties".
        @type dict
        @param clearWill flag indicating to clear the last will previously set
        @type bool
        """
        if options:
            parametersDict = self.defaultConnectionOptions()
            parametersDict.update(options)

            self.setConnectionTimeout(parametersDict["ConnectionTimeout"])

            # step 1: set username and password
            if parametersDict["Username"]:
                if parametersDict["Password"]:
                    self.setUserCredentials(
                        parametersDict["Username"],
                        pwConvert(parametersDict["Password"], encode=False),
                    )
                else:
                    self.setUserCredentials(parametersDict["Username"])

            # step 2: set last will data
            if not clearWill and parametersDict["WillTopic"]:
                if parametersDict["WillMessage"]:
                    willMessage = parametersDict["WillMessage"]
                else:
                    # empty message to clear the will
                    willMessage = None
                props = (
                    self.__createPropertiesObject(
                        PacketTypes.WILLMESSAGE, parametersDict["WillProperties"]
                    )
                    if (
                        parametersDict["WillProperties"]
                        and self.__protocol == MqttProtocols.MQTTv5
                    )
                    else None
                )
                self.setLastWill(
                    parametersDict["WillTopic"],
                    payload=willMessage,
                    qos=parametersDict["WillQos"],
                    retain=parametersDict["WillRetain"],
                    properties=props,
                )

            # step 3: set TLS parameters
            if parametersDict["TlsEnable"]:
                if parametersDict["TlsCaCert"] and parametersDict["TlsClientCert"]:
                    # use self signed client certificate
                    self.setTLS(
                        caCerts=parametersDict["TlsCaCert"],
                        certFile=parametersDict["TlsClientCert"],
                        keyFile=parametersDict["TlsClientKey"],
                    )
                elif parametersDict["TlsCaCert"]:
                    # use CA certificate file
                    self.setTLS(caCerts=parametersDict["TlsCaCert"])
                else:
                    # use default TLS configuration
                    self.setTLS()

            # step 4: get the connect user properties
            if self.__protocol == MqttProtocols.MQTTv5:
                try:
                    userProperties = parametersDict["UserProperties"]
                    properties = userProperties["connect"][:]
                    self.__disconnectUserProperties = (
                        userProperties["connect"][:]
                        if userProperties["use_connect"]
                        else userProperties["disconnect"][:]
                    )
                except KeyError:
                    properties = None
            else:
                properties = None
            # step 4: connect to server
            self.__cleanSession = parametersDict["CleanSession"]
            self.connectToServer(
                host,
                port=port,
                keepalive=parametersDict["Keepalive"],
                properties=properties,
                clearWill=clearWill,
            )
        else:
            keepalive = self.defaultConnectionOptions()["Keepalive"]
            self.connectToServer(
                host,
                port=port,
                keepalive=keepalive,
                bindAddress=bindAddress,
                clearWill=clearWill,
            )

    @classmethod
    def defaultConnectionOptions(cls):
        """
        Class method to get a connection options dictionary with default
        values.

        @return dictionary containing the default connection options. It has
            the keys "ClientId", "Protocol", "ConnectionTimeout", "Keepalive",
            "CleanSession", "Username", "Password", "WillTopic", "WillMessage",
            "WillQos", "WillRetain", "WillProperties", "TlsEnable",
            "TlsCaCert", "TlsClientCert", "TlsClientKey", "UserProperties".
        @rtype dict
        """
        from PluginMqttMonitor import mqttPluginObject  # noqa: I102

        return {
            "ClientId": "ERIC7_MQTT_MONITOR_CLIENT",
            "Protocol": mqttPluginObject.getPreferences("DefaultProtocol"),
            "ConnectionTimeout": MqttClient.DefaultConnectTimeout,
            "Keepalive": 60,
            "CleanSession": True,
            "Username": "",
            "Password": "",
            "WillTopic": "",
            "WillMessage": "",
            "WillQos": 0,
            "WillRetain": False,
            "WillProperties": [],
            "TlsEnable": False,
            "TlsCaCert": "",
            "TlsClientCert": "",
            "TlsClientKey": "",
            "UserProperties": {
                "connect": [],
                "disconnect": [],
                "use_connect": True,
            },
        }

    def reconnectToServer(self):
        """
        Public method to reconnect the client with the same parameters.
        """
        self.__connectTimeoutTimer.start()

        self.__mqttClient.reconnect()

        if not self.__loopStarted:
            self.startLoop()

    def disconnectFromServer(self):
        """
        Public method to disconnect the client from the remote broker.
        """
        self.__connectTimeoutTimer.stop()

        props = (
            self.__createPropertiesObject(
                PacketTypes.DISCONNECT, self.__disconnectUserProperties
            )
            if self.__disconnectUserProperties
            else None
        )
        self.__mqttClient.disconnect(properties=props)

    def subscribe(self, topic, qos=0, properties=None):
        """
        Public method to subscribe to topics with quality of service.

        @param topic single topic to subscribe to or a tuple with a topic
            and a QoS or a list of tuples with a topic and a QoS each
        @type str or tuple of (str, int) or list of tuple of (str, int)
        @param qos quality of service
        @type int, one of 0, 1 or 2
        @param properties list of user properties to be sent with the
            subscription
        @type list of tuple of (str, str)
        @return tuple containing the result code and the message ID
        @rtype tuple of (int, int)
        """
        props = (
            self.__createPropertiesObject(PacketTypes.SUBSCRIBE, properties)
            if properties
            else None
        )
        return self.__mqttClient.subscribe(topic, qos=qos, properties=props)

    def unsubscribe(self, topic, properties=None):
        """
        Public method to unsubscribe topics.

        @param topic topic or list of topics to unsubscribe
        @type str or list of str
        @param properties list of user properties to be sent with the
            subscription
        @type list of tuple of (str, str)
        @return tuple containing the result code and the message ID
        @rtype tuple of (int, int)
        """
        props = (
            self.__createPropertiesObject(PacketTypes.UNSUBSCRIBE, properties)
            if properties
            else None
        )
        return self.__mqttClient.unsubscribe(topic, properties=props)

    def publish(self, topic, payload=None, qos=0, retain=False, properties=None):
        """
        Public method to publish to a topic.

        @param topic topic to publish to
        @type str
        @param payload data to be published
        @type str, bytes, int or float
        @param qos quality of service
        @type int, one of 0, 1 or 2
        @param retain flag indicating to set as the "last known good"/retained
            message for the topic
        @type bool
        @param properties list of user properties to be sent with the
            subscription
        @type list of tuple of (str, str)
        @return message info object
        @rtype mqtt.MQTTMessageInfo
        """
        props = (
            self.__createPropertiesObject(PacketTypes.PUBLISH, properties)
            if properties
            else None
        )
        return self.__mqttClient.publish(
            topic, payload=payload, qos=qos, retain=retain, properties=props
        )

    def __createPropertiesObject(self, packetType, properties):
        """
        Private method to assemble the MQTT v5 properties object.

        @param packetType type of the MQTT packet
        @type PacketTypes (= int)
        @param properties list of user properties
        @type list of tuple of (str, str)
        @return MQTT v5 properties object
        @rtype Properties
        """
        props = Properties(packetType)
        props.UserProperty = properties
        return props


def mqttErrorMessage(mqttErrno):
    """
    Module function to get the error string associated with an MQTT error
    number.

    @param mqttErrno result code of a MQTT request
    @type int
    @return textual representation of the result code
    @rtype str
    """
    if mqttErrno == mqtt.MQTT_ERR_SUCCESS:
        return QCoreApplication.translate("MqttErrorMessage", "No error.")
    elif mqttErrno == mqtt.MQTT_ERR_NOMEM:
        return QCoreApplication.translate("MqttErrorMessage", "Out of memory.")
    elif mqttErrno == mqtt.MQTT_ERR_PROTOCOL:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "A network protocol error occurred when communicating with the broker.",
        )
    elif mqttErrno == mqtt.MQTT_ERR_INVAL:
        return QCoreApplication.translate(
            "MqttErrorMessage", "Invalid function arguments provided."
        )
    elif mqttErrno == mqtt.MQTT_ERR_NO_CONN:
        return QCoreApplication.translate(
            "MqttErrorMessage", "The client is not currently connected."
        )
    elif mqttErrno == mqtt.MQTT_ERR_CONN_REFUSED:
        return QCoreApplication.translate(
            "MqttErrorMessage", "The connection was refused."
        )
    elif mqttErrno == mqtt.MQTT_ERR_NOT_FOUND:
        return QCoreApplication.translate(
            "MqttErrorMessage", "Message not found (internal error)."
        )
    elif mqttErrno == mqtt.MQTT_ERR_CONN_LOST:
        return QCoreApplication.translate(
            "MqttErrorMessage", "The connection was lost."
        )
    elif mqttErrno == mqtt.MQTT_ERR_TLS:
        return QCoreApplication.translate("MqttErrorMessage", "A TLS error occurred.")
    elif mqttErrno == mqtt.MQTT_ERR_PAYLOAD_SIZE:
        return QCoreApplication.translate("MqttErrorMessage", "Payload too large.")
    elif mqttErrno == mqtt.MQTT_ERR_NOT_SUPPORTED:
        return QCoreApplication.translate(
            "MqttErrorMessage", "This feature is not supported."
        )
    elif mqttErrno == mqtt.MQTT_ERR_AUTH:
        return QCoreApplication.translate("MqttErrorMessage", "Authorisation failed.")
    elif mqttErrno == mqtt.MQTT_ERR_ACL_DENIED:
        return QCoreApplication.translate("MqttErrorMessage", "Access denied by ACL.")
    elif mqttErrno == mqtt.MQTT_ERR_UNKNOWN:
        return QCoreApplication.translate("MqttErrorMessage", "Unknown error.")
    elif mqttErrno == mqtt.MQTT_ERR_ERRNO:
        return QCoreApplication.translate("MqttErrorMessage", "Error defined by errno.")
    elif mqttErrno == mqtt.MQTT_ERR_QUEUE_SIZE:
        return QCoreApplication.translate("MqttErrorMessage", "Message queue full.")
    else:
        return QCoreApplication.translate("MqttErrorMessage", "Unknown error.")


def mqttLogLevelString(mqttLogLevel, isMqttLogLevel=True):
    """
    Module function to get the log level string associated with a log level.

    @param mqttLogLevel log level of the paho-mqtt client
    @type int
    @param isMqttLogLevel flag indicating a MQTT log level is given (if
        False it is the MqttClient variant, i.e. Debug being lowest)
    @type bool
    @return textual representation of the log level
    @rtype str
    """
    if isMqttLogLevel:
        try:
            logLevel = MqttClient.LogLevelMap[mqttLogLevel]
        except KeyError:
            return QCoreApplication.translate("MqttLogLevelString", "Unknown")
    else:
        logLevel = mqttLogLevel

    if logLevel == MqttClient.LogInfo:
        return QCoreApplication.translate("MqttLogLevelString", "Info")
    elif logLevel == MqttClient.LogNotice:
        return QCoreApplication.translate("MqttLogLevelString", "Notice")
    elif logLevel == MqttClient.LogWarning:
        return QCoreApplication.translate("MqttLogLevelString", "Warning")
    elif logLevel == MqttClient.LogError:
        return QCoreApplication.translate("MqttLogLevelString", "Error")
    elif logLevel == MqttClient.LogDebug:
        return QCoreApplication.translate("MqttLogLevelString", "Debug")
    elif logLevel == MqttClient.LogDisabled:
        return QCoreApplication.translate("MqttLogLevelString", "Logging Disabled")
    else:
        return QCoreApplication.translate("MqttLogLevelString", "Unknown")

eric ide

mercurial