Mon, 24 Oct 2022 18:01:45 +0200
Adapted the import statements to the new structure.
# -*- coding: utf-8 -*- # Copyright (c) 2018 - 2022 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the MQTT Monitor widget. """ import collections import contextlib import copy import os from PyQt6.QtCore import pyqtSlot, Qt, QTimer, QFileInfo, QPoint from PyQt6.QtGui import QFont, QTextCursor, QBrush, QColor from PyQt6.QtWidgets import QWidget, QDialog, QMenu from eric7 import Utilities try: from eric7.EricGui import EricPixmapCache except ImportError: from UI import PixmapCache as EricPixmapCache from eric7.EricWidgets import EricMessageBox, EricFileDialog from eric7.EricWidgets.EricApplication import ericApp from eric7.EricWidgets.EricPathPicker import EricPathPickerModes from .Ui_MqttMonitorWidget import Ui_MqttMonitorWidget from .MqttClient import ( MqttClient, mqttConnackMessage, mqttErrorMessage, mqttLogLevelString, ) from .MqttReasonCodes import mqttReasonCode from .MqttProtocols import MqttProtocols class MqttMonitorWidget(QWidget, Ui_MqttMonitorWidget): """ Class implementing the MQTT Monitor widget. """ BrokerStatusTopicPrefix = "$SYS/broker/" BrokerStatusTopic = "$SYS/broker/#" BrokerStatusTopicLoadPrefix = "$SYS/broker/load/" def __init__(self, plugin, usesDarkPalette, parent=None): """ Constructor @param plugin reference to the plug-in object @type MqttMonitorPlugin @param usesDarkPalette flag indicating the use of a dark application palette @type bool @param parent reference to the parent widget @type QWidget """ super().__init__(parent) self.setupUi(self) self.layout().setContentsMargins(0, 3, 0, 0) self.__plugin = plugin self.__iconSuffix = "dark" if usesDarkPalette else "light" self.__connectedToBroker = False self.__brokerStatusTopicSubscribed = False with contextlib.suppress(AttributeError): # backward compatibility if not ericApp().usesSmallScreen(): self.pixmapLabel.setPixmap( EricPixmapCache.getPixmap( os.path.join( "MqttMonitor", "icons", "mqtt48-{0}".format(self.__iconSuffix), ) ) ) self.publishPayloadFilePicker.setMode(EricPathPickerModes.OPEN_FILE_MODE) self.publishPayloadFilePicker.setFilters(self.tr("All Files (*)")) self.brokerComboBox.lineEdit().setClearButtonEnabled(True) self.publishTopicComboBox.lineEdit().setClearButtonEnabled(True) self.__messagesFormat = self.messagesEdit.currentCharFormat() self.__messagesTopicFormat = self.messagesEdit.currentCharFormat() self.__messagesTopicFormat.setFontWeight(QFont.Weight.Bold) self.__messagesQosFormat = self.messagesEdit.currentCharFormat() self.__messagesQosFormat.setFontItalic(True) self.__messagesSubheaderFormat = self.messagesEdit.currentCharFormat() self.__messagesSubheaderFormat.setFontUnderline(True) self.__propertiesFormat = self.propertiesEdit.currentCharFormat() self.__propertiesTopicFormat = self.propertiesEdit.currentCharFormat() self.__propertiesTopicFormat.setFontWeight(QFont.Weight.Bold) self.__propertiesNameFormat = self.propertiesEdit.currentCharFormat() self.__propertiesNameFormat.setFontItalic(True) self.messagesSearchWidget.attachTextEdit(self.messagesEdit) self.messagesSearchWidget.setWidthForHeight(False) self.__isMessageAlternate = False self.__isPropertiesAlternate = False for logLevel in ( MqttClient.LogDisabled, MqttClient.LogDebug, MqttClient.LogInfo, MqttClient.LogNotice, MqttClient.LogWarning, MqttClient.LogError, ): self.logLevelComboBox.addItem( mqttLogLevelString(logLevel, isMqttLogLevel=False), logLevel ) self.logLevelComboBox.setCurrentIndex(self.logLevelComboBox.count() - 1) if usesDarkPalette: self.__logMessagesBackgrounds = { MqttClient.LogDebug: QBrush(QColor("#2f2f2f")), MqttClient.LogInfo: QBrush(QColor("#868686")), MqttClient.LogNotice: QBrush(QColor("#009900")), MqttClient.LogWarning: QBrush(QColor("#999900")), MqttClient.LogError: QBrush(QColor("#990000")), MqttClient.LogDisabled: QBrush(QColor("#990099")), # reuse LogDisabled for unknown log levels } else: self.__logMessagesBackgrounds = { MqttClient.LogDebug: QBrush(Qt.GlobalColor.white), MqttClient.LogInfo: QBrush(Qt.GlobalColor.lightGray), MqttClient.LogNotice: QBrush(Qt.GlobalColor.green), MqttClient.LogWarning: QBrush(Qt.GlobalColor.yellow), MqttClient.LogError: QBrush(Qt.GlobalColor.red), MqttClient.LogDisabled: QBrush(Qt.GlobalColor.magenta) # reuse LogDisabled for unknown log levels } self.logSearchWidget.attachTextEdit(self.logEdit) self.logSearchWidget.setWidthForHeight(False) self.brokerWidget.setCurrentIndex(0) self.__connectionModeProfile = True self.__setConnectionMode(True) # initial mode is 'profile connection' self.__populateProfileComboBox() self.connectButton.setIcon(EricPixmapCache.getIcon("ircConnect")) self.brokerConnectionOptionsButton.setIcon( EricPixmapCache.getIcon( os.path.join( "MqttMonitor", "icons", "connectionOptions-{0}".format(self.__iconSuffix), ) ) ) self.__populateBrokerComboBoxes() self.brokerStatusLabel.hide() self.clearWillButton.setIcon(EricPixmapCache.getIcon("certificateDelete")) self.subscribeTopicComboBox.lineEdit().setClearButtonEnabled(True) self.subscribeTopicComboBox.lineEdit().returnPressed.connect( self.on_subscribeButton_clicked ) self.__populateSubscribeTopicComboBox() self.subscribeButton.setIcon(EricPixmapCache.getIcon("plus")) self.subscribeButton.setEnabled(False) self.subscribePropertiesButton.setIcon(EricPixmapCache.getIcon("listSelection")) self.subscribePropertiesButton.setEnabled(False) self.subscribePropertiesButton.setVisible(False) self.unsubscribeButton.setIcon(EricPixmapCache.getIcon("minus")) self.unsubscribeButton.setEnabled(False) self.unsubscribePropertiesButton.setIcon( EricPixmapCache.getIcon("listSelection") ) self.unsubscribePropertiesButton.setEnabled(False) self.unsubscribePropertiesButton.setVisible(False) self.__initPropertiesEditMenu() self.__subscribedTopics = [] self.__topicQueue = {} self.__updateUnsubscribeTopicComboBox() self.__publishedTopics = [] self.__updatePublishTopicComboBox() self.publishButton.setEnabled(False) self.publishPropertiesButton.setIcon(EricPixmapCache.getIcon("listSelection")) self.publishPropertiesButton.setEnabled(False) self.publishPropertiesButton.setVisible(False) self.__connectionOptions = None prefix = MqttMonitorWidget.BrokerStatusTopicPrefix self.__statusLabelMapping = { # broker prefix + "version": self.versionLabel, prefix + "timestamp": self.timestampLabel, prefix + "uptime": self.uptimeLabel, prefix + "subscriptions/count": self.subscriptionsLabel, # clients prefix + "clients/connected": self.clientsConnectedLabel, prefix + "clients/disconnected": self.clientsDisconnectedLabel, prefix + "clients/expired": self.clientsExpiredLabel, prefix + "clients/maximum": self.clientsMaximumLabel, prefix + "clients/total": self.clientsTotalLabel, # messages prefix + "messages/sent": self.messagesSentLabel, prefix + "messages/received": self.messagesReceivedLabel, prefix + "messages/stored": self.messagesStoredLabel, prefix + "store/messages/count": self.messagesStoredLabel, prefix + "messages/inflight": self.messagesInflightLabel, prefix + "retained messages/count": self.messagesRetainedLabel, # publish messages prefix + "publish/messages/sent": self.publishMessagesSentLabel, prefix + "publish/messages/received": self.publishMessagesReceivedLabel, prefix + "publish/messages/dropped": self.publishMessagesDroppedLabel, # traffic prefix + "bytes/sent": self.bytesSentLabel, prefix + "bytes/received": self.bytesReceivedLabel, # load prefix + "load/bytes/sent": self.loadBytesSentLabel, prefix + "load/bytes/received": self.loadBytesReceivedLabel, prefix + "load/messages/sent": self.loadMessagesSentLabel, prefix + "load/messages/received": self.loadMessagesReceivedLabel, prefix + "load/publish/sent": self.loadPublishSentLabel, prefix + "load/publish/received": self.loadPublishReceivedLabel, prefix + "load/publish/dropped": self.loadPublishDroppedLabel, prefix + "load/connections": self.loadConnectionsLabel, prefix + "load/sockets": self.loadSocketsLabel, } self.__statusLoadValues = collections.defaultdict(self.__loadDefaultDictFactory) def __createClient(self, clientId="", cleanSession=None, protocol=None): """ Private method to instantiate a MQTT client for a given protocol. @param clientId ID to be used for the client @type str @param cleanSession flag indicating to start a clean session @type bool @param protocol MQTT protocol version to be used (defaults to None) @type MqttProtocols or int (optional) @return created and connected MQTT client object @rtype MqttClient """ if protocol is None: protocol = self.__plugin.getPreferences("DefaultProtocol") client = MqttClient( clientId=clientId, cleanSession=cleanSession, protocol=protocol ) # connect the MQTT client signals client.onConnectV3.connect(self.__brokerConnected) client.onConnectV5.connect(self.__brokerConnected) client.onDisconnectedV3.connect(self.__brokerDisconnected) client.onDisconnectedV5.connect(self.__brokerDisconnected) client.onLog.connect(self.__clientLog) client.onMessageV3.connect(self.__messageReceived) client.onMessageV5.connect(self.__messageReceived) client.onPublish.connect(self.__messagePublished) client.onSubscribeV3.connect(self.__topicSubscribed) client.onSubscribeV5.connect(self.__topicSubscribedV5) client.onUnsubscribeV3.connect(self.__topicUnsubscribed) client.onUnsubscribeV5.connect(self.__topicUnsubscribedV5) client.connectTimeout.connect(self.__connectTimeout) return client def __initPropertiesEditMenu(self): """ Private method to create the properties output context menu. """ self.__propertiesEditMenu = QMenu(self) self.__copyPropertiesAct = self.__propertiesEditMenu.addAction( EricPixmapCache.getIcon("editCopy"), self.tr("Copy"), self.propertiesEdit.copy, ) self.__propertiesEditMenu.addSeparator() self.__selectAllPropertiesAct = self.__propertiesEditMenu.addAction( EricPixmapCache.getIcon("editSelectAll"), self.tr("Select All"), self.propertiesEdit.selectAll, ) self.__propertiesEditMenu.addSeparator() self.__clearPropertiesAct = self.__propertiesEditMenu.addAction( EricPixmapCache.getIcon("editDelete"), self.tr("Clear"), self.propertiesEdit.clear, ) self.propertiesEdit.copyAvailable.connect(self.__copyPropertiesAct.setEnabled) self.__copyPropertiesAct.setEnabled(False) ####################################################################### ## Slots handling MQTT related signals ####################################################################### @pyqtSlot(dict, int) @pyqtSlot(dict, int, int, dict) def __brokerConnected(self, flags, rc, packetType=None, properties=None): """ Private slot to handle being connected to a broker. @param flags flags set for the connection @type dict @param rc CONNACK result code or MQTTv5 reason code @type int @param packetType packet type as reported by the client @type int @param properties dictionary containing the received connection properties @type dict """ self.brokerStatusLabel.hide() if rc == 0: self.__connectedToBroker = True self.__connectionOptions = None try: sessionPresent = flags["session present"] == 1 except KeyError: sessionPresent = False msg = ( mqttReasonCode(rc, packetType) if packetType is not None else mqttConnackMessage(rc) ) if sessionPresent: msg = self.tr("{0} - Session still present").format(msg) self.__flashBrokerStatusLabel(msg) if properties: self.__showProperties("Connect", properties) self.connectButton.setEnabled(True) if rc == 0: self.__connectedToBroker = True self.__connectionOptions = None self.connectButton.setIcon(EricPixmapCache.getIcon("ircDisconnect")) self.subscribeGroup.setEnabled(True) self.subscribePropertiesButton.setVisible( self.__client.getProtocol() == MqttProtocols.MQTTv5 ) self.unsubscribeGroup.setEnabled(True) self.unsubscribePropertiesButton.setVisible( self.__client.getProtocol() == MqttProtocols.MQTTv5 ) self.publishGroup.setEnabled(True) self.brokerStatusButton.setEnabled(True) self.publishPropertiesButton.setVisible( self.__client.getProtocol() == MqttProtocols.MQTTv5 ) self.__statusLoadValues.clear() self.__clearBrokerStatusLabels() self.__setBrokerStatusSubscribed(False) else: self.__client.stopLoop() @pyqtSlot() def __connectTimeout(self): """ Private slot handling a timeout during a connection attempt. """ self.__flashBrokerStatusLabel(self.tr("Connection timed out")) self.__setConnectButtonState() @pyqtSlot(int) @pyqtSlot(int, int) def __brokerDisconnected(self, rc, packetType=None): """ Private slot to handle a disconnection from the broker. @param rc MQTT error result code @type int @param packetType packet type as reported by the client @type int """ self.__connectedToBroker = False # ensure, the client loop is stopped self.__client.stopLoop() msg = ( # MQTT v5 mqttReasonCode(rc, packetType) if packetType is not None else # MQTT v3 ( mqttErrorMessage(rc) if rc > 0 else self.tr("Connection to Broker shut down cleanly.") ) ) self.__flashBrokerStatusLabel(msg) self.connectButton.setIcon(EricPixmapCache.getIcon("ircConnect")) self.__setConnectButtonState() self.__subscribedTopics = [] self.__topicQueue = {} self.__updateUnsubscribeTopicComboBox() self.__updatePublishTopicComboBox() self.subscribeGroup.setEnabled(False) self.subscribePropertiesButton.setVisible(False) self.unsubscribeGroup.setEnabled(False) self.unsubscribePropertiesButton.setVisible(False) self.publishGroup.setEnabled(False) self.publishPropertiesButton.setVisible(False) self.brokerStatusButton.setEnabled(False) self.__statusLoadValues.clear() @pyqtSlot(int, str) def __clientLog(self, level, message): """ Private slot to handle the receipt of a log message. @param level log level @type int @param message log message @type str """ with contextlib.suppress(KeyError): if MqttClient.LogLevelMap[level] < self.logLevelComboBox.itemData( self.logLevelComboBox.currentIndex() ): return scrollbarValue = self.logEdit.verticalScrollBar().value() textCursor = self.logEdit.textCursor() if not self.logEdit.document().isEmpty(): textCursor.movePosition(QTextCursor.MoveOperation.End) self.logEdit.setTextCursor(textCursor) self.logEdit.insertPlainText("\n") textBlockFormat = textCursor.blockFormat() try: textBlockFormat.setBackground( self.__logMessagesBackgrounds[MqttClient.LogLevelMap[level]] ) except KeyError: textBlockFormat.setBackground( self.__logMessagesBackgrounds[MqttClient.LogDisabled] ) textCursor.setBlockFormat(textBlockFormat) textCursor.movePosition(QTextCursor.MoveOperation.End) self.logEdit.setTextCursor(textCursor) txt = self.tr("{0}: {1}").format(mqttLogLevelString(level), message) self.logEdit.insertPlainText(Utilities.filterAnsiSequences(txt)) if self.followLogMessagesCheckBox.isChecked(): self.logEdit.ensureCursorVisible() else: self.logEdit.verticalScrollBar().setValue(scrollbarValue) @pyqtSlot(str, bytes, int, bool) @pyqtSlot(str, bytes, int, bool, dict) def __messageReceived(self, topic, payload, qos, retain, properties=None): """ Private slot to handle the receipt of a message. @param topic topic of the message @type str @param payload payload (i.e. data) of the message @type bytes @param qos quality of service indicator @type int @param retain flag indicating a retained message @type bool @param properties properties sent with the message (MQTT v5) @type Properties """ if topic.startswith(MqttMonitorWidget.BrokerStatusTopicPrefix): # handle broker status messages self.__handleBrokerStatusMessage(topic, payload) else: self.__appendMessage(topic, payload, qos, retain, properties=properties) @pyqtSlot(int) def __messagePublished(self, mid): """ Private slot to handle a message being published. @param mid ID of the published message @type int """ # nothing to show for this pass @pyqtSlot(int) def __topicSubscribed(self, mid): """ Private slot to handle being subscribed to topics (MQTT v3.1, MQTT v3.1.1). @param mid ID of the subscribe request @type int """ if mid in self.__topicQueue: topic = self.__topicQueue.pop(mid) self.__subscribedTopics.append(topic) self.__addTopicToRecent(topic) self.__updateUnsubscribeTopicComboBox() self.__updatePublishTopicComboBox() @pyqtSlot(int, list, dict) def __topicSubscribedV5(self, mid, reasonCodes, properties): """ Private slot to handle being subscribed to topics (MQTT v5). @param mid ID of the subscribe request @type int @param reasonCodes list of reason codes, one for each topic @type list of ReasonCodes @param properties dictionary containing the received subscribe properties @type dict """ msg = mqttReasonCode(reasonCodes[0].value, reasonCodes[0].packetType) self.__flashBrokerStatusLabel(msg) if properties: self.__showProperties("Subscribe", properties) self.__topicSubscribed(mid) @pyqtSlot(int) def __topicUnsubscribed(self, mid): """ Private slot to handle being unsubcribed from a topic (MQTT v3.1, MQTT v3.1.1). @param mid ID of the unsubscribe request @type int """ if mid in self.__topicQueue: topic = self.__topicQueue.pop(mid) with contextlib.suppress(ValueError): self.__subscribedTopics.remove(topic) self.__updateUnsubscribeTopicComboBox() self.__updatePublishTopicComboBox() @pyqtSlot(int, int, int, dict) def __topicUnsubscribedV5(self, mid, rc, packetType, properties): """ Private slot to handle being unsubscribed to topics (MQTT v5). @param mid ID of the subscribe request @type int @param rc MQTTv5 reason code @type int @param packetType packet type as reported by the client @type int @param properties dictionary containing the received subscribe properties @type dict """ msg = mqttReasonCode(rc, packetType) self.__flashBrokerStatusLabel(msg) if properties: self.__showProperties("Unsubscribe", properties) self.__topicUnsubscribed(mid) ####################################################################### ## Slots handling UI interactions ####################################################################### @pyqtSlot() def __flashBrokerStatusLabel(self, message): """ Private slot to show the broker status label with some text for 5 seconds. @param message message to be shown @type str """ self.brokerStatusLabel.setText(message) self.brokerStatusLabel.show() QTimer.singleShot(5000, self.brokerStatusLabel.hide) @pyqtSlot() def on_modeButton_clicked(self): """ Private slot to switch between connection profiles and direct connection mode. """ self.__setConnectionMode(not self.__connectionModeProfile) @pyqtSlot(str) def on_profileComboBox_currentIndexChanged(self, profileName): """ Private slot handling the change of the selected profile. @param profileName name of the selected profile @type str """ self.__setConnectButtonState() @pyqtSlot(str) def on_brokerComboBox_editTextChanged(self, host): """ Private slot to handling entering or selecting a broker host name. @param host host name of the broker @type str """ self.__setConnectButtonState() @pyqtSlot() def on_brokerConnectionOptionsButton_clicked(self): """ Private slot to show a dialog to modify connection options or a dialog to edit connection profiles. """ if self.__connectionModeProfile: from .MqttConnectionProfilesDialog import MqttConnectionProfilesDialog profileName = self.profileComboBox.currentText() dlg = MqttConnectionProfilesDialog( self.__plugin.getPreferences("BrokerProfiles"), currentProfile=profileName, parent=self, ) if dlg.exec() == QDialog.DialogCode.Accepted: profilesDict = dlg.getProfiles() self.__plugin.setPreferences("BrokerProfiles", profilesDict) self.__populateProfileComboBox() else: from .MqttConnectionOptionsDialog import MqttConnectionOptionsDialog dlg = MqttConnectionOptionsDialog(self.__connectionOptions, parent=self) if dlg.exec() == QDialog.DialogCode.Accepted: self.__connectionOptions = dlg.getConnectionOptions() if self.__connectionOptions["TlsEnable"]: port = self.brokerPortComboBox.currentText().strip() if port == "1883": # it is default non-encrypted port => set to TLS port self.brokerPortComboBox.setEditText("8883") else: port = self.brokerPortComboBox.currentText().strip() if port == "8883": # it is default TLS port => set to non-encrypted port self.brokerPortComboBox.setEditText("1883") @pyqtSlot() def on_connectButton_clicked(self): """ Private slot to handle a connect or disconnect request. """ if self.__connectedToBroker: self.__client.disconnectFromServer() else: if self.__connectionModeProfile: self.__profileConnectToBroker() else: self.__directConnectToBroker() @pyqtSlot() def on_subscribePropertiesButton_clicked(self): """ Private slot to edit the subscribe user properties. """ topic = self.subscribeTopicComboBox.currentText() self.__editProperties( "subscribe", self.tr("SUBSCRIBE: User Properties for '{0}'").format(topic), topic, ) @pyqtSlot(str) def on_subscribeTopicComboBox_editTextChanged(self, topic): """ Private slot to handle a change of the entered topic. @param topic entered topic text @type str """ self.subscribeButton.setEnabled(bool(topic)) self.subscribePropertiesButton.setEnabled(bool(topic)) @pyqtSlot() def on_subscribeButton_clicked(self): """ Private slot to subscribe to the entered topic. """ topic = self.subscribeTopicComboBox.currentText() qos = self.subscribeQosSpinBox.value() if topic: if topic.startswith(MqttMonitorWidget.BrokerStatusTopicPrefix): EricMessageBox.warning( self, self.tr("Subscribe to Topic"), self.tr( "Subscriptions to the Status topic '$SYS' shall" " be done on the 'Status' tab." ), ) else: properties = ( self.__plugin.getPreferences("SubscribeProperties").get(topic, []) if self.__client.getProtocol() == MqttProtocols.MQTTv5 else None ) result, mid = self.__client.subscribe( topic, qos=qos, properties=properties ) self.__topicQueue[mid] = topic @pyqtSlot() def on_unsubscribePropertiesButton_clicked(self): """ Private slot to edit the unsubscribe user properties. """ topic = self.unsubscribeTopicComboBox.currentText() self.__editProperties( "unsubscribe", self.tr("UNSUBSCRIBE: User Properties for '{0}'").format(topic), topic, ) @pyqtSlot(str) def on_unsubscribeTopicComboBox_currentIndexChanged(self, topic): """ Private slot to handle the selection of a topic to unsubscribe from. @param topic topic text @type str """ self.unsubscribeButton.setEnabled(bool(topic)) self.unsubscribePropertiesButton.setEnabled(bool(topic)) @pyqtSlot() def on_unsubscribeButton_clicked(self): """ Private slot to unsubscribe from the selected topic. """ topic = self.unsubscribeTopicComboBox.currentText() if topic: properties = ( self.__plugin.getPreferences("SubscribeProperties").get(topic, []) if self.__client.getProtocol() == MqttProtocols.MQTTv5 else None ) result, mid = self.__client.unsubscribe(topic, properties=properties) self.__topicQueue[mid] = topic @pyqtSlot() def on_publishPropertiesButton_clicked(self): """ Private slot to edit the publish user properties. """ topic = self.publishTopicComboBox.currentText() self.__editProperties( "publish", self.tr("PUBLISH: User Properties for '{0}'").format(topic), topic, ) @pyqtSlot(str) def on_publishTopicComboBox_editTextChanged(self, topic): """ Private slot to handle changes of the publish topic name. @param topic topic text @type str """ self.publishButton.setEnabled(bool(topic)) self.publishPropertiesButton.setEnabled(bool(topic)) @pyqtSlot() def on_publishButton_clicked(self): """ Private slot to publish the entered message. """ topic = self.publishTopicComboBox.currentText() qos = self.publishQosSpinBox.value() retain = self.publishRetainCheckBox.isChecked() payloadFile = self.publishPayloadFilePicker.text() if ( bool(payloadFile) and os.path.exists(payloadFile) and os.path.getsize(payloadFile) <= 268435455 ): # payload size limit is 268,435,455 bytes try: with open(payloadFile, "rb") as f: payloadStr = f.read() except EnvironmentError as err: EricMessageBox.critical( self, self.tr("Read Payload from File"), self.tr( """<p>The file <b>{0}</b> could not be read.""" """ Aborting...</p><p>Reason: {1}</p>""" ).format(payloadFile, str(err)), ) return else: payloadStr = self.publishPayloadEdit.toPlainText() if not payloadStr: # use empty string together with the retain flag to clean # a retained message by sending None instead payloadStr = None properties = ( self.__plugin.getPreferences("PublishProperties").get(topic, []) if self.__client.getProtocol() == MqttProtocols.MQTTv5 else None ) msgInfo = self.__client.publish( topic, payload=payloadStr, qos=qos, retain=retain, properties=properties ) if msgInfo.rc == 0: if topic not in self.__publishedTopics: self.__publishedTopics.append(topic) self.__updatePublishTopicComboBox(resetTopic=False) if self.clearPublishCheckBox.isChecked(): self.on_publishClearButton_clicked() @pyqtSlot() def on_publishClearRetainedButton_clicked(self): """ Private slot to clear the retained messages for the topic. """ topic = self.publishTopicComboBox.currentText() properties = ( self.__plugin.getPreferences("PublishProperties").get(topic, []) if self.__client.getProtocol() == MqttProtocols.MQTTv5 else None ) msgInfo = self.__client.publish( topic, payload=None, retain=True, properties=properties ) if msgInfo.rc == 0: if topic not in self.__publishedTopics: self.__publishedTopics.append(topic) self.__updatePublishTopicComboBox(resetTopic=False) @pyqtSlot() def on_publishClearButton_clicked(self): """ Private slot to clear the publish data fields. """ self.publishTopicComboBox.clearEditText() self.publishPayloadEdit.clear() self.publishQosSpinBox.setValue(0) self.publishRetainCheckBox.setChecked(False) self.publishPayloadFilePicker.clear() @pyqtSlot(str) def on_publishPayloadFilePicker_textChanged(self, path): """ Private slot handling a change of path of the payload file. @param path path of the payload file @type str """ self.publishPayloadEdit.setEnabled(not bool(path)) @pyqtSlot(QPoint) def on_propertiesEdit_customContextMenuRequested(self, pos): """ Private slot to show the context menu for the properties output. @param pos the position of the mouse pointer @type QPoint """ self.__propertiesEditMenu.popup(self.propertiesEdit.mapToGlobal(pos)) @pyqtSlot() def on_brokerStatusButton_clicked(self): """ Private slot to subscribe or unsubscribe the broker status topic. """ if self.__brokerStatusTopicSubscribed: # unsubscribe status topic rc, _ = self.__client.unsubscribe(MqttMonitorWidget.BrokerStatusTopic) if rc == 0: # successfully sent self.__setBrokerStatusSubscribed(False) else: # subscribe status topic rc, _ = self.__client.subscribe(MqttMonitorWidget.BrokerStatusTopic) if rc == 0: # successfully sent self.__setBrokerStatusSubscribed(True) @pyqtSlot(int) def on_messagesEdit_blockCountChanged(self, newBlockCount): """ Private slot handling changes of received messages. @param newBlockCount (ignored) @type int """ enable = not self.messagesEdit.document().isEmpty() self.saveMessagesButton.setEnabled(enable) self.clearMessagesButton.setEnabled(enable) @pyqtSlot() def on_saveMessagesButton_clicked(self): """ Private slot to save the received messages. """ fn, selectedFilter = EricFileDialog.getSaveFileNameAndFilter( self, self.tr("Save Messages"), "", self.tr("Messages Files (*.txt);;All Files (*)"), "", EricFileDialog.DontConfirmOverwrite, ) if fn: if fn.endswith("."): fn = fn[:-1] ext = QFileInfo(fn).suffix() if not ext: ex = selectedFilter.split("(*")[1].split(")")[0] if ex: fn += ex if QFileInfo(fn).exists(): res = EricMessageBox.yesNo( self, self.tr("Save Messages"), self.tr( "<p>The file <b>{0}</b> already exists." " Overwrite it?</p>" ).format(fn), icon=EricMessageBox.Warning, ) if not res: return fn = Utilities.toNativeSeparators(fn) try: with open(fn, "w") as f: f.write(self.messagesEdit.toPlainText()) except EnvironmentError as err: EricMessageBox.critical( self, self.tr("Save Messages"), self.tr( """<p>The file <b>{0}</b> could not be written.""" """</p><p>Reason: {1}</p>""" ).format(fn, str(err)), ) @pyqtSlot(int) def on_logEdit_blockCountChanged(self, newBlockCount): """ Private slot handling changes of received messages. @param newBlockCount (ignored) @type int """ enable = not self.logEdit.document().isEmpty() self.saveLogMessagesButton.setEnabled(enable) self.clearLogMessagesButton.setEnabled(enable) @pyqtSlot() def on_saveLogMessagesButton_clicked(self): """ Private slot to save the log messages. """ fn, selectedFilter = EricFileDialog.getSaveFileNameAndFilter( self, self.tr("Save Log Messages"), "", self.tr("Log Files (*.log);;All Files (*)"), "", EricFileDialog.DontConfirmOverwrite, ) if fn: if fn.endswith("."): fn = fn[:-1] ext = QFileInfo(fn).suffix() if not ext: ex = selectedFilter.split("(*")[1].split(")")[0] if ex: fn += ex if QFileInfo(fn).exists(): res = EricMessageBox.yesNo( self, self.tr("Save Log Messages"), self.tr( "<p>The file <b>{0}</b> already exists." " Overwrite it?</p>" ).format(fn), icon=EricMessageBox.Warning, ) if not res: return fn = Utilities.toNativeSeparators(fn) try: with open(fn, "w") as f: f.write(self.logEdit.toPlainText()) except EnvironmentError as err: EricMessageBox.critical( self, self.tr("Save Log Messages"), self.tr( """<p>The file <b>{0}</b> could not be written.""" """</p><p>Reason: {1}</p>""" ).format(fn, str(err)), ) ####################################################################### ## Utility methods ####################################################################### def __setBrokerStatusSubscribed(self, subscribed): """ Private method to set the subscription status for the broker status topics. @param subscribed subscription status for the broker status topics @type bool """ self.__brokerStatusTopicSubscribed = subscribed if subscribed: self.brokerStatusButton.setText(self.tr("Unsubscribe")) self.brokerStatusButton.setToolTip( self.tr("Press to deactivate the status display") ) else: self.brokerStatusButton.setText(self.tr("Subscribe")) self.brokerStatusButton.setToolTip( self.tr("Press to activate the status display") ) def __addBrokerToRecent(self, host, port): """ Private method to add a host name to the list of recently connected brokers. @param host host name of broker @type str @param port port number of the connection @type int """ brokerList = self.__plugin.getPreferences("RecentBrokersWithPort") hostAndPort = [host, port] if hostAndPort in brokerList: brokerList.remove(hostAndPort) brokerList.insert(0, hostAndPort) # limit the most recently used entries maxBrokers = self.__plugin.getPreferences("RecentBrokersNumber") brokerList = brokerList[:maxBrokers] self.__plugin.setPreferences("RecentBrokersWithPort", brokerList) self.__populateBrokerComboBoxes() def __populateBrokerComboBoxes(self): """ Private method to populate the broker name and port combo boxes. """ brokerPortList = self.__plugin.getPreferences("RecentBrokersWithPort") # step 1: clear combo boxes self.brokerComboBox.clear() self.brokerPortComboBox.clear() # step 2a: populate the broker name list currentBroker = brokerPortList[0][0] if brokerPortList else "" brokerSet = {b[0].strip() for b in brokerPortList} self.brokerComboBox.addItems(sorted(brokerSet)) index = self.brokerComboBox.findText(currentBroker) self.brokerComboBox.setCurrentIndex(index) # step 2b: populate the broker ports list currentPort = brokerPortList[0][1] if brokerPortList else 1883 currentPortStr = "{0:5}".format(currentPort) portsSet = {b[1] for b in brokerPortList} portsSet.update({1883, 8883}) self.brokerPortComboBox.addItems(sorted("{0:5}".format(p) for p in portsSet)) index = self.brokerPortComboBox.findText(currentPortStr) self.brokerPortComboBox.setCurrentIndex(index) # step 3: update the connect button state self.__setConnectButtonState() def __populateProfileComboBox(self): """ Private method to populate the profiles selection box. """ profilesDict = self.__plugin.getPreferences("BrokerProfiles") mostRecentProfile = self.__plugin.getPreferences("MostRecentProfile") self.profileComboBox.clear() self.profileComboBox.addItems(sorted(profilesDict.keys())) if mostRecentProfile: index = self.profileComboBox.findText(mostRecentProfile) if index >= 0: self.profileComboBox.setCurrentIndex(index) self.__setConnectButtonState() def __addTopicToRecent(self, topic): """ Private method to add a topic to the list of recently subscribed topics. @param topic subscribed topic @type str """ topicsList = self.__plugin.getPreferences("RecentTopics") if topic in topicsList: topicsList.remove(topic) topicsList.insert(0, topic) # limit the most recently used entries maxTopics = self.__plugin.getPreferences("RecentTopicsNumber") topicsList = topicsList[:maxTopics] self.__plugin.setPreferences("RecentTopics", topicsList) self.__populateSubscribeTopicComboBox() def __populateSubscribeTopicComboBox(self): """ Private method to populate the subscribe topic combo box. """ topicsList = self.__plugin.getPreferences("RecentTopics") self.subscribeTopicComboBox.clear() self.subscribeTopicComboBox.addItems(sorted(topicsList)) self.subscribeTopicComboBox.clearEditText() def __updateUnsubscribeTopicComboBox(self): """ Private method to update the unsubcribe topic combo box. """ self.unsubscribeTopicComboBox.clear() self.unsubscribeTopicComboBox.addItems(sorted(self.__subscribedTopics)) self.unsubscribeButton.setEnabled(bool(self.__subscribedTopics)) self.unsubscribePropertiesButton.setEnabled(bool(self.__subscribedTopics)) def __updatePublishTopicComboBox(self, resetTopic=True): """ Private method to update the publish topic combo box. @param resetTopic flag indicating to reset the topic @type bool """ currentTopic = self.publishTopicComboBox.currentText() self.publishTopicComboBox.clear() self.publishTopicComboBox.addItems( list(set(self.__publishedTopics + self.__subscribedTopics)) ) if resetTopic: self.publishTopicComboBox.clearEditText() else: topicIndex = self.publishTopicComboBox.findText(currentTopic) self.publishTopicComboBox.setCurrentIndex(topicIndex) def __appendMessage(self, topic, payload, qos, retain, properties=None): """ Private method to append a received message to the output. @param topic topic of the received message @type str @param payload payload of the received message @type bytes @param qos quality of service indicator (0, 1, 2) @type int @param retain flag indicating a retained message @type bool @param properties properties sent with the message (MQTT v5) @type dict """ scrollbarValue = self.messagesEdit.verticalScrollBar().value() textCursor = self.messagesEdit.textCursor() if not self.messagesEdit.document().isEmpty(): textCursor.movePosition(QTextCursor.MoveOperation.End) self.messagesEdit.setTextCursor(textCursor) self.messagesEdit.insertPlainText("\n") textBlockFormat = textCursor.blockFormat() if self.__isMessageAlternate: textBlockFormat.setBackground(self.messagesEdit.palette().alternateBase()) else: textBlockFormat.setBackground(self.messagesEdit.palette().base()) textCursor.setBlockFormat(textBlockFormat) textCursor.movePosition(QTextCursor.MoveOperation.End) self.messagesEdit.setTextCursor(textCursor) self.messagesEdit.setCurrentCharFormat(self.__messagesTopicFormat) self.messagesEdit.insertPlainText(topic + "\n") self.messagesEdit.setCurrentCharFormat(self.__messagesQosFormat) self.messagesEdit.insertPlainText(self.tr("QoS: {0}\n").format(qos)) if retain: self.messagesEdit.setCurrentCharFormat(self.__messagesQosFormat) self.messagesEdit.insertPlainText(self.tr("Retained Message\n")) if properties: self.messagesEdit.setCurrentCharFormat(self.__messagesSubheaderFormat) self.messagesEdit.insertPlainText(self.tr("Properties:\n")) self.messagesEdit.setCurrentCharFormat(self.__messagesFormat) for name, value in sorted(properties.items()): self.messagesEdit.insertPlainText( self.tr("{0}: {1}\n", "property name, property value").format( name, value ) ) self.messagesEdit.setCurrentCharFormat(self.__messagesSubheaderFormat) self.messagesEdit.insertPlainText(self.tr("Message:\n")) payloadStr = str(payload, encoding="utf-8", errors="replace") payloadStr = Utilities.filterAnsiSequences(payloadStr) self.messagesEdit.setCurrentCharFormat(self.__messagesFormat) if payloadStr: self.messagesEdit.insertPlainText(payloadStr) else: self.messagesEdit.insertPlainText(self.tr("<empty>")) if self.followMessagesCheckBox.isChecked(): self.messagesEdit.ensureCursorVisible() else: self.messagesEdit.verticalScrollBar().setValue(scrollbarValue) self.__isMessageAlternate = not self.__isMessageAlternate def __handleBrokerStatusMessage(self, topic, payload): """ Private method to handle a status message of the broker. @param topic topic of the received message @type str @param payload payload of the received message @type bytes """ payloadStr = str(payload, encoding="utf-8", errors="replace").strip() topic = topic.strip() if topic.startswith(MqttMonitorWidget.BrokerStatusTopicLoadPrefix): self.__handleBrokerLoadStatusMessage(topic, payloadStr) else: with contextlib.suppress(KeyError): label = self.__statusLabelMapping[topic] label.setText(payloadStr) def __handleBrokerLoadStatusMessage(self, topic, payloadStr): """ Private method to append a received message to the output. @param topic topic of the received message @type str @param payloadStr string representation of the payload of the received message @type str """ subtopic, topicElement = topic.rsplit("/", 1) self.__statusLoadValues[subtopic][topicElement] = payloadStr with contextlib.suppress(KeyError): label = self.__statusLabelMapping[subtopic] label.setText( "{0} / {1} / {2}".format( self.__statusLoadValues[subtopic]["1min"], self.__statusLoadValues[subtopic]["5min"], self.__statusLoadValues[subtopic]["15min"], ) ) def __clearBrokerStatusLabels(self): """ Private method to clear the broker status labels. """ for statusLabelKey in self.__statusLabelMapping: label = ( "- / - / -" if statusLabelKey.startswith( MqttMonitorWidget.BrokerStatusTopicLoadPrefix ) else "-" ) self.__statusLabelMapping[statusLabelKey].setText(label) def __loadDefaultDictFactory(self): """ Private method to populate non-existing load items. @return default dictionary entry @rtype dict """ return { "1min": "-", "5min": "-", "15min": "-", } def __setConnectionMode(self, profileMode): """ Private method to set the connection mode. @param profileMode flag indicating the profile connection mode @type bool """ self.__connectionModeProfile = profileMode if profileMode: self.modeButton.setIcon( EricPixmapCache.getIcon( os.path.join( "MqttMonitor", "icons", "profiles-{0}".format(self.__iconSuffix) ) ) ) else: self.modeButton.setIcon( EricPixmapCache.getIcon( os.path.join( "MqttMonitor", "icons", "quickopen-{0}".format(self.__iconSuffix), ) ) ) self.profileComboBox.setVisible(profileMode) self.brokerConnectionWidget.setVisible(not profileMode) self.__setConnectButtonState() def __setConnectButtonState(self): """ Private method to set the enabled state of the connect button. """ if self.__connectionModeProfile: self.connectButton.setEnabled(bool(self.profileComboBox.currentText())) else: self.connectButton.setEnabled(bool(self.brokerComboBox.currentText())) def __directConnectToBroker(self): """ Private method to connect to the broker with entered data. """ host = self.brokerComboBox.currentText() port = self.brokerPortComboBox.currentText().strip() try: port = int(port) except ValueError: # use standard port at 1883 port = 1883 if host: self.brokerStatusLabel.setText( self.tr("Connecting to {0}:{1} ...").format(host, port) ) self.brokerStatusLabel.show() self.__addBrokerToRecent(host, port) self.connectButton.setEnabled(False) if self.clearWillButton.isChecked(): clearWill = True self.clearWillButton.setChecked(False) else: clearWill = False if self.__connectionOptions is None: self.__client = self.__createClient() self.__client.connectToServer(host, port=port, clearWill=clearWill) else: self.__client = self.__createClient( clientId=self.__connectionOptions["ClientId"], cleanSession=self.__connectionOptions["CleanSession"], protocol=self.__connectionOptions["Protocol"], ) self.__client.connectToServerWithOptions( host, port=port, options=self.__connectionOptions, clearWill=clearWill, ) def __profileConnectToBroker(self): """ Private method to connect to the broker with selected profile. """ profileName = self.profileComboBox.currentText() if profileName: self.__plugin.setPreferences("MostRecentProfile", profileName) profilesDict = self.__plugin.getPreferences("BrokerProfiles") connectionProfile = copy.deepcopy(profilesDict[profileName]) host = connectionProfile["BrokerAddress"] port = connectionProfile["BrokerPort"] try: protocol = connectionProfile["Protocol"] except KeyError: protocol = MqttProtocols( self.__plugin.getPreferences("DefaultProtocol") ) self.brokerStatusLabel.setText( self.tr("Connecting to {0}:{1} ...").format(host, port) ) self.brokerStatusLabel.show() self.connectButton.setEnabled(False) if self.clearWillButton.isChecked(): clearWill = True self.clearWillButton.setChecked(False) else: clearWill = False self.__client = self.__createClient( clientId=connectionProfile["ClientId"], cleanSession=connectionProfile["CleanSession"], protocol=protocol, ) self.__client.connectToServerWithOptions( host, port=port, options=connectionProfile, clearWill=clearWill ) def __showProperties(self, typeStr, properties): """ Private method to display the received properties in the properties pane. @param typeStr message type @type str @param properties dictionary containing the relevant properties @type dict """ textCursor = self.propertiesEdit.textCursor() if not self.propertiesEdit.document().isEmpty(): textCursor.movePosition(QTextCursor.MoveOperation.End) self.propertiesEdit.setTextCursor(textCursor) textBlockFormat = textCursor.blockFormat() if self.__isPropertiesAlternate: textBlockFormat.setBackground(self.propertiesEdit.palette().alternateBase()) else: textBlockFormat.setBackground(self.propertiesEdit.palette().base()) textCursor.setBlockFormat(textBlockFormat) textCursor.movePosition(QTextCursor.MoveOperation.End) self.propertiesEdit.setTextCursor(textCursor) self.propertiesEdit.setCurrentCharFormat(self.__propertiesTopicFormat) self.propertiesEdit.insertPlainText(typeStr + "\n") for name, value in sorted(properties.items()): self.propertiesEdit.setCurrentCharFormat(self.__propertiesNameFormat) self.propertiesEdit.insertPlainText("{0}: ".format(name)) self.propertiesEdit.setCurrentCharFormat(self.__propertiesFormat) self.propertiesEdit.insertPlainText("{0}\n".format(str(value))) self.propertiesEdit.ensureCursorVisible() self.__isPropertiesAlternate = not self.__isPropertiesAlternate def __editProperties(self, propertiesType, header, key): """ Private method to edit user properties of a given type. @param propertiesType properties type (one of 'subscribe', 'unsubscribe', 'publish') @type str @param header header to be shown in the edit dialog @type str @param key key to retrieve the right properties @type str """ from .MqttUserPropertiesEditor import MqttUserPropertiesEditorDialog preferencesKey = "{0}Properties".format(propertiesType.capitalize()) properties = self.__plugin.getPreferences(preferencesKey) dlg = MqttUserPropertiesEditorDialog(header, properties.get(key, []), self) if dlg.exec() == QDialog.DialogCode.Accepted: properties[key] = dlg.getProperties() self.__plugin.setPreferences(preferencesKey, properties)