Thu, 30 Aug 2018 18:57:57 +0200
MqttMonitorWidget: implemented part 2 of the status tab.
# -*- coding: utf-8 -*- # Copyright (c) 2018 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the MQTT Monitor widget. """ from __future__ import unicode_literals try: str = unicode # __IGNORE_EXCEPTION__ except NameError: pass import os import collections from PyQt5.QtCore import pyqtSlot, QTimer from PyQt5.QtGui import QTextCursor from PyQt5.QtWidgets import QWidget from E5Gui import E5MessageBox from .Ui_MqttMonitorWidget import Ui_MqttMonitorWidget from .MqttClient import MqttClient, mqttConnackMessage, mqttErrorMessage import UI.PixmapCache import Utilities class MqttMonitorWidget(QWidget, Ui_MqttMonitorWidget): """ Class implementing the MQTT Monitor widget. """ BrokerStatusTopicPrefix = "$SYS/broker/" BrokerStatusTopic = "$SYS/broker/#" BrokerStatusTopicLoadPrefix = "$SYS/broker/load/" def __init__(self, plugin, parent=None): """ Constructor @param plugin reference to the plug-in object @type MqttMonitorPlugin @param parent reference to the parent widget @type QWidget """ super(MqttMonitorWidget, self).__init__(parent) self.setupUi(self) self.__plugin = plugin self.__connectedToBroker = False self.__brokerStatusTopicSubscribed = False self.pixmapLabel.setPixmap(UI.PixmapCache.getPixmap( os.path.join("MqttMonitor", "icons", "mqtt48.png"))) self.brokerWidget.setCurrentIndex(0) self.connectButton.setIcon(UI.PixmapCache.getIcon("ircConnect.png")) self.brokerComboBox.addItems( self.__plugin.getPreferences("RecentBrokers")) self.brokerStatusLabel.hide() self.subscribeButton.setIcon(UI.PixmapCache.getIcon("plus.png")) self.subscribeButton.setEnabled(False) self.unsubscribeButton.setIcon(UI.PixmapCache.getIcon("minus.png")) self.__subscribedTopics = [] self.__topicQueue = {} self.__updateUnsubscribeTopicComboBox() prefix = MqttMonitorWidget.BrokerStatusTopicPrefix self.__statusLabelMapping = { # broker prefix + "version": self.versionLabel, prefix + "timestamp": self.timestampLabel, prefix + "uptime": self.uptimeLabel, prefix + "subscriptions/count": self.subscriptionsLabel, # clients prefix + "clients/connected": self.clientsConnectedLabel, prefix + "clients/disconnected": self.clientsDisconnectedLabel, prefix + "clients/expired": self.clientsExpiredLabel, prefix + "clients/maximum": self.clientsMaximumLabel, prefix + "clients/total": self.clientsTotalLabel, # messages prefix + "messages/sent": self.messagesSentLabel, prefix + "messages/received": self.messagesReceivedLabel, prefix + "messages/stored": self.messagesStoredLabel, prefix + "store/messages/count": self.messagesStoredLabel, prefix + "messages/inflight": self.messagesInflightLabel, prefix + "retained messages/count": self.messagesRetainedLabel, # publish messages prefix + "publish/messages/sent": self.publishMessagesSentLabel, prefix + "publish/messages/received": self.publishMessagesReceivedLabel, prefix + "publish/messages/dropped": self.publishMessagesDroppedLabel, # traffic prefix + "bytes/sent": self.bytesSentLabel, prefix + "bytes/received": self.bytesReceivedLabel, # load prefix + "load/bytes/sent": self.loadBytesSentLabel, prefix + "load/bytes/received": self.loadBytesReceivedLabel, prefix + "load/messages/sent": self.loadMessagesSentLabel, prefix + "load/messages/received": self.loadMessagesReceivedLabel, prefix + "load/publish/sent": self.loadPublishSentLabel, prefix + "load/publish/received": self.loadPublishReceivedLabel, prefix + "load/publish/dropped": self.loadPublishDroppedLabel, prefix + "load/connections": self.loadConnectionsLabel, prefix + "load/sockets": self.loadSocketsLabel, } self.__statusLoadValues = collections.defaultdict( self.__loadDefaultDictFactory) self.__client = MqttClient() # connect the MQTT client signals self.__client.onConnect.connect(self.__brokerConnected) self.__client.onDisconnected.connect(self.__brokerDisconnected) self.__client.onMessage.connect(self.__messageReceived) self.__client.onPublish.connect(self.__messagePublished) self.__client.onSubscribe.connect(self.__topicSubscribed) self.__client.onUnsubscribe.connect(self.__topicUnsubscribed) ####################################################################### ## Slots handling MQTT related signals ####################################################################### @pyqtSlot(dict, int) def __brokerConnected(self, flags, rc): """ Private slot to handle being connected to a broker. @param flags flags set for the connection @type dict @param rc CONNACK result code @type int """ if rc == 0: self.__connectedToBroker = True msg = mqttConnackMessage(rc) self.__flashBrokerStatusLabel(msg) self.connectButton.setIcon(UI.PixmapCache.getIcon("ircDisconnect.png")) self.subscribeGroup.setEnabled(True) self.unsubscribeGroup.setEnabled(True) self.brokerStatusButton.setEnabled(True) self.__statusLoadValues.clear() @pyqtSlot(int) def __brokerDisconnected(self, rc): """ Private slot to handle a disconnection from the broker. @param rc MQTT error result code @type int """ self.__connectedToBroker = False # ensure, the client loop is stopped self.__client.stopLoop() if rc != 0: msg = mqttErrorMessage(rc) else: msg = self.tr("Connection to Broker shut down cleanly.") self.__flashBrokerStatusLabel(msg) self.connectButton.setIcon(UI.PixmapCache.getIcon("ircConnect.png")) self.__subscribedTopics = [] self.__topicQueue = {} self.__updateUnsubscribeTopicComboBox() self.subscribeGroup.setEnabled(False) self.unsubscribeGroup.setEnabled(False) self.brokerStatusButton.setEnabled(False) self.__statusLoadValues.clear() @pyqtSlot(str, bytes, int, bool) def __messageReceived(self, topic, payload, qos, retain): """ Private slot to handle the receipt of a message. @param topic topic of the message @type str @param payload payload (i.e. data) of the message @type bytes @param qos quality of service indicator @type int @param retain flag indicating a retained message @type bool """ if topic.startswith(MqttMonitorWidget.BrokerStatusTopicPrefix): # handle broker status messages self.__handleBrokerStatusMessage(topic, payload) else: self.__appendMessage(topic, payload) @pyqtSlot(int) def __messagePublished(self, mid): """ Private slot to handle a message being published. @param mid ID of the published message @type int """ pass @pyqtSlot(int, tuple) def __topicSubscribed(self, mid, grantedQos): """ Private slot to handle being subscribed to topics. @param mid ID of the subscribe request @type int @param grantedQos tuple of granted quality of service @type tuple of int """ if mid in self.__topicQueue: topic = self.__topicQueue.pop(mid) self.__subscribedTopics.append(topic) self.subscribeTopicEdit.clear() self.__updateUnsubscribeTopicComboBox() @pyqtSlot(int) def __topicUnsubscribed(self, mid): """ Private slot to handle being unsubcribed from a topic. @param mid ID of the unsubscribe request @type int """ if mid in self.__topicQueue: topic = self.__topicQueue.pop(mid) try: self.__subscribedTopics.remove(topic) self.__updateUnsubscribeTopicComboBox() except ValueError: # ignore it pass ####################################################################### ## Slots handling UI interactions ####################################################################### @pyqtSlot() def __flashBrokerStatusLabel(self, message): """ Private slot to show the broker status label with some text for 5 seconds. @param message message to be shown @type str """ self.brokerStatusLabel.setText(message) self.brokerStatusLabel.show() QTimer.singleShot(5000, self.brokerStatusLabel.hide) @pyqtSlot(str) def on_brokerComboBox_editTextChanged(self, host): """ Private slot to handling entering or selecting a broker host name. @param host host name of the broker @type str """ if not self.__connectedToBroker and not host: self.connectButton.setEnabled(False) else: self.connectButton.setEnabled(True) @pyqtSlot() def on_connectButton_clicked(self): """ Private slot to handle a connect or disconnect request. """ if self.__connectedToBroker: self.__client.disconnectFromServer() else: host = self.brokerComboBox.currentText() if host: self.__addBrokerToRecent(host) self.__client.connectToServer(host) # use standard port at 1883 @pyqtSlot(str) def on_subscribeTopicEdit_textChanged(self, topic): """ Private slot to handle a change of the entered topic. @param topic entered topic text @type str """ self.subscribeButton.setEnabled(bool(topic)) @pyqtSlot() def on_subscribeButton_clicked(self): """ Private slot to subscribe to the entered topic. """ topic = self.subscribeTopicEdit.text() qos = self.subscribeQosSpinBox.value() if topic: if topic.startswith(MqttMonitorWidget.BrokerStatusTopicPrefix): E5MessageBox.warning( self, self.tr("Subscribe to Topic"), self.tr("Subscriptions to the Status topic '$SYS' shall" " be done on the 'Status' tab.")) else: self.__topicQueue[ self.__client.subscribe(topic, qos)[1]] = topic @pyqtSlot(str) def on_unsubscribeTopicComboBox_currentIndexChanged(self, topic): """ Private slot to handle the selection of a topic to unsubscribe from. @param topic topic text @type str """ self.unsubscribeButton.setEnabled(bool(topic)) @pyqtSlot() def on_unsubscribeButton_clicked(self): """ Private slot to unsubscribe from the selected topic. """ topic = self.unsubscribeTopicComboBox.currentText() if topic: self.__topicQueue[ self.__client.unsubscribe(topic)[1]] = topic @pyqtSlot() def on_brokerStatusButton_clicked(self): """ Private slot to subscribe or unsubscribe the broker status topic. """ if self.__brokerStatusTopicSubscribed: # unsubscribe status topic rc, _ = self.__client.unsubscribe( MqttMonitorWidget.BrokerStatusTopic) if rc == 0: # successfully sent self.__brokerStatusTopicSubscribed = False self.brokerStatusButton.setText(self.tr("Subscribe")) self.brokerStatusButton.setToolTip( self.tr("Press to activate the status display")) else: # subscribe status topic rc, _ = self.__client.subscribe( MqttMonitorWidget.BrokerStatusTopic) if rc == 0: # successfully sent self.__brokerStatusTopicSubscribed = True self.brokerStatusButton.setText(self.tr("Unsubscribe")) self.brokerStatusButton.setToolTip( self.tr("Press to deactivate the status display")) ####################################################################### ## Utility methods ####################################################################### def __addBrokerToRecent(self, host): """ Private method to add a host name to the list of recently connected brokers. @param host host name of broker @type str """ brokerList = self.__plugin.getPreferences("RecentBrokers") if host in brokerList: brokerList.remove(host) brokerList.insert(0, host) self.__plugin.setPreferences("RecentBrokers", brokerList) self.brokerComboBox.clear() self.brokerComboBox.addItems(brokerList) def __updateUnsubscribeTopicComboBox(self): """ Private method to update the unsubcribe topic combo box. """ self.unsubscribeTopicComboBox.clear() self.unsubscribeTopicComboBox.addItems(sorted(self.__subscribedTopics)) self.unsubscribeButton.setEnabled(len(self.__subscribedTopics) > 0) def __appendMessage(self, topic, payload): """ Private method to append a received message to the output. @param topic topic of the received message @type str @param payload payload of the received message @type bytes """ payloadStr = str(payload, encoding="utf-8", errors="replace") txt = "{0} {1}".format(topic, payloadStr) if not txt.endswith(("\r\n", "\r", "\n")): txt += "\n" tc = self.messagesEdit.textCursor() tc.movePosition(QTextCursor.End) self.messagesEdit.setTextCursor(tc) self.messagesEdit.insertPlainText(Utilities.filterAnsiSequences(txt)) self.messagesEdit.ensureCursorVisible() def __handleBrokerStatusMessage(self, topic, payload): """ Private method to append a received message to the output. @param topic topic of the received message @type str @param payload payload of the received message @type bytes """ payloadStr = str(payload, encoding="utf-8", errors="replace").strip() topic = topic.strip() if topic.startswith(MqttMonitorWidget.BrokerStatusTopicLoadPrefix): self.__handleBrokerLoadStatusMessage(topic, payloadStr) else: try: label = self.__statusLabelMapping[topic] label.setText(payloadStr) except KeyError: # ignore topics not shown in display pass def __handleBrokerLoadStatusMessage(self, topic, payloadStr): """ Private method to append a received message to the output. @param topic topic of the received message @type str @param payloadStr string representation of the payload of the received message @type str """ subtopic, topicElement = topic.rsplit("/", 1) self.__statusLoadValues[subtopic][topicElement] = payloadStr try: label = self.__statusLabelMapping[subtopic] label.setText("{0} / {1} / {2}".format( self.__statusLoadValues[subtopic]["1min"], self.__statusLoadValues[subtopic]["5min"], self.__statusLoadValues[subtopic]["15min"], )) except KeyError: # ignore topics not shown in display pass def __loadDefaultDictFactory(self): """ Private method to populate non-existing load items. @return default dictionary entry @rtype dict """ return { "1min": "-", "5min": "-", "15min": "-", }