Sat, 19 Oct 2024 16:04:24 +0200
Fixed a backward compatibility issue with eric-ide < 24.11.
# -*- coding: utf-8 -*- # Copyright (c) 2018 - 2024 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing a PyQt wrapper around the paho MQTT client. """ import paho.mqtt.client as mqtt from paho.mqtt.packettypes import PacketTypes from paho.mqtt.properties import Properties from PyQt6.QtCore import QCoreApplication, QObject, QTimer, pyqtSignal, pyqtSlot try: from eric7.EricUtilities.crypto import pwConvert except ImportError: # backward compatibility for eric-ide < 24.11 from eric7.Utilities.crypto import pwConvert from .MqttProtocols import MqttProtocols class MqttClient(QObject): """ Class implementing a PyQt wrapper around the paho MQTT client. @signal onConnect(connectFlags, rc, packetType, properties) emitted after the client has connected to the broker @signal onDisconnected(rc, packetType) emitted after the client has disconnected from the broker @signal onLog(level, message) emitted to send client log data @signal onMessage(topic, payload, qos, retain, properties) emitted after a message has been received by the client @signal onPublish(mid) emitted after a message has been published @signal onSubscribe(mid, reasonCodes, properties) emitted after the client has subscribed to some topics @signal onUnsubscribe(mid) emitted after the client has unsubscribed from some topics (MQTT v3) @signal onUnsubscribe(mid, rc, packetType, properties) emitted after the client has unsubscribed from some topics (MQTT v5) @signal connectTimeout() emitted to indicate, that a connection attempt timed out """ onConnect = pyqtSignal(bool, int, int, dict) onDisconnected = pyqtSignal(int, int) onLog = pyqtSignal(int, str) onMessage = pyqtSignal(str, bytes, int, bool, dict) onPublish = pyqtSignal(int) onSubscribe = pyqtSignal(int, list, dict) onUnsubscribe = pyqtSignal((int,), (int, int, int, dict)) connectTimeout = pyqtSignal() DefaultConnectTimeout = 15 # connect timeout in seconds LogDebug = 0x01 LogInfo = 0x02 LogNotice = 0x04 LogWarning = 0x08 LogError = 0x10 LogDisabled = 0xFF LogLevelMap = { mqtt.MQTT_LOG_DEBUG: LogDebug, mqtt.MQTT_LOG_INFO: LogInfo, mqtt.MQTT_LOG_NOTICE: LogNotice, mqtt.MQTT_LOG_WARNING: LogWarning, # __NO-TASK__ mqtt.MQTT_LOG_ERR: LogError, } def __init__( self, clientId="", cleanSession=True, userdata=None, protocol=mqtt.MQTTv311, transport="tcp", parent=None, ): """ Constructor @param clientId ID to be used for the client @type str @param cleanSession flag indicating to start a clean session @type bool @param userdata user data @type any @param protocol version of the MQTT protocol to use @type int, one of mqtt.MQTTv31, mqtt.MQTTv311 or mqtt.MQTTv5 @param transport transport to be used @type str, one of "tcp" or "websockets" @param parent reference to the parent object @type QObject """ QObject.__init__(self, parent=parent) self.__loopStarted = False self.__connectTimeoutTimer = QTimer(self) self.__connectTimeoutTimer.setSingleShot(True) self.__connectTimeoutTimer.setInterval(MqttClient.DefaultConnectTimeout * 1000) self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout) self.onConnect.connect(self.__connectTimeoutTimer.stop) self.__cleanSession = cleanSession self.__protocol = protocol self.__disconnectUserProperties = [] if protocol == MqttProtocols.MQTTv5: cleanSession = None self.__mqttClient = mqtt.Client( mqtt.CallbackAPIVersion.VERSION2, client_id=clientId, clean_session=cleanSession, userdata=userdata, protocol=int(protocol), transport=transport, ) self.__initCallbacks() def __initCallbacks(self): """ Private method to initialize the MQTT callback methods. """ self.__mqttClient.on_connect = self.__onConnect self.__mqttClient.on_disconnect = self.__onDisconnected self.__mqttClient.on_log = self.__onLog self.__mqttClient.on_message = self.__onMessage self.__mqttClient.on_publish = self.__onPublish self.__mqttClient.on_subscribe = self.__onSubscribe self.__mqttClient.on_unsubscribe = self.__onUnsubscribe def __onConnect( self, _client, _userdata, connectFlags, rc, properties, ): """ Private method to handle the connect to the broker. @param _client reference to the client object (unused) @type paho.mqtt.Client @param _userdata user data (unused) @type Any @param connectFlags response flags sent by the broker @type mqtt.ConnectFlags @param rc reason code @type paho.mqtt.ReasonCodes @param properties MQTT v5.0 properties received from the broker @type dict """ self.onConnect.emit( connectFlags.session_present, rc.value, rc.packetType, properties.json() ) def __onDisconnected( self, _client, _userdata, _flags, rc, _properties=None, ): """ Private method to handle the disconnect from the broker. @param _client reference to the client object (unused) @type paho.mqtt.Client @param _userdata user data (unused) @type Any @param _flags dictionary containing the response flags sent by the broker (unused) @type dict @param rc result code or reason code @type int or paho.mqtt.ReasonCodes @param _properties MQTT v5.0 properties received from the broker (defaults to None) (unused) @type dict (optional) """ if isinstance(rc, int): packetType = PacketTypes.DISCONNECT resultCode = rc else: packetType = rc.packetType resultCode = rc.value self.onDisconnected.emit(resultCode, packetType) def __onSubscribe( self, _client, _userdata, mid, reasonCodes, properties, ): """ Private method to handle a subscribe event. @param _client reference to the client object (unused) @type paho.mqtt.Client @param _userdata user data (unused) @type Any @param mid message ID @type int @param reasonCodes list of reason code for each subscribed topic @type list of paho.mqtt.ReasonCodes @param properties MQTT v5.0 properties received from the broker @type dict """ self.onSubscribe.emit(mid, reasonCodes, properties.json()) def __onUnsubscribe( self, _client, _userdata, mid, reasonCodes, properties, ): """ Private method to handle an unsubscribe event. @param _client reference to the client object (unused) @type paho.mqtt.Client @param _userdata user data (unused) @type Any @param mid message ID @type int @param reasonCodes list of reason code for each unsubscribed topic @type list of paho.mqtt.ReasonCodes @param properties MQTT v5.0 properties received from the broker @type dict """ if reasonCodes: self.onUnsubscribe[int, int, int, dict].emit( mid, reasonCodes[0].value, reasonCodes[0].packetType, properties.json() ) else: self.onUnsubscribe[int].emit(mid) def __onMessage( self, _client, _userdata, message, ): """ Private method to handle a new message received from the broker. @param _client reference to the client object (unused) @type paho.mqtt.Client @param _userdata user data (unused) @type Any @param message received message object @type paho.mqtt.MQTTMessage """ self.onMessage.emit( message.topic, message.payload, message.qos, message.retain, message.properties.json() if message.properties is not None else {}, ) def __onLog( self, _client, _userdata, level, buf, ): """ Private method to handle a log event. @param _client reference to the client object (unused) @type paho.mqtt.Client @param _userdata user data (unused) @type Any @param level severity of the log message @type int @param buf log message @type str """ self.onLog.emit(level, buf) def __onPublish( self, _client, _userdata, mid, _reasonCode, _properties, ): """ Private method to handle the publishing of a message. @param _client reference to the client object (unused) @type paho.mqtt.Client @param _userdata user data (unused) @type Any @param mid message ID @type int @param _reasonCode reason code (unused) @type paho.mqtt.ReasonCodes @param _properties MQTT v5.0 properties received from the broker (unused) @type dict """ self.onPublish.emit(mid) @pyqtSlot() def __connectTimeout(self): """ Private slot handling a failed connection attempt. """ self.stopLoop() self.connectTimeout.emit() def setConnectionTimeout(self, timeout): """ Public method to set the connection timeout value. @param timeout timeout value to be set in seconds @type int """ self.__connectTimeoutTimer.setInterval(timeout * 1000) def setMaxInflightMessages(self, inflight=20): """ Public method to set the maximum number of messages with QoS > 0 that can be part way through their network flow at once. @param inflight maximum number of messages in flight @type int """ self.__mqttClient.max_inflight_messages_set(inflight) def setMaxQueuedMessages(self, queueSize=0): """ Public method to set the maximum number of messages with QoS > 0 that can be pending in the outgoing message queue. @param queueSize maximum number of queued messages (0 = unlimited) @type int """ self.__mqttClient.max_queued_messages_set(queueSize) def setUserCredentials(self, username, password=None): """ Public method to set the user name and optionally the password. @param username user name to be set @type str @param password optional password @type str """ self.__mqttClient.username_pw_set(username, password=password) def setUserData(self, userdata): """ Public method to set the user data. @param userdata user data @type any """ self.__mqttClient.user_data_set(userdata) def setLastWill(self, topic, payload=None, qos=0, retain=False, properties=None): """ Public method to set the last will of the client. @param topic topic the will message should be published on @type str @param payload message to send as a will @type str, bytes, int or float @param qos quality of service level to use for the will @type int, one of 0, 1 or 2 @param retain flag indicating to set as the "last known good"/retained message for the will topic @type bool @param properties list of user properties to be sent with the last will message @type list of tuple of (str, str) """ self.__mqttClient.will_set( topic, payload=payload, qos=qos, retain=retain, properties=properties ) def clearLastWill(self): """ Public method to remove a will that was previously configured with setLastWill(). """ self.__mqttClient.will_clear() def setTLS(self, caCerts=None, certFile=None, keyFile=None): """ Public method to enable secure connections and set the TLS parameters. @param caCerts path to the Certificate Authority certificates file @type str @param certFile PEM encoded client certificate file @type str @param keyFile PEM encoded private key file @type str @return tuple containing a success flag and the error string of the paho-mqtt library @rtype tuple of (bool, str) """ try: self.__mqttClient.tls_set( ca_certs=caCerts, certfile=certFile, keyfile=keyFile ) return True, "" except (FileNotFoundError, ValueError) as err: return False, str(err) return False, "unspecific error occurred" def getProtocol(self): """ Public method to get the MQTT protocol version. @return MQTT protocol version in use @rtype int """ return self.__protocol def startLoop(self): """ Public method to start the MQTT client loop. """ self.__mqttClient.loop_start() self.__loopStarted = True def stopLoop(self): """ Public method to stop the MQTT client loop. """ self.__mqttClient.loop_stop() self.__loopStarted = False def connectToServer( self, host, port=1883, keepalive=60, bindAddress="", properties=None, clearWill=False, ): """ Public method to connect to a remote MQTT broker. @param host host name or IP address of the remote broker @type str @param port network port of the server host to connect to (default: 1883, using TLS: 8883) @type int @param keepalive maximum period in seconds allowed between communications with the broker @type int @param bindAddress IP address of a local network interface to bind this client to @type str @param properties list of user properties to be sent with the subscription @type list of tuple of (str, str) @param clearWill flag indicating to clear the last will previously set @type bool """ if clearWill: self.clearLastWill() props = ( self.__createPropertiesObject(PacketTypes.CONNECT, properties) if properties else None ) self.__mqttClient.connect_async( host, port=port, keepalive=keepalive, bind_address=bindAddress, clean_start=self.__cleanSession, properties=props, ) self.__connectTimeoutTimer.start() if not self.__loopStarted: self.startLoop() def connectToServerWithOptions( self, host, port=1883, bindAddress="", options=None, clearWill=False ): """ Public method to connect to a remote MQTT broker. @param host host name or IP address of the remote broker @type str @param port network port of the server host to connect to (default: 1883, using TLS: 8883) @type int @param bindAddress IP address of a local network interface to bind this client to @type str @param options dictionary containing the connection options. This dictionary should contain the keys "ClientId", "ConnectionTimeout", "Keepalive", "CleanSession", "Username", "Password", "WillTopic", "WillMessage", "WillQos", "WillRetain", "WillProperties", "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey", "UserProperties". @type dict @param clearWill flag indicating to clear the last will previously set @type bool """ if options: parametersDict = self.defaultConnectionOptions() parametersDict.update(options) self.setConnectionTimeout(parametersDict["ConnectionTimeout"]) # step 1: set username and password if parametersDict["Username"]: if parametersDict["Password"]: self.setUserCredentials( parametersDict["Username"], pwConvert(parametersDict["Password"], encode=False), ) else: self.setUserCredentials(parametersDict["Username"]) # step 2: set last will data if not clearWill and parametersDict["WillTopic"]: if parametersDict["WillMessage"]: willMessage = parametersDict["WillMessage"] else: # empty message to clear the will willMessage = None props = ( self.__createPropertiesObject( PacketTypes.WILLMESSAGE, parametersDict["WillProperties"] ) if ( parametersDict["WillProperties"] and self.__protocol == MqttProtocols.MQTTv5 ) else None ) self.setLastWill( parametersDict["WillTopic"], payload=willMessage, qos=parametersDict["WillQos"], retain=parametersDict["WillRetain"], properties=props, ) # step 3: set TLS parameters if parametersDict["TlsEnable"]: if parametersDict["TlsCaCert"] and parametersDict["TlsClientCert"]: # use self signed client certificate self.setTLS( caCerts=parametersDict["TlsCaCert"], certFile=parametersDict["TlsClientCert"], keyFile=parametersDict["TlsClientKey"], ) elif parametersDict["TlsCaCert"]: # use CA certificate file self.setTLS(caCerts=parametersDict["TlsCaCert"]) else: # use default TLS configuration self.setTLS() # step 4: get the connect user properties if self.__protocol == MqttProtocols.MQTTv5: try: userProperties = parametersDict["UserProperties"] properties = userProperties["connect"][:] self.__disconnectUserProperties = ( userProperties["connect"][:] if userProperties["use_connect"] else userProperties["disconnect"][:] ) except KeyError: properties = None else: properties = None # step 4: connect to server self.__cleanSession = parametersDict["CleanSession"] self.connectToServer( host, port=port, keepalive=parametersDict["Keepalive"], properties=properties, clearWill=clearWill, ) else: keepalive = self.defaultConnectionOptions()["Keepalive"] self.connectToServer( host, port=port, keepalive=keepalive, bindAddress=bindAddress, clearWill=clearWill, ) @classmethod def defaultConnectionOptions(cls): """ Class method to get a connection options dictionary with default values. @return dictionary containing the default connection options. It has the keys "ClientId", "Protocol", "ConnectionTimeout", "Keepalive", "CleanSession", "Username", "Password", "WillTopic", "WillMessage", "WillQos", "WillRetain", "WillProperties", "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey", "UserProperties". @rtype dict """ from PluginMqttMonitor import mqttPluginObject # noqa: I102 return { "ClientId": "ERIC7_MQTT_MONITOR_CLIENT", "Protocol": mqttPluginObject.getPreferences("DefaultProtocol"), "ConnectionTimeout": MqttClient.DefaultConnectTimeout, "Keepalive": 60, "CleanSession": True, "Username": "", "Password": "", "WillTopic": "", "WillMessage": "", "WillQos": 0, "WillRetain": False, "WillProperties": [], "TlsEnable": False, "TlsCaCert": "", "TlsClientCert": "", "TlsClientKey": "", "UserProperties": { "connect": [], "disconnect": [], "use_connect": True, }, } def reconnectToServer(self): """ Public method to reconnect the client with the same parameters. """ self.__connectTimeoutTimer.start() self.__mqttClient.reconnect() if not self.__loopStarted: self.startLoop() def disconnectFromServer(self): """ Public method to disconnect the client from the remote broker. """ self.__connectTimeoutTimer.stop() props = ( self.__createPropertiesObject( PacketTypes.DISCONNECT, self.__disconnectUserProperties ) if self.__disconnectUserProperties else None ) self.__mqttClient.disconnect(properties=props) def subscribe(self, topic, qos=0, properties=None): """ Public method to subscribe to topics with quality of service. @param topic single topic to subscribe to or a tuple with a topic and a QoS or a list of tuples with a topic and a QoS each @type str or tuple of (str, int) or list of tuple of (str, int) @param qos quality of service @type int, one of 0, 1 or 2 @param properties list of user properties to be sent with the subscription @type list of tuple of (str, str) @return tuple containing the result code and the message ID @rtype tuple of (int, int) """ props = ( self.__createPropertiesObject(PacketTypes.SUBSCRIBE, properties) if properties else None ) return self.__mqttClient.subscribe(topic, qos=qos, properties=props) def unsubscribe(self, topic, properties=None): """ Public method to unsubscribe topics. @param topic topic or list of topics to unsubscribe @type str or list of str @param properties list of user properties to be sent with the subscription @type list of tuple of (str, str) @return tuple containing the result code and the message ID @rtype tuple of (int, int) """ props = ( self.__createPropertiesObject(PacketTypes.UNSUBSCRIBE, properties) if properties else None ) return self.__mqttClient.unsubscribe(topic, properties=props) def publish(self, topic, payload=None, qos=0, retain=False, properties=None): """ Public method to publish to a topic. @param topic topic to publish to @type str @param payload data to be published @type str, bytes, int or float @param qos quality of service @type int, one of 0, 1 or 2 @param retain flag indicating to set as the "last known good"/retained message for the topic @type bool @param properties list of user properties to be sent with the subscription @type list of tuple of (str, str) @return message info object @rtype mqtt.MQTTMessageInfo """ props = ( self.__createPropertiesObject(PacketTypes.PUBLISH, properties) if properties else None ) return self.__mqttClient.publish( topic, payload=payload, qos=qos, retain=retain, properties=props ) def __createPropertiesObject(self, packetType, properties): """ Private method to assemble the MQTT v5 properties object. @param packetType type of the MQTT packet @type PacketTypes (= int) @param properties list of user properties @type list of tuple of (str, str) @return MQTT v5 properties object @rtype Properties """ props = Properties(packetType) props.UserProperty = properties return props def mqttErrorMessage(mqttErrno): """ Module function to get the error string associated with an MQTT error number. @param mqttErrno result code of a MQTT request @type int @return textual representation of the result code @rtype str """ if mqttErrno == mqtt.MQTT_ERR_SUCCESS: return QCoreApplication.translate("MqttErrorMessage", "No error.") elif mqttErrno == mqtt.MQTT_ERR_NOMEM: return QCoreApplication.translate("MqttErrorMessage", "Out of memory.") elif mqttErrno == mqtt.MQTT_ERR_PROTOCOL: return QCoreApplication.translate( "MqttErrorMessage", "A network protocol error occurred when communicating with the broker.", ) elif mqttErrno == mqtt.MQTT_ERR_INVAL: return QCoreApplication.translate( "MqttErrorMessage", "Invalid function arguments provided." ) elif mqttErrno == mqtt.MQTT_ERR_NO_CONN: return QCoreApplication.translate( "MqttErrorMessage", "The client is not currently connected." ) elif mqttErrno == mqtt.MQTT_ERR_CONN_REFUSED: return QCoreApplication.translate( "MqttErrorMessage", "The connection was refused." ) elif mqttErrno == mqtt.MQTT_ERR_NOT_FOUND: return QCoreApplication.translate( "MqttErrorMessage", "Message not found (internal error)." ) elif mqttErrno == mqtt.MQTT_ERR_CONN_LOST: return QCoreApplication.translate( "MqttErrorMessage", "The connection was lost." ) elif mqttErrno == mqtt.MQTT_ERR_TLS: return QCoreApplication.translate("MqttErrorMessage", "A TLS error occurred.") elif mqttErrno == mqtt.MQTT_ERR_PAYLOAD_SIZE: return QCoreApplication.translate("MqttErrorMessage", "Payload too large.") elif mqttErrno == mqtt.MQTT_ERR_NOT_SUPPORTED: return QCoreApplication.translate( "MqttErrorMessage", "This feature is not supported." ) elif mqttErrno == mqtt.MQTT_ERR_AUTH: return QCoreApplication.translate("MqttErrorMessage", "Authorisation failed.") elif mqttErrno == mqtt.MQTT_ERR_ACL_DENIED: return QCoreApplication.translate("MqttErrorMessage", "Access denied by ACL.") elif mqttErrno == mqtt.MQTT_ERR_UNKNOWN: return QCoreApplication.translate("MqttErrorMessage", "Unknown error.") elif mqttErrno == mqtt.MQTT_ERR_ERRNO: return QCoreApplication.translate("MqttErrorMessage", "Error defined by errno.") elif mqttErrno == mqtt.MQTT_ERR_QUEUE_SIZE: return QCoreApplication.translate("MqttErrorMessage", "Message queue full.") else: return QCoreApplication.translate("MqttErrorMessage", "Unknown error.") def mqttLogLevelString(mqttLogLevel, isMqttLogLevel=True): """ Module function to get the log level string associated with a log level. @param mqttLogLevel log level of the paho-mqtt client @type int @param isMqttLogLevel flag indicating a MQTT log level is given (if False it is the MqttClient variant, i.e. Debug being lowest) @type bool @return textual representation of the log level @rtype str """ if isMqttLogLevel: try: logLevel = MqttClient.LogLevelMap[mqttLogLevel] except KeyError: return QCoreApplication.translate("MqttLogLevelString", "Unknown") else: logLevel = mqttLogLevel if logLevel == MqttClient.LogInfo: return QCoreApplication.translate("MqttLogLevelString", "Info") elif logLevel == MqttClient.LogNotice: return QCoreApplication.translate("MqttLogLevelString", "Notice") elif logLevel == MqttClient.LogWarning: return QCoreApplication.translate("MqttLogLevelString", "Warning") elif logLevel == MqttClient.LogError: return QCoreApplication.translate("MqttLogLevelString", "Error") elif logLevel == MqttClient.LogDebug: return QCoreApplication.translate("MqttLogLevelString", "Debug") elif logLevel == MqttClient.LogDisabled: return QCoreApplication.translate("MqttLogLevelString", "Logging Disabled") else: return QCoreApplication.translate("MqttLogLevelString", "Unknown")