MqttMonitor/MqttClient.py

Mon, 27 Aug 2018 19:26:27 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Mon, 27 Aug 2018 19:26:27 +0200
changeset 3
82845c0fd550
parent 2
d439c5109829
child 9
f75a385e9127
permissions
-rw-r--r--

Fixed some code style issues and implemented the broker connect/disconnect group of the MqttMonitorWidget.

# -*- coding: utf-8 -*-

# Copyright (c) 2018 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing a PyQt wrapper around the paho MQTT client.
"""

from __future__ import unicode_literals

from PyQt5.QtCore import pyqtSignal, QObject, QCoreApplication

import paho.mqtt.client as mqtt


class MqttClient(QObject):
    """
    Class implementing a PyQt wrapper around the paho MQTT client.
    
    @signal onConnect(flags, rc) emitted after the client has connected to the
        broker
    @signal onDisconnected(rc) emitted after the client has disconnected from
        the broker
    @signal onMessage(topic, payload, qos, retain) emitted after a message has
        been received by the client
    @signal onPublish(mid) emitted after a message has been published
    @signal onSubscribe(mid, grantedQos) emitted after the client has
        subscribed to some topics
    @signal onUnsubscribe(mid) emitted after the client has unsubscribed from
        some topics
    """
    onConnect = pyqtSignal(dict, int)
    onDisconnected = pyqtSignal(int)
    onMessage = pyqtSignal(str, bytes, int, bool)
    onPublish = pyqtSignal(int)
    onSubscribe = pyqtSignal(int, tuple)
    onUnsubscribe = pyqtSignal(int)
    
    def __init__(self, clientId="", cleanSession=True, userdata=None,
                 protocol=mqtt.MQTTv311, transport="tcp", parent=None):
        """
        Constructor
        
        @param clientId ID to be used for the client
        @type str
        @param cleanSession flag indicating to start a clean session
        @type bool
        @param userdata user data
        @type any
        @param protocol version of the MQTT protocol to use
        @type int, one of mqtt.MQTTv31 or mqtt.MQTTv311
        @param transport transport to be used
        @type str, one of "tcp" or "websockets"
        @param parent reference to the parent object
        @type QObject
        """
        QObject.__init__(self, parent=parent)
        
        self.__loopStarted = False
        
        self.__mqttClient = mqtt.Client(
            client_id=clientId, clean_session=cleanSession, userdata=None,
            protocol=mqtt.MQTTv311, transport="tcp")
        
        self.__mqttClient.on_connect = \
            lambda client, userdata, flags, rc: self.onConnect.emit(
                flags, rc)
        self.__mqttClient.on_disconnect = \
            lambda client, userdata, rc: self.onDisconnected.emit(rc)
        self.__mqttClient.on_message = \
            lambda client, userdata, message: self.onMessage.emit(
                message.topic, message.payload, message.qos, message.retain)
        self.__mqttClient.on_publish = \
            lambda client, userdata, mid: self.onPublish.emit(mid)
        self.__mqttClient.on_subscribe = \
            lambda client, userdata, mid, grantedQos: self.onSubscribe.emit(
                mid, grantedQos)
        self.__mqttClient.on_unsubscribe = \
            lambda client, userdata, mid: self.onUnsubscribe.emit(mid)
    
    def startLoop(self):
        """
        Public method to start the MQTT client loop.
        """
        self.__mqttClient.loop_start()
        self.__loopStarted = True
    
    def stopLoop(self):
        """
        Public method to stop the MQTT client loop.
        """
        self.__mqttClient.loop_stop()
        self.__loopStarted = False
    
    def connectToServer(self, host, port=1883, keepalive=60, bindAddress=""):
        """
        Public method to connect to a remote MQTT broker.
        
        @param host host name or IP address of the remote broker
        @type str
        @param port network port of the server host to connect to (default:
            1883, using TLS: 8883)
        @type int
        @param keepalive maximum period in seconds allowed between
            communications with the broker
        @type int
        @param bindAddress IP address of a local network interface to bind
            this client to
        @type str
        """
        self.__mqttClient.connect_async(
            host, port=port, keepalive=keepalive, bind_address=bindAddress)
        
        if not self.__loopStarted:
            self.startLoop()
    
    def reconnectToServer(self):
        """
        Public method to reconnect the client with the same parameters.
        """
        self.__mqttClient.reconnect()
        
        if not self.__loopStarted:
            self.startLoop()
    
    def disconnectFromServer(self):
        """
        Public method to disconnect the client from the remote broker.
        """
        self.__mqttClient.disconnect()
    
    def subscribe(self, topic, qos=0):
        """
        Public method to subscribe to topics with quality of service.
        
        @param topic single topic to subscribe to or a tuple with a topic
            and a QoS or a list of tuples with a topic and a QoS each
        @type str or tuple of (str, int) or list of tuple of (str, int)
        @param qos quality of service
        @type int, one of 0, 1 or 2
        @return tuple containing the result code and the message ID
        @rtype tuple of (int, int)
        """
        return self.__mqttClient.subscribe(topic, qos=qos)
    
    def unsubscribe(self, topic):
        """
        Public method to unsubscribe topics.
        
        @param topic topic or list of topics to unsubscribe
        @type str or list of str
        @return tuple containing the result code and the message ID
        @rtype tuple of (int, int)
        """
        return self.__mqttClient.unsubscribe(topic)
    
    def publish(self, topic, payload=None, qos=0, retain=False):
        """
        Public method to publish to a topic.
        
        @param topic topic to publish to
        @type str
        @param payload data to be published
        @type str, bytes, int or float
        @param qos quality of service
        @type int, one of 0, 1 or 2
        @param retain flag indicating to set as the "last known good"/retained
            message for the topic
        @type bool
        @return message info object
        @rtype mqtt.MQTTMessageInfo
        """
        return self.__mqttClient.publish(topic, payload=payload, qos=qos,
                                         retain=retain)


def mqttConnackMessage(connackCode):
    """
    Public method to get the string associated with a CONNACK result.
    
    @param connackCode result code of the connection request
    @type int
    @return textual representation for the result code
    @rtype str
    """
    if connackCode == mqtt.CONNACK_ACCEPTED:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Accepted.")
    elif connackCode == mqtt.CONNACK_REFUSED_PROTOCOL_VERSION:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: unacceptable protocol version.")
    elif connackCode == mqtt.CONNACK_REFUSED_IDENTIFIER_REJECTED:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: identifier rejected.")
    elif connackCode == mqtt.CONNACK_REFUSED_SERVER_UNAVAILABLE:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: broker unavailable.")
    elif connackCode == mqtt.CONNACK_REFUSED_BAD_USERNAME_PASSWORD:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: bad user name or password.")
    elif connackCode == mqtt.CONNACK_REFUSED_NOT_AUTHORIZED:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: not authorised.")
    else:
        return QCoreApplication.translate(
            "MqttConnackMessage",
            "Connection Refused: unknown reason.")


def mqttErrorMessage(self, mqttErrno):
    """
    Public method to get the error string associated with an MQTT error
    number.
    
    @param mqttErrno result code of a MQTT request
    @type int
    @return textual representation of the result code
    @rtype str
    """
    if mqttErrno == mqtt.MQTT_ERR_SUCCESS:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "No error.")
    elif mqttErrno == mqtt.MQTT_ERR_NOMEM:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Out of memory.")
    elif mqttErrno == mqtt.MQTT_ERR_PROTOCOL:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "A network protocol error occurred when communicating with"
            " the broker.")
    elif mqttErrno == mqtt.MQTT_ERR_INVAL:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Invalid function arguments provided.")
    elif mqttErrno == mqtt.MQTT_ERR_NO_CONN:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "The client is not currently connected.")
    elif mqttErrno == mqtt.MQTT_ERR_CONN_REFUSED:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "The connection was refused.")
    elif mqttErrno == mqtt.MQTT_ERR_NOT_FOUND:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Message not found (internal error).")
    elif mqttErrno == mqtt.MQTT_ERR_CONN_LOST:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "The connection was lost.")
    elif mqttErrno == mqtt.MQTT_ERR_TLS:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "A TLS error occurred.")
    elif mqttErrno == mqtt.MQTT_ERR_PAYLOAD_SIZE:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Payload too large.")
    elif mqttErrno == mqtt.MQTT_ERR_NOT_SUPPORTED:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "This feature is not supported.")
    elif mqttErrno == mqtt.MQTT_ERR_AUTH:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Authorisation failed.")
    elif mqttErrno == mqtt.MQTT_ERR_ACL_DENIED:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Access denied by ACL.")
    elif mqttErrno == mqtt.MQTT_ERR_UNKNOWN:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Unknown error.")
    elif mqttErrno == mqtt.MQTT_ERR_ERRNO:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Error defined by errno.")
    else:
        return QCoreApplication.translate(
            "MqttErrorMessage",
            "Unknown error.")

eric ide

mercurial