MqttMonitor/MqttClient.py

branch
eric7
changeset 103
5fe4f179975f
parent 102
70b8858199f5
child 104
9a4c9b7f078c
equal deleted inserted replaced
102:70b8858199f5 103:5fe4f179975f
120 self.onConnectV3.connect(self.__connectTimeoutTimer.stop) 120 self.onConnectV3.connect(self.__connectTimeoutTimer.stop)
121 self.onConnectV5.connect(self.__connectTimeoutTimer.stop) 121 self.onConnectV5.connect(self.__connectTimeoutTimer.stop)
122 122
123 self.__cleanSession = cleanSession 123 self.__cleanSession = cleanSession
124 self.__protocol = protocol 124 self.__protocol = protocol
125 self.__disconnectUserProperties = []
125 126
126 if protocol == MqttProtocols.MQTTv5: 127 if protocol == MqttProtocols.MQTTv5:
127 cleanSession = None 128 cleanSession = None
128 129
129 self.__mqttClient = mqtt.Client( 130 self.__mqttClient = mqtt.Client(
276 @param userdata user data 277 @param userdata user data
277 @type any 278 @type any
278 """ 279 """
279 self.__mqttClient.user_data_set(userdata) 280 self.__mqttClient.user_data_set(userdata)
280 281
281 # TODO: MQTTv5: add support for properties 282 # TODO: MQTTv5: add support for WILL properties
282 def setLastWill(self, topic, payload=None, qos=0, retain=False): 283 def setLastWill(self, topic, payload=None, qos=0, retain=False):
283 """ 284 """
284 Public method to set the last will of the client. 285 Public method to set the last will of the client.
285 286
286 @param topic topic the will message should be published on 287 @param topic topic the will message should be published on
348 """ 349 """
349 self.__mqttClient.loop_stop() 350 self.__mqttClient.loop_stop()
350 self.__loopStarted = False 351 self.__loopStarted = False
351 352
352 def connectToServer(self, host, port=1883, keepalive=60, bindAddress="", 353 def connectToServer(self, host, port=1883, keepalive=60, bindAddress="",
353 reinit=True): 354 properties=None):
354 """ 355 """
355 Public method to connect to a remote MQTT broker. 356 Public method to connect to a remote MQTT broker.
356 357
357 @param host host name or IP address of the remote broker 358 @param host host name or IP address of the remote broker
358 @type str 359 @type str
363 communications with the broker 364 communications with the broker
364 @type int 365 @type int
365 @param bindAddress IP address of a local network interface to bind 366 @param bindAddress IP address of a local network interface to bind
366 this client to 367 this client to
367 @type str 368 @type str
368 @param reinit flag indicating to reinitialize the MQTT client before 369 @param properties list of user properties to be sent with the
369 trying to connect with the given parameters 370 subscription
370 @type bool 371 @type list of tuple of (str, str)
371 """ 372 """
372 # TODO: MQTTv5: add support for MQTTv5 properties 373 props = (
374 self.__createPropertiesObject(PacketTypes.CONNECT, properties)
375 if properties else
376 None
377 )
373 self.__mqttClient.connect_async( 378 self.__mqttClient.connect_async(
374 host, port=port, keepalive=keepalive, bind_address=bindAddress, 379 host, port=port, keepalive=keepalive, bind_address=bindAddress,
375 clean_start=self.__cleanSession) 380 clean_start=self.__cleanSession, properties=props)
376 381
377 self.__connectTimeoutTimer.start() 382 self.__connectTimeoutTimer.start()
378 383
379 if not self.__loopStarted: 384 if not self.__loopStarted:
380 self.startLoop() 385 self.startLoop()
391 @type int 396 @type int
392 @param bindAddress IP address of a local network interface to bind 397 @param bindAddress IP address of a local network interface to bind
393 this client to 398 this client to
394 @type str 399 @type str
395 @param options dictionary containing the connection options. This 400 @param options dictionary containing the connection options. This
396 dictionary should contain the keys "ClientId", "Keepalive", 401 dictionary should contain the keys "ClientId", "ConnectionTimeout",
397 "CleanSession", "Username", "Password", "WillTopic", "WillMessage", 402 "Keepalive", "CleanSession", "Username", "Password", "WillTopic",
398 "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", "TlsClientCert", 403 "WillMessage", "WillQos", "WillRetain", "TlsEnable", "TlsCaCert",
399 "TlsClientKey", "ConnectionTimeout" 404 "TlsClientCert", "TlsClientKey", "UserProperties".
400 @type dict 405 @type dict
401 """ 406 """
402 if options: 407 if options:
403 parametersDict = self.defaultConnectionOptions() 408 parametersDict = self.defaultConnectionOptions()
404 parametersDict.update(options) 409 parametersDict.update(options)
441 self.setTLS(caCerts=parametersDict["TlsCaCert"]) 446 self.setTLS(caCerts=parametersDict["TlsCaCert"])
442 else: 447 else:
443 # use default TLS configuration 448 # use default TLS configuration
444 self.setTLS() 449 self.setTLS()
445 450
451 # step 4: get the connect user properties
452 if self.__protocol == MqttProtocols.MQTTv5:
453 try:
454 userProperties = parametersDict["UserProperties"]
455 properties = userProperties["connect"][:]
456 self.__disconnectUserProperties = (
457 userProperties["connect"][:]
458 if userProperties["use_connect"] else
459 userProperties["disconnect"][:]
460 )
461 except KeyError:
462 properties = None
463 else:
464 properties = None
446 # step 4: connect to server 465 # step 4: connect to server
447 self.__cleanSession = parametersDict["CleanSession"] 466 self.__cleanSession = parametersDict["CleanSession"]
448 self.connectToServer(host, port=port, 467 self.connectToServer(host, port=port,
449 keepalive=parametersDict["Keepalive"]) 468 keepalive=parametersDict["Keepalive"],
469 properties=properties)
450 else: 470 else:
451 keepalive = self.defaultConnectionOptions["Keepalive"] 471 keepalive = self.defaultConnectionOptions["Keepalive"]
452 self.connectToServer(host, port=port, keepalive=keepalive, 472 self.connectToServer(host, port=port, keepalive=keepalive,
453 bindAddress=bindAddress) 473 bindAddress=bindAddress)
454 474
460 480
461 @return dictionary containing the default connection options. It has 481 @return dictionary containing the default connection options. It has
462 the keys "ClientId", "Protocol", "ConnectionTimeout", "Keepalive", 482 the keys "ClientId", "Protocol", "ConnectionTimeout", "Keepalive",
463 "CleanSession", "Username", "Password", "WillTopic", "WillMessage", 483 "CleanSession", "Username", "Password", "WillTopic", "WillMessage",
464 "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", "TlsClientCert", 484 "WillQos", "WillRetain", "TlsEnable", "TlsCaCert", "TlsClientCert",
465 "TlsClientKey". 485 "TlsClientKey", "UserProperties".
466 @rtype dict 486 @rtype dict
467 """ 487 """
468 return { 488 return {
469 "ClientId": "ERIC7_MQTT_MONITOR_CLIENT", 489 "ClientId": "ERIC7_MQTT_MONITOR_CLIENT",
470 "Protocol": MqttProtocols.MQTTv311, 490 "Protocol": MqttProtocols.MQTTv311,
479 "WillRetain": False, 499 "WillRetain": False,
480 "TlsEnable": False, 500 "TlsEnable": False,
481 "TlsCaCert": "", 501 "TlsCaCert": "",
482 "TlsClientCert": "", 502 "TlsClientCert": "",
483 "TlsClientKey": "", 503 "TlsClientKey": "",
504 "UserProperties": {
505 "connect": [],
506 "disconnect": [],
507 "use_connect": True,
508 },
484 } 509 }
485 510
486 def reconnectToServer(self): 511 def reconnectToServer(self):
487 """ 512 """
488 Public method to reconnect the client with the same parameters. 513 Public method to reconnect the client with the same parameters.
498 """ 523 """
499 Public method to disconnect the client from the remote broker. 524 Public method to disconnect the client from the remote broker.
500 """ 525 """
501 self.__connectTimeoutTimer.stop() 526 self.__connectTimeoutTimer.stop()
502 527
503 # TODO: MQTTv5: add support for properties (?) 528 props = (
504 # TODO: MQTTv5: add support for reason code 529 self.__createPropertiesObject(
505 self.__mqttClient.disconnect() 530 PacketTypes.DISCONNECT, self.__disconnectUserProperties)
531 if self.__disconnectUserProperties else
532 None
533 )
534 self.__mqttClient.disconnect(properties=props)
506 535
507 def subscribe(self, topic, qos=0, properties=None): 536 def subscribe(self, topic, qos=0, properties=None):
508 """ 537 """
509 Public method to subscribe to topics with quality of service. 538 Public method to subscribe to topics with quality of service.
510 539
537 @type list of tuple of (str, str) 566 @type list of tuple of (str, str)
538 @return tuple containing the result code and the message ID 567 @return tuple containing the result code and the message ID
539 @rtype tuple of (int, int) 568 @rtype tuple of (int, int)
540 """ 569 """
541 props = ( 570 props = (
542 self.__createPropertiesObject(PacketTypes.SUBSCRIBE, properties) 571 self.__createPropertiesObject(PacketTypes.UNSUBSCRIBE, properties)
543 if properties else 572 if properties else
544 None 573 None
545 ) 574 )
546 return self.__mqttClient.unsubscribe(topic, properties=props) 575 return self.__mqttClient.unsubscribe(topic, properties=props)
547 576
548 # TODO: MQTTv5: add support for properties 577 def publish(self, topic, payload=None, qos=0, retain=False,
549 def publish(self, topic, payload=None, qos=0, retain=False): 578 properties=None):
550 """ 579 """
551 Public method to publish to a topic. 580 Public method to publish to a topic.
552 581
553 @param topic topic to publish to 582 @param topic topic to publish to
554 @type str 583 @type str
557 @param qos quality of service 586 @param qos quality of service
558 @type int, one of 0, 1 or 2 587 @type int, one of 0, 1 or 2
559 @param retain flag indicating to set as the "last known good"/retained 588 @param retain flag indicating to set as the "last known good"/retained
560 message for the topic 589 message for the topic
561 @type bool 590 @type bool
591 @param properties list of user properties to be sent with the
592 subscription
593 @type list of tuple of (str, str)
562 @return message info object 594 @return message info object
563 @rtype mqtt.MQTTMessageInfo 595 @rtype mqtt.MQTTMessageInfo
564 """ 596 """
597 props = (
598 self.__createPropertiesObject(PacketTypes.PUBLISH, properties)
599 if properties else
600 None
601 )
565 return self.__mqttClient.publish(topic, payload=payload, qos=qos, 602 return self.__mqttClient.publish(topic, payload=payload, qos=qos,
566 retain=retain) 603 retain=retain, properties=props)
567 604
568 def __createPropertiesObject(self, packetType, properties): 605 def __createPropertiesObject(self, packetType, properties):
569 """ 606 """
570 Private method to assemble the MQTT v5 properties object. 607 Private method to assemble the MQTT v5 properties object.
571 608
575 @type list of tuple of (str, str) 612 @type list of tuple of (str, str)
576 @return MQTT v5 properties object 613 @return MQTT v5 properties object
577 @rtype Properties 614 @rtype Properties
578 """ 615 """
579 props = Properties(packetType) 616 props = Properties(packetType)
580 for userProperty in properties: 617 props.UserProperty = properties
581 props.UserProperty = tuple(userProperty)
582 return props 618 return props
583 619
584 620
585 def mqttConnackMessage(connackCode): 621 def mqttConnackMessage(connackCode):
586 """ 622 """

eric ide

mercurial