MqttMonitor/MqttClient.py

changeset 31
40582e448c4b
parent 30
17ef10819773
child 43
a0853f7a8b80
--- a/MqttMonitor/MqttClient.py	Sun Sep 09 12:21:19 2018 +0200
+++ b/MqttMonitor/MqttClient.py	Sun Sep 09 17:32:54 2018 +0200
@@ -9,7 +9,8 @@
 
 from __future__ import unicode_literals
 
-from PyQt5.QtCore import pyqtSignal, QObject, QCoreApplication
+from PyQt5.QtCore import pyqtSignal, pyqtSlot, QObject, QCoreApplication, \
+    QTimer
 
 import paho.mqtt.client as mqtt
 
@@ -24,6 +25,7 @@
         broker
     @signal onDisconnected(rc) emitted after the client has disconnected from
         the broker
+    @signal onLog(level, message) emitted to send client log data
     @signal onMessage(topic, payload, qos, retain) emitted after a message has
         been received by the client
     @signal onPublish(mid) emitted after a message has been published
@@ -34,11 +36,29 @@
     """
     onConnect = pyqtSignal(dict, int)
     onDisconnected = pyqtSignal(int)
+    onLog = pyqtSignal(int, str)
     onMessage = pyqtSignal(str, bytes, int, bool)
     onPublish = pyqtSignal(int)
     onSubscribe = pyqtSignal(int, tuple)
     onUnsubscribe = pyqtSignal(int)
     
+    connectTimeout = pyqtSignal()
+    
+    DefaultConnectTimeout = 15      # connect timeout in seconds
+    
+    LogDebug = 0x01
+    LogInfo = 0x02
+    LogNotice = 0x04
+    LogWarning = 0x08
+    LogError = 0x10
+    LogLevelMap = {
+        mqtt.MQTT_LOG_DEBUG: LogDebug,
+        mqtt.MQTT_LOG_INFO: LogInfo,
+        mqtt.MQTT_LOG_NOTICE: LogNotice,
+        mqtt.MQTT_LOG_WARNING: LogWarning,                        # __NO-TASK__
+        mqtt.MQTT_LOG_ERR: LogError,
+    }
+    
     def __init__(self, clientId="", cleanSession=True, userdata=None,
                  protocol=mqtt.MQTTv311, transport="tcp", parent=None):
         """
@@ -61,6 +81,14 @@
         
         self.__loopStarted = False
         
+        self.__connectTimeoutTimer = QTimer(self)
+        self.__connectTimeoutTimer.setSingleShot(True)
+        self.__connectTimeoutTimer.setInterval(
+            MqttClient.DefaultConnectTimeout * 1000)
+        self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout)
+        
+        self.onConnect.connect(self.__connectTimeoutTimer.stop)
+        
         self.__mqttClient = mqtt.Client(
             client_id=clientId, clean_session=cleanSession, userdata=None,
             protocol=mqtt.MQTTv311, transport="tcp")
@@ -76,6 +104,8 @@
                 flags, rc)
         self.__mqttClient.on_disconnect = \
             lambda client, userdata, rc: self.onDisconnected.emit(rc)
+        self.__mqttClient.on_log = \
+            lambda client, userdata, level, buf: self.onLog.emit(level, buf)
         self.__mqttClient.on_message = \
             lambda client, userdata, message: self.onMessage.emit(
                 message.topic, message.payload, message.qos, message.retain)
@@ -87,6 +117,14 @@
         self.__mqttClient.on_unsubscribe = \
             lambda client, userdata, mid: self.onUnsubscribe.emit(mid)
     
+    @pyqtSlot()
+    def __connectTimeout(self):
+        """
+        Privat slot handling a failed connection attempt.
+        """
+        self.stopLoop()
+        self.connectTimeout.emit()
+    
     def reinitialise(self, clientId="", cleanSession=True, userdata=None):
         """
         Public method to reinitialize the client with given data.
@@ -103,6 +141,15 @@
         
         self.__initCallbacks()
     
+    def setConnectionTimeout(self, timeout):
+        """
+        Public method to set the connection timeout value.
+        
+        @param timeout timeout value to be set in seconds
+        @type int
+        """
+        self.__connectTimeoutTimer.setInterval(timeout * 1000)
+    
     def setMaxInflightMessages(self, inflight=20):
         """
         Public method to set the maximum number of messages with QoS > 0 that
@@ -202,7 +249,8 @@
         self.__mqttClient.loop_stop()
         self.__loopStarted = False
     
-    def connectToServer(self, host, port=1883, keepalive=60, bindAddress=""):
+    def connectToServer(self, host, port=1883, keepalive=60, bindAddress="",
+                        reinit=True):
         """
         Public method to connect to a remote MQTT broker.
         
@@ -218,11 +266,13 @@
             this client to
         @type str
         """
