--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/eric7/MicroPython/MicroPythonDeviceInterface.py Tue Feb 14 18:10:30 2023 +0100 @@ -0,0 +1,305 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2019 - 2023 Detlev Offenbach <detlev@die-offenbachs.de> +# + +""" +Module implementing some file system commands for MicroPython. +""" + +from PyQt6.QtCore import ( + QCoreApplication, + QEventLoop, + QObject, + QThread, + QTimer, + pyqtSignal, + pyqtSlot, +) + +from eric7 import Preferences + +from .MicroPythonSerialPort import MicroPythonSerialPort + + +class MicroPythonDeviceInterface(QObject): + """ + Class implementing some file system commands for MicroPython. + + Commands are provided to perform operations on the file system of a + connected MicroPython device. Supported commands are: + <ul> + <li>ls: directory listing</li> + <li>lls: directory listing with meta data</li> + <li>cd: change directory</li> + <li>pwd: get the current directory</li> + <li>put: copy a file to the connected device</li> + <li>putData: write data to a file of the connected device</li> + <li>get: get a file from the connected device</li> + <li>getData: read data of a file of the connected device</li> + <li>rm: remove a file from the connected device</li> + <li>rmrf: remove a file/directory recursively (like 'rm -rf' in bash) + <li>mkdir: create a new directory</li> + <li>rmdir: remove an empty directory</li> + <li>fileSystemInfo: get information about the file system + </ul> + + There are additional non file systemcommands. + <ul> + <li>getBoardData: get information about the connected board</li> + <li>getDeviceData: get version info about MicroPython and some implementation + information</li> + <li>getModules: get a list of built-in modules</li> + <li>getTime: get the current time</li> + <li>syncTime: synchronize the time of the connected device</li> + <li>showTime: show the current time of the connected device</li> + </ul> + + @signal executeAsyncFinished() emitted to indicate the end of an + asynchronously executed list of commands (e.g. a script) + @signal dataReceived(data) emitted to send data received via the serial + connection for further processing + """ + + executeAsyncFinished = pyqtSignal() + dataReceived = pyqtSignal(bytes) + + def __init__(self, parent=None): + """ + Constructor + + @param parent reference to the parent object + @type QObject + """ + super().__init__(parent) + + self.__repl = parent + + self.__blockReadyRead = False + + self.__serial = MicroPythonSerialPort( + timeout=Preferences.getMicroPython("SerialTimeout"), parent=self + ) + self.__serial.readyRead.connect(self.__readSerial) + + @pyqtSlot() + def __readSerial(self): + """ + Private slot to read all available serial data and emit it with the + "dataReceived" signal for further processing. + """ + if not self.__blockReadyRead: + data = bytes(self.__serial.readAll()) + self.dataReceived.emit(data) + + @pyqtSlot() + def connectToDevice(self, port): + """ + Public slot to start the manager. + + @param port name of the port to be used + @type str + @return flag indicating success + @rtype bool + """ + return self.__serial.openSerialLink(port) + + @pyqtSlot() + def disconnectFromDevice(self): + """ + Public slot to stop the thread. + """ + self.__serial.closeSerialLink() + + def isConnected(self): + """ + Public method to get the connection status. + + @return flag indicating the connection status + @rtype bool + """ + return self.__serial.isConnected() + + @pyqtSlot() + def handlePreferencesChanged(self): + """ + Public slot to handle a change of the preferences. + """ + self.__serial.setTimeout(Preferences.getMicroPython("SerialTimeout")) + + def write(self, data): + """ + Public method to write data to the connected device. + + @param data data to be written + @type bytes or bytearray + """ + self.__serial.isConnected() and self.__serial.write(data) + + def __rawOn(self): + """ + Private method to switch the connected device to 'raw' mode. + + Note: switching to raw mode is done with synchronous writes. + + @return flag indicating success + @@rtype bool + """ + if not self.__serial: + return False + + rawReplMessage = b"raw REPL; CTRL-B to exit\r\n>" + + self.__serial.write(b"\x02") # end raw mode if required + written = self.__serial.waitForBytesWritten(500) + # time out after 500ms if device is not responding + if not written: + return False + for _i in range(3): + # CTRL-C three times to break out of loops + self.__serial.write(b"\r\x03") + written = self.__serial.waitForBytesWritten(500) + # time out after 500ms if device is not responding + if not written: + return False + QThread.msleep(10) + self.__serial.readAll() # read all data and discard it + self.__serial.write(b"\r\x01") # send CTRL-A to enter raw mode + self.__serial.readUntil(rawReplMessage) + if self.__serial.hasTimedOut(): + # it timed out; try it again and than fail + self.__serial.write(b"\r\x01") # send CTRL-A again + self.__serial.readUntil(rawReplMessage) + if self.__serial.hasTimedOut(): + return False + + QCoreApplication.processEvents( + QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents + ) + self.__serial.readAll() # read all data and discard it + return True + + def __rawOff(self): + """ + Private method to switch 'raw' mode off. + """ + if self.__serial: + self.__serial.write(b"\x02") # send CTRL-B to cancel raw mode + self.__serial.readUntil(b">>> ") # read until Python prompt + self.__serial.readAll() # read all data and discard it + + def probeDevice(self): + """ + Public method to check the device is responding. + + If the device has not been flashed with a MicroPython formware, the + probe will fail. + + @return flag indicating a communicating MicroPython device + @rtype bool + """ + if not self.__serial: + return False + + if not self.__serial.isConnected(): + return False + + # switch on raw mode + self.__blockReadyRead = True + ok = self.__rawOn() + if not ok: + self.__blockReadyRead = False + return False + + # switch off raw mode + QThread.msleep(10) + self.__rawOff() + self.__blockReadyRead = False + + return True + + def execute(self, commands): + """ + Public method to send commands to the connected device and return the + result. + + If no serial connection is available, empty results will be returned. + + @param commands list of commands to be executed + @type str or list of str + @return tuple containing stdout and stderr output of the device + @rtype tuple of (bytes, bytes) + """ + if not self.__serial: + return b"", b"" + + if not self.__serial.isConnected(): + return b"", b"Device not connected or not switched on." + + result = bytearray() + err = b"" + + if isinstance(commands, str): + commands = [commands] + + # switch on raw mode + self.__blockReadyRead = True + ok = self.__rawOn() + if not ok: + self.__blockReadyRead = False + return (b"", b"Could not switch to raw mode. Is the device switched on?") + + # send commands + QThread.msleep(10) + for command in commands: + if command: + commandBytes = command.encode("utf-8") + self.__serial.write(commandBytes + b"\x04") + QCoreApplication.processEvents( + QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents + ) + ok = self.__serial.readUntil(b"OK") + if ok != b"OK": + return ( + b"", + "Expected 'OK', got '{0}', followed by '{1}'".format( + ok, self.__serial.readAll() + ).encode("utf-8"), + ) + + # read until prompt + response = self.__serial.readUntil(b"\x04>") + if self.__serial.hasTimedOut(): + self.__blockReadyRead = False + return b"", b"Timeout while processing commands." + if b"\x04" in response[:-2]: + # split stdout, stderr + out, err = response[:-2].split(b"\x04") + result += out + else: + err = b"invalid response received: " + response + if err: + result = b"" + break + + # switch off raw mode + QThread.msleep(10) + self.__rawOff() + self.__blockReadyRead = False + + return bytes(result), err + + def executeAsync(self, commandsList): + """ + Public method to execute a series of commands over a period of time + without returning any result (asynchronous execution). + + @param commandsList list of commands to be execute on the device + @type list of bytes + """ + if commandsList: + command = commandsList.pop(0) + self.__serial.write(command) + QTimer.singleShot(2, lambda: self.executeAsync(commandsList)) + else: + self.executeAsyncFinished.emit()