--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/MqttMonitor/MqttClient.py Sun Aug 26 19:40:15 2018 +0200 @@ -0,0 +1,279 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2018 Detlev Offenbach <detlev@die-offenbachs.de> +# + +""" +Module implementing a PyQt wrapper around the paho MQTT client. +""" + +from __future__ import unicode_literals + +from PyQt5.QtCore import pyqtSignal, QObject, QCoreApplication + +import paho.mqtt.client as mqtt + + +class MqttClient(QObject): + """ + Class implementing a PyQt wrapper around the paho MQTT client. + + @signal onConnect(flags, rc) emitted after the client has connected to the + broker + @signal onDisconnected(rc) emitted after the client has disconnected from + the broker + @signal onMessage(topic, payload, qos, retain) emitted after a message has + been received by the client + @signal onPublish(mid) emitted after a message has been published + @signal onSubscribe(mid, grantedQos) emitted after the client has + subscribed to some topics + @signal onUnsubscribe(mid) emitted after the client has unsubscribed from + some topics + """ + onConnect = pyqtSignal(dict, int) + onDisconnected = pyqtSignal(int) + onMessage = pyqtSignal(str, bytes, int, bool) + onPublish = pyqtSignal(int) + onSubscribe = pyqtSignal(int, tuple) + onUnsubscribe = pyqtSignal(int) + + def __init__(self, clientId="", cleanSession=True, userdata=None, + protocol=mqtt.MQTTv311, transport="tcp", parent=None): + """ + Constructor + + @param clientId ID to be used for the client + @type str + @param cleanSession flag indicating to start a clean session + @type bool + @param userdata user data + @type any + @param protocol version of the MQTT protocol to use + @type int, one of mqtt.MQTTv31 or mqtt.MQTTv311 + @param transport transport to be used + @type str, one of "tcp" or "websockets" + @param parent reference to the parent object + @type QObject + """ + QObject.__init__(self, parent=parent) + + self.__loopStarted = False + + self.__mqttClient = mqtt.Client( + client_id=clientId, clean_session=cleanSession, userdata=None, + protocol=mqtt.MQTTv311, transport="tcp") + + self.__mqttClient.on_connect = \ + lambda client, userdata, flags, rc: self.onConnect.emit( + flags, rc) + self.__mqttClient.on_disconnect = \ + lambda client, userdata, rc: self.onDisconnected.emit(rc) + self.__mqttClient.on_message = \ + lambda client, userdata, message: self.onMessage.emit( + message.topic, message.payload, message.qos, message.retain) + self.__mqttClient.on_publish = \ + lambda client, userdata, mid: self.onPublish.emit(mid) + self.__mqttClient.on_subscribe = \ + lambda client, userdata, mid, grantedQos: self.onSubscribe.emit( + mid, grantedQos) + self.__mqttClient.on_unsubscribe = \ + lambda client, userdata, mid: self.onUnsubscribe.emit(mid) + + def startLoop(self): + """ + Public method to start the MQTT client loop. + """ + self.__mqttClient.loop_start() + self.__loopStarted = True + + def stopLoop(self): + """ + Public method to stop the MQTT client loop. + """ + self.__mqttClient.loop_stop() + self.__loopStarted = False + + def connectToServer(self, host, port=1883, keepalive=60, bindAddress=""): + """ + Public method to connect to a remote MQTT broker. + + @param host host name or IP address of the remote broker + @type str + @param port network port of the server host to connect to (default: + 1883, using TLS: 8883) + @type int + @param keepalive maximum period in seconds allowed between + communications with the broker + @type int + @param bindAddress IP address of a local network interface to bind + this client to + @type str + """ + self.__mqttClient.connect_async( + host, port=port, keepalive=keepalive, bind_address=bindAddress) + + if not self.__loopStarted: + self.startLoop() + + def reconnectToServer(self): + """ + Public method to reconnect the client with the same parameters. + """ + self.__mqttClient.reconnect() + + if not self.__loopStarted: + self.startLoop() + + def disconnectFromServer(self): + """ + Public method to disconnect the client from the remote broker. + """ + self.__mqttClient.disconnect() + + def subscribe(self, topic, qos=0): + """ + Public method to subscribe to topics with quality of service. + + @param topic single topic to subscribe to or a tuple with a topic + and a QoS or a list of tuples with a topic and a QoS each + @type str or tuple of (str, int) or list of tuple of (str, int) + @param qos quality of service + @type int, one of 0, 1 or 2 + @return tuple containing the result code and the message ID + @rtype tuple of (int, int) + """ + return self.__mqttClient.subscribe(topic, qos=qos) + + def unsubscribe(self, topic): + """ + Public method to unsubscribe topics. + + @param topic topic or list of topics to unsubscribe + @type str or list of str + @return tuple containing the result code and the message ID + @rtype tuple of (int, int) + """ + return self.__mqttClient.unsubscribe(topic) + + def publish(self, topic, payload=None, qos=0, retain=False): + """ + Public method to publish to a topic. + + @param topic topic to publish to + @type str + @param payload data to be published + @type str, bytes, int or float + @param qos quality of service + @type int, one of 0, 1 or 2 + @param retain flag indicating to set as the "last known good"/retained + message for the topic + @type bool + @return message info object + @rtype mqtt.MQTTMessageInfo + """ + return self.__mqttClient.publish(topic, payload=payload, qos=qos, + retain=retain) + +def mqttConnackMessage(connackCode): + """ + Public method to get the string associated with a CONNACK result. + """ + if connackCode == mqtt.CONNACK_ACCEPTED: + return QCoreApplication.translate( + "MqttConnackMessage", + "Connection Accepted.") + elif connackCode == mqtt.CONNACK_REFUSED_PROTOCOL_VERSION: + return QCoreApplication.translate( + "MqttConnackMessage", + "Connection Refused: unacceptable protocol version.") + elif connackCode == mqtt.CONNACK_REFUSED_IDENTIFIER_REJECTED: + return QCoreApplication.translate( + "MqttConnackMessage", + "Connection Refused: identifier rejected.") + elif connackCode == mqtt.CONNACK_REFUSED_SERVER_UNAVAILABLE: + return QCoreApplication.translate( + "MqttConnackMessage", + "Connection Refused: broker unavailable.") + elif connackCode == mqtt.CONNACK_REFUSED_BAD_USERNAME_PASSWORD: + return QCoreApplication.translate( + "MqttConnackMessage", + "Connection Refused: bad user name or password.") + elif connackCode == mqtt.CONNACK_REFUSED_NOT_AUTHORIZED: + return QCoreApplication.translate( + "MqttConnackMessage", + "Connection Refused: not authorised.") + else: + return QCoreApplication.translate( + "MqttConnackMessage", + "Connection Refused: unknown reason.") + +def mqttErrorMessage(self, mqttErrno): + """ + Public method to get the error string associated with an MQTT error + number. + """ + if mqttErrno == mqtt.MQTT_ERR_SUCCESS: + return QCoreApplication.translate( + "MqttErrorMessage", + "No error.") + elif mqttErrno == mqtt.MQTT_ERR_NOMEM: + return QCoreApplication.translate( + "MqttErrorMessage", + "Out of memory.") + elif mqttErrno == mqtt.MQTT_ERR_PROTOCOL: + return QCoreApplication.translate( + "MqttErrorMessage", + "A network protocol error occurred when communicating with" + " the broker.") + elif mqttErrno == mqtt.MQTT_ERR_INVAL: + return QCoreApplication.translate( + "MqttErrorMessage", + "Invalid function arguments provided.") + elif mqttErrno == mqtt.MQTT_ERR_NO_CONN: + return QCoreApplication.translate( + "MqttErrorMessage", + "The client is not currently connected.") + elif mqttErrno == mqtt.MQTT_ERR_CONN_REFUSED: + return QCoreApplication.translate( + "MqttErrorMessage", + "The connection was refused.") + elif mqttErrno == mqtt.MQTT_ERR_NOT_FOUND: + return QCoreApplication.translate( + "MqttErrorMessage", + "Message not found (internal error).") + elif mqttErrno == mqtt.MQTT_ERR_CONN_LOST: + return QCoreApplication.translate( + "MqttErrorMessage", + "The connection was lost.") + elif mqttErrno == mqtt.MQTT_ERR_TLS: + return QCoreApplication.translate( + "MqttErrorMessage", + "A TLS error occurred.") + elif mqttErrno == mqtt.MQTT_ERR_PAYLOAD_SIZE: + return QCoreApplication.translate( + "MqttErrorMessage", + "Payload too large.") + elif mqttErrno == mqtt.MQTT_ERR_NOT_SUPPORTED: + return QCoreApplication.translate( + "MqttErrorMessage", + "This feature is not supported.") + elif mqttErrno == mqtt.MQTT_ERR_AUTH: + return QCoreApplication.translate( + "MqttErrorMessage", + "Authorisation failed.") + elif mqttErrno == mqtt.MQTT_ERR_ACL_DENIED: + return QCoreApplication.translate( + "MqttErrorMessage", + "Access denied by ACL.") + elif mqttErrno == mqtt.MQTT_ERR_UNKNOWN: + return QCoreApplication.translate( + "MqttErrorMessage", + "Unknown error.") + elif mqttErrno == mqtt.MQTT_ERR_ERRNO: + return QCoreApplication.translate( + "MqttErrorMessage", + "Error defined by errno.") + else: + return QCoreApplication.translate( + "MqttErrorMessage", + "Unknown error.")