MqttMonitor/MqttClient.py

Sun, 30 May 2021 18:21:40 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sun, 30 May 2021 18:21:40 +0200
branch
eric7
changeset 92
2fb5c08019fd
parent 84
044df16e55aa
child 95
d830314cca87
permissions
-rw-r--r--

Ported the plug-in to PyQt6 for eric7.

# -*- coding: utf-8 -*-

# Copyright (c) 2018 - 2021 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing a PyQt wrapper around the paho MQTT client.
"""

from PyQt6.QtCore import (
    pyqtSignal, pyqtSlot, QObject, QCoreApplication, QTimer
)

import paho.mqtt.client as mqtt

from Utilities.crypto import pwConvert


class MqttClient(QObject):
    """
    Class implementing a PyQt wrapper around the paho MQTT client.
    
    @signal onConnect(flags, rc) emitted after the client has connected to the
        broker
    @signal onDisconnected(rc) emitted after the client has disconnected from
        the broker
    @signal onLog(level, message) emitted to send client log data
    @signal onMessage(topic, payload, qos, retain) emitted after a message has
        been received by the client
    @signal onPublish(mid) emitted after a message has been published
    @signal onSubscribe(mid, grantedQos) emitted after the client has
        subscribed to some topics
    @signal onUnsubscribe(mid) emitted after the client has unsubscribed from
        some topics
    @signal connectTimeout() emitted to indicate, that a connection attempt
        timed out
    """
    onConnect = pyqtSignal(dict, int)
    onDisconnected = pyqtSignal(int)
    onLog = pyqtSignal(int, str)
    onMessage = pyqtSignal(str, bytes, int, bool)
    onPublish = pyqtSignal(int)
    onSubscribe = pyqtSignal(int, tuple)
    onUnsubscribe = pyqtSignal(int)
    
    connectTimeout = pyqtSignal()
    
    DefaultConnectTimeout = 15      # connect timeout in seconds
    
    LogDebug = 0x01
    LogInfo = 0x02
    LogNotice = 0x04
    LogWarning = 0x08
    LogError = 0x10
    LogDisabled = 0xff
    LogLevelMap = {
        mqtt.MQTT_LOG_DEBUG: LogDebug,
        mqtt.MQTT_LOG_INFO: LogInfo,
        mqtt.MQTT_LOG_NOTICE: LogNotice,
        mqtt.MQTT_LOG_WARNING: LogWarning,                        # __NO-TASK__
        mqtt.MQTT_LOG_ERR: LogError,
    }
    
    def __init__(self, clientId="", cleanSession=True, userdata=None,
                 protocol=mqtt.MQTTv311, transport="tcp", parent=None):
        """
        Constructor
        
        @param clientId ID to be used for the client
        @type str
        @param cleanSession flag indicating to start a clean session
        @type bool
        @param userdata user data
        @type any
        @param protocol version of the MQTT protocol to use
        @type int, one of mqtt.MQTTv3, mqtt.MQTTv311 or mqtt.MQTTv5
        @param transport transport to be used
        @type str, one of "tcp" or "websockets"
        @param parent reference to the parent object
        @type QObject
        """
        QObject.__init__(self, parent=parent)
        
        self.__loopStarted = False
        
        self.__connectTimeoutTimer = QTimer(self)
        self.__connectTimeoutTimer.setSingleShot(True)
        self.__connectTimeoutTimer.setInterval(
            MqttClient.DefaultConnectTimeout * 1000)
        self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout)
        
        self.onConnect.connect(self.__connectTimeoutTimer.stop)
        
        # TODO: MQTTv5: set clean_session to None and remember cleanSession
        self.__mqttClient = mqtt.Client(
            client_id=clientId, clean_session=cleanSession, userdata=None,
            protocol=mqtt.MQTTv311, transport="tcp")
        
        self.__initCallbacks()

    def __initCallbacks(self):
        """
        Private method to initialize the MQTT callback methods.
        """
        # TODO: add properties to signal
        # TODO: MQTTv5: add support for MQTTv5 signature
        self.__mqttClient.on_connect = (
            lambda client, userdata, flags, rc, properties:
                self.onConnect.emit(flags, rc))
        # TODO: MQTTv5: add support for MQTTv5 signature
        self.__mqttClient.on_disconnect = (
            lambda client, userdata, rc: self.onDisconnected.emit(rc))
        self.__mqttClient.on_log = (
            lambda client, userdata, level, buf: self.onLog.emit(level, buf))
        self.__mqttClient.on_message = (
            lambda client, userdata, message: self.onMessage.emit(
                message.topic, message.payload, message.qos, message.retain))
        self.__mqttClient.on_publish = (
            lambda client, userdata, mid: self.onPublish.emit(mid))
        # TODO: add properties to signal
        # TODO: MQTTv5: add support for MQTTv5 signature
        self.__mqttClient.on_subscribe = (
            lambda client, userdata, mid, grantedQos, properties:
                self.onSubscribe.emit(mid, grantedQos))
        # TODO: MQTTv5: add support for MQTTv5 signature
        self.__mqttClient.on_unsubscribe = (
            lambda client, userdata, mid: self.onUnsubscribe.emit(mid))
    
    @pyqtSlot()
    def __connectTimeout(self):
        """
        Private slot handling a failed connection attempt.
        """
        self.stopLoop()
        self.connectTimeout.emit()
    
    def reinitialise(self, clientId="", cleanSession=True, userdata=None):
        """
        Public method to reinitialize the client with given data.
        
        @param clientId ID to be used for the client
        @type str
        @param cleanSession flag indicating to start a clean session
        @type bool
        @param userdata user data
        @type any
        """
        self.__mqttClient.reinitialise(
            client_id=clientId, clean_session=cleanSession, userdata=userdata)
        
        self.__initCallbacks()
    
    def setConnectionTimeout(self, timeout):
        """
        Public method to set the connection timeout value.
        
        @param timeout timeout value to be set in seconds
        @type int
        """
        self.__connectTimeoutTimer.setInterval(timeout * 1000)
    
    def setMaxInflightMessages(self, inflight=20):
        """
        Public method to set the maximum number of messages with QoS > 0 that
        can be part way through their network flow at once.
        
        @param inflight maximum number of messages in flight
        @type int
        """
        self.__mqttClient.max_inflight_messages_set(inflight)
    
    def setMaxQueuedMessages(self, queueSize=0):
        """
        Public method to set the maximum number of messages with QoS > 0 that
        can be pending in the outgoing message queue.
        
        @param queueSize maximum number of queued messages (0 = unlimited)
        @type int
        """
        self.__mqttClient.max_queued_messages_set(queueSize)
    
    def setUserCredentials(self, username, password=None):
        """
        Public method to set the user name and optionally the password.
        
        @param username user name to be set
        @type str
        @param password optional password
        @type str
        """
        self.__mqttClient.username_pw_set(username, password=password)
    
    def setUserData(self, userdata):
        """
        Public method to set the user data.
        
        @param userdata user data
        @type any
        """
        self.__mqttClient.user_data_set(userdata)
    
    # TODO: MQTTv5: add support for properties
    def setLastWill(self, topic, payload=None, qos=0, retain=False):
        """
        Public method to set the last will of the client.
        
        @param topic topic the will message should be published on
        @type str
        @param payload message to send as a will
        @type str, bytes, int or float
        @param qos quality of service level to use for the will
        @type int, one of 0, 1 or 2
        @param retain flag indicating to set as the "last known good"/retained
            message for the will topic
        @type bool
        """
        self.__mqttClient.will_set(topic, payload=payload, qos=qos,
                                   retain=retain)
    
    def clearLastWill(self):
        """
        Public method to remove a will that was previously configured with
        setLastWill().
        """
        self.__mqttClient.will_clear()
    
    def setTLS(self, caCerts=None, certFile=None, keyFile=None):
        """
        Public method to enable secure connections and set the TLS parameters.
        
        @param caCerts path to the Certificate Authority certificates file
        @type str
        @param certFile PEM encoded client certificate file
        @type str
        @param keyFile PEM encoded private key file
        @type str
        @return tuple containing a success flag and the error string of the
            paho-mqtt library
        @rtype tuple of (bool, str)
        """
        try:
            self.__mqttClient.tls_set(ca_certs=caCerts, certfile=certFile,
                                      keyfile=keyFile)
            return True, ""
        except (ValueError, FileNotFoundError) as err:
            return False, str(err)
        
        return False, "unspecific error occurred"
    
    def startLoop(self):
        """
        Public method to start the MQTT client loop.
        """
        self.__mqttClient.loop_start()
        self.__loopStarted = True
    
    def stopLoop(self):
        """
        Public method to stop the MQTT client loop.
        """
        self.__mqttClient.loop_stop()
        self.__loopStarted = False
    
    def connectToServer(self, host, port=1883, keepalive=60, bindAddress="",
                        reinit=True):
        """
        Public method to connect to a remote MQTT broker.
        
        @param host host name or IP address of the remote broker
        @type str
        @param port network port of the server host to connect to (default:
            1883, using TLS: 8883)
        @type int
        @param keepalive maximum period in seconds allowed between
            communications with the broker
        @type int
        @param bindAddress IP address of a local network interface to bind
            this client to
        @type str
        @param reinit flag indicating to reinitialize the MQTT client before
            trying to connect with the given parameters
        @type bool
        """
        if reinit:
            self.reinitialise()
        # TODO: MQTTv5: use 'clean_start' set to the remembered 'cleanSession'
        # TODO: MQTTv5: add support for MQTTv5 properties
        self.__mqttClient.connect_async(
            host, port=port, keepalive=keepalive, bind_address=bindAddress)
        
        self.__connectTimeoutTimer.start()
        
        if not self.__loopStarted:
            self.startLoop()
    
    def connectToServerWithOptions(self, host, port=1883, bindAddress="",
                                   options=None):
        """
        Public method to connect to a remote MQTT broker.
        
        @param host host name or IP address of the remote broker
        @type str
        @param port network port of the server host to connect to (default:
            1883, using TLS: 8883)
        @type int
        @param bindAddress IP address of a local network interface to bind
            this client to
        @type str
        @param options dictionary containing the connection options. This
            dictionary should contain the keys "ClientId", "Keepalive",
            "CleanSession", "Username", "Password", "WillTopic", "WillMessage",
            "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", "TlsClientCert",
            "TlsClientKey", "ConnectionTimeout"
        @type dict
        """
        if options:
            parametersDict = self.defaultConnectionOptions()
            parametersDict.update(options)
            
            # step 1: reinitialize to set the client ID and clean session flag
            self.reinitialise(
                clientId=parametersDict["ClientId"],
                cleanSession=parametersDict["CleanSession"]
            )
            self.setConnectionTimeout(parametersDict["ConnectionTimeout"])
            
            # step 2: set username and password
            if parametersDict["Username"]:
                if parametersDict["Password"]:
                    self.setUserCredentials(
                        parametersDict["Username"],
                        pwConvert(parametersDict["Password"], encode=False))
                else:
                    self.setUserCredentials(parametersDict["Username"])
            
            # step 3: set last will data
            if parametersDict["WillTopic"]:
                if parametersDict["WillMessage"]:
                    willMessage = parametersDict["WillMessage"]
                else:
                    # empty message to clear the will
                    willMessage = None
                self.setLastWill(parametersDict["WillTopic"],
                                 willMessage,
                                 parametersDict["WillQos"],
                                 parametersDict["WillRetain"])
            
            # step 4: set TLS parameters
            if parametersDict["TlsEnable"]:
                if (
                    parametersDict["TlsCaCert"] and
                    parametersDict["TlsClientCert"]
                ):
                    # use self signed client certificate
                    self.setTLS(caCerts=parametersDict["TlsCaCert"],
                                certFile=parametersDict["TlsClientCert"],
                                keyFile=parametersDict["TlsClientKey"])
                elif parametersDict["TlsCaCert"]:
                    # use CA certificate file
                    self.setTLS(caCerts=parametersDict["TlsCaCert"])
                else:
                    # use default TLS configuration
                    self.setTLS()
            
            # step 5: connect to server
            self.connectToServer(host, port=port,
                                 keepalive=parametersDict["Keepalive"],
                                 reinit=False)
        else:
            keepalive = self.defaultConnectionOptions["Keepalive"]
            self.connectToServer(host, port=port, keepalive=keepalive,
                                 bindAddress=bindAddress)
    
    def defaultConnectionOptions(self):
        """
        Public method to get a connection options dictionary with default
        values.
        
        @return dictionary containing the default connection options. It has
            the keys "ClientId", "Keepalive", "CleanSession", "Username",
            "Password", "WillTopic", "WillMessage", "WillQos", "WillRetain",
            "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey",
            "ConnectionTimeout".
        @rtype dict
        """
        return {
            "ClientId": "ERIC7_MQTT_MONITOR_CLIENT",
            "ConnectionTimeout": MqttClient.DefaultConnectTimeout,
            "Keepalive": 60,
            "CleanSession": True,
            "Username": "",
            "Password": "",
            "WillTopic": "",
            "WillMessage": "",
            "WillQos": 0,
            "WillRetain": False,
            "TlsEnable": False,
            "TlsCaCert": "",
            "TlsClientCert": "",
            "TlsClientKey": "",
        }
    
    def reconnectToServer(self):
        """
        Public method to reconnect the client with the same parameters.
        """
        self.__connectTimeoutTimer.start()
        
        self.__mqttClient.reconnect()
        
        if not self.__loopStarted:
            self.startLoop()
    
    def disconnectFromServer(self):
        """
        Public method to disconnect the client from the remote broker.
        """
        self.__connectTimeoutTimer.stop()
        
        # TODO: MQTTv5: add support for properties (?)
        # TODO: MQTTv5: add support for reason code
        self.__mqttClient.disconnect()
    
    # TODO: MQTTv5: add support for properties
    # TODO: MQTTv5: add support for subscribe options
    def subscribe(self, topic, qos=0):
        """
        Public method to subscribe to topics with quality of service.
        
        @param topic single topic to subscribe to or a tuple with a topic
            and a QoS or a list of tuples with a topic and a QoS each
        @type str or tuple of (str, int) or list of tuple of (str, int)
        @param qos quality of service
        @type int, one of 0, 1 or 2
        @return tuple containing the result code and the message ID
        @rtype tuple of (int, int)
        """
        return self.__mqttClient.subscribe(topic, qos=qos)
    
    # TODO: MQTTv5: add support for properties (?)
    def unsubscribe(self, topic):
        """
        Public method to unsubscribe topics.
        
        @param topic topic or list of topics to unsubscribe
        @type str or list of str
        @return tuple containing the result code and the message ID
        @rtype tuple of (int, int)
        """
        return self.__mqttClient.unsubscribe(topic)
    
    # TODO: MQTTv5: add support for properties
    def publish(self, topic, payload=None, qos=0, retain=False):
        """
        Public method to publish to a topic.
        
        @param topic topic to publish to
        @type str
        @param payload data to be published
        @type str, bytes, int or float
        @param qos quality of service
        @type int, one of 0, 1 or 2
        @param retain flag indicating to set as the "last known good"/retained
            message for the topic
        @type bool
        @return message info object
        @rtype mqtt.MQTTMessageInfo
        """
        return self.__mqttClient.publish(topic, payload=payload, qos=qos,
                                         retain=retain)


def mqttConnackMessage(connackCode):
    """
    Module function to get the string associated with a CONNACK result.
    
    @param connackCode result code of the connection request
    @type int
    @return textual representation for the result code
    @rtype str
    """
    if connackCode == mqtt.CONNACK_ACCEPTED:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Accepted.")
    elif connackCode == mqtt.CONNACK_REFUSED_PROTOCOL_VERSION:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: unacceptable protocol version.")
    elif connackCode == mqtt.CONNACK_REFUSED_IDENTIFIER_REJECTED:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: identifier rejected.")
    elif connackCode == mqtt.CONNACK_REFUSED_SERVER_UNAVAILABLE:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: broker unavailable.")
    elif connackCode == mqtt.CONNACK_REFUSED_BAD_USERNAME_PASSWORD:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: bad user name or password.")
    elif connackCode == mqtt.CONNACK_REFUSED_NOT_AUTHORIZED:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: not authorised.")
    else:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: unknown reason.")


def mqttErrorMessage(mqttErrno):
    """
    Module function to get the error string associated with an MQTT error
    number.
    
    @param mqttErrno result code of a MQTT request
    @type int
    @return textual representation of the result code
    @rtype str
    """
    if mqttErrno == mqtt.MQTT_ERR_SUCCESS:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "No error.")
    elif mqttErrno == mqtt.MQTT_ERR_NOMEM:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Out of memory.")
    elif mqttErrno == mqtt.MQTT_ERR_PROTOCOL:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "A network protocol error occurred when communicating with"
            " the broker.")
    elif mqttErrno == mqtt.MQTT_ERR_INVAL:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Invalid function arguments provided.")
    elif mqttErrno == mqtt.MQTT_ERR_NO_CONN:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "The client is not currently connected.")
    elif mqttErrno == mqtt.MQTT_ERR_CONN_REFUSED:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "The connection was refused.")
    elif mqttErrno == mqtt.MQTT_ERR_NOT_FOUND:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Message not found (internal error).")
    elif mqttErrno == mqtt.MQTT_ERR_CONN_LOST:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "The connection was lost.")
    elif mqttErrno == mqtt.MQTT_ERR_TLS:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "A TLS error occurred.")
    elif mqttErrno == mqtt.MQTT_ERR_PAYLOAD_SIZE:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Payload too large.")
    elif mqttErrno == mqtt.MQTT_ERR_NOT_SUPPORTED:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "This feature is not supported.")
    elif mqttErrno == mqtt.MQTT_ERR_AUTH:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Authorisation failed.")
    elif mqttErrno == mqtt.MQTT_ERR_ACL_DENIED:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Access denied by ACL.")
    elif mqttErrno == mqtt.MQTT_ERR_UNKNOWN:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Unknown error.")
    elif mqttErrno == mqtt.MQTT_ERR_ERRNO:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Error defined by errno.")
    elif mqttErrno == mqtt.MQTT_ERR_QUEUE_SIZE:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Message queue full.")
    else:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Unknown error.")


def mqttLogLevelString(mqttLogLevel, isMqttLogLevel=True):
    """
    Module function to get the log level string associated with a log level.
    
    @param mqttLogLevel log level of the paho-mqtt client
    @type int
    @param isMqttLogLevel flag indicating a MQTT log level is given (if
        False it is the MqttClient variant, i.e. Debug being lowest)
    @type bool
    @return textual representation of the log level
    @rtype str
    """
    if isMqttLogLevel:
        try:
            logLevel = MqttClient.LogLevelMap[mqttLogLevel]
        except KeyError:
            return QCoreApplication.translate("MqttLogLevelString", "Unknown")
    else:
        logLevel = mqttLogLevel
    
    if logLevel == MqttClient.LogInfo:
        return QCoreApplication.translate("MqttLogLevelString", "Info")
    elif logLevel == MqttClient.LogNotice:
        return QCoreApplication.translate("MqttLogLevelString", "Notice")
    elif logLevel == MqttClient.LogWarning:
        return QCoreApplication.translate("MqttLogLevelString", "Warning")
    elif logLevel == MqttClient.LogError:
        return QCoreApplication.translate("MqttLogLevelString", "Error")
    elif logLevel == MqttClient.LogDebug:
        return QCoreApplication.translate("MqttLogLevelString", "Debug")
    elif logLevel == MqttClient.LogDisabled:
        return QCoreApplication.translate("MqttLogLevelString",
                                          "Logging Disabled")
    else:
        return QCoreApplication.translate("MqttLogLevelString", "Unknown")

eric ide

mercurial