Sun, 30 May 2021 18:21:40 +0200
Ported the plug-in to PyQt6 for eric7.
# -*- coding: utf-8 -*- # Copyright (c) 2018 - 2021 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the MQTT Monitor widget. """ import os import collections import copy import contextlib from PyQt6.QtCore import pyqtSlot, Qt, QTimer, QFileInfo from PyQt6.QtGui import QFont, QTextCursor, QBrush, QColor from PyQt6.QtWidgets import QWidget, QDialog from EricWidgets import EricMessageBox, EricFileDialog from EricWidgets.EricPathPicker import EricPathPickerModes from .Ui_MqttMonitorWidget import Ui_MqttMonitorWidget from .MqttClient import ( MqttClient, mqttConnackMessage, mqttErrorMessage, mqttLogLevelString ) import UI.PixmapCache import Utilities class MqttMonitorWidget(QWidget, Ui_MqttMonitorWidget): """ Class implementing the MQTT Monitor widget. """ BrokerStatusTopicPrefix = "$SYS/broker/" BrokerStatusTopic = "$SYS/broker/#" BrokerStatusTopicLoadPrefix = "$SYS/broker/load/" def __init__(self, plugin, usesDarkPalette, parent=None): """ Constructor @param plugin reference to the plug-in object @type MqttMonitorPlugin @param usesDarkPalette flag indicating the use of a dark application palette @type bool @param parent reference to the parent widget @type QWidget """ super().__init__(parent) self.setupUi(self) self.__plugin = plugin self.__iconSuffix = "dark" if usesDarkPalette else "light" self.__connectedToBroker = False self.__brokerStatusTopicSubscribed = False self.pixmapLabel.setPixmap(UI.PixmapCache.getPixmap( os.path.join("MqttMonitor", "icons", "mqtt48-{0}".format(self.__iconSuffix)) )) self.publishPayloadFilePicker.setMode( EricPathPickerModes.OPEN_FILE_MODE) self.publishPayloadFilePicker.setFilters(self.tr("All Files (*)")) self.__messagesFormat = self.messagesEdit.currentCharFormat() self.__messagesTopicFormat = self.messagesEdit.currentCharFormat() self.__messagesTopicFormat.setFontWeight(QFont.Weight.Bold) self.__messagesQosFormat = self.messagesEdit.currentCharFormat() self.__messagesQosFormat.setFontItalic(True) self.messagesSearchWidget.attachTextEdit(self.messagesEdit) self.messagesSearchWidget.setWidthForHeight(False) self.__isAlternate = False for logLevel in (MqttClient.LogDisabled, MqttClient.LogDebug, MqttClient.LogInfo, MqttClient.LogNotice, MqttClient.LogWarning, MqttClient.LogError): self.logLevelComboBox.addItem(mqttLogLevelString( logLevel, isMqttLogLevel=False), logLevel) self.logLevelComboBox.setCurrentIndex( self.logLevelComboBox.count() - 1) if usesDarkPalette: self.__logMessagesBackgrounds = { MqttClient.LogDebug: QBrush(QColor("#2f2f2f")), MqttClient.LogInfo: QBrush(QColor("#868686")), MqttClient.LogNotice: QBrush(QColor("#009900")), MqttClient.LogWarning: QBrush(QColor("#999900")), MqttClient.LogError: QBrush(QColor("#990000")), MqttClient.LogDisabled: QBrush(QColor("#990099")), # reuse LogDisabled for unknown log levels } else: self.__logMessagesBackgrounds = { MqttClient.LogDebug: QBrush(Qt.GlobalColor.white), MqttClient.LogInfo: QBrush(Qt.GlobalColor.lightGray), MqttClient.LogNotice: QBrush(Qt.GlobalColor.green), MqttClient.LogWarning: QBrush(Qt.GlobalColor.yellow), MqttClient.LogError: QBrush(Qt.GlobalColor.red), MqttClient.LogDisabled: QBrush(Qt.GlobalColor.magenta) # reuse LogDisabled for unknown log levels } self.logSearchWidget.attachTextEdit(self.logEdit) self.logSearchWidget.setWidthForHeight(False) self.brokerWidget.setCurrentIndex(0) self.__connectionModeProfile = True self.__setConnectionMode(True) # initial mode is 'profile connection' self.__populateProfileComboBox() self.connectButton.setIcon(UI.PixmapCache.getIcon("ircConnect")) self.brokerConnectionOptionsButton.setIcon(UI.PixmapCache.getIcon( os.path.join("MqttMonitor", "icons", "connectionOptions-{0}".format(self.__iconSuffix)) )) self.__populateBrokerComboBoxes() self.brokerStatusLabel.hide() self.subscribeButton.setIcon(UI.PixmapCache.getIcon("plus")) self.subscribeButton.setEnabled(False) self.unsubscribeButton.setIcon(UI.PixmapCache.getIcon("minus")) self.__subscribedTopics = [] self.__topicQueue = {} self.__updateUnsubscribeTopicComboBox() self.__publishedTopics = [] self.__updatePublishTopicComboBox() self.publishButton.setEnabled(False) self.__connectionOptions = None prefix = MqttMonitorWidget.BrokerStatusTopicPrefix self.__statusLabelMapping = { # broker prefix + "version": self.versionLabel, prefix + "timestamp": self.timestampLabel, prefix + "uptime": self.uptimeLabel, prefix + "subscriptions/count": self.subscriptionsLabel, # clients prefix + "clients/connected": self.clientsConnectedLabel, prefix + "clients/disconnected": self.clientsDisconnectedLabel, prefix + "clients/expired": self.clientsExpiredLabel, prefix + "clients/maximum": self.clientsMaximumLabel, prefix + "clients/total": self.clientsTotalLabel, # messages prefix + "messages/sent": self.messagesSentLabel, prefix + "messages/received": self.messagesReceivedLabel, prefix + "messages/stored": self.messagesStoredLabel, prefix + "store/messages/count": self.messagesStoredLabel, prefix + "messages/inflight": self.messagesInflightLabel, prefix + "retained messages/count": self.messagesRetainedLabel, # publish messages prefix + "publish/messages/sent": self.publishMessagesSentLabel, prefix + "publish/messages/received": self.publishMessagesReceivedLabel, prefix + "publish/messages/dropped": self.publishMessagesDroppedLabel, # traffic prefix + "bytes/sent": self.bytesSentLabel, prefix + "bytes/received": self.bytesReceivedLabel, # load prefix + "load/bytes/sent": self.loadBytesSentLabel, prefix + "load/bytes/received": self.loadBytesReceivedLabel, prefix + "load/messages/sent": self.loadMessagesSentLabel, prefix + "load/messages/received": self.loadMessagesReceivedLabel, prefix + "load/publish/sent": self.loadPublishSentLabel, prefix + "load/publish/received": self.loadPublishReceivedLabel, prefix + "load/publish/dropped": self.loadPublishDroppedLabel, prefix + "load/connections": self.loadConnectionsLabel, prefix + "load/sockets": self.loadSocketsLabel, } self.__statusLoadValues = collections.defaultdict( self.__loadDefaultDictFactory) self.__client = MqttClient() # connect the MQTT client signals self.__client.onConnect.connect(self.__brokerConnected) self.__client.onDisconnected.connect(self.__brokerDisconnected) self.__client.onLog.connect(self.__clientLog) self.__client.onMessage.connect(self.__messageReceived) self.__client.onPublish.connect(self.__messagePublished) self.__client.onSubscribe.connect(self.__topicSubscribed) self.__client.onUnsubscribe.connect(self.__topicUnsubscribed) self.__client.connectTimeout.connect(self.__connectTimeout) ####################################################################### ## Slots handling MQTT related signals ####################################################################### @pyqtSlot(dict, int) def __brokerConnected(self, flags, rc): """ Private slot to handle being connected to a broker. @param flags flags set for the connection @type dict @param rc CONNACK result code @type int """ self.brokerStatusLabel.hide() # TODO: add support for flags[‘session present’] if rc == 0: self.__connectedToBroker = True self.__connectionOptions = None msg = mqttConnackMessage(rc) self.__flashBrokerStatusLabel(msg) self.connectButton.setEnabled(True) if rc == 0: self.__connectedToBroker = True self.__connectionOptions = None self.connectButton.setIcon( UI.PixmapCache.getIcon("ircDisconnect")) self.subscribeGroup.setEnabled(True) self.unsubscribeGroup.setEnabled(True) self.publishGroup.setEnabled(True) self.brokerStatusButton.setEnabled(True) self.__statusLoadValues.clear() self.__clearBrokerStatusLabels() self.__setBrokerStatusSubscribed(False) else: self.__client.stopLoop() @pyqtSlot() def __connectTimeout(self): """ Private slot handling a timeout during a connection attempt. """ self.__flashBrokerStatusLabel(self.tr("Connection timed out")) self.__setConnectButtonState() @pyqtSlot(int) def __brokerDisconnected(self, rc): """ Private slot to handle a disconnection from the broker. @param rc MQTT error result code @type int """ self.__connectedToBroker = False # ensure, the client loop is stopped self.__client.stopLoop() msg = ( mqttErrorMessage(rc) if rc > 0 else self.tr("Connection to Broker shut down cleanly.") ) self.__flashBrokerStatusLabel(msg) self.connectButton.setIcon(UI.PixmapCache.getIcon("ircConnect")) self.__setConnectButtonState() self.__subscribedTopics = [] self.__topicQueue = {} self.__updateUnsubscribeTopicComboBox() self.__updatePublishTopicComboBox() self.subscribeGroup.setEnabled(False) self.unsubscribeGroup.setEnabled(False) self.publishGroup.setEnabled(False) self.brokerStatusButton.setEnabled(False) self.__statusLoadValues.clear() @pyqtSlot(int, str) def __clientLog(self, level, message): """ Private slot to handle the receipt of a log message. @param level log level @type int @param message log message @type str """ with contextlib.suppress(KeyError): if MqttClient.LogLevelMap[level] < self.logLevelComboBox.itemData( self.logLevelComboBox.currentIndex()): return scrollbarValue = self.logEdit.verticalScrollBar().value() textCursor = self.logEdit.textCursor() if not self.logEdit.document().isEmpty(): textCursor.movePosition(QTextCursor.MoveOperation.End) self.logEdit.setTextCursor(textCursor) self.logEdit.insertPlainText("\n") textBlockFormat = textCursor.blockFormat() try: textBlockFormat.setBackground( self.__logMessagesBackgrounds[MqttClient.LogLevelMap[level]]) except KeyError: textBlockFormat.setBackground( self.__logMessagesBackgrounds[MqttClient.LogDisabled]) textCursor.setBlockFormat(textBlockFormat) textCursor.movePosition(QTextCursor.MoveOperation.End) self.logEdit.setTextCursor(textCursor) txt = self.tr("{0}: {1}").format(mqttLogLevelString(level), message) self.logEdit.insertPlainText(Utilities.filterAnsiSequences(txt)) if self.followLogMessagesCheckBox.isChecked(): self.logEdit.ensureCursorVisible() else: self.logEdit.verticalScrollBar().setValue(scrollbarValue) @pyqtSlot(str, bytes, int, bool) def __messageReceived(self, topic, payload, qos, retain): """ Private slot to handle the receipt of a message. @param topic topic of the message @type str @param payload payload (i.e. data) of the message @type bytes @param qos quality of service indicator @type int @param retain flag indicating a retained message @type bool """ if topic.startswith(MqttMonitorWidget.BrokerStatusTopicPrefix): # handle broker status messages self.__handleBrokerStatusMessage(topic, payload) else: self.__appendMessage(topic, payload, qos) @pyqtSlot(int) def __messagePublished(self, mid): """ Private slot to handle a message being published. @param mid ID of the published message @type int """ # TODO: check this 'pass' statement pass @pyqtSlot(int, tuple) def __topicSubscribed(self, mid, grantedQos): """ Private slot to handle being subscribed to topics. @param mid ID of the subscribe request @type int @param grantedQos tuple of granted quality of service @type tuple of int """ if mid in self.__topicQueue: topic = self.__topicQueue.pop(mid) self.__subscribedTopics.append(topic) self.subscribeTopicEdit.clear() self.__updateUnsubscribeTopicComboBox() self.__updatePublishTopicComboBox() @pyqtSlot(int) def __topicUnsubscribed(self, mid): """ Private slot to handle being unsubcribed from a topic. @param mid ID of the unsubscribe request @type int """ if mid in self.__topicQueue: topic = self.__topicQueue.pop(mid) with contextlib.suppress(ValueError): self.__subscribedTopics.remove(topic) self.__updateUnsubscribeTopicComboBox() self.__updatePublishTopicComboBox() ####################################################################### ## Slots handling UI interactions ####################################################################### @pyqtSlot() def __flashBrokerStatusLabel(self, message): """ Private slot to show the broker status label with some text for 5 seconds. @param message message to be shown @type str """ self.brokerStatusLabel.setText(message) self.brokerStatusLabel.show() QTimer.singleShot(5000, self.brokerStatusLabel.hide) @pyqtSlot() def on_modeButton_clicked(self): """ Private slot to switch between connection profiles and direct connection mode. """ self.__setConnectionMode(not self.__connectionModeProfile) @pyqtSlot(str) def on_profileComboBox_currentIndexChanged(self, profileName): """ Private slot handling the change of the selected profile. @param profileName name of the selected profile @type str """ self.__setConnectButtonState() @pyqtSlot(str) def on_brokerComboBox_editTextChanged(self, host): """ Private slot to handling entering or selecting a broker host name. @param host host name of the broker @type str """ self.__setConnectButtonState() @pyqtSlot() def on_brokerConnectionOptionsButton_clicked(self): """ Private slot to show a dialog to modify connection options or a dialog to edit connection profiles. """ if self.__connectionModeProfile: from .MqttConnectionProfilesDialog import ( MqttConnectionProfilesDialog ) dlg = MqttConnectionProfilesDialog( self.__client, self.__plugin.getPreferences("BrokerProfiles"), parent=self) if dlg.exec() == QDialog.DialogCode.Accepted: profilesDict = dlg.getProfiles() self.__plugin.setPreferences("BrokerProfiles", profilesDict) self.__populateProfileComboBox() else: from .MqttConnectionOptionsDialog import ( MqttConnectionOptionsDialog ) dlg = MqttConnectionOptionsDialog( self.__client, self.__connectionOptions, parent=self) if dlg.exec() == QDialog.DialogCode.Accepted: self.__connectionOptions = dlg.getConnectionOptions() if self.__connectionOptions["TlsEnable"]: port = self.brokerPortComboBox.currentText().strip() if port == "1883": # it is default non-encrypted port => set to TLS port self.brokerPortComboBox.setEditText("8883") else: port = self.brokerPortComboBox.currentText().strip() if port == "8883": # it is default TLS port => set to non-encrypted port self.brokerPortComboBox.setEditText("1883") @pyqtSlot() def on_connectButton_clicked(self): """ Private slot to handle a connect or disconnect request. """ if self.__connectedToBroker: self.__client.disconnectFromServer() else: if self.__connectionModeProfile: self.__profileConnectToBroker() else: self.__directConnectToBroker() @pyqtSlot(str) def on_subscribeTopicEdit_textChanged(self, topic): """ Private slot to handle a change of the entered topic. @param topic entered topic text @type str """ self.subscribeButton.setEnabled(bool(topic)) @pyqtSlot() def on_subscribeTopicEdit_returnPressed(self): """ Private slot handling the user pressing the return button to subscribe a topic. """ self.on_subscribeButton_clicked() @pyqtSlot() def on_subscribeButton_clicked(self): """ Private slot to subscribe to the entered topic. """ topic = self.subscribeTopicEdit.text() qos = self.subscribeQosSpinBox.value() if topic: if topic.startswith(MqttMonitorWidget.BrokerStatusTopicPrefix): EricMessageBox.warning( self, self.tr("Subscribe to Topic"), self.tr("Subscriptions to the Status topic '$SYS' shall" " be done on the 'Status' tab.")) else: self.__topicQueue[ self.__client.subscribe(topic, qos)[1]] = topic @pyqtSlot(str) def on_unsubscribeTopicComboBox_currentIndexChanged(self, topic): """ Private slot to handle the selection of a topic to unsubscribe from. @param topic topic text @type str """ self.unsubscribeButton.setEnabled(bool(topic)) @pyqtSlot() def on_unsubscribeButton_clicked(self): """ Private slot to unsubscribe from the selected topic. """ topic = self.unsubscribeTopicComboBox.currentText() if topic: self.__topicQueue[ self.__client.unsubscribe(topic)[1]] = topic @pyqtSlot(str) def on_publishTopicComboBox_editTextChanged(self, topic): """ Private slot to handle changes of the publish topic name. @param topic topic text @type str """ self.publishButton.setEnabled(bool(topic)) @pyqtSlot() def on_publishButton_clicked(self): """ Private slot to publish the entered message. """ topic = self.publishTopicComboBox.currentText() qos = self.publishQosSpinBox.value() retain = self.publishRetainCheckBox.isChecked() payloadFile = self.publishPayloadFilePicker.text() if ( bool(payloadFile) and os.path.exists(payloadFile) and os.path.getsize(payloadFile) <= 268435455 ): # payload size limit is 268,435,455 bytes try: with open(payloadFile, "rb") as f: payloadStr = f.read() except EnvironmentError as err: EricMessageBox.critical( self, self.tr("Read Payload from File"), self.tr("""<p>The file <b>{0}</b> could not be read.""" """ Aborting...</p><p>Reason: {1}</p>""").format( payloadFile, str(err))) return else: payloadStr = self.publishPayloadEdit.toPlainText() if not payloadStr: # use empty string together with the retain flag to clean # a retained message by sending None instead payloadStr = None msgInfo = self.__client.publish(topic, payloadStr, qos, retain) if msgInfo.rc == 0: if topic not in self.__publishedTopics: self.__publishedTopics.append(topic) self.__updatePublishTopicComboBox(resetTopic=False) if self.clearPublishCheckBox.isChecked(): self.on_publishClearButton_clicked() @pyqtSlot() def on_publishClearButton_clicked(self): """ Private slot to clear the publish data fields. """ self.publishTopicComboBox.clearEditText() self.publishPayloadEdit.clear() self.publishQosSpinBox.setValue(0) self.publishRetainCheckBox.setChecked(False) self.publishPayloadFilePicker.clear() @pyqtSlot(str) def on_publishPayloadFilePicker_textChanged(self, path): """ Private slot handling a change of path of the payload file. @param path path of the payload file @type str """ self.publishPayloadEdit.setEnabled(not bool(path)) @pyqtSlot() def on_brokerStatusButton_clicked(self): """ Private slot to subscribe or unsubscribe the broker status topic. """ if self.__brokerStatusTopicSubscribed: # unsubscribe status topic rc, _ = self.__client.unsubscribe( MqttMonitorWidget.BrokerStatusTopic) if rc == 0: # successfully sent self.__setBrokerStatusSubscribed(False) else: # subscribe status topic rc, _ = self.__client.subscribe( MqttMonitorWidget.BrokerStatusTopic) if rc == 0: # successfully sent self.__setBrokerStatusSubscribed(True) @pyqtSlot(int) def on_messagesEdit_blockCountChanged(self, newBlockCount): """ Private slot handling changes of received messages. @param newBlockCount (ignored) @type int """ enable = not self.messagesEdit.document().isEmpty() self.saveMessagesButton.setEnabled(enable) self.clearMessagesButton.setEnabled(enable) @pyqtSlot() def on_saveMessagesButton_clicked(self): """ Private slot to save the received messages. """ fn, selectedFilter = EricFileDialog.getSaveFileNameAndFilter( self, self.tr("Save Messages"), "", self.tr("Messages Files (*.txt);;All Files (*)"), "", EricFileDialog.DontConfirmOverwrite) if fn: if fn.endswith("."): fn = fn[:-1] ext = QFileInfo(fn).suffix() if not ext: ex = selectedFilter.split("(*")[1].split(")")[0] if ex: fn += ex if QFileInfo(fn).exists(): res = EricMessageBox.yesNo( self, self.tr("Save Messages"), self.tr("<p>The file <b>{0}</b> already exists." " Overwrite it?</p>").format(fn), icon=EricMessageBox.Warning) if not res: return fn = Utilities.toNativeSeparators(fn) try: with open(fn, "w") as f: f.write(self.messagesEdit.toPlainText()) except EnvironmentError as err: EricMessageBox.critical( self, self.tr("Save Messages"), self.tr("""<p>The file <b>{0}</b> could not be written.""" """</p><p>Reason: {1}</p>""").format( fn, str(err))) @pyqtSlot(int) def on_logEdit_blockCountChanged(self, newBlockCount): """ Private slot handling changes of received messages. @param newBlockCount (ignored) @type int """ enable = not self.logEdit.document().isEmpty() self.saveLogMessagesButton.setEnabled(enable) self.clearLogMessagesButton.setEnabled(enable) @pyqtSlot() def on_saveLogMessagesButton_clicked(self): """ Private slot to save the log messages. """ fn, selectedFilter = EricFileDialog.getSaveFileNameAndFilter( self, self.tr("Save Log Messages"), "", self.tr("Log Files (*.log);;All Files (*)"), "", EricFileDialog.DontConfirmOverwrite) if fn: if fn.endswith("."): fn = fn[:-1] ext = QFileInfo(fn).suffix() if not ext: ex = selectedFilter.split("(*")[1].split(")")[0] if ex: fn += ex if QFileInfo(fn).exists(): res = EricMessageBox.yesNo( self, self.tr("Save Log Messages"), self.tr("<p>The file <b>{0}</b> already exists." " Overwrite it?</p>").format(fn), icon=EricMessageBox.Warning) if not res: return fn = Utilities.toNativeSeparators(fn) try: with open(fn, "w") as f: f.write(self.logEdit.toPlainText()) except EnvironmentError as err: EricMessageBox.critical( self, self.tr("Save Log Messages"), self.tr("""<p>The file <b>{0}</b> could not be written.""" """</p><p>Reason: {1}</p>""").format( fn, str(err))) ####################################################################### ## Utility methods ####################################################################### def __setBrokerStatusSubscribed(self, subscribed): """ Private method to set the subscription status for the broker status topics. @param subscribed subscription status for the broker status topics @type bool """ self.__brokerStatusTopicSubscribed = subscribed if subscribed: self.brokerStatusButton.setText(self.tr("Unsubscribe")) self.brokerStatusButton.setToolTip( self.tr("Press to deactivate the status display")) else: self.brokerStatusButton.setText(self.tr("Subscribe")) self.brokerStatusButton.setToolTip( self.tr("Press to activate the status display")) def __addBrokerToRecent(self, host, port): """ Private method to add a host name to the list of recently connected brokers. @param host host name of broker @type str @param port port number of the connection @type int """ brokerList = self.__plugin.getPreferences("RecentBrokersWithPort") hostAndPort = [host, port] if hostAndPort in brokerList: brokerList.remove(hostAndPort) brokerList.insert(0, hostAndPort) # limit to most recently used 20 entries brokerList = brokerList[:20] self.__plugin.setPreferences("RecentBrokersWithPort", brokerList) self.__populateBrokerComboBoxes() def __populateBrokerComboBoxes(self): """ Private method to populate the broker name and port combo boxes. """ brokerPortList = self.__plugin.getPreferences("RecentBrokersWithPort") # step 1: clear combo boxes self.brokerComboBox.clear() self.brokerPortComboBox.clear() # step 2a: populate the broker name list currentBroker = brokerPortList[0][0] if brokerPortList else "" brokerSet = {b[0].strip() for b in brokerPortList} self.brokerComboBox.addItems(sorted(brokerSet)) index = self.brokerComboBox.findText(currentBroker) self.brokerComboBox.setCurrentIndex(index) # step 2b: populate the broker ports list currentPort = brokerPortList[0][1] if brokerPortList else 1883 currentPortStr = "{0:5}".format(currentPort) portsSet = {b[1] for b in brokerPortList} portsSet.update({1883, 8883}) self.brokerPortComboBox.addItems( sorted("{0:5}".format(p) for p in portsSet)) index = self.brokerPortComboBox.findText(currentPortStr) self.brokerPortComboBox.setCurrentIndex(index) # step 3: update the connect button state self.__setConnectButtonState() def __populateProfileComboBox(self): """ Private method to populate the profiles selection box. """ profilesDict = self.__plugin.getPreferences("BrokerProfiles") mostRecentProfile = self.__plugin.getPreferences("MostRecentProfile") self.profileComboBox.clear() self.profileComboBox.addItems(sorted(profilesDict.keys())) if mostRecentProfile: index = self.profileComboBox.findText(mostRecentProfile) if index >= 0: self.profileComboBox.setCurrentIndex(index) self.__setConnectButtonState() def __updateUnsubscribeTopicComboBox(self): """ Private method to update the unsubcribe topic combo box. """ self.unsubscribeTopicComboBox.clear() self.unsubscribeTopicComboBox.addItems(sorted(self.__subscribedTopics)) self.unsubscribeButton.setEnabled(len(self.__subscribedTopics) > 0) def __updatePublishTopicComboBox(self, resetTopic=True): """ Private method to update the publish topic combo box. @param resetTopic flag indicating to reset the topic @type bool """ currentTopic = self.publishTopicComboBox.currentText() self.publishTopicComboBox.clear() self.publishTopicComboBox.addItems( list(set(self.__publishedTopics + self.__subscribedTopics))) if resetTopic: self.publishTopicComboBox.clearEditText() else: topicIndex = self.publishTopicComboBox.findText(currentTopic) self.publishTopicComboBox.setCurrentIndex(topicIndex) def __appendMessage(self, topic, payload, qos): """ Private method to append a received message to the output. @param topic topic of the received message @type str @param payload payload of the received message @type bytes @param qos quality of service indicator (0, 1, 2) @type int """ scrollbarValue = self.messagesEdit.verticalScrollBar().value() textCursor = self.messagesEdit.textCursor() if not self.messagesEdit.document().isEmpty(): textCursor.movePosition(QTextCursor.MoveOperation.End) self.messagesEdit.setTextCursor(textCursor) self.messagesEdit.insertPlainText("\n") textBlockFormat = textCursor.blockFormat() if self.__isAlternate: textBlockFormat.setBackground( self.messagesEdit.palette().alternateBase()) else: textBlockFormat.setBackground( self.messagesEdit.palette().base()) textCursor.setBlockFormat(textBlockFormat) textCursor.movePosition(QTextCursor.MoveOperation.End) self.messagesEdit.setTextCursor(textCursor) self.messagesEdit.setCurrentCharFormat(self.__messagesTopicFormat) self.messagesEdit.insertPlainText(topic + "\n") self.messagesEdit.setCurrentCharFormat(self.__messagesQosFormat) self.messagesEdit.insertPlainText(self.tr("QoS: {0}\n").format(qos)) payloadStr = str(payload, encoding="utf-8", errors="replace") self.messagesEdit.setCurrentCharFormat(self.__messagesFormat) self.messagesEdit.insertPlainText( Utilities.filterAnsiSequences(payloadStr)) if self.followMessagesCheckBox.isChecked(): self.messagesEdit.ensureCursorVisible() else: self.messagesEdit.verticalScrollBar().setValue(scrollbarValue) self.__isAlternate = not self.__isAlternate def __handleBrokerStatusMessage(self, topic, payload): """ Private method to append a received message to the output. @param topic topic of the received message @type str @param payload payload of the received message @type bytes """ payloadStr = str(payload, encoding="utf-8", errors="replace").strip() topic = topic.strip() if topic.startswith(MqttMonitorWidget.BrokerStatusTopicLoadPrefix): self.__handleBrokerLoadStatusMessage(topic, payloadStr) else: with contextlib.suppress(KeyError): label = self.__statusLabelMapping[topic] label.setText(payloadStr) def __handleBrokerLoadStatusMessage(self, topic, payloadStr): """ Private method to append a received message to the output. @param topic topic of the received message @type str @param payloadStr string representation of the payload of the received message @type str """ subtopic, topicElement = topic.rsplit("/", 1) self.__statusLoadValues[subtopic][topicElement] = payloadStr with contextlib.suppress(KeyError): label = self.__statusLabelMapping[subtopic] label.setText("{0} / {1} / {2}".format( self.__statusLoadValues[subtopic]["1min"], self.__statusLoadValues[subtopic]["5min"], self.__statusLoadValues[subtopic]["15min"], )) def __clearBrokerStatusLabels(self): """ Private method to clear the broker status labels. """ for statusLabelKey in self.__statusLabelMapping: label = ( "- / - / -" if statusLabelKey.startswith( MqttMonitorWidget.BrokerStatusTopicLoadPrefix) else "-" ) self.__statusLabelMapping[statusLabelKey].setText(label) def __loadDefaultDictFactory(self): """ Private method to populate non-existing load items. @return default dictionary entry @rtype dict """ return { "1min": "-", "5min": "-", "15min": "-", } def __setConnectionMode(self, profileMode): """ Private method to set the connection mode. @param profileMode flag indicating the profile connection mode @type bool """ self.__connectionModeProfile = profileMode if profileMode: self.modeButton.setIcon(UI.PixmapCache.getIcon( os.path.join("MqttMonitor", "icons", "profiles-{0}".format(self.__iconSuffix)) )) else: self.modeButton.setIcon(UI.PixmapCache.getIcon( os.path.join("MqttMonitor", "icons", "quickopen-{0}".format(self.__iconSuffix)) )) self.profileComboBox.setVisible(profileMode) self.brokerConnectionWidget.setVisible(not profileMode) self.__setConnectButtonState() def __setConnectButtonState(self): """ Private method to set the enabled state of the connect button. """ if self.__connectionModeProfile: self.connectButton.setEnabled( bool(self.profileComboBox.currentText())) else: self.connectButton.setEnabled( bool(self.brokerComboBox.currentText())) def __directConnectToBroker(self): """ Private method to connect to the broker with entered data. """ host = self.brokerComboBox.currentText() port = self.brokerPortComboBox.currentText().strip() try: port = int(port) except ValueError: # use standard port at 1883 port = 1883 if host: self.brokerStatusLabel.setText( self.tr("Connecting to {0}:{1} ...").format( host, port)) self.brokerStatusLabel.show() self.__addBrokerToRecent(host, port) self.connectButton.setEnabled(False) if self.__connectionOptions is None: self.__client.connectToServer(host, port=port) else: self.__client.connectToServerWithOptions( host, port=port, options=self.__connectionOptions) def __profileConnectToBroker(self): """ Private method to connect to the broker with selected profile. """ profileName = self.profileComboBox.currentText() if profileName: self.__plugin.setPreferences("MostRecentProfile", profileName) profilesDict = self.__plugin.getPreferences("BrokerProfiles") profile = copy.copy(profilesDict[profileName]) # play it save host = profile["BrokerAddress"] port = profile["BrokerPort"] self.brokerStatusLabel.setText( self.tr("Connecting to {0}:{1} ...").format( host, port)) self.brokerStatusLabel.show() self.connectButton.setEnabled(False) self.__client.connectToServerWithOptions(host, port=port, options=profile)