Wed, 21 Jul 2021 20:10:36 +0200
Started implementing support for MQTT v5 user properties.
# -*- coding: utf-8 -*- # Copyright (c) 2018 - 2021 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the MQTT Monitor plug-in. """ import os import json from PyQt6.QtCore import Qt, QObject, QTranslator, QCoreApplication from PyQt6.QtGui import QKeySequence from EricWidgets.EricApplication import ericApp from EricGui.EricAction import EricAction import UI.PixmapCache import Preferences # Start-Of-Header name = "MQTT Monitor Plugin" author = "Detlev Offenbach <detlev@die-offenbachs.de>" autoactivate = True deactivateable = True version = "1.0.1" className = "MqttMonitorPlugin" packageName = "MqttMonitor" shortDescription = "Plug-in implementing a tool to connect to a MQTT broker" longDescription = ( """Plug-in implementing a tool to connect to a MQTT broker, subscribe""" """ to topics, present received messages and publish messages.""" """ It displays broker statistics (the $SYS/# topic tree) and""" """ log messages of the underlying paho-mqtt client. This tool""" """ supports unencrypted connections (port 1883) as well as encrypted""" """ SSL/TLS connections (port 8883).""" ) needsRestart = False pyqtApi = 2 # End-Of-Header error = "" def exeDisplayData(): """ Module function to support the display of some executable info. @return dictionary containing the data to query the presence of the executable @rtype dict """ try: import paho.mqtt version = paho.mqtt.__version__ except ImportError: version = QCoreApplication.translate( "MqttMonitorPlugin", "(package not available)") return { "programEntry": False, "header": QCoreApplication.translate("MqttMonitorPlugin", "MQTT"), "text": QCoreApplication.translate("MqttMonitorPlugin", "paho-mqtt"), "version": version, } def prepareUninstall(): """ Module function to prepare for an uninstallation. """ Preferences.Prefs.settings.remove(MqttMonitorPlugin.PreferencesKey) class MqttMonitorPlugin(QObject): """ Class implementing the MQTT Monitor plug-in. """ PreferencesKey = "MqttMonitor" def __init__(self, ui): """ Constructor @param ui reference to the user interface object @type UI.UserInterface """ super().__init__(ui) self.__ui = ui self.__initialize() self.__defaults = { "RecentBrokersWithPort": "[]", # JSON formatted empty list "BrokerProfiles": "{}", # JSON formatted empty dict # __IGNORE_WARNING_M613__ "MostRecentProfile": "", # most recently used profile "SubscribeProperties": "{}", # JSON formatted empty dict # __IGNORE_WARNING_M613__ "UnsubscribeProperties": "{}", # JSON formatted empty dict # __IGNORE_WARNING_M613__ } self.__translator = None self.__loadTranslator() def __initialize(self): """ Private slot to (re)initialize the plugin. """ self.__widget = None def activate(self): """ Public method to activate this plug-in. @return tuple of None and activation status @rtype tuple of (None, bool) """ global error error = "" # clear previous error try: import paho.mqtt # __IGNORE_WARNING__ except ImportError: error = self.tr("The 'paho-mqtt' package is not available.") return None, False from MqttMonitor.MqttMonitorWidget import MqttMonitorWidget usesDarkPalette = ericApp().usesDarkPalette() iconSuffix = "dark" if usesDarkPalette else "light" self.__widget = MqttMonitorWidget(self, usesDarkPalette) self.__ui.addSideWidget( self.__ui.RightSide, self.__widget, UI.PixmapCache.getIcon( os.path.join("MqttMonitor", "icons", "mqtt22-{0}".format(iconSuffix))), self.tr("MQTT Monitor")) self.__activateAct = EricAction( self.tr('MQTT Monitor'), self.tr('M&QTT Monitor'), QKeySequence(self.tr("Alt+Shift+Q")), 0, self, 'mqtt_monitor_activate') self.__activateAct.setStatusTip(self.tr( "Switch the input focus to the MQTT Monitor window.")) self.__activateAct.setWhatsThis(self.tr( """<b>Activate MQTT Monitor</b>""" """<p>This switches the input focus to the MQTT Monitor""" """ window.</p>""" )) self.__activateAct.triggered.connect(self.__activateWidget) self.__ui.addEricActions([self.__activateAct], 'ui') menu = self.__ui.getMenu("subwindow") menu.addAction(self.__activateAct) return None, True def deactivate(self): """ Public method to deactivate this plug-in. """ menu = self.__ui.getMenu("subwindow") menu.removeAction(self.__activateAct) self.__ui.removeEricActions([self.__activateAct], 'ui') self.__ui.removeSideWidget(self.__widget) self.__initialize() def __loadTranslator(self): """ Private method to load the translation file. """ if self.__ui is not None: loc = self.__ui.getLocale() if loc and loc != "C": locale_dir = os.path.join( os.path.dirname(__file__), "MqttMonitor", "i18n") translation = "mqttmonitor_{0}".format(loc) translator = QTranslator(None) loaded = translator.load(translation, locale_dir) if loaded: self.__translator = translator ericApp().installTranslator(self.__translator) else: print("Warning: translation file '{0}' could not be" " loaded.".format(translation)) print("Using default.") def __activateWidget(self): """ Private slot to handle the activation of the MQTT Monitor. """ uiLayoutType = self.__ui.getLayoutType() if uiLayoutType == "Toolboxes": self.__ui.rToolboxDock.show() self.__ui.rToolbox.setCurrentWidget(self.__widget) elif uiLayoutType == "Sidebars": self.__ui.rightSidebar.show() self.__ui.rightSidebar.setCurrentWidget(self.__widget) else: self.__widget.show() self.__widget.setFocus(Qt.FocusReason.ActiveWindowFocusReason) def getPreferences(self, key): """ Public method to retrieve the various settings. @param key key of the setting @type str @return value of the requested setting @rtype Any """ if key in ["RecentBrokersWithPort", "BrokerProfiles", "SubscribeProperties", "UnsubscribeProperties"]: return json.loads(Preferences.Prefs.settings.value( self.PreferencesKey + "/" + key, self.__defaults[key])) else: return Preferences.Prefs.settings.value( self.PreferencesKey + "/" + key, self.__defaults[key]) def setPreferences(self, key, value): """ Public method to store the various settings. @param key key of the setting to be set @type str @param value value to be set @type Any """ if key in ["RecentBrokersWithPort", "BrokerProfiles", "SubscribeProperties", "UnsubscribeProperties"]: Preferences.Prefs.settings.setValue( self.PreferencesKey + "/" + key, json.dumps(value)) else: Preferences.Prefs.settings.setValue( self.PreferencesKey + "/" + key, value) def installDependencies(pipInstall): """ Function to install dependencies of this plug-in. @param pipInstall function to be called with a list of package names. @type function """ try: import paho.mqtt # __IGNORE_WARNING__ except ImportError: pipInstall(["paho-mqtt"]) # # eflag: noqa = M801