120 self.onConnectV3.connect(self.__connectTimeoutTimer.stop) |
120 self.onConnectV3.connect(self.__connectTimeoutTimer.stop) |
121 self.onConnectV5.connect(self.__connectTimeoutTimer.stop) |
121 self.onConnectV5.connect(self.__connectTimeoutTimer.stop) |
122 |
122 |
123 self.__cleanSession = cleanSession |
123 self.__cleanSession = cleanSession |
124 self.__protocol = protocol |
124 self.__protocol = protocol |
|
125 self.__disconnectUserProperties = [] |
125 |
126 |
126 if protocol == MqttProtocols.MQTTv5: |
127 if protocol == MqttProtocols.MQTTv5: |
127 cleanSession = None |
128 cleanSession = None |
128 |
129 |
129 self.__mqttClient = mqtt.Client( |
130 self.__mqttClient = mqtt.Client( |
276 @param userdata user data |
277 @param userdata user data |
277 @type any |
278 @type any |
278 """ |
279 """ |
279 self.__mqttClient.user_data_set(userdata) |
280 self.__mqttClient.user_data_set(userdata) |
280 |
281 |
281 # TODO: MQTTv5: add support for properties |
282 # TODO: MQTTv5: add support for WILL properties |
282 def setLastWill(self, topic, payload=None, qos=0, retain=False): |
283 def setLastWill(self, topic, payload=None, qos=0, retain=False): |
283 """ |
284 """ |
284 Public method to set the last will of the client. |
285 Public method to set the last will of the client. |
285 |
286 |
286 @param topic topic the will message should be published on |
287 @param topic topic the will message should be published on |
348 """ |
349 """ |
349 self.__mqttClient.loop_stop() |
350 self.__mqttClient.loop_stop() |
350 self.__loopStarted = False |
351 self.__loopStarted = False |
351 |
352 |
352 def connectToServer(self, host, port=1883, keepalive=60, bindAddress="", |
353 def connectToServer(self, host, port=1883, keepalive=60, bindAddress="", |
353 reinit=True): |
354 properties=None): |
354 """ |
355 """ |
355 Public method to connect to a remote MQTT broker. |
356 Public method to connect to a remote MQTT broker. |
356 |
357 |
357 @param host host name or IP address of the remote broker |
358 @param host host name or IP address of the remote broker |
358 @type str |
359 @type str |
363 communications with the broker |
364 communications with the broker |
364 @type int |
365 @type int |
365 @param bindAddress IP address of a local network interface to bind |
366 @param bindAddress IP address of a local network interface to bind |
366 this client to |
367 this client to |
367 @type str |
368 @type str |
368 @param reinit flag indicating to reinitialize the MQTT client before |
369 @param properties list of user properties to be sent with the |
369 trying to connect with the given parameters |
370 subscription |
370 @type bool |
371 @type list of tuple of (str, str) |
371 """ |
372 """ |
372 # TODO: MQTTv5: add support for MQTTv5 properties |
373 props = ( |
|
374 self.__createPropertiesObject(PacketTypes.CONNECT, properties) |
|
375 if properties else |
|
376 None |
|
377 ) |
373 self.__mqttClient.connect_async( |
378 self.__mqttClient.connect_async( |
374 host, port=port, keepalive=keepalive, bind_address=bindAddress, |
379 host, port=port, keepalive=keepalive, bind_address=bindAddress, |
375 clean_start=self.__cleanSession) |
380 clean_start=self.__cleanSession, properties=props) |
376 |
381 |
377 self.__connectTimeoutTimer.start() |
382 self.__connectTimeoutTimer.start() |
378 |
383 |
379 if not self.__loopStarted: |
384 if not self.__loopStarted: |
380 self.startLoop() |
385 self.startLoop() |
391 @type int |
396 @type int |
392 @param bindAddress IP address of a local network interface to bind |
397 @param bindAddress IP address of a local network interface to bind |
393 this client to |
398 this client to |
394 @type str |
399 @type str |
395 @param options dictionary containing the connection options. This |
400 @param options dictionary containing the connection options. This |
396 dictionary should contain the keys "ClientId", "Keepalive", |
401 dictionary should contain the keys "ClientId", "ConnectionTimeout", |
397 "CleanSession", "Username", "Password", "WillTopic", "WillMessage", |
402 "Keepalive", "CleanSession", "Username", "Password", "WillTopic", |
398 "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", "TlsClientCert", |
403 "WillMessage", "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", |
399 "TlsClientKey", "ConnectionTimeout" |
404 "TlsClientCert", "TlsClientKey", "UserProperties". |
400 @type dict |
405 @type dict |
401 """ |
406 """ |
402 if options: |
407 if options: |
403 parametersDict = self.defaultConnectionOptions() |
408 parametersDict = self.defaultConnectionOptions() |
404 parametersDict.update(options) |
409 parametersDict.update(options) |
441 self.setTLS(caCerts=parametersDict["TlsCaCert"]) |
446 self.setTLS(caCerts=parametersDict["TlsCaCert"]) |
442 else: |
447 else: |
443 # use default TLS configuration |
448 # use default TLS configuration |
444 self.setTLS() |
449 self.setTLS() |
445 |
450 |
|
451 # step 4: get the connect user properties |
|
452 if self.__protocol == MqttProtocols.MQTTv5: |
|
453 try: |
|
454 userProperties = parametersDict["UserProperties"] |
|
455 properties = userProperties["connect"][:] |
|
456 self.__disconnectUserProperties = ( |
|
457 userProperties["connect"][:] |
|
458 if userProperties["use_connect"] else |
|
459 userProperties["disconnect"][:] |
|
460 ) |
|
461 except KeyError: |
|
462 properties = None |
|
463 else: |
|
464 properties = None |
446 # step 4: connect to server |
465 # step 4: connect to server |
447 self.__cleanSession = parametersDict["CleanSession"] |
466 self.__cleanSession = parametersDict["CleanSession"] |
448 self.connectToServer(host, port=port, |
467 self.connectToServer(host, port=port, |
449 keepalive=parametersDict["Keepalive"]) |
468 keepalive=parametersDict["Keepalive"], |
|
469 properties=properties) |
450 else: |
470 else: |
451 keepalive = self.defaultConnectionOptions["Keepalive"] |
471 keepalive = self.defaultConnectionOptions["Keepalive"] |
452 self.connectToServer(host, port=port, keepalive=keepalive, |
472 self.connectToServer(host, port=port, keepalive=keepalive, |
453 bindAddress=bindAddress) |
473 bindAddress=bindAddress) |
454 |
474 |
460 |
480 |
461 @return dictionary containing the default connection options. It has |
481 @return dictionary containing the default connection options. It has |
462 the keys "ClientId", "Protocol", "ConnectionTimeout", "Keepalive", |
482 the keys "ClientId", "Protocol", "ConnectionTimeout", "Keepalive", |
463 "CleanSession", "Username", "Password", "WillTopic", "WillMessage", |
483 "CleanSession", "Username", "Password", "WillTopic", "WillMessage", |
464 "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", "TlsClientCert", |
484 "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", "TlsClientCert", |
465 "TlsClientKey". |
485 "TlsClientKey", "UserProperties". |
466 @rtype dict |
486 @rtype dict |
467 """ |
487 """ |
468 return { |
488 return { |
469 "ClientId": "ERIC7_MQTT_MONITOR_CLIENT", |
489 "ClientId": "ERIC7_MQTT_MONITOR_CLIENT", |
470 "Protocol": MqttProtocols.MQTTv311, |
490 "Protocol": MqttProtocols.MQTTv311, |
498 """ |
523 """ |
499 Public method to disconnect the client from the remote broker. |
524 Public method to disconnect the client from the remote broker. |
500 """ |
525 """ |
501 self.__connectTimeoutTimer.stop() |
526 self.__connectTimeoutTimer.stop() |
502 |
527 |
503 # TODO: MQTTv5: add support for properties (?) |
528 props = ( |
504 # TODO: MQTTv5: add support for reason code |
529 self.__createPropertiesObject( |
505 self.__mqttClient.disconnect() |
530 PacketTypes.DISCONNECT, self.__disconnectUserProperties) |
|
531 if self.__disconnectUserProperties else |
|
532 None |
|
533 ) |
|
534 self.__mqttClient.disconnect(properties=props) |
506 |
535 |
507 def subscribe(self, topic, qos=0, properties=None): |
536 def subscribe(self, topic, qos=0, properties=None): |
508 """ |
537 """ |
509 Public method to subscribe to topics with quality of service. |
538 Public method to subscribe to topics with quality of service. |
510 |
539 |
537 @type list of tuple of (str, str) |
566 @type list of tuple of (str, str) |
538 @return tuple containing the result code and the message ID |
567 @return tuple containing the result code and the message ID |
539 @rtype tuple of (int, int) |
568 @rtype tuple of (int, int) |
540 """ |
569 """ |
541 props = ( |
570 props = ( |
542 self.__createPropertiesObject(PacketTypes.SUBSCRIBE, properties) |
571 self.__createPropertiesObject(PacketTypes.UNSUBSCRIBE, properties) |
543 if properties else |
572 if properties else |
544 None |
573 None |
545 ) |
574 ) |
546 return self.__mqttClient.unsubscribe(topic, properties=props) |
575 return self.__mqttClient.unsubscribe(topic, properties=props) |
547 |
576 |
548 # TODO: MQTTv5: add support for properties |
577 def publish(self, topic, payload=None, qos=0, retain=False, |
549 def publish(self, topic, payload=None, qos=0, retain=False): |
578 properties=None): |
550 """ |
579 """ |
551 Public method to publish to a topic. |
580 Public method to publish to a topic. |
552 |
581 |
553 @param topic topic to publish to |
582 @param topic topic to publish to |
554 @type str |
583 @type str |
557 @param qos quality of service |
586 @param qos quality of service |
558 @type int, one of 0, 1 or 2 |
587 @type int, one of 0, 1 or 2 |
559 @param retain flag indicating to set as the "last known good"/retained |
588 @param retain flag indicating to set as the "last known good"/retained |
560 message for the topic |
589 message for the topic |
561 @type bool |
590 @type bool |
|
591 @param properties list of user properties to be sent with the |
|
592 subscription |
|
593 @type list of tuple of (str, str) |
562 @return message info object |
594 @return message info object |
563 @rtype mqtt.MQTTMessageInfo |
595 @rtype mqtt.MQTTMessageInfo |
564 """ |
596 """ |
|
597 props = ( |
|
598 self.__createPropertiesObject(PacketTypes.PUBLISH, properties) |
|
599 if properties else |
|
600 None |
|
601 ) |
565 return self.__mqttClient.publish(topic, payload=payload, qos=qos, |
602 return self.__mqttClient.publish(topic, payload=payload, qos=qos, |
566 retain=retain) |
603 retain=retain, properties=props) |
567 |
604 |
568 def __createPropertiesObject(self, packetType, properties): |
605 def __createPropertiesObject(self, packetType, properties): |
569 """ |
606 """ |
570 Private method to assemble the MQTT v5 properties object. |
607 Private method to assemble the MQTT v5 properties object. |
571 |
608 |
575 @type list of tuple of (str, str) |
612 @type list of tuple of (str, str) |
576 @return MQTT v5 properties object |
613 @return MQTT v5 properties object |
577 @rtype Properties |
614 @rtype Properties |
578 """ |
615 """ |
579 props = Properties(packetType) |
616 props = Properties(packetType) |
580 for userProperty in properties: |
617 props.UserProperty = properties |
581 props.UserProperty = tuple(userProperty) |
|
582 return props |
618 return props |
583 |
619 |
584 |
620 |
585 def mqttConnackMessage(connackCode): |
621 def mqttConnackMessage(connackCode): |
586 """ |
622 """ |