Sun, 09 Sep 2018 12:21:19 +0200
Some smaller improvements and added some TODOs.
# -*- coding: utf-8 -*- # Copyright (c) 2018 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing a PyQt wrapper around the paho MQTT client. """ from __future__ import unicode_literals from PyQt5.QtCore import pyqtSignal, QObject, QCoreApplication import paho.mqtt.client as mqtt from Utilities.crypto import pwConvert class MqttClient(QObject): """ Class implementing a PyQt wrapper around the paho MQTT client. @signal onConnect(flags, rc) emitted after the client has connected to the broker @signal onDisconnected(rc) emitted after the client has disconnected from the broker @signal onMessage(topic, payload, qos, retain) emitted after a message has been received by the client @signal onPublish(mid) emitted after a message has been published @signal onSubscribe(mid, grantedQos) emitted after the client has subscribed to some topics @signal onUnsubscribe(mid) emitted after the client has unsubscribed from some topics """ onConnect = pyqtSignal(dict, int) onDisconnected = pyqtSignal(int) onMessage = pyqtSignal(str, bytes, int, bool) onPublish = pyqtSignal(int) onSubscribe = pyqtSignal(int, tuple) onUnsubscribe = pyqtSignal(int) def __init__(self, clientId="", cleanSession=True, userdata=None, protocol=mqtt.MQTTv311, transport="tcp", parent=None): """ Constructor @param clientId ID to be used for the client @type str @param cleanSession flag indicating to start a clean session @type bool @param userdata user data @type any @param protocol version of the MQTT protocol to use @type int, one of mqtt.MQTTv31 or mqtt.MQTTv311 @param transport transport to be used @type str, one of "tcp" or "websockets" @param parent reference to the parent object @type QObject """ QObject.__init__(self, parent=parent) self.__loopStarted = False self.__mqttClient = mqtt.Client( client_id=clientId, clean_session=cleanSession, userdata=None, protocol=mqtt.MQTTv311, transport="tcp") self.__initCallbacks() def __initCallbacks(self): """ Private method to initialize the MQTT callback methods. """ self.__mqttClient.on_connect = \ lambda client, userdata, flags, rc: self.onConnect.emit( flags, rc) self.__mqttClient.on_disconnect = \ lambda client, userdata, rc: self.onDisconnected.emit(rc) self.__mqttClient.on_message = \ lambda client, userdata, message: self.onMessage.emit( message.topic, message.payload, message.qos, message.retain) self.__mqttClient.on_publish = \ lambda client, userdata, mid: self.onPublish.emit(mid) self.__mqttClient.on_subscribe = \ lambda client, userdata, mid, grantedQos: self.onSubscribe.emit( mid, grantedQos) self.__mqttClient.on_unsubscribe = \ lambda client, userdata, mid: self.onUnsubscribe.emit(mid) def reinitialise(self, clientId="", cleanSession=True, userdata=None): """ Public method to reinitialize the client with given data. @param clientId ID to be used for the client @type str @param cleanSession flag indicating to start a clean session @type bool @param userdata user data @type any """ self.__mqttClient.reinitialise( client_id=clientId, clean_session=cleanSession, userdata=userdata) self.__initCallbacks() def setMaxInflightMessages(self, inflight=20): """ Public method to set the maximum number of messages with QoS > 0 that can be part way through their network flow at once. @param inflight maximum number of messages in flight @type int """ self.__mqttClient.max_inflight_messages_set(inflight) def setMaxQueuedMessages(self, queueSize=0): """ Public method to set the maximum number of messages with QoS > 0 that can be pending in the outgoing message queue. @param queueSize maximum number of queued messages (0 = unlimited) @type int """ self.__mqttClient.max_queued_messages_set(queueSize) def setUserCredentials(self, username, password=None): """ Public method to set the user name and optionally the password. @param username user name to be set @type str @param password optional password @type str """ self.__mqttClient.username_pw_set(username, password=password) def setUserData(self, userdata): """ Public method to set the user data. @param userdata user data @type any """ self.__mqttClient.user_data_set(userdata) def setLastWill(self, topic, payload=None, qos=0, retain=False): """ Public method to set the last will of the client. @param topic topic the will message should be published on @type str @param payload message to send as a will @type str, bytes, int or float @param qos quality of service level to use for the will @type int, one of 0, 1 or 2 @param retain flag indicating to set as the "last known good"/retained message for the will topic @type bool """ self.__mqttClient.will_set(topic, payload=payload, qos=qos, retain=retain) def clearLastWill(self): """ Public method to remove a will that was previously configured with setLastWill(). """ self.__mqttClient.will_clear() def setTLS(self, caCerts=None, certFile=None, keyFile=None): """ Public method to enable secure connections and set the TLS parameters. @param caCerts path to the Certificate Authority certificates file @type str @param certFile PEM encoded client certificate file @type str @param keyFile PEM encoded private key file @type str @return tuple containing a success flag and the error string of the paho-mqtt library @rtype tuple of (bool, str) """ try: self.__mqttClient.tls_set(ca_certs=caCerts, certfile=certFile, keyfile=keyFile) return True, "" except ValueError as err: return False, str(err) def startLoop(self): """ Public method to start the MQTT client loop. """ self.__mqttClient.loop_start() self.__loopStarted = True def stopLoop(self): """ Public method to stop the MQTT client loop. """ self.__mqttClient.loop_stop() self.__loopStarted = False def connectToServer(self, host, port=1883, keepalive=60, bindAddress=""): """ Public method to connect to a remote MQTT broker. @param host host name or IP address of the remote broker @type str @param port network port of the server host to connect to (default: 1883, using TLS: 8883) @type int @param keepalive maximum period in seconds allowed between communications with the broker @type int @param bindAddress IP address of a local network interface to bind this client to @type str """ # TODO: get this fixed or allow to interrupt self.__mqttClient.reconnect_delay_set(max_delay=16) self.__mqttClient.connect_async( host, port=port, keepalive=keepalive, bind_address=bindAddress) if not self.__loopStarted: self.startLoop() def connectToServerWithOptions(self, host, port=1883, bindAddress="", options=None): """ Public method to connect to a remote MQTT broker. @param host host name or IP address of the remote broker @type str @param port network port of the server host to connect to (default: 1883, using TLS: 8883) @type int @param bindAddress IP address of a local network interface to bind this client to @type str @param options dictionary containing the connection options. This dictionary should contain the keys "ClientId", "Keepalive", "CleanSession", "Username", "Password", "WillTopic", "WillMessage", "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey" @type dict """ if options: parametersDict = self.defaultConnectionOptions() parametersDict.update(options) # step 1: reinitialize to set the client ID and clean session flag self.reinitialise( clientId=parametersDict["ClientId"], cleanSession=parametersDict["CleanSession"] ) # step 2: set username and password if parametersDict["Username"]: if parametersDict["Password"]: self.setUserCredentials( parametersDict["Username"], pwConvert(parametersDict["Password"], encode=False)) else: self.setUserCredentials(parametersDict["Username"]) # step 3: set last will data if parametersDict["WillTopic"]: if parametersDict["WillMessage"]: willMessage = parametersDict["WillMessage"] else: # empty message to clear the will willMessage = None self.setLastWill(parametersDict["WillTopic"], willMessage, parametersDict["WillQos"], parametersDict["WillRetain"]) # step 4: set TLS parameters if parametersDict["TlsEnable"]: if parametersDict["TlsCaCert"] and \ parametersDict["TlsClientCert"]: # use self signed client certificate self.setTLS(caCerts=parametersDict["TlsCaCert"], certFile=parametersDict["TlsClientCert"], keyFile=parametersDict["TlsClientKey"]) elif parametersDict["TlsCaCert"]: # use CA certificate file self.setTLS(caCerts=parametersDict["TlsCaCert"]) else: # use default TLS configuration self.setTLS() # step 5: connect to server self.connectToServer(host, port=port, keepalive=parametersDict["Keepalive"]) else: keepalive = self.defaultConnectionOptions["Keepalive"] self.connectToServer(host, port=port, keepalive=keepalive, bindAddress=bindAddress) def defaultConnectionOptions(self): """ Public method to get a connection options dictionary with default values. @return dictionary containing the default connection options. It has the keys "ClientId", "Keepalive", "CleanSession", "Username", "Password", "WillTopic", "WillMessage", "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey". @rtype dict """ return { "ClientId": "ERIC6_MQTT_MONITOR_CLIENT", "Keepalive": 60, "CleanSession": True, "Username": "", "Password": "", "WillTopic": "", "WillMessage": "", "WillQos": 0, "WillRetain": False, "TlsEnable": False, "TlsCaCert": "", "TlsClientCert": "", "TlsClientKey": "", } def reconnectToServer(self): """ Public method to reconnect the client with the same parameters. """ self.__mqttClient.reconnect() if not self.__loopStarted: self.startLoop() def disconnectFromServer(self): """ Public method to disconnect the client from the remote broker. """ self.__mqttClient.disconnect() def subscribe(self, topic, qos=0): """ Public method to subscribe to topics with quality of service. @param topic single topic to subscribe to or a tuple with a topic and a QoS or a list of tuples with a topic and a QoS each @type str or tuple of (str, int) or list of tuple of (str, int) @param qos quality of service @type int, one of 0, 1 or 2 @return tuple containing the result code and the message ID @rtype tuple of (int, int) """ return self.__mqttClient.subscribe(topic, qos=qos) def unsubscribe(self, topic): """ Public method to unsubscribe topics. @param topic topic or list of topics to unsubscribe @type str or list of str @return tuple containing the result code and the message ID @rtype tuple of (int, int) """ return self.__mqttClient.unsubscribe(topic) def publish(self, topic, payload=None, qos=0, retain=False): """ Public method to publish to a topic. @param topic topic to publish to @type str @param payload data to be published @type str, bytes, int or float @param qos quality of service @type int, one of 0, 1 or 2 @param retain flag indicating to set as the "last known good"/retained message for the topic @type bool @return message info object @rtype mqtt.MQTTMessageInfo """ return self.__mqttClient.publish(topic, payload=payload, qos=qos, retain=retain) def mqttConnackMessage(connackCode): """ Public method to get the string associated with a CONNACK result. @param connackCode result code of the connection request @type int @return textual representation for the result code @rtype str """ if connackCode == mqtt.CONNACK_ACCEPTED: return QCoreApplication.translate( "MqttConnackMessage", "Connection Accepted.") elif connackCode == mqtt.CONNACK_REFUSED_PROTOCOL_VERSION: return QCoreApplication.translate( "MqttConnackMessage", "Connection Refused: unacceptable protocol version.") elif connackCode == mqtt.CONNACK_REFUSED_IDENTIFIER_REJECTED: return QCoreApplication.translate( "MqttConnackMessage", "Connection Refused: identifier rejected.") elif connackCode == mqtt.CONNACK_REFUSED_SERVER_UNAVAILABLE: return QCoreApplication.translate( "MqttConnackMessage", "Connection Refused: broker unavailable.") elif connackCode == mqtt.CONNACK_REFUSED_BAD_USERNAME_PASSWORD: return QCoreApplication.translate( "MqttConnackMessage", "Connection Refused: bad user name or password.") elif connackCode == mqtt.CONNACK_REFUSED_NOT_AUTHORIZED: return QCoreApplication.translate( "MqttConnackMessage", "Connection Refused: not authorised.") else: return QCoreApplication.translate( "MqttConnackMessage", "Connection Refused: unknown reason.") def mqttErrorMessage(mqttErrno): """ Public method to get the error string associated with an MQTT error number. @param mqttErrno result code of a MQTT request @type int @return textual representation of the result code @rtype str """ if mqttErrno == mqtt.MQTT_ERR_SUCCESS: return QCoreApplication.translate( "MqttErrorMessage", "No error.") elif mqttErrno == mqtt.MQTT_ERR_NOMEM: return QCoreApplication.translate( "MqttErrorMessage", "Out of memory.") elif mqttErrno == mqtt.MQTT_ERR_PROTOCOL: return QCoreApplication.translate( "MqttErrorMessage", "A network protocol error occurred when communicating with" " the broker.") elif mqttErrno == mqtt.MQTT_ERR_INVAL: return QCoreApplication.translate( "MqttErrorMessage", "Invalid function arguments provided.") elif mqttErrno == mqtt.MQTT_ERR_NO_CONN: return QCoreApplication.translate( "MqttErrorMessage", "The client is not currently connected.") elif mqttErrno == mqtt.MQTT_ERR_CONN_REFUSED: return QCoreApplication.translate( "MqttErrorMessage", "The connection was refused.") elif mqttErrno == mqtt.MQTT_ERR_NOT_FOUND: return QCoreApplication.translate( "MqttErrorMessage", "Message not found (internal error).") elif mqttErrno == mqtt.MQTT_ERR_CONN_LOST: return QCoreApplication.translate( "MqttErrorMessage", "The connection was lost.") elif mqttErrno == mqtt.MQTT_ERR_TLS: return QCoreApplication.translate( "MqttErrorMessage", "A TLS error occurred.") elif mqttErrno == mqtt.MQTT_ERR_PAYLOAD_SIZE: return QCoreApplication.translate( "MqttErrorMessage", "Payload too large.") elif mqttErrno == mqtt.MQTT_ERR_NOT_SUPPORTED: return QCoreApplication.translate( "MqttErrorMessage", "This feature is not supported.") elif mqttErrno == mqtt.MQTT_ERR_AUTH: return QCoreApplication.translate( "MqttErrorMessage", "Authorisation failed.") elif mqttErrno == mqtt.MQTT_ERR_ACL_DENIED: return QCoreApplication.translate( "MqttErrorMessage", "Access denied by ACL.") elif mqttErrno == mqtt.MQTT_ERR_UNKNOWN: return QCoreApplication.translate( "MqttErrorMessage", "Unknown error.") elif mqttErrno == mqtt.MQTT_ERR_ERRNO: return QCoreApplication.translate( "MqttErrorMessage", "Error defined by errno.") else: return QCoreApplication.translate( "MqttErrorMessage", "Unknown error.")