Sat, 20 Jul 2019 14:48:09 +0200
Started to implement the device file system interface.
# -*- coding: utf-8 -*- # Copyright (c) 2019 Detlev Offenbach <detlev@die-offenbachs.de> # """ Module implementing a QSerialPort with additional functionality for MicroPython devices. """ from __future__ import unicode_literals from PyQt5.QtCore import QIODevice, QTime from PyQt5.QtSerialPort import QSerialPort class MicroPythonSerialPort(QSerialPort): """ Class implementing a QSerialPort with additional functionality for MicroPython devices. """ def __init__(self, timeout=10000, parent=None): """ Constructor @param timeout timout in milliseconds to be set @type int @param parent reference to the parent object @type QObject """ super(MicroPythonSerialPort, self).__init__(parent) self.__connected = False self.__timeout = timeout # 10s default timeout def setTimeout(self, timeout): """ Public method to set the timeout for device operations. @param timeout timout in milliseconds to be set @type int """ self.__timeout = timeout def openSerialLink(self, port): """ Public method to open a serial link to a given serial port. @param port port name to connect to @type str @return flag indicating success @rtype bool """ self.setPortName(port) if self.open(QIODevice.ReadWrite): self.setDataTerminalReady(True) # 115.200 baud, 8N1 self.setBaudRate(115200) self.setDataBits(QSerialPort.Data8) self.setParity(QSerialPort.NoParity) self.setStopBits(QSerialPort.OneStop) self.__connected = True return True else: return False def closeSerialLink(self): """ Public method to close the open serial connection. """ if self.__connceted: self.close() self.__connected = False def isConnected(self): """ Public method to get the connection state. @return flag indicating the connection state @rtype bool """ return self.__connected def readUntil(self, expected=b"\n", size=None): r""" Public method to read data until an expected sequence is found (default: \n) or a specific size is exceeded. @param expected expected bytes sequence @type bytes @param size maximum data to be read @type int @return bytes read from the device including the expected sequence @rtype bytes """ from PyQt5.QtCore import QCoreApplication, QEventLoop data = bytearray() t = QTime() t.start() while True: # TODO: check if this is still needed when used with a QThread QCoreApplication.processEvents(QEventLoop.ExcludeUserInputEvents) ## if self.waitForReadyRead(self.__timeout): c = bytes(self.read(1)) if c: data += c if data.endswith(expected): break if size is not None and len(data) >= size: break # else: # break if t.elapsed() > self.__timeout: break ## else: ## break return bytes(data)