MqttMonitor/MqttClient.py

branch
eric7
changeset 99
420cb8adbf7e
parent 98
85d56e77e9df
child 100
9c29cfbd96c3
equal deleted inserted replaced
98:85d56e77e9df 99:420cb8adbf7e
12 from PyQt6.QtCore import ( 12 from PyQt6.QtCore import (
13 pyqtSignal, pyqtSlot, QObject, QCoreApplication, QTimer 13 pyqtSignal, pyqtSlot, QObject, QCoreApplication, QTimer
14 ) 14 )
15 15
16 import paho.mqtt.client as mqtt 16 import paho.mqtt.client as mqtt
17 from paho.mqtt.packettypes import PacketTypes
17 18
18 from Utilities.crypto import pwConvert 19 from Utilities.crypto import pwConvert
19 20
20 21
21 class MqttProtocols(enum.IntEnum): 22 class MqttProtocols(enum.IntEnum):
44 @signal onUnsubscribe(mid) emitted after the client has unsubscribed from 45 @signal onUnsubscribe(mid) emitted after the client has unsubscribed from
45 some topics 46 some topics
46 @signal connectTimeout() emitted to indicate, that a connection attempt 47 @signal connectTimeout() emitted to indicate, that a connection attempt
47 timed out 48 timed out
48 """ 49 """
49 onConnect = pyqtSignal(dict, int) 50 onConnectV3 = pyqtSignal(dict, int)
50 onDisconnected = pyqtSignal(int) 51 onConnectV5 = pyqtSignal(dict, int, int)
52 onDisconnectedV3 = pyqtSignal(int)
53 onDisconnectedV5 = pyqtSignal(int, int)
51 onLog = pyqtSignal(int, str) 54 onLog = pyqtSignal(int, str)
52 onMessage = pyqtSignal(str, bytes, int, bool) 55 onMessage = pyqtSignal(str, bytes, int, bool)
53 onPublish = pyqtSignal(int) 56 onPublish = pyqtSignal(int)
54 onSubscribe = pyqtSignal(int, tuple) 57 onSubscribeV3 = pyqtSignal(int, tuple)
58 onSubscribeV5 = pyqtSignal(int, list)
55 onUnsubscribe = pyqtSignal(int) 59 onUnsubscribe = pyqtSignal(int)
56 60
57 connectTimeout = pyqtSignal() 61 connectTimeout = pyqtSignal()
58 62
59 DefaultConnectTimeout = 15 # connect timeout in seconds 63 DefaultConnectTimeout = 15 # connect timeout in seconds
98 self.__connectTimeoutTimer.setSingleShot(True) 102 self.__connectTimeoutTimer.setSingleShot(True)
99 self.__connectTimeoutTimer.setInterval( 103 self.__connectTimeoutTimer.setInterval(
100 MqttClient.DefaultConnectTimeout * 1000) 104 MqttClient.DefaultConnectTimeout * 1000)
101 self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout) 105 self.__connectTimeoutTimer.timeout.connect(self.__connectTimeout)
102 106
103 self.onConnect.connect(self.__connectTimeoutTimer.stop) 107 self.onConnectV3.connect(self.__connectTimeoutTimer.stop)
108 self.onConnectV5.connect(self.__connectTimeoutTimer.stop)
104 109
105 self.__cleanSession = cleanSession 110 self.__cleanSession = cleanSession
111 self.__protocol = protocol
112
106 if protocol == MqttProtocols.MQTTv5: 113 if protocol == MqttProtocols.MQTTv5:
107 cleanSession = None 114 cleanSession = None
108 115
109 self.__mqttClient = mqtt.Client( 116 self.__mqttClient = mqtt.Client(
110 client_id=clientId, clean_session=cleanSession, userdata=userdata, 117 client_id=clientId, clean_session=cleanSession, userdata=userdata,
111 protocol=int(protocol), transport=transport) 118 protocol=int(protocol), transport=transport)
112 119
113 self.__initCallbacks() 120 self.__initCallbacks(protocol)
114 121
115 def __initCallbacks(self): 122 def __initCallbacks(self, protocol):
116 """ 123 """
117 Private method to initialize the MQTT callback methods. 124 Private method to initialize the MQTT callback methods.
118 """ 125
119 # TODO: add properties to signal 126 @param protocol MQTT protocol version
127 @type MqttProtocols
128 """
129 if protocol in (MqttProtocols.MQTTv31, MqttProtocols.MQTTv311):
130 self.__mqttClient.on_connect = (
131 lambda client, userdata, flags, rc, properties=None:
132 self.onConnectV3.emit(flags, rc)
133 )
134 self.__mqttClient.on_disconnect = (
135 lambda client, userdata, rc:
136 self.onDisconnectedV3.emit(rc)
137 )
138 self.__mqttClient.on_subscribe = (
139 lambda client, userdata, mid, grantedQos, properties=None:
140 self.onSubscribeV3.emit(mid, grantedQos)
141 )
142 else:
143 # TODO: add properties to signal
144 self.__mqttClient.on_connect = (
145 lambda client, userdata, flags, rc, properties=None:
146 self.onConnectV5.emit(flags, rc.value, rc.packetType)
147 )
148 self.__mqttClient.on_disconnect = self.__onDisconnectedV5
149 # TODO: add properties to signal
150 self.__mqttClient.on_subscribe = (
151 lambda client, userdata, mid, reasonCodes, properties=None:
152 self.onSubscribeV5.emit(mid, reasonCodes)
153 )
120 # TODO: MQTTv5: add support for MQTTv5 signature 154 # TODO: MQTTv5: add support for MQTTv5 signature
121 self.__mqttClient.on_connect = (
122 lambda client, userdata, flags, rc, properties=None:
123 self.onConnect.emit(flags, rc))
124 # TODO: MQTTv5: add support for MQTTv5 signature
125 self.__mqttClient.on_disconnect = (
126 lambda client, userdata, rc: self.onDisconnected.emit(rc))
127 self.__mqttClient.on_log = ( 155 self.__mqttClient.on_log = (
128 lambda client, userdata, level, buf: self.onLog.emit(level, buf)) 156 lambda client, userdata, level, buf:
157 self.onLog.emit(level, buf)
158 )
129 self.__mqttClient.on_message = ( 159 self.__mqttClient.on_message = (
130 lambda client, userdata, message: self.onMessage.emit( 160 lambda client, userdata, message:
131 message.topic, message.payload, message.qos, message.retain)) 161 self.onMessage.emit(message.topic, message.payload,
162 message.qos, message.retain)
163 )
132 self.__mqttClient.on_publish = ( 164 self.__mqttClient.on_publish = (
133 lambda client, userdata, mid: self.onPublish.emit(mid)) 165 lambda client, userdata, mid:
134 # TODO: add properties to signal 166 self.onPublish.emit(mid)
135 # TODO: MQTTv5: add support for MQTTv5 signature 167 )
136 self.__mqttClient.on_subscribe = (
137 lambda client, userdata, mid, grantedQos, properties=None:
138 self.onSubscribe.emit(mid, grantedQos))
139 # TODO: MQTTv5: add support for MQTTv5 signature 168 # TODO: MQTTv5: add support for MQTTv5 signature
140 self.__mqttClient.on_unsubscribe = ( 169 self.__mqttClient.on_unsubscribe = (
141 lambda client, userdata, mid: self.onUnsubscribe.emit(mid)) 170 lambda client, userdata, mid, properties=None, reasoncodes=None:
171 self.onUnsubscribe.emit(mid)
172 )
173
174 def __onDisconnectedV5(self, client, userdata, rc, properties=None):
175 """
176 Private method to handle the disconnect from the broker.
177
178 @param client reference to the client object
179 @type paho.mqtt.Client
180 @param userdata user data
181 @type list
182 @param rc result code or reason code
183 @type int or ReasonCodes
184 @param properties optional properties (defaults to None)
185 @type dict (optional)
186 """
187 if isinstance(rc, int):
188 packetType = PacketTypes.DISCONNECT
189 resultCode = rc
190 else:
191 packetType = rc.packetType
192 resultCode = rc.value
193 self.onDisconnectedV5.emit(resultCode, packetType)
142 194
143 @pyqtSlot() 195 @pyqtSlot()
144 def __connectTimeout(self): 196 def __connectTimeout(self):
145 """ 197 """
146 Private slot handling a failed connection attempt. 198 Private slot handling a failed connection attempt.
258 return True, "" 310 return True, ""
259 except (ValueError, FileNotFoundError) as err: 311 except (ValueError, FileNotFoundError) as err:
260 return False, str(err) 312 return False, str(err)
261 313
262 return False, "unspecific error occurred" 314 return False, "unspecific error occurred"
315
316 def getProtocol(self):
317 """
318 Public method to get the MQTT protocol version.
319 """
320 return self.__protocol
263 321
264 def startLoop(self): 322 def startLoop(self):
265 """ 323 """
266 Public method to start the MQTT client loop. 324 Public method to start the MQTT client loop.
267 """ 325 """

eric ide

mercurial