--- a/MqttMonitor/MqttClient.py Sun Sep 09 12:21:19 2018 +0200 +++ b/MqttMonitor/MqttClient.py Sun Sep 09 17:32:54 2018 +0200 @@ -9,7 +9,8 @@ from __future__ import unicode_literals -from PyQt5.QtCore import pyqtSignal, QObject, QCoreApplication +from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject, QCoreApplication, \ + QTimer import paho.mqtt.client as mqtt @@ -24,6 +25,7 @@ broker @signal onDisconnected(rc) emitted after the client has disconnected from the broker + @signal onLog(level, message) emitted to send client log data @signal onMessage(topic, payload, qos, retain) emitted after a message has been received by the client @signal onPublish(mid) emitted after a message has been published @@ -34,11 +36,29 @@ """ onConnect = pyqtSignal(dict, int) onDisconnected = pyqtSignal(int) + onLog = pyqtSignal(int, str) onMessage = pyqtSignal(str, bytes, int, bool) onPublish = pyqtSignal(int) onSubscribe = pyqtSignal(int, tuple) onUnsubscribe = pyqtSignal(int) + connectTimeout = pyqtSignal() + + DefaultConnectTimeout = 15 # connect timeout in seconds + + LogDebug = 0x01 + LogInfo = 0x02 + LogNotice = 0x04 + LogWarning = 0x08 + LogError = 0x10 + LogLevelMap = { + mqtt.MQTT_LOG_DEBUG: LogDebug, + mqtt.MQTT_LOG_INFO: LogInfo, + mqtt.MQTT_LOG_NOTICE: LogNotice, + mqtt.MQTT_LOG_WARNING: LogWarning, # __NO-TASK__ + mqtt.MQTT_LOG_ERR: LogError, + } + def __init__(self, clientId="", cleanSession=True, userdata=None, protocol=mqtt.MQTTv311, transport="tcp", parent=None): """ @@ -61,6 +81,14 @@ self.__loopStarted = False + self.__connectTimeoutTimer = QTimer(self) + self.__connectTimeoutTimer.setSingleShot(True) + self.__connectTimeoutTimer.setInterval( + MqttClient.DefaultConnectTimeout * 1000) + self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout) + + self.onConnect.connect(self.__connectTimeoutTimer.stop) + self.__mqttClient = mqtt.Client( client_id=clientId, clean_session=cleanSession, userdata=None, protocol=mqtt.MQTTv311, transport="tcp") @@ -76,6 +104,8 @@ flags, rc) self.__mqttClient.on_disconnect = \ lambda client, userdata, rc: self.onDisconnected.emit(rc) + self.__mqttClient.on_log = \ + lambda client, userdata, level, buf: self.onLog.emit(level, buf) self.__mqttClient.on_message = \ lambda client, userdata, message: self.onMessage.emit( message.topic, message.payload, message.qos, message.retain) @@ -87,6 +117,14 @@ self.__mqttClient.on_unsubscribe = \ lambda client, userdata, mid: self.onUnsubscribe.emit(mid) + @pyqtSlot() + def __connectTimeout(self): + """ + Privat slot handling a failed connection attempt. + """ + self.stopLoop() + self.connectTimeout.emit() + def reinitialise(self, clientId="", cleanSession=True, userdata=None): """ Public method to reinitialize the client with given data. @@ -103,6 +141,15 @@ self.__initCallbacks() + def setConnectionTimeout(self, timeout): + """ + Public method to set the connection timeout value. + + @param timeout timeout value to be set in seconds + @type int + """ + self.__connectTimeoutTimer.setInterval(timeout * 1000) + def setMaxInflightMessages(self, inflight=20): """ Public method to set the maximum number of messages with QoS > 0 that @@ -202,7 +249,8 @@ self.__mqttClient.loop_stop() self.__loopStarted = False - def connectToServer(self, host, port=1883, keepalive=60, bindAddress=""): + def connectToServer(self, host, port=1883, keepalive=60, bindAddress="", + reinit=True): """ Public method to connect to a remote MQTT broker. @@ -218,11 +266,13 @@ this client to @type str """ - # TODO: get this fixed or allow to interrupt - self.__mqttClient.reconnect_delay_set(max_delay=16) + if reinit: + self.reinitialise() self.__mqttClient.connect_async( host, port=port, keepalive=keepalive, bind_address=bindAddress) + self.__connectTimeoutTimer.start() + if not self.__loopStarted: self.startLoop() @@ -243,7 +293,7 @@ dictionary should contain the keys "ClientId", "Keepalive", "CleanSession", "Username", "Password", "WillTopic", "WillMessage", "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", "TlsClientCert", - "TlsClientKey" + "TlsClientKey", "ConnectionTimeout" @type dict """ if options: @@ -255,6 +305,7 @@ clientId=parametersDict["ClientId"], cleanSession=parametersDict["CleanSession"] ) + self.setConnectionTimeout(parametersDict["ConnectionTimeout"]) # step 2: set username and password if parametersDict["Username"]: @@ -294,7 +345,8 @@ # step 5: connect to server self.connectToServer(host, port=port, - keepalive=parametersDict["Keepalive"]) + keepalive=parametersDict["Keepalive"], + reinit=False) else: keepalive = self.defaultConnectionOptions["Keepalive"] self.connectToServer(host, port=port, keepalive=keepalive, @@ -308,11 +360,13 @@ @return dictionary containing the default connection options. It has the keys "ClientId", "Keepalive", "CleanSession", "Username", "Password", "WillTopic", "WillMessage", "WillQos", "WillRetain", - "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey". + "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey", + "ConnectionTimeout". @rtype dict """ return { "ClientId": "ERIC6_MQTT_MONITOR_CLIENT", + "ConnectionTimeout": MqttClient.DefaultConnectTimeout, "Keepalive": 60, "CleanSession": True, "Username": "", @@ -331,6 +385,8 @@ """ Public method to reconnect the client with the same parameters. """ + self.__connectTimeoutTimer.start() + self.__mqttClient.reconnect() if not self.__loopStarted: @@ -340,6 +396,8 @@ """ Public method to disconnect the client from the remote broker. """ + self.__connectTimeoutTimer.stop() + self.__mqttClient.disconnect() def subscribe(self, topic, qos=0): @@ -389,7 +447,7 @@ def mqttConnackMessage(connackCode): """ - Public method to get the string associated with a CONNACK result. + Module function to get the string associated with a CONNACK result. @param connackCode result code of the connection request @type int @@ -428,7 +486,7 @@ def mqttErrorMessage(mqttErrno): """ - Public method to get the error string associated with an MQTT error + Module function to get the error string associated with an MQTT error number. @param mqttErrno result code of a MQTT request @@ -501,3 +559,37 @@ return QCoreApplication.translate( "MqttErrorMessage", "Unknown error.") + + +def mqttLogLevelString(mqttLogLevel, isMqttLogLevel=True): + """ + Module function to get the log level string associated with a log level. + + @param mqttLogLevel log level of the paho-mqtt client + @type int + @param isMqttLogLevel flag indicating a MQTT log level is given (if + False it is the MqttClient variant, i.e. Debug being lowest) + @type bool + @return textual representation of the log level + @rtype str + """ + if isMqttLogLevel: + try: + logLevel = MqttClient.LogLevelMap[mqttLogLevel] + except KeyError: + return QCoreApplication.translate("MqttLogLevelString", "Unknown") + else: + logLevel = mqttLogLevel + + if logLevel == MqttClient.LogInfo: + return QCoreApplication.translate("MqttLogLevelString", "Info") + elif logLevel == MqttClient.LogNotice: + return QCoreApplication.translate("MqttLogLevelString", "Notice") + elif logLevel == MqttClient.LogWarning: + return QCoreApplication.translate("MqttLogLevelString", "Warning") + elif logLevel == MqttClient.LogError: + return QCoreApplication.translate("MqttLogLevelString", "Error") + elif logLevel == MqttClient.LogDebug: + return QCoreApplication.translate("MqttLogLevelString", "Debug") + else: + return QCoreApplication.translate("MqttLogLevelString", "Unknown")