MqttMonitor/MqttClient.py

Sat, 01 Sep 2018 20:18:11 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sat, 01 Sep 2018 20:18:11 +0200
changeset 10
7e0e921dc7ea
parent 9
f75a385e9127
child 11
90d3ebed4cc0
permissions
-rw-r--r--

Started to implement the connection options dialog and methods to specify these connection options connecting to the server.

# -*- coding: utf-8 -*-

# Copyright (c) 2018 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing a PyQt wrapper around the paho MQTT client.
"""

from __future__ import unicode_literals

from PyQt5.QtCore import pyqtSignal, QObject, QCoreApplication

import paho.mqtt.client as mqtt


class MqttClient(QObject):
    """
    Class implementing a PyQt wrapper around the paho MQTT client.
    
    @signal onConnect(flags, rc) emitted after the client has connected to the
        broker
    @signal onDisconnected(rc) emitted after the client has disconnected from
        the broker
    @signal onMessage(topic, payload, qos, retain) emitted after a message has
        been received by the client
    @signal onPublish(mid) emitted after a message has been published
    @signal onSubscribe(mid, grantedQos) emitted after the client has
        subscribed to some topics
    @signal onUnsubscribe(mid) emitted after the client has unsubscribed from
        some topics
    """
    onConnect = pyqtSignal(dict, int)
    onDisconnected = pyqtSignal(int)
    onMessage = pyqtSignal(str, bytes, int, bool)
    onPublish = pyqtSignal(int)
    onSubscribe = pyqtSignal(int, tuple)
    onUnsubscribe = pyqtSignal(int)
    
    def __init__(self, clientId="", cleanSession=True, userdata=None,
                 protocol=mqtt.MQTTv311, transport="tcp", parent=None):
        """
        Constructor
        
        @param clientId ID to be used for the client
        @type str
        @param cleanSession flag indicating to start a clean session
        @type bool
        @param userdata user data
        @type any
        @param protocol version of the MQTT protocol to use
        @type int, one of mqtt.MQTTv31 or mqtt.MQTTv311
        @param transport transport to be used
        @type str, one of "tcp" or "websockets"
        @param parent reference to the parent object
        @type QObject
        """
        QObject.__init__(self, parent=parent)
        
        self.__loopStarted = False
        
        self.__mqttClient = mqtt.Client(
            client_id=clientId, clean_session=cleanSession, userdata=None,
            protocol=mqtt.MQTTv311, transport="tcp")
        
        self.__mqttClient.on_connect = \
            lambda client, userdata, flags, rc: self.onConnect.emit(
                flags, rc)
        self.__mqttClient.on_disconnect = \
            lambda client, userdata, rc: self.onDisconnected.emit(rc)
        self.__mqttClient.on_message = \
            lambda client, userdata, message: self.onMessage.emit(
                message.topic, message.payload, message.qos, message.retain)
        self.__mqttClient.on_publish = \
            lambda client, userdata, mid: self.onPublish.emit(mid)
        self.__mqttClient.on_subscribe = \
            lambda client, userdata, mid, grantedQos: self.onSubscribe.emit(
                mid, grantedQos)
        self.__mqttClient.on_unsubscribe = \
            lambda client, userdata, mid: self.onUnsubscribe.emit(mid)
    
    def reinitialise(self, clientId="", cleanSession=True, userdata=None):
        """
        Public method to reinitialize the client with given data.
        
        @param clientId ID to be used for the client
        @type str
        @param cleanSession flag indicating to start a clean session
        @type bool
        @param userdata user data
        @type any
        """
        self.__mqttClient.reinitialise(
            client_id=clientId, clean_session=cleanSession, userdata=userdata)
    
    def setMaxInflightMessages(self, inflight=20):
        """
        Public method to set the maximum number of messages with QoS > 0 that
        can be part way through their network flow at once.
        
        @param inflight maximum number of messages in flight
        @type int
        """
        self.__mqttClient.max_inflight_messages_set(inflight)
    
    def setMaxQueuedMessages(self, queueSize=0):
        """
        Public method to set the maximum number of messages with QoS > 0 that
        can be pending in the outgoing message queue.
        
        @param queueSize maximum number of queued messages (0 = unlimited)
        @type int
        """
        self.__mqttClient.max_queued_messages_set(queueSize)
    
    def setUserCredentials(self, username, password=None):
        """
        Public method to set the user name and optionally the password.
        
        @param username user name to be set
        @type str
        @param password optional password
        @type str
        """
        self.__mqttClient.username_pw_set(username, password=password)
    
    def setUserData(self, userdata):
        """
        Public method to set the user data.
        
        @param userdata user data
        @type any
        """
        self.__mqttClient.user_data_set(userdata)
    
    def setLastWill(self, topic, payload=None, qos=0, retain=False):
        """
        Public method to set the last will of the client.
        
        @param topic topic the will message should be published on
        @type str
        @param payload message to send as a will
        @type str, bytes, int or float
        @param qos quality of service level to use for the will
        @type int, one of 0, 1 or 2
        @param retain flag indicating to set as the "last known good"/retained
            message for the will topic
        @type bool
        """
        self.__mqttClient.will_set(topic, payload=payload, qos=qos,
                                   retain=retain)
    
    def startLoop(self):
        """
        Public method to start the MQTT client loop.
        """
        self.__mqttClient.loop_start()
        self.__loopStarted = True
    
    def stopLoop(self):
        """
        Public method to stop the MQTT client loop.
        """
        self.__mqttClient.loop_stop()
        self.__loopStarted = False
    
    def connectToServer(self, host, port=1883, keepalive=60, bindAddress=""):
        """
        Public method to connect to a remote MQTT broker.
        
        @param host host name or IP address of the remote broker
        @type str
        @param port network port of the server host to connect to (default:
            1883, using TLS: 8883)
        @type int
        @param keepalive maximum period in seconds allowed between
            communications with the broker
        @type int
        @param bindAddress IP address of a local network interface to bind
            this client to
        @type str
        """
        self.__mqttClient.connect_async(
            host, port=port, keepalive=keepalive, bind_address=bindAddress)
        
        if not self.__loopStarted:
            self.startLoop()
    
    def connectToServerWithOptions(self, host, port=1883, bindAddress="",
                                   options=None):
        """
        Public method to connect to a remote MQTT broker.
        
        @param host host name or IP address of the remote broker
        @type str
        @param port network port of the server host to connect to (default:
            1883, using TLS: 8883)
        @type int
        @param bindAddress IP address of a local network interface to bind
            this client to
        @type str
        @param options dictionary containing the connection options. This
            dictionary should contain the keys "ClientId", "Keepalive",
            "CleanSession", "Username", "Password", "WillTopic", "WillMessage",
            "WillQos", "WillRetain"
        @type dict
        """
        if options:
            parametersDict = self.defaultConnectionOptions()
            parametersDict.update(options)
            
            # step 1: reinitialize to set the client ID and clean session flag
            self.reinitialise(
                clientId=parametersDict["ClientId"],
                cleanSession=parametersDict["CleanSession"]
            )
            
            # step 2: set username and password
            if parametersDict["Username"]:
                if parametersDict["Password"]:
                    self.setUserCredentials(parametersDict["Username"],
                                            parametersDict["Password"])
                else:
                    self.setUserCredentials(parametersDict["Username"])
            
            # step 3: set last will data
            if parametersDict["WillTopic"]:
                if parametersDict["WillMessage"]:
                    willMessage = parametersDict["WillMessage"]
                else:
                    # empty message to clear the will
                    willMessage = None
                self.setLastWill(parametersDict["WillTopic"],
                                 willMessage,
                                 parametersDict["WillQos"],
                                 parametersDict["WillRetain"])
            
            # step 4: connect to server
            self.connectToServer(host, port=port,
                                 keepalive=parametersDict["Keepalive"])
        else:
            keepalive = self.defaultConnectionOptions["Keepalive"]
            self.connectToServer(host, port=port, keepalive=keepalive,
                                 bindAddress=bindAddress)
    
    def defaultConnectionOptions(self):
        """
        Public method to get a connection options dictionary with default
        values.
        
        @return dictionary containing the default connection options. It has
            the keys "ClientId", "Keepalive", "CleanSession", "Username",
            "Password", "WillTopic", "WillMessage", "WillQos", "WillRetain"
        @rtype dict
        """
        return {
            "ClientId": "ERIC6_MQTT_MONITOR_CLIENT",
            "Keepalive": 60,
            "CleanSession": True,
            "Username": "",
            "Password": "",
            "WillTopic": "",
            "WillMessage": "",
            "WillQos": 0,
            "WillRetain": False,
        }
    
    def reconnectToServer(self):
        """
        Public method to reconnect the client with the same parameters.
        """
        self.__mqttClient.reconnect()
        
        if not self.__loopStarted:
            self.startLoop()
    
    def disconnectFromServer(self):
        """
        Public method to disconnect the client from the remote broker.
        """
        self.__mqttClient.disconnect()
    
    def subscribe(self, topic, qos=0):
        """
        Public method to subscribe to topics with quality of service.
        
        @param topic single topic to subscribe to or a tuple with a topic
            and a QoS or a list of tuples with a topic and a QoS each
        @type str or tuple of (str, int) or list of tuple of (str, int)
        @param qos quality of service
        @type int, one of 0, 1 or 2
        @return tuple containing the result code and the message ID
        @rtype tuple of (int, int)
        """
        return self.__mqttClient.subscribe(topic, qos=qos)
    
    def unsubscribe(self, topic):
        """
        Public method to unsubscribe topics.
        
        @param topic topic or list of topics to unsubscribe
        @type str or list of str
        @return tuple containing the result code and the message ID
        @rtype tuple of (int, int)
        """
        return self.__mqttClient.unsubscribe(topic)
    
    def publish(self, topic, payload=None, qos=0, retain=False):
        """
        Public method to publish to a topic.
        
        @param topic topic to publish to
        @type str
        @param payload data to be published
        @type str, bytes, int or float
        @param qos quality of service
        @type int, one of 0, 1 or 2
        @param retain flag indicating to set as the "last known good"/retained
            message for the topic
        @type bool
        @return message info object
        @rtype mqtt.MQTTMessageInfo
        """
        return self.__mqttClient.publish(topic, payload=payload, qos=qos,
                                         retain=retain)


def mqttConnackMessage(connackCode):
    """
    Public method to get the string associated with a CONNACK result.
    
    @param connackCode result code of the connection request
    @type int
    @return textual representation for the result code
    @rtype str
    """
    if connackCode == mqtt.CONNACK_ACCEPTED:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Accepted.")
    elif connackCode == mqtt.CONNACK_REFUSED_PROTOCOL_VERSION:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: unacceptable protocol version.")
    elif connackCode == mqtt.CONNACK_REFUSED_IDENTIFIER_REJECTED:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: identifier rejected.")
    elif connackCode == mqtt.CONNACK_REFUSED_SERVER_UNAVAILABLE:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: broker unavailable.")
    elif connackCode == mqtt.CONNACK_REFUSED_BAD_USERNAME_PASSWORD:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: bad user name or password.")
    elif connackCode == mqtt.CONNACK_REFUSED_NOT_AUTHORIZED:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: not authorised.")
    else:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: unknown reason.")


def mqttErrorMessage(mqttErrno):
    """
    Public method to get the error string associated with an MQTT error
    number.
    
    @param mqttErrno result code of a MQTT request
    @type int
    @return textual representation of the result code
    @rtype str
    """
    if mqttErrno == mqtt.MQTT_ERR_SUCCESS:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "No error.")
    elif mqttErrno == mqtt.MQTT_ERR_NOMEM:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Out of memory.")
    elif mqttErrno == mqtt.MQTT_ERR_PROTOCOL:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "A network protocol error occurred when communicating with"
            " the broker.")
    elif mqttErrno == mqtt.MQTT_ERR_INVAL:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Invalid function arguments provided.")
    elif mqttErrno == mqtt.MQTT_ERR_NO_CONN:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "The client is not currently connected.")
    elif mqttErrno == mqtt.MQTT_ERR_CONN_REFUSED:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "The connection was refused.")
    elif mqttErrno == mqtt.MQTT_ERR_NOT_FOUND:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Message not found (internal error).")
    elif mqttErrno == mqtt.MQTT_ERR_CONN_LOST:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "The connection was lost.")
    elif mqttErrno == mqtt.MQTT_ERR_TLS:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "A TLS error occurred.")
    elif mqttErrno == mqtt.MQTT_ERR_PAYLOAD_SIZE:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Payload too large.")
    elif mqttErrno == mqtt.MQTT_ERR_NOT_SUPPORTED:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "This feature is not supported.")
    elif mqttErrno == mqtt.MQTT_ERR_AUTH:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Authorisation failed.")
    elif mqttErrno == mqtt.MQTT_ERR_ACL_DENIED:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Access denied by ACL.")
    elif mqttErrno == mqtt.MQTT_ERR_UNKNOWN:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Unknown error.")
    elif mqttErrno == mqtt.MQTT_ERR_ERRNO:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Error defined by errno.")
    else:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Unknown error.")

eric ide

mercurial