-        # TODO: get this fixed or allow to interrupt
-        self.__mqttClient.reconnect_delay_set(max_delay=16)
+        if reinit:
+            self.reinitialise()
         self.__mqttClient.connect_async(
             host, port=port, keepalive=keepalive, bind_address=bindAddress)
         
+        self.__connectTimeoutTimer.start()
+        
         if not self.__loopStarted:
             self.startLoop()
     
@@ -243,7 +293,7 @@
             dictionary should contain the keys "ClientId", "Keepalive",
             "CleanSession", "Username", "Password", "WillTopic", "WillMessage",
             "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", "TlsClientCert",
-            "TlsClientKey"
+            "TlsClientKey", "ConnectionTimeout"
         @type dict
         """
         if options:
@@ -255,6 +305,7 @@
                 clientId=parametersDict["ClientId"],
                 cleanSession=parametersDict["CleanSession"]
             )
+            self.setConnectionTimeout(parametersDict["ConnectionTimeout"])
             
             # step 2: set username and password
             if parametersDict["Username"]:
@@ -294,7 +345,8 @@
             
             # step 5: connect to server
             self.connectToServer(host, port=port,
-                                 keepalive=parametersDict["Keepalive"])
+                                 keepalive=parametersDict["Keepalive"],
+                                 reinit=False)
         else:
             keepalive = self.defaultConnectionOptions["Keepalive"]
             self.connectToServer(host, port=port, keepalive=keepalive,
@@ -308,11 +360,13 @@
         @return dictionary containing the default connection options. It has
             the keys "ClientId", "Keepalive", "CleanSession", "Username",
             "Password", "WillTopic", "WillMessage", "WillQos", "WillRetain",
-            "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey".
+            "TlsEnable", "TlsCaCert", "TlsClientCert", "TlsClientKey",
+            "ConnectionTimeout".
         @rtype dict
         """
         return {
             "ClientId": "ERIC6_MQTT_MONITOR_CLIENT",
+            "ConnectionTimeout": MqttClient.DefaultConnectTimeout,
             "Keepalive": 60,
             "CleanSession": True,
             "Username": "",
@@ -331,6 +385,8 @@
         """
         Public method to reconnect the client with the same parameters.
         """
+        self.__connectTimeoutTimer.start()
+        
         self.__mqttClient.reconnect()
         
         if not self.__loopStarted:
@@ -340,6 +396,8 @@
         """
         Public method to disconnect the client from the remote broker.
         """
+        self.__connectTimeoutTimer.stop()
+        
         self.__mqttClient.disconnect()
     
     def subscribe(self, topic, qos=0):
@@ -389,7 +447,7 @@
 
 def mqttConnackMessage(connackCode):
     """
-    Public method to get the string associated with a CONNACK result.
+    Module function to get the string associated with a CONNACK result.
     
     @param connackCode result code of the connection request
     @type int
@@ -428,7 +486,7 @@
 
 def mqttErrorMessage(mqttErrno):
     """
-    Public method to get the error string associated with an MQTT error
+    Module function to get the error string associated with an MQTT error
     number.
     
     @param mqttErrno result code of a MQTT request
@@ -501,3 +559,37 @@
         return QCoreApplication.translate(
             "MqttErrorMessage",
             "Unknown error.")
+
+
+def mqttLogLevelString(mqttLogLevel, isMqttLogLevel=True):
+    """
+    Module function to get the log level string associated with a log level.
+    
+    @param mqttLogLevel log level of the paho-mqtt client
+    @type int
+    @param isMqttLogLevel flag indicating a MQTT log level is given (if
+        False it is the MqttClient variant, i.e. Debug being lowest)
+    @type bool
+    @return textual representation of the log level
+    @rtype str
+    """
+    if isMqttLogLevel:
+        try:
+            logLevel = MqttClient.LogLevelMap[mqttLogLevel]
+        except KeyError:
+            return QCoreApplication.translate("MqttLogLevelString", "Unknown")
+    else:
+        logLevel = mqttLogLevel
+    
+    if logLevel == MqttClient.LogInfo:
+        return QCoreApplication.translate("MqttLogLevelString", "Info")
+    elif logLevel == MqttClient.LogNotice:
+        return QCoreApplication.translate("MqttLogLevelString", "Notice")
+    elif logLevel == MqttClient.LogWarning:
+        return QCoreApplication.translate("MqttLogLevelString", "Warning")
+    elif logLevel == MqttClient.LogError:
+        return QCoreApplication.translate("MqttLogLevelString", "Error")
+    elif logLevel == MqttClient.LogDebug:
+        return QCoreApplication.translate("MqttLogLevelString", "Debug")
+    else:
+        return QCoreApplication.translate("MqttLogLevelString", "Unknown")

eric ide

mercurial