Sat, 08 Sep 2018 15:28:48 +0200
MqttConnectionOptionsDialog: added support for TLS.
# -*- coding: utf-8 -*- # Copyright (c) 2018 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the MQTT Monitor widget. """ from __future__ import unicode_literals try: str = unicode # __IGNORE_EXCEPTION__ except NameError: pass import os import collections import copy from PyQt5.QtCore import pyqtSlot, QTimer from PyQt5.QtGui import QTextCursor from PyQt5.QtWidgets import QWidget, QDialog from E5Gui import E5MessageBox from .Ui_MqttMonitorWidget import Ui_MqttMonitorWidget from .MqttClient import MqttClient, mqttConnackMessage, mqttErrorMessage import UI.PixmapCache import Utilities class MqttMonitorWidget(QWidget, Ui_MqttMonitorWidget): """ Class implementing the MQTT Monitor widget. """ BrokerStatusTopicPrefix = "$SYS/broker/" BrokerStatusTopic = "$SYS/broker/#" BrokerStatusTopicLoadPrefix = "$SYS/broker/load/" def __init__(self, plugin, parent=None): """ Constructor @param plugin reference to the plug-in object @type MqttMonitorPlugin @param parent reference to the parent widget @type QWidget """ super(MqttMonitorWidget, self).__init__(parent) self.setupUi(self) self.__plugin = plugin self.__connectedToBroker = False self.__brokerStatusTopicSubscribed = False self.pixmapLabel.setPixmap(UI.PixmapCache.getPixmap( os.path.join("MqttMonitor", "icons", "mqtt48.png"))) self.brokerWidget.setCurrentIndex(0) self.__connectionModeProfile = True self.__setConnectionMode(True) # initial mode is 'profile connection' self.__populateProfileComboBox() self.connectButton.setIcon(UI.PixmapCache.getIcon("ircConnect.png")) self.brokerConnectionOptionsButton.setIcon(UI.PixmapCache.getIcon( os.path.join("MqttMonitor", "icons", "connectionOptions.png"))) self.__populateBrokerComboBoxes() self.brokerStatusLabel.hide() self.subscribeButton.setIcon(UI.PixmapCache.getIcon("plus.png")) self.subscribeButton.setEnabled(False) self.unsubscribeButton.setIcon(UI.PixmapCache.getIcon("minus.png")) self.__subscribedTopics = [] self.__topicQueue = {} self.__updateUnsubscribeTopicComboBox() self.__publishedTopics = [] self.__updatePublishTopicComboBox() self.publishButton.setEnabled(False) self.__connectionOptions = None prefix = MqttMonitorWidget.BrokerStatusTopicPrefix self.__statusLabelMapping = { # broker prefix + "version": self.versionLabel, prefix + "timestamp": self.timestampLabel, prefix + "uptime": self.uptimeLabel, prefix + "subscriptions/count": self.subscriptionsLabel, # clients prefix + "clients/connected": self.clientsConnectedLabel, prefix + "clients/disconnected": self.clientsDisconnectedLabel, prefix + "clients/expired": self.clientsExpiredLabel, prefix + "clients/maximum": self.clientsMaximumLabel, prefix + "clients/total": self.clientsTotalLabel, # messages prefix + "messages/sent": self.messagesSentLabel, prefix + "messages/received": self.messagesReceivedLabel, prefix + "messages/stored": self.messagesStoredLabel, prefix + "store/messages/count": self.messagesStoredLabel, prefix + "messages/inflight": self.messagesInflightLabel, prefix + "retained messages/count": self.messagesRetainedLabel, # publish messages prefix + "publish/messages/sent": self.publishMessagesSentLabel, prefix + "publish/messages/received": self.publishMessagesReceivedLabel, prefix + "publish/messages/dropped": self.publishMessagesDroppedLabel, # traffic prefix + "bytes/sent": self.bytesSentLabel, prefix + "bytes/received": self.bytesReceivedLabel, # load prefix + "load/bytes/sent": self.loadBytesSentLabel, prefix + "load/bytes/received": self.loadBytesReceivedLabel, prefix + "load/messages/sent": self.loadMessagesSentLabel, prefix + "load/messages/received": self.loadMessagesReceivedLabel, prefix + "load/publish/sent": self.loadPublishSentLabel, prefix + "load/publish/received": self.loadPublishReceivedLabel, prefix + "load/publish/dropped": self.loadPublishDroppedLabel, prefix + "load/connections": self.loadConnectionsLabel, prefix + "load/sockets": self.loadSocketsLabel, } self.__statusLoadValues = collections.defaultdict( self.__loadDefaultDictFactory) self.__client = MqttClient() # connect the MQTT client signals self.__client.onConnect.connect(self.__brokerConnected) self.__client.onDisconnected.connect(self.__brokerDisconnected) self.__client.onMessage.connect(self.__messageReceived) self.__client.onPublish.connect(self.__messagePublished) self.__client.onSubscribe.connect(self.__topicSubscribed) self.__client.onUnsubscribe.connect(self.__topicUnsubscribed) ####################################################################### ## Slots handling MQTT related signals ####################################################################### @pyqtSlot(dict, int) def __brokerConnected(self, flags, rc): """ Private slot to handle being connected to a broker. @param flags flags set for the connection @type dict @param rc CONNACK result code @type int """ if rc == 0: self.__connectedToBroker = True self.__connectionOptions = None msg = mqttConnackMessage(rc) self.__flashBrokerStatusLabel(msg) self.connectButton.setIcon(UI.PixmapCache.getIcon("ircDisconnect.png")) self.subscribeGroup.setEnabled(True) self.unsubscribeGroup.setEnabled(True) self.publishGroup.setEnabled(True) self.brokerStatusButton.setEnabled(True) self.__statusLoadValues.clear() self.__clearBrokerStatusLabels() self.__setBrokerStatusSubscribed(False) @pyqtSlot(int) def __brokerDisconnected(self, rc): """ Private slot to handle a disconnection from the broker. @param rc MQTT error result code @type int """ self.__connectedToBroker = False # ensure, the client loop is stopped self.__client.stopLoop() if rc != 0: msg = mqttErrorMessage(rc) else: msg = self.tr("Connection to Broker shut down cleanly.") self.__flashBrokerStatusLabel(msg) self.connectButton.setIcon(UI.PixmapCache.getIcon("ircConnect.png")) self.__subscribedTopics = [] self.__topicQueue = {} self.__updateUnsubscribeTopicComboBox() self.__updatePublishTopicComboBox() self.subscribeGroup.setEnabled(False) self.unsubscribeGroup.setEnabled(False) self.publishGroup.setEnabled(False) self.brokerStatusButton.setEnabled(False) self.__statusLoadValues.clear() @pyqtSlot(str, bytes, int, bool) def __messageReceived(self, topic, payload, qos, retain): """ Private slot to handle the receipt of a message. @param topic topic of the message @type str @param payload payload (i.e. data) of the message @type bytes @param qos quality of service indicator @type int @param retain flag indicating a retained message @type bool """ if topic.startswith(MqttMonitorWidget.BrokerStatusTopicPrefix): # handle broker status messages self.__handleBrokerStatusMessage(topic, payload) else: self.__appendMessage(topic, payload) @pyqtSlot(int) def __messagePublished(self, mid): """ Private slot to handle a message being published. @param mid ID of the published message @type int """ pass @pyqtSlot(int, tuple) def __topicSubscribed(self, mid, grantedQos): """ Private slot to handle being subscribed to topics. @param mid ID of the subscribe request @type int @param grantedQos tuple of granted quality of service @type tuple of int """ if mid in self.__topicQueue: topic = self.__topicQueue.pop(mid) self.__subscribedTopics.append(topic) self.subscribeTopicEdit.clear() self.__updateUnsubscribeTopicComboBox() self.__updatePublishTopicComboBox() @pyqtSlot(int) def __topicUnsubscribed(self, mid): """ Private slot to handle being unsubcribed from a topic. @param mid ID of the unsubscribe request @type int """ if mid in self.__topicQueue: topic = self.__topicQueue.pop(mid) try: self.__subscribedTopics.remove(topic) self.__updateUnsubscribeTopicComboBox() self.__updatePublishTopicComboBox() except ValueError: # ignore it pass ####################################################################### ## Slots handling UI interactions ####################################################################### @pyqtSlot() def __flashBrokerStatusLabel(self, message): """ Private slot to show the broker status label with some text for 5 seconds. @param message message to be shown @type str """ self.brokerStatusLabel.setText(message) self.brokerStatusLabel.show() QTimer.singleShot(5000, self.brokerStatusLabel.hide) @pyqtSlot() def on_modeButton_clicked(self): """ Private slot to switch between connection profiles and direct connection mode. """ self.__setConnectionMode(not self.__connectionModeProfile) @pyqtSlot(str) def on_brokerComboBox_editTextChanged(self, host): """ Private slot to handling entering or selecting a broker host name. @param host host name of the broker @type str """ self.__setConnectButtonState() @pyqtSlot() def on_brokerConnectionOptionsButton_clicked(self): """ Private slot to show a dialog to modify connection options or a dialog to edit connection profiles. """ if self.__connectionModeProfile: from .MqttConnectionProfilesDialog import \ MqttConnectionProfilesDialog dlg = MqttConnectionProfilesDialog( self.__client, self.__plugin.getPreferences("BrokerProfiles"), parent=self) if dlg.exec_() == QDialog.Accepted: profilesDict = dlg.getProfiles() self.__plugin.setPreferences("BrokerProfiles", profilesDict) self.__populateProfileComboBox() else: from .MqttConnectionOptionsDialog import \ MqttConnectionOptionsDialog dlg = MqttConnectionOptionsDialog( self.__client, self.__connectionOptions, parent=self) if dlg.exec_() == QDialog.Accepted: self.__connectionOptions = dlg.getConnectionOptions() if self.__connectionOptions["TlsEnable"]: port = self.brokerPortComboBox.currentText().strip() if port == "1883": # it is default non-encrypted port => set to TLS port self.brokerPortComboBox.setEditText("8883") else: port = self.brokerPortComboBox.currentText().strip() if port == "8883": # it is default TLS port => set to non-encrypted port self.brokerPortComboBox.setEditText("1883") @pyqtSlot() def on_connectButton_clicked(self): """ Private slot to handle a connect or disconnect request. """ if self.__connectedToBroker: self.__client.disconnectFromServer() else: if self.__connectionModeProfile: self.__profileConnectToBroker() else: self.__directConnectToBroker() @pyqtSlot(str) def on_subscribeTopicEdit_textChanged(self, topic): """ Private slot to handle a change of the entered topic. @param topic entered topic text @type str """ self.subscribeButton.setEnabled(bool(topic)) @pyqtSlot() def on_subscribeButton_clicked(self): """ Private slot to subscribe to the entered topic. """ topic = self.subscribeTopicEdit.text() qos = self.subscribeQosSpinBox.value() if topic: if topic.startswith(MqttMonitorWidget.BrokerStatusTopicPrefix): E5MessageBox.warning( self, self.tr("Subscribe to Topic"), self.tr("Subscriptions to the Status topic '$SYS' shall" " be done on the 'Status' tab.")) else: self.__topicQueue[ self.__client.subscribe(topic, qos)[1]] = topic @pyqtSlot(str) def on_unsubscribeTopicComboBox_currentIndexChanged(self, topic): """ Private slot to handle the selection of a topic to unsubscribe from. @param topic topic text @type str """ self.unsubscribeButton.setEnabled(bool(topic)) @pyqtSlot() def on_unsubscribeButton_clicked(self): """ Private slot to unsubscribe from the selected topic. """ topic = self.unsubscribeTopicComboBox.currentText() if topic: self.__topicQueue[ self.__client.unsubscribe(topic)[1]] = topic @pyqtSlot(str) def on_publishTopicComboBox_editTextChanged(self, topic): """ Private slot to handle changes of the publish topic name. @param topic topic text @type str """ self.publishButton.setEnabled(bool(topic)) @pyqtSlot() def on_publishButton_clicked(self): """ Private slot to publish the entered message. """ topic = self.publishTopicComboBox.currentText() qos = self.publishQosSpinBox.value() retain = self.publishRetainCheckBox.isChecked() payloadStr = self.publishPayloadEdit.toPlainText() if not payloadStr: # use empty string together with the retain flag to clean # a retained message by sending None instead payloadStr = None msgInfo = self.__client.publish(topic, payloadStr, qos, retain) if msgInfo.rc == 0: if topic not in self.__publishedTopics: self.__publishedTopics.append(topic) self.__updatePublishTopicComboBox() @pyqtSlot() def on_publishClearButton_clicked(self): """ Private slot to clear the publish data fields. """ self.publishTopicComboBox.clearEditText() self.publishPayloadEdit.clear() self.publishQosSpinBox.setValue(0) self.publishRetainCheckBox.setChecked(False) @pyqtSlot() def on_brokerStatusButton_clicked(self): """ Private slot to subscribe or unsubscribe the broker status topic. """ if self.__brokerStatusTopicSubscribed: # unsubscribe status topic rc, _ = self.__client.unsubscribe( MqttMonitorWidget.BrokerStatusTopic) if rc == 0: # successfully sent self.__setBrokerStatusSubscribed(False) else: # subscribe status topic rc, _ = self.__client.subscribe( MqttMonitorWidget.BrokerStatusTopic) if rc == 0: # successfully sent self.__setBrokerStatusSubscribed(True) def __setBrokerStatusSubscribed(self, subscribed): """ Private method to set the subscription status for the broker status topics. @param subscribed subscription status for the broker status topics @type bool """ self.__brokerStatusTopicSubscribed = subscribed if subscribed: self.brokerStatusButton.setText(self.tr("Unsubscribe")) self.brokerStatusButton.setToolTip( self.tr("Press to deactivate the status display")) else: self.brokerStatusButton.setText(self.tr("Subscribe")) self.brokerStatusButton.setToolTip( self.tr("Press to activate the status display")) ####################################################################### ## Utility methods ####################################################################### def __addBrokerToRecent(self, host, port): """ Private method to add a host name to the list of recently connected brokers. @param host host name of broker @type str @param port port number of the connection @type int """ brokerList = self.__plugin.getPreferences("RecentBrokersWithPort") hostAndPort = [host, port] if hostAndPort in brokerList: brokerList.remove(hostAndPort) brokerList.insert(0, hostAndPort) self.__plugin.setPreferences("RecentBrokersWithPort", brokerList) self.__populateBrokerComboBoxes() def __populateBrokerComboBoxes(self): """ Private method to populate the broker name and port combo boxes. """ brokerList = self.__plugin.getPreferences("RecentBrokersWithPort") # step 1: clear combo boxes self.brokerComboBox.clear() self.brokerPortComboBox.clear() # step 2a: populate the broker name list self.brokerComboBox.addItems([b[0].strip() for b in brokerList]) self.__setConnectButtonState() # step 2b: populate the broker ports list if brokerList: currentPort = brokerList[0][1] else: currentPort = 1883 currentPortStr = "{0:5}".format(currentPort) portsSet = {b[1] for b in brokerList} portsSet.update({1883, 8883}) self.brokerPortComboBox.addItems( sorted("{0:5}".format(p) for p in portsSet)) index = self.brokerPortComboBox.findText(currentPortStr) self.brokerPortComboBox.setCurrentIndex(index) def __populateProfileComboBox(self): """ Private method to populate the profiles selection box. """ profilesDict = self.__plugin.getPreferences("BrokerProfiles") self.profileComboBox.clear() self.profileComboBox.addItems(sorted(profilesDict.keys())) self.__setConnectButtonState() def __updateUnsubscribeTopicComboBox(self): """ Private method to update the unsubcribe topic combo box. """ self.unsubscribeTopicComboBox.clear() self.unsubscribeTopicComboBox.addItems(sorted(self.__subscribedTopics)) self.unsubscribeButton.setEnabled(len(self.__subscribedTopics) > 0) def __updatePublishTopicComboBox(self): """ Private method to update the publish topic combo box. """ self.publishTopicComboBox.clear() self.publishTopicComboBox.addItems( [""] + list(set(self.__publishedTopics + self.__subscribedTopics))) def __appendMessage(self, topic, payload): """ Private method to append a received message to the output. @param topic topic of the received message @type str @param payload payload of the received message @type bytes """ payloadStr = str(payload, encoding="utf-8", errors="replace") txt = self.tr("{0} -> {1}").format(topic, payloadStr) if not txt.endswith(("\r\n", "\r", "\n")): txt += "\n" tc = self.messagesEdit.textCursor() tc.movePosition(QTextCursor.End) self.messagesEdit.setTextCursor(tc) self.messagesEdit.insertPlainText(Utilities.filterAnsiSequences(txt)) self.messagesEdit.ensureCursorVisible() def __handleBrokerStatusMessage(self, topic, payload): """ Private method to append a received message to the output. @param topic topic of the received message @type str @param payload payload of the received message @type bytes """ payloadStr = str(payload, encoding="utf-8", errors="replace").strip() topic = topic.strip() if topic.startswith(MqttMonitorWidget.BrokerStatusTopicLoadPrefix): self.__handleBrokerLoadStatusMessage(topic, payloadStr) else: try: label = self.__statusLabelMapping[topic] label.setText(payloadStr) except KeyError: # ignore topics not shown in display pass def __handleBrokerLoadStatusMessage(self, topic, payloadStr): """ Private method to append a received message to the output. @param topic topic of the received message @type str @param payloadStr string representation of the payload of the received message @type str """ subtopic, topicElement = topic.rsplit("/", 1) self.__statusLoadValues[subtopic][topicElement] = payloadStr try: label = self.__statusLabelMapping[subtopic] label.setText("{0} / {1} / {2}".format( self.__statusLoadValues[subtopic]["1min"], self.__statusLoadValues[subtopic]["5min"], self.__statusLoadValues[subtopic]["15min"], )) except KeyError: # ignore topics not shown in display pass def __clearBrokerStatusLabels(self): """ Private method to clear the broker status labels. """ for statusLabelKey in self.__statusLabelMapping: if statusLabelKey.startswith( MqttMonitorWidget.BrokerStatusTopicLoadPrefix): label = "- / - / -" else: label = "-" self.__statusLabelMapping[statusLabelKey].setText(label) def __loadDefaultDictFactory(self): """ Private method to populate non-existing load items. @return default dictionary entry @rtype dict """ return { "1min": "-", "5min": "-", "15min": "-", } def __setConnectionMode(self, profileMode): """ Private method to set the connection mode. @param profileMode flag indicating the profile connection mode @type bool """ self.__connectionModeProfile = profileMode if profileMode: self.modeButton.setIcon(UI.PixmapCache.getIcon( os.path.join("MqttMonitor", "icons", "profiles.png"))) else: self.modeButton.setIcon(UI.PixmapCache.getIcon( os.path.join("MqttMonitor", "icons", "quickopen.png"))) self.profileComboBox.setVisible(profileMode) self.brokerConnectionWidget.setVisible(not profileMode) self.__setConnectButtonState() def __setConnectButtonState(self): """ Private method to set the enabled state of the connect button. """ if self.__connectionModeProfile: self.connectButton.setEnabled( bool(self.profileComboBox.currentText())) else: self.connectButton.setEnabled( bool(self.brokerComboBox.currentText())) def __directConnectToBroker(self): """ Private method to connect to the broker with entered data. """ host = self.brokerComboBox.currentText() port = self.brokerPortComboBox.currentText().strip() try: port = int(port) except ValueError: # use standard port at 1883 port = 1883 if host: self.__addBrokerToRecent(host, port) if self.__connectionOptions is None: self.__client.connectToServer(host, port=port) else: self.__client.connectToServerWithOptions( host, port=port, options=self.__connectionOptions) def __profileConnectToBroker(self): """ Private method to connect to the broker with selected profile. """ profileName = self.profileComboBox.currentText() if profileName: profilesDict = self.__plugin.getPreferences("BrokerProfiles") profile = copy.copy(profilesDict[profileName]) # play it save host = profile["BrokerAddress"] port = profile["BrokerPort"] self.__client.connectToServerWithOptions(host, port=port, options=profile)