Thu, 11 Jul 2019 19:48:14 +0200
MicroPythonDevices: fixed an issue disconnecting from the device.
# -*- coding: utf-8 -*- # Copyright (c) 2019 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing the MicroPython REPL widget. """ from __future__ import unicode_literals import re from PyQt5.QtCore import ( pyqtSlot, pyqtSignal, Qt, QPoint, QEvent, QIODevice, QTimer ) from PyQt5.QtGui import QColor, QKeySequence, QTextCursor from PyQt5.QtWidgets import ( QWidget, QMenu, QApplication, QHBoxLayout, QSpacerItem, QSizePolicy) try: from PyQt5.QtSerialPort import QSerialPort HAS_QTSERIALPORT = True except ImportError: HAS_QTSERIALPORT = False try: from PyQt5.QtChart import QChart # __IGNORE_WARNING__ HAS_QTCHART = True except ImportError: HAS_QTCHART = False from E5Gui.E5ZoomWidget import E5ZoomWidget from E5Gui import E5MessageBox, E5FileDialog from E5Gui.E5Application import e5App from .Ui_MicroPythonReplWidget import Ui_MicroPythonReplWidget from . import MicroPythonDevices import Globals import UI.PixmapCache class MicroPythonReplWidget(QWidget, Ui_MicroPythonReplWidget): """ Class implementing the MicroPython REPL widget. @signal dataReceived(data) emitted to send data received via the serial connection for further processing """ ZoomMin = -10 ZoomMax = 20 DeviceTypeRole = Qt.UserRole DevicePortRole = Qt.UserRole + 1 dataReceived = pyqtSignal(bytes) def __init__(self, parent=None): """ Constructor @param parent reference to the parent widget @type QWidget """ super(MicroPythonReplWidget, self).__init__(parent) self.setupUi(self) self.deviceIconLabel.setPixmap(MicroPythonDevices.getDeviceIcon( "", False)) self.openButton.setIcon(UI.PixmapCache.getIcon("open")) self.saveButton.setIcon(UI.PixmapCache.getIcon("fileSaveAs")) self.checkButton.setIcon(UI.PixmapCache.getIcon("question")) self.runButton.setIcon(UI.PixmapCache.getIcon("start")) self.replButton.setIcon(UI.PixmapCache.getIcon("terminal")) self.filesButton.setIcon(UI.PixmapCache.getIcon("filemanager")) self.chartButton.setIcon(UI.PixmapCache.getIcon("chart")) self.disconnectButton.setIcon(UI.PixmapCache.getIcon("disconnect")) self.__zoomLayout = QHBoxLayout() spacerItem = QSpacerItem(40, 20, QSizePolicy.Expanding, QSizePolicy.Minimum) self.__zoomLayout.addSpacerItem(spacerItem) self.__zoom0 = self.replEdit.fontPointSize() self.__zoomWidget = E5ZoomWidget( UI.PixmapCache.getPixmap("zoomOut"), UI.PixmapCache.getPixmap("zoomIn"), UI.PixmapCache.getPixmap("zoomReset"), self) self.__zoomLayout.addWidget(self.__zoomWidget) self.layout().insertLayout( self.layout().count() - 1, self.__zoomLayout) self.__zoomWidget.setMinimum(self.ZoomMin) self.__zoomWidget.setMaximum(self.ZoomMax) self.__zoomWidget.valueChanged.connect(self.__doZoom) self.__currentZoom = 0 self.__serial = None self.__device = None self.setConnected(False) self.__replRunning = False self.__plotterRunning = False self.__fileManagerRunning = False if not HAS_QTSERIALPORT: self.replEdit.setHtml(self.tr( "<h3>The QtSerialPort package is not available.<br/>" "MicroPython support is deactivated.</h3>")) self.setEnabled(False) return self.__vt100Re = re.compile( r'(?P<count>[\d]*)(;?[\d]*)*(?P<action>[ABCDKm])') self.__populateDeviceTypeComboBox() self.replEdit.setAcceptRichText(False) self.replEdit.setUndoRedoEnabled(False) self.replEdit.setContextMenuPolicy(Qt.CustomContextMenu) self.replEdit.installEventFilter(self) self.replEdit.customContextMenuRequested.connect( self.__showContextMenu) def __populateDeviceTypeComboBox(self): """ Private method to populate the device type selector. """ self.deviceTypeComboBox.clear() self.deviceInfoLabel.clear() self.deviceTypeComboBox.addItem("", "") devices = MicroPythonDevices.getFoundDevices() if devices: self.deviceInfoLabel.setText( self.tr("%n supported device(s) detected.", n=len(devices))) index = 0 for device in sorted(devices): index += 1 self.deviceTypeComboBox.addItem( self.tr("{0} at {1}".format(device[1], device[2]))) self.deviceTypeComboBox.setItemData( index, device[0], self.DeviceTypeRole) self.deviceTypeComboBox.setItemData( index, device[2], self.DevicePortRole) else: self.deviceInfoLabel.setText( self.tr("No supported devices detected.")) self.on_deviceTypeComboBox_activated(0) @pyqtSlot(int) def on_deviceTypeComboBox_activated(self, index): """ Private slot handling the selection of a device type. @param index index of the selected device @type int """ deviceType = self.deviceTypeComboBox.itemData( index, self.DeviceTypeRole) self.deviceIconLabel.setPixmap(MicroPythonDevices.getDeviceIcon( deviceType, False)) self.__device = MicroPythonDevices.getDevice(deviceType, self) self.__device.setButtons() @pyqtSlot() def on_checkButton_clicked(self): """ Private slot to check for connected devices. """ self.__populateDeviceTypeComboBox() def setActionButtons(self, **kwargs): """ Public method to set the enabled state of the various action buttons. @param kwargs keyword arguments containg the enabled states (keys are 'run', 'repl', 'files', 'chart', 'open', 'save' @type dict """ if "open" in kwargs: self.openButton.setEnabled(kwargs["open"]) if "save" in kwargs: self.saveButton.setEnabled(kwargs["save"]) if "run" in kwargs: self.runButton.setEnabled(kwargs["run"]) if "repl" in kwargs: self.replButton.setEnabled(kwargs["repl"]) if "files" in kwargs: self.filesButton.setEnabled(kwargs["files"]) if "chart" in kwargs: self.chartButton.setEnabled(kwargs["chart"]) @pyqtSlot(QPoint) def __showContextMenu(self, pos): """ Privat slot to show the REPL context menu. @param pos position to show the menu at @type QPoint """ if Globals.isMacPlatform(): copyKeys = QKeySequence(Qt.CTRL + Qt.Key_C) pasteKeys = QKeySequence(Qt.CTRL + Qt.Key_V) else: copyKeys = QKeySequence(Qt.CTRL + Qt.SHIFT + Qt.Key_C) pasteKeys = QKeySequence(Qt.CTRL + Qt.SHIFT + Qt.Key_V) menu = QMenu(self) menu.addAction(self.tr("Copy"), self.replEdit.copy, copyKeys) menu.addAction(self.tr("Paste"), self.__paste, pasteKeys) menu.exec_(self.replEdit.mapToGlobal(pos)) def setConnected(self, connected): """ Public method to set the connection status LED. @param connected connection state @type bool """ if connected: self.deviceConnectedLed.setColor(QColor(Qt.green)) else: self.deviceConnectedLed.setColor(QColor(Qt.red)) self.deviceTypeComboBox.setEnabled(not connected) self.disconnectButton.setEnabled(connected) def __showNoDeviceMessage(self): """ Private method to show a message dialog indicating a missing device. """ E5MessageBox.critical( self, self.tr("No device attached"), self.tr("""Please ensure the device is plugged into your""" """ computer and selected.\n\nIt must have a version""" """ of MicroPython (or CircuitPython) flashed onto""" """ it before anything will work.\n\nFinally press""" """ the device's reset button and wait a few seconds""" """ before trying again.""")) @pyqtSlot() def on_replButton_clicked(self): """ Private slot to connect to the selected device and start a REPL. """ if not self.__device: self.__showNoDeviceMessage() return if self.__replRunning: self.dataReceived.disconnect(self.__processData) self.__disconnect() self.__replRunning = False self.__device.setRepl(False) else: ok, reason = self.__device.canStartRepl() if not ok: E5MessageBox.warning( self, self.tr("Start REPL"), self.tr("""<p>The REPL cannot be started.</p><p>Reason:""" """ {0}</p>""").format(reason)) return if not self.__serial: self.replEdit.clear() self.dataReceived.connect(self.__processData) self.__openSerialLink() if self.__serial: if self.__device.forceInterrupt(): # send a Ctrl-B (exit raw mode) self.__serial.write(b'\x02') # send Ctrl-C (keyboard interrupt) self.__serial.write(b'\x03') self.__replRunning = True self.__device.setRepl(True) @pyqtSlot() def on_disconnectButton_clicked(self): """ Private slot to disconnect from the currently connected device. """ if self.__replRunning: self.on_replButton_clicked() # TODO: add more def __disconnect(self): """ Private slot to disconnect the serial connection. """ self.__closeSerialLink() self.setConnected(False) def __activatePlotter(self): """ Private method to activate a data plotter widget. """ # TODO: not implemented yet def __deactivatePlotter(self): """ Private method to deactivate the plotter widget. """ # TODO: not implemented yet @pyqtSlot() def __paste(self): """ Private slot to perform a paste operation. """ clipboard = QApplication.clipboard() if clipboard and clipboard.text(): pasteText = clipboard.text().replace('\n\r', '\r') pasteText = pasteText.replace('\n', '\r') self.__serial and self.__serial.write(pasteText.encode("utf-8")) def eventFilter(self, obj, evt): """ Public method to process events for the REPL pane. @param obj reference to the object the event was meant for @type QObject @param evt reference to the event object @type QEvent @return flag to indicate that the event was handled @rtype bool """ if obj is self.replEdit and evt.type() == QEvent.KeyPress: # handle the key press event on behalve of the REPL pane key = evt.key() msg = bytes(evt.text(), 'utf8') if key == Qt.Key_Backspace: msg = b'\b' elif key == Qt.Key_Delete: msg = b'\x1B[\x33\x7E' elif key == Qt.Key_Up: msg = b'\x1B[A' elif key == Qt.Key_Down: msg = b'\x1B[B' elif key == Qt.Key_Right: msg = b'\x1B[C' elif key == Qt.Key_Left: msg = b'\x1B[D' elif key == Qt.Key_Home: msg = b'\x1B[H' elif key == Qt.Key_End: msg = b'\x1B[F' elif ((Globals.isMacPlatform() and evt.modifiers() == Qt.MetaModifier) or (not Globals.isMacPlatform() and evt.modifiers() == Qt.ControlModifier)): if Qt.Key_A <= key <= Qt.Key_Z: # devices treat an input of \x01 as Ctrl+A, etc. msg = bytes([1 + key - Qt.Key_A]) elif (evt.modifiers() == Qt.ControlModifier | Qt.ShiftModifier or (Globals.isMacPlatform() and evt.modifiers() == Qt.ControlModifier)): if key == Qt.Key_C: self.replEdit.copy() msg = b'' elif key == Qt.Key_V: self.__paste() msg = b'' self.__serial and self.__serial.write(msg) return True else: # standard event processing return super(MicroPythonReplWidget, self).eventFilter(obj, evt) def __processData(self, data): """ Private slot to process bytes received from the device. @param data bytes received from the device @type bytes """ tc = self.replEdit.textCursor() # the text cursor must be on the last line while tc.movePosition(QTextCursor.Down): pass index = 0 while index < len(data): if data[index] == 8: # \b tc.movePosition(QTextCursor.Left) self.replEdit.setTextCursor(tc) elif data[index] == 13: # \r pass elif (len(data) > index + 1 and data[index] == 27 and data[index + 1] == 91): # VT100 cursor command detected: <Esc>[ index += 2 # move index to after the [ match = self.__vt100Re.search(data[index:].decode("utf-8")) if match: # move to last position in control sequence # ++ will be done at end of loop index += match.end() - 1 if match.group("count") == "": count = 1 else: count = int(match.group("count")) action = match.group("action") if action == "A": # up tc.movePosition(QTextCursor.Up, n=count) self.replEdit.setTextCursor(tc) elif action == "B": # down tc.movePosition(QTextCursor.Down, n=count) self.replEdit.setTextCursor(tc) elif action == "C": # right tc.movePosition(QTextCursor.Right, n=count) self.replEdit.setTextCursor(tc) elif action == "D": # left tc.movePosition(QTextCursor.Left, n=count) self.replEdit.setTextCursor(tc) elif action == "K": # delete things if match.group("count") == "": # delete to eol tc.movePosition(QTextCursor.EndOfLine, mode=QTextCursor.KeepAnchor) tc.removeSelectedText() self.replEdit.setTextCursor(tc) # TODO: add handling of 'm' (colors) elif data[index] == 10: # \n tc.movePosition(QTextCursor.End) self.replEdit.setTextCursor(tc) self.replEdit.insertPlainText(chr(data[index])) else: tc.deleteChar() self.replEdit.setTextCursor(tc) self.replEdit.insertPlainText(chr(data[index])) index += 1 self.replEdit.ensureCursorVisible() def __doZoom(self, value): """ Private slot to zoom the REPL pane. @param value zoom value @param int """ if value < self.__currentZoom: self.replEdit.zoomOut(self.__currentZoom - value) elif value > self.__currentZoom: self.replEdit.zoomIn(value - self.__currentZoom) self.__currentZoom = value def __getCurrentPort(self): """ Private method to determine the port path of the selected device. @return path of the port of the selected device @rtype str """ portName = self.deviceTypeComboBox.itemData( self.deviceTypeComboBox.currentIndex(), self.DevicePortRole) if Globals.isWindowsPlatform(): # return unchanged return portName else: # return with device path prepended return "/dev/{0}".format(portName) def __openSerialLink(self): """ Private method to open a serial link to the selected device. """ port = self.__getCurrentPort() self.__serial = QSerialPort() self.__serial.setPortName(port) if self.__serial.open(QIODevice.ReadWrite): self.__serial.setDataTerminalReady(True) # 115.200 baud, 8N1 self.__serial.setBaudRate(115200) self.__serial.setDataBits(QSerialPort.Data8) self.__serial.setParity(QSerialPort.NoParity) self.__serial.setStopBits(QSerialPort.OneStop) self.__serial.readyRead.connect(self.__readSerial) self.setConnected(True) else: E5MessageBox.warning( self, self.tr("Serial Device Connect"), self.tr("""<p>Cannot connect to device at serial port""" """ <b>{0}</b>.</p>""").format(port)) self.__serial = None def __closeSerialLink(self): """ Private method to close the open serial connection. """ if self.__serial: self.__serial.close() self.__serial = None @pyqtSlot() def __readSerial(self): """ Private slot to read all available serial data and emit it with the "dataReceived" signal for further processing. """ data = bytes(self.__serial.readAll()) self.dataReceived.emit(data) def execute(self, commandsList): """ Public method to execute a series of commands over a period of time. @param commandsList list of commands to be execute on the device @type list of bytes """ if commandsList: command = commandsList[0] self.__serial.write(command) remainder = commandsList[1:] remainingTask = lambda commands=remainder: self.execute(commands) QTimer.singleShot(2, remainingTask) @pyqtSlot() def on_runButton_clicked(self): """ Private slot to execute the script of the active editor on the selected device. """ if not self.__device: self.__showNoDeviceMessage() return aw = e5App().getObject("ViewManager").activeWindow() if aw is None: E5MessageBox.critical( self, self.tr("Run Script"), self.tr("""There is no editor open. Abort...""")) return script = aw.text() if not script: E5MessageBox.critical( self, self.tr("Run Script"), self.tr("""The current editor does not contain a script.""" """ Abort...""")) return ok, reason = self.__device.canRunScript() if not ok: E5MessageBox.warning( self, self.tr("Run Script"), self.tr("""<p>Cannot run script.</p><p>Reason:""" """ {0}</p>""").format(reason)) return if not self.__replRunning: self.on_replButton_clicked() if self.__replRunning: self.__device.runScript(script) @pyqtSlot() def on_openButton_clicked(self): """ Private slot to open a file of the connected device. """ if not self.__device: self.__showNoDeviceMessage() return workspace = self.__device.getWorkspace() fileName = E5FileDialog.getOpenFileName( self, self.tr("Open Python File"), workspace, self.tr("Python3 Files (*.py)")) if fileName: e5App().getObject("ViewManager").openSourceFile(fileName) @pyqtSlot() def on_saveButton_clicked(self): """ Private slot to save the current editor to the connected device. """ if not self.__device: self.__showNoDeviceMessage() return workspace = self.__device.getWorkspace() aw = e5App().getObject("ViewManager").activeWindow() aw.saveFileAs(workspace)