src/eric7/MicroPython/MicroPythonSerialPort.py

branch
eric7
changeset 9209
b99e7fd55fd3
parent 8881
54e42bc2437a
child 9221
bf71ee032bb4
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/eric7/MicroPython/MicroPythonSerialPort.py	Thu Jul 07 11:23:56 2022 +0200
@@ -0,0 +1,124 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2019 - 2022 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing a QSerialPort with additional functionality for
+MicroPython devices.
+"""
+
+from PyQt6.QtCore import QIODevice, QTime, QCoreApplication, QEventLoop
+from PyQt6.QtSerialPort import QSerialPort
+
+
+class MicroPythonSerialPort(QSerialPort):
+    """
+    Class implementing a QSerialPort with additional functionality for
+    MicroPython devices.
+    """
+    def __init__(self, timeout=10000, parent=None):
+        """
+        Constructor
+        
+        @param timeout timout in milliseconds to be set
+        @type int
+        @param parent reference to the parent object
+        @type QObject
+        """
+        super().__init__(parent)
+        
+        self.__connected = False
+        self.__timeout = timeout      # 10s default timeout
+        self.__timedOut = False
+    
+    def setTimeout(self, timeout):
+        """
+        Public method to set the timeout for device operations.
+        
+        @param timeout timout in milliseconds to be set
+        @type int
+        """
+        self.__timeout = timeout
+    
+    def openSerialLink(self, port):
+        """
+        Public method to open a serial link to a given serial port.
+        
+        @param port port name to connect to
+        @type str
+        @return flag indicating success
+        @rtype bool
+        """
+        self.setPortName(port)
+        if self.open(QIODevice.OpenModeFlag.ReadWrite):
+            self.setDataTerminalReady(True)
+            # 115.200 baud, 8N1
+            self.setBaudRate(115200)
+            self.setDataBits(QSerialPort.DataBits.Data8)
+            self.setParity(QSerialPort.Parity.NoParity)
+            self.setStopBits(QSerialPort.StopBits.OneStop)
+            
+            self.__connected = True
+            return True
+        else:
+            return False
+    
+    def closeSerialLink(self):
+        """
+        Public method to close the open serial connection.
+        """
+        if self.__connected:
+            self.close()
+            
+            self.__connected = False
+    
+    def isConnected(self):
+        """
+        Public method to get the connection state.
+        
+        @return flag indicating the connection state
+        @rtype bool
+        """
+        return self.__connected
+    
+    def hasTimedOut(self):
+        """
+        Public method to check, if the last 'readUntil' has timed out.
+        
+        @return flag indicating a timeout
+        @@rtype bool
+        """
+        return self.__timedOut
+    
+    def readUntil(self, expected=b"\n", size=None):
+        r"""
+        Public method to read data until an expected sequence is found
+        (default: \n) or a specific size is exceeded.
+        
+        @param expected expected bytes sequence
+        @type bytes
+        @param size maximum data to be read
+        @type int
+        @return bytes read from the device including the expected sequence
+        @rtype bytes
+        """
+        data = bytearray()
+        self.__timedOut = False
+        
+        t = QTime.currentTime()
+        while True:
+            QCoreApplication.processEvents(
+                QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents)
+            c = bytes(self.read(1))
+            if c:
+                data += c
+                if data.endswith(expected):
+                    break
+                if size is not None and len(data) >= size:
+                    break
+            if t.msecsTo(QTime.currentTime()) > self.__timeout:
+                self.__timedOut = True
+                break
+        
+        return bytes(data)

eric ide

mercurial