Tue, 14 Feb 2023 18:10:30 +0100
Reorganized the MicroPython code even more.
# -*- coding: utf-8 -*- # Copyright (c) 2019 - 2023 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing some file system commands for MicroPython. """ from PyQt6.QtCore import ( QCoreApplication, QEventLoop, QObject, QThread, QTimer, pyqtSignal, pyqtSlot, ) from eric7 import Preferences from .MicroPythonSerialPort import MicroPythonSerialPort class MicroPythonDeviceInterface(QObject): """ Class implementing some file system commands for MicroPython. Commands are provided to perform operations on the file system of a connected MicroPython device. Supported commands are: <ul> <li>ls: directory listing</li> <li>lls: directory listing with meta data</li> <li>cd: change directory</li> <li>pwd: get the current directory</li> <li>put: copy a file to the connected device</li> <li>putData: write data to a file of the connected device</li> <li>get: get a file from the connected device</li> <li>getData: read data of a file of the connected device</li> <li>rm: remove a file from the connected device</li> <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash) <li>mkdir: create a new directory</li> <li>rmdir: remove an empty directory</li> <li>fileSystemInfo: get information about the file system </ul> There are additional non file systemcommands. <ul> <li>getBoardData: get information about the connected board</li> <li>getDeviceData: get version info about MicroPython and some implementation information</li> <li>getModules: get a list of built-in modules</li> <li>getTime: get the current time</li> <li>syncTime: synchronize the time of the connected device</li> <li>showTime: show the current time of the connected device</li> </ul> @signal executeAsyncFinished() emitted to indicate the end of an asynchronously executed list of commands (e.g. a script) @signal dataReceived(data) emitted to send data received via the serial connection for further processing """ executeAsyncFinished = pyqtSignal() dataReceived = pyqtSignal(bytes) def __init__(self, parent=None): """ Constructor @param parent reference to the parent object @type QObject """ super().__init__(parent) self.__repl = parent self.__blockReadyRead = False self.__serial = MicroPythonSerialPort( timeout=Preferences.getMicroPython("SerialTimeout"), parent=self ) self.__serial.readyRead.connect(self.__readSerial) @pyqtSlot() def __readSerial(self): """ Private slot to read all available serial data and emit it with the "dataReceived" signal for further processing. """ if not self.__blockReadyRead: data = bytes(self.__serial.readAll()) self.dataReceived.emit(data) @pyqtSlot() def connectToDevice(self, port): """ Public slot to start the manager. @param port name of the port to be used @type str @return flag indicating success @rtype bool """ return self.__serial.openSerialLink(port) @pyqtSlot() def disconnectFromDevice(self): """ Public slot to stop the thread. """ self.__serial.closeSerialLink() def isConnected(self): """ Public method to get the connection status. @return flag indicating the connection status @rtype bool """ return self.__serial.isConnected() @pyqtSlot() def handlePreferencesChanged(self): """ Public slot to handle a change of the preferences. """ self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout")) def write(self, data): """ Public method to write data to the connected device. @param data data to be written @type bytes or bytearray """ self.__serial.isConnected() and self.__serial.write(data) def __rawOn(self): """ Private method to switch the connected device to 'raw' mode. Note: switching to raw mode is done with synchronous writes. @return flag indicating success @@rtype bool """ if not self.__serial: return False rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>" self.__serial.write(b"\x02") # end raw mode if required written = self.__serial.waitForBytesWritten(500) # time out after 500ms if device is not responding if not written: return False for _i in range(3): # CTRL-C three times to break out of loops self.__serial.write(b"\r\x03") written = self.__serial.waitForBytesWritten(500) # time out after 500ms if device is not responding if not written: return False QThread.msleep(10) self.__serial.readAll() # read all data and discard it self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode self.__serial.readUntil(rawReplMessage) if self.__serial.hasTimedOut(): # it timed out; try it again and than fail self.__serial.write(b"\r\x01") # send CTRL-A again self.__serial.readUntil(rawReplMessage) if self.__serial.hasTimedOut(): return False QCoreApplication.processEvents( QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents ) self.__serial.readAll() # read all data and discard it return True def __rawOff(self): """ Private method to switch 'raw' mode off. """ if self.__serial: self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode self.__serial.readUntil(b">>> ") # read until Python prompt self.__serial.readAll() # read all data and discard it def probeDevice(self): """ Public method to check the device is responding. If the device has not been flashed with a MicroPython formware, the probe will fail. @return flag indicating a communicating MicroPython device @rtype bool """ if not self.__serial: return False if not self.__serial.isConnected(): return False # switch on raw mode self.__blockReadyRead = True ok = self.__rawOn() if not ok: self.__blockReadyRead = False return False # switch off raw mode QThread.msleep(10) self.__rawOff() self.__blockReadyRead = False return True def execute(self, commands): """ Public method to send commands to the connected device and return the result. If no serial connection is available, empty results will be returned. @param commands list of commands to be executed @type str or list of str @return tuple containing stdout and stderr output of the device @rtype tuple of (bytes, bytes) """ if not self.__serial: return b"", b"" if not self.__serial.isConnected(): return b"", b"Device not connected or not switched on." result = bytearray() err = b"" if isinstance(commands, str): commands = [commands] # switch on raw mode self.__blockReadyRead = True ok = self.__rawOn() if not ok: self.__blockReadyRead = False return (b"", b"Could not switch to raw mode. Is the device switched on?") # send commands QThread.msleep(10) for command in commands: if command: commandBytes = command.encode("utf-8") self.__serial.write(commandBytes + b"\x04") QCoreApplication.processEvents( QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents ) ok = self.__serial.readUntil(b"OK") if ok != b"OK": return ( b"", "Expected 'OK', got '{0}', followed by '{1}'".format( ok, self.__serial.readAll() ).encode("utf-8"), ) # read until prompt response = self.__serial.readUntil(b"\x04>") if self.__serial.hasTimedOut(): self.__blockReadyRead = False return b"", b"Timeout while processing commands." if b"\x04" in response[:-2]: # split stdout, stderr out, err = response[:-2].split(b"\x04") result += out else: err = b"invalid response received: " + response if err: result = b"" break # switch off raw mode QThread.msleep(10) self.__rawOff() self.__blockReadyRead = False return bytes(result), err def executeAsync(self, commandsList): """ Public method to execute a series of commands over a period of time without returning any result (asynchronous execution). @param commandsList list of commands to be execute on the device @type list of bytes """ if commandsList: command = commandsList.pop(0) self.__serial.write(command) QTimer.singleShot(2, lambda: self.executeAsync(commandsList)) else: self.executeAsyncFinished.emit()