MqttMonitor/MqttClient.py

changeset 2
d439c5109829
child 3
82845c0fd550
equal deleted inserted replaced
1:bf1a17419d44 2:d439c5109829
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2018 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing a PyQt wrapper around the paho MQTT client.
8 """
9
10 from __future__ import unicode_literals
11
12 from PyQt5.QtCore import pyqtSignal, QObject, QCoreApplication
13
14 import paho.mqtt.client as mqtt
15
16
17 class MqttClient(QObject):
18 """
19 Class implementing a PyQt wrapper around the paho MQTT client.
20
21 @signal onConnect(flags, rc) emitted after the client has connected to the
22 broker
23 @signal onDisconnected(rc) emitted after the client has disconnected from
24 the broker
25 @signal onMessage(topic, payload, qos, retain) emitted after a message has
26 been received by the client
27 @signal onPublish(mid) emitted after a message has been published
28 @signal onSubscribe(mid, grantedQos) emitted after the client has
29 subscribed to some topics
30 @signal onUnsubscribe(mid) emitted after the client has unsubscribed from
31 some topics
32 """
33 onConnect = pyqtSignal(dict, int)
34 onDisconnected = pyqtSignal(int)
35 onMessage = pyqtSignal(str, bytes, int, bool)
36 onPublish = pyqtSignal(int)
37 onSubscribe = pyqtSignal(int, tuple)
38 onUnsubscribe = pyqtSignal(int)
39
40 def __init__(self, clientId="", cleanSession=True, userdata=None,
41 protocol=mqtt.MQTTv311, transport="tcp", parent=None):
42 """
43 Constructor
44
45 @param clientId ID to be used for the client
46 @type str
47 @param cleanSession flag indicating to start a clean session
48 @type bool
49 @param userdata user data
50 @type any
51 @param protocol version of the MQTT protocol to use
52 @type int, one of mqtt.MQTTv31 or mqtt.MQTTv311
53 @param transport transport to be used
54 @type str, one of "tcp" or "websockets"
55 @param parent reference to the parent object
56 @type QObject
57 """
58 QObject.__init__(self, parent=parent)
59
60 self.__loopStarted = False
61
62 self.__mqttClient = mqtt.Client(
63 client_id=clientId, clean_session=cleanSession, userdata=None,
64 protocol=mqtt.MQTTv311, transport="tcp")
65
66 self.__mqttClient.on_connect = \
67 lambda client, userdata, flags, rc: self.onConnect.emit(
68 flags, rc)
69 self.__mqttClient.on_disconnect = \
70 lambda client, userdata, rc: self.onDisconnected.emit(rc)
71 self.__mqttClient.on_message = \
72 lambda client, userdata, message: self.onMessage.emit(
73 message.topic, message.payload, message.qos, message.retain)
74 self.__mqttClient.on_publish = \
75 lambda client, userdata, mid: self.onPublish.emit(mid)
76 self.__mqttClient.on_subscribe = \
77 lambda client, userdata, mid, grantedQos: self.onSubscribe.emit(
78 mid, grantedQos)
79 self.__mqttClient.on_unsubscribe = \
80 lambda client, userdata, mid: self.onUnsubscribe.emit(mid)
81
82 def startLoop(self):
83 """
84 Public method to start the MQTT client loop.
85 """
86 self.__mqttClient.loop_start()
87 self.__loopStarted = True
88
89 def stopLoop(self):
90 """
91 Public method to stop the MQTT client loop.
92 """
93 self.__mqttClient.loop_stop()
94 self.__loopStarted = False
95
96 def connectToServer(self, host, port=1883, keepalive=60, bindAddress=""):
97 """
98 Public method to connect to a remote MQTT broker.
99
100 @param host host name or IP address of the remote broker
101 @type str
102 @param port network port of the server host to connect to (default:
103 1883, using TLS: 8883)
104 @type int
105 @param keepalive maximum period in seconds allowed between
106 communications with the broker
107 @type int
108 @param bindAddress IP address of a local network interface to bind
109 this client to
110 @type str
111 """
112 self.__mqttClient.connect_async(
113 host, port=port, keepalive=keepalive, bind_address=bindAddress)
114
115 if not self.__loopStarted:
116 self.startLoop()
117
118 def reconnectToServer(self):
119 """
120 Public method to reconnect the client with the same parameters.
121 """
122 self.__mqttClient.reconnect()
123
124 if not self.__loopStarted:
125 self.startLoop()
126
127 def disconnectFromServer(self):
128 """
129 Public method to disconnect the client from the remote broker.
130 """
131 self.__mqttClient.disconnect()
132
133 def subscribe(self, topic, qos=0):
134 """
135 Public method to subscribe to topics with quality of service.
136
137 @param topic single topic to subscribe to or a tuple with a topic
138 and a QoS or a list of tuples with a topic and a QoS each
139 @type str or tuple of (str, int) or list of tuple of (str, int)
140 @param qos quality of service
141 @type int, one of 0, 1 or 2
142 @return tuple containing the result code and the message ID
143 @rtype tuple of (int, int)
144 """
145 return self.__mqttClient.subscribe(topic, qos=qos)
146
147 def unsubscribe(self, topic):
148 """
149 Public method to unsubscribe topics.
150
151 @param topic topic or list of topics to unsubscribe
152 @type str or list of str
153 @return tuple containing the result code and the message ID
154 @rtype tuple of (int, int)
155 """
156 return self.__mqttClient.unsubscribe(topic)
157
158 def publish(self, topic, payload=None, qos=0, retain=False):
159 """
160 Public method to publish to a topic.
161
162 @param topic topic to publish to
163 @type str
164 @param payload data to be published
165 @type str, bytes, int or float
166 @param qos quality of service
167 @type int, one of 0, 1 or 2
168 @param retain flag indicating to set as the "last known good"/retained
169 message for the topic
170 @type bool
171 @return message info object
172 @rtype mqtt.MQTTMessageInfo
173 """
174 return self.__mqttClient.publish(topic, payload=payload, qos=qos,
175 retain=retain)
176
177 def mqttConnackMessage(connackCode):
178 """
179 Public method to get the string associated with a CONNACK result.
180 """
181 if connackCode == mqtt.CONNACK_ACCEPTED:
182 return QCoreApplication.translate(
183 "MqttConnackMessage",
184 "Connection Accepted.")
185 elif connackCode == mqtt.CONNACK_REFUSED_PROTOCOL_VERSION:
186 return QCoreApplication.translate(
187 "MqttConnackMessage",
188 "Connection Refused: unacceptable protocol version.")
189 elif connackCode == mqtt.CONNACK_REFUSED_IDENTIFIER_REJECTED:
190 return QCoreApplication.translate(
191 "MqttConnackMessage",
192 "Connection Refused: identifier rejected.")
193 elif connackCode == mqtt.CONNACK_REFUSED_SERVER_UNAVAILABLE:
194 return QCoreApplication.translate(
195 "MqttConnackMessage",
196 "Connection Refused: broker unavailable.")
197 elif connackCode == mqtt.CONNACK_REFUSED_BAD_USERNAME_PASSWORD:
198 return QCoreApplication.translate(
199 "MqttConnackMessage",
200 "Connection Refused: bad user name or password.")
201 elif connackCode == mqtt.CONNACK_REFUSED_NOT_AUTHORIZED:
202 return QCoreApplication.translate(
203 "MqttConnackMessage",
204 "Connection Refused: not authorised.")
205 else:
206 return QCoreApplication.translate(
207 "MqttConnackMessage",
208 "Connection Refused: unknown reason.")
209
210 def mqttErrorMessage(self, mqttErrno):
211 """
212 Public method to get the error string associated with an MQTT error
213 number.
214 """
215 if mqttErrno == mqtt.MQTT_ERR_SUCCESS:
216 return QCoreApplication.translate(
217 "MqttErrorMessage",
218 "No error.")
219 elif mqttErrno == mqtt.MQTT_ERR_NOMEM:
220 return QCoreApplication.translate(
221 "MqttErrorMessage",
222 "Out of memory.")
223 elif mqttErrno == mqtt.MQTT_ERR_PROTOCOL:
224 return QCoreApplication.translate(
225 "MqttErrorMessage",
226 "A network protocol error occurred when communicating with"
227 " the broker.")
228 elif mqttErrno == mqtt.MQTT_ERR_INVAL:
229 return QCoreApplication.translate(
230 "MqttErrorMessage",
231 "Invalid function arguments provided.")
232 elif mqttErrno == mqtt.MQTT_ERR_NO_CONN:
233 return QCoreApplication.translate(
234 "MqttErrorMessage",
235 "The client is not currently connected.")
236 elif mqttErrno == mqtt.MQTT_ERR_CONN_REFUSED:
237 return QCoreApplication.translate(
238 "MqttErrorMessage",
239 "The connection was refused.")
240 elif mqttErrno == mqtt.MQTT_ERR_NOT_FOUND:
241 return QCoreApplication.translate(
242 "MqttErrorMessage",
243 "Message not found (internal error).")
244 elif mqttErrno == mqtt.MQTT_ERR_CONN_LOST:
245 return QCoreApplication.translate(
246 "MqttErrorMessage",
247 "The connection was lost.")
248 elif mqttErrno == mqtt.MQTT_ERR_TLS:
249 return QCoreApplication.translate(
250 "MqttErrorMessage",
251 "A TLS error occurred.")
252 elif mqttErrno == mqtt.MQTT_ERR_PAYLOAD_SIZE:
253 return QCoreApplication.translate(
254 "MqttErrorMessage",
255 "Payload too large.")
256 elif mqttErrno == mqtt.MQTT_ERR_NOT_SUPPORTED:
257 return QCoreApplication.translate(
258 "MqttErrorMessage",
259 "This feature is not supported.")
260 elif mqttErrno == mqtt.MQTT_ERR_AUTH:
261 return QCoreApplication.translate(
262 "MqttErrorMessage",
263 "Authorisation failed.")
264 elif mqttErrno == mqtt.MQTT_ERR_ACL_DENIED:
265 return QCoreApplication.translate(
266 "MqttErrorMessage",
267 "Access denied by ACL.")
268 elif mqttErrno == mqtt.MQTT_ERR_UNKNOWN:
269 return QCoreApplication.translate(
270 "MqttErrorMessage",
271 "Unknown error.")
272 elif mqttErrno == mqtt.MQTT_ERR_ERRNO:
273 return QCoreApplication.translate(
274 "MqttErrorMessage",
275 "Error defined by errno.")
276 else:
277 return QCoreApplication.translate(
278 "MqttErrorMessage",
279 "Unknown error.")

eric ide

mercurial