MqttMonitor/MqttClient.py

changeset 2
d439c5109829
child 3
82845c0fd550
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MqttMonitor/MqttClient.py	Sun Aug 26 19:40:15 2018 +0200
@@ -0,0 +1,279 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2018 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing a PyQt wrapper around the paho MQTT client.
+"""
+
+from __future__ import unicode_literals
+
+from PyQt5.QtCore import pyqtSignal, QObject, QCoreApplication
+
+import paho.mqtt.client as mqtt
+
+
+class MqttClient(QObject):
+    """
+    Class implementing a PyQt wrapper around the paho MQTT client.
+    
+    @signal onConnect(flags, rc) emitted after the client has connected to the
+        broker
+    @signal onDisconnected(rc) emitted after the client has disconnected from
+        the broker
+    @signal onMessage(topic, payload, qos, retain) emitted after a message has
+        been received by the client
+    @signal onPublish(mid) emitted after a message has been published
+    @signal onSubscribe(mid, grantedQos) emitted after the client has
+        subscribed to some topics
+    @signal onUnsubscribe(mid) emitted after the client has unsubscribed from
+        some topics
+    """
+    onConnect = pyqtSignal(dict, int)
+    onDisconnected = pyqtSignal(int)
+    onMessage = pyqtSignal(str, bytes, int, bool)
+    onPublish = pyqtSignal(int)
+    onSubscribe = pyqtSignal(int, tuple)
+    onUnsubscribe = pyqtSignal(int)
+    
+    def __init__(self, clientId="", cleanSession=True, userdata=None,
+                 protocol=mqtt.MQTTv311, transport="tcp", parent=None):
+        """
+        Constructor
+        
+        @param clientId ID to be used for the client
+        @type str
+        @param cleanSession flag indicating to start a clean session
+        @type bool
+        @param userdata user data
+        @type any
+        @param protocol version of the MQTT protocol to use
+        @type int, one of mqtt.MQTTv31 or mqtt.MQTTv311
+        @param transport transport to be used
+        @type str, one of "tcp" or "websockets"
+        @param parent reference to the parent object
+        @type QObject
+        """
+        QObject.__init__(self, parent=parent)
+        
+        self.__loopStarted = False
+        
+        self.__mqttClient = mqtt.Client(
+            client_id=clientId, clean_session=cleanSession, userdata=None,
+            protocol=mqtt.MQTTv311, transport="tcp")
+        
+        self.__mqttClient.on_connect = \
+            lambda client, userdata, flags, rc: self.onConnect.emit(
+                flags, rc)
+        self.__mqttClient.on_disconnect = \
+            lambda client, userdata, rc: self.onDisconnected.emit(rc)
+        self.__mqttClient.on_message = \
+            lambda client, userdata, message: self.onMessage.emit(
+                message.topic, message.payload, message.qos, message.retain)
+        self.__mqttClient.on_publish = \
+            lambda client, userdata, mid: self.onPublish.emit(mid)
+        self.__mqttClient.on_subscribe = \
+            lambda client, userdata, mid, grantedQos: self.onSubscribe.emit(
+                mid, grantedQos)
+        self.__mqttClient.on_unsubscribe = \
+            lambda client, userdata, mid: self.onUnsubscribe.emit(mid)
+    
+    def startLoop(self):
+        """
+        Public method to start the MQTT client loop.
+        """
+        self.__mqttClient.loop_start()
+        self.__loopStarted = True
+    
+    def stopLoop(self):
+        """
+        Public method to stop the MQTT client loop.
+        """
+        self.__mqttClient.loop_stop()
+        self.__loopStarted = False
+    
+    def connectToServer(self, host, port=1883, keepalive=60, bindAddress=""):
+        """
+        Public method to connect to a remote MQTT broker.
+        
+        @param host host name or IP address of the remote broker
+        @type str
+        @param port network port of the server host to connect to (default:
+            1883, using TLS: 8883)
+        @type int
+        @param keepalive maximum period in seconds allowed between
+            communications with the broker
+        @type int
+        @param bindAddress IP address of a local network interface to bind
+            this client to
+        @type str
+        """
+        self.__mqttClient.connect_async(
+            host, port=port, keepalive=keepalive, bind_address=bindAddress)
+        
+        if not self.__loopStarted:
+            self.startLoop()
+    
+    def reconnectToServer(self):
+        """
+        Public method to reconnect the client with the same parameters.
+        """
+        self.__mqttClient.reconnect()
+        
+        if not self.__loopStarted:
+            self.startLoop()
+    
+    def disconnectFromServer(self):
+        """
+        Public method to disconnect the client from the remote broker.
+        """
+        self.__mqttClient.disconnect()
+    
+    def subscribe(self, topic, qos=0):
+        """
+        Public method to subscribe to topics with quality of service.
+        
+        @param topic single topic to subscribe to or a tuple with a topic
+            and a QoS or a list of tuples with a topic and a QoS each
+        @type str or tuple of (str, int) or list of tuple of (str, int)
+        @param qos quality of service
+        @type int, one of 0, 1 or 2
+        @return tuple containing the result code and the message ID
+        @rtype tuple of (int, int)
+        """
+        return self.__mqttClient.subscribe(topic, qos=qos)
+    
+    def unsubscribe(self, topic):
+        """
+        Public method to unsubscribe topics.
+        
+        @param topic topic or list of topics to unsubscribe
+        @type str or list of str
+        @return tuple containing the result code and the message ID
+        @rtype tuple of (int, int)
+        """
+        return self.__mqttClient.unsubscribe(topic)
+    
+    def publish(self, topic, payload=None, qos=0, retain=False):
+        """
+        Public method to publish to a topic.
+        
+        @param topic topic to publish to
+        @type str
+        @param payload data to be published
+        @type str, bytes, int or float
+        @param qos quality of service
+        @type int, one of 0, 1 or 2
+        @param retain flag indicating to set as the "last known good"/retained
+            message for the topic
+        @type bool
+        @return message info object
+        @rtype mqtt.MQTTMessageInfo
+        """
+        return self.__mqttClient.publish(topic, payload=payload, qos=qos,
+                                         retain=retain)
+
+def mqttConnackMessage(connackCode):
+    """
+    Public method to get the string associated with a CONNACK result.
+    """
+    if connackCode == mqtt.CONNACK_ACCEPTED:
+        return QCoreApplication.translate(
+            "MqttConnackMessage", 
+            "Connection Accepted.")
+    elif connackCode == mqtt.CONNACK_REFUSED_PROTOCOL_VERSION:
+        return QCoreApplication.translate(
+            "MqttConnackMessage", 
+            "Connection Refused: unacceptable protocol version.")
+    elif connackCode == mqtt.CONNACK_REFUSED_IDENTIFIER_REJECTED:
+        return QCoreApplication.translate(
+            "MqttConnackMessage", 
+            "Connection Refused: identifier rejected.")
+    elif connackCode == mqtt.CONNACK_REFUSED_SERVER_UNAVAILABLE:
+        return QCoreApplication.translate(
+            "MqttConnackMessage", 
+            "Connection Refused: broker unavailable.")
+    elif connackCode == mqtt.CONNACK_REFUSED_BAD_USERNAME_PASSWORD:
+        return QCoreApplication.translate(
+            "MqttConnackMessage", 
+            "Connection Refused: bad user name or password.")
+    elif connackCode == mqtt.CONNACK_REFUSED_NOT_AUTHORIZED:
+        return QCoreApplication.translate(
+            "MqttConnackMessage", 
+            "Connection Refused: not authorised.")
+    else:
+        return QCoreApplication.translate(
+            "MqttConnackMessage", 
+            "Connection Refused: unknown reason.")
+
+def mqttErrorMessage(self, mqttErrno):
+    """
+    Public method to get the error string associated with an MQTT error
+    number.
+    """
+    if mqttErrno == mqtt.MQTT_ERR_SUCCESS:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "No error.")
+    elif mqttErrno == mqtt.MQTT_ERR_NOMEM:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "Out of memory.")
+    elif mqttErrno == mqtt.MQTT_ERR_PROTOCOL:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "A network protocol error occurred when communicating with"
+            " the broker.")
+    elif mqttErrno == mqtt.MQTT_ERR_INVAL:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "Invalid function arguments provided.")
+    elif mqttErrno == mqtt.MQTT_ERR_NO_CONN:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "The client is not currently connected.")
+    elif mqttErrno == mqtt.MQTT_ERR_CONN_REFUSED:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "The connection was refused.")
+    elif mqttErrno == mqtt.MQTT_ERR_NOT_FOUND:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "Message not found (internal error).")
+    elif mqttErrno == mqtt.MQTT_ERR_CONN_LOST:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "The connection was lost.")
+    elif mqttErrno == mqtt.MQTT_ERR_TLS:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "A TLS error occurred.")
+    elif mqttErrno == mqtt.MQTT_ERR_PAYLOAD_SIZE:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "Payload too large.")
+    elif mqttErrno == mqtt.MQTT_ERR_NOT_SUPPORTED:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "This feature is not supported.")
+    elif mqttErrno == mqtt.MQTT_ERR_AUTH:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "Authorisation failed.")
+    elif mqttErrno == mqtt.MQTT_ERR_ACL_DENIED:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "Access denied by ACL.")
+    elif mqttErrno == mqtt.MQTT_ERR_UNKNOWN:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "Unknown error.")
+    elif mqttErrno == mqtt.MQTT_ERR_ERRNO:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "Error defined by errno.")
+    else:
+        return QCoreApplication.translate(
+            "MqttErrorMessage", 
+            "Unknown error.")

eric ide

mercurial