src/eric7/MicroPython/MicroPythonSerialPort.py

Sat, 23 Dec 2023 15:48:12 +0100

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sat, 23 Dec 2023 15:48:12 +0100
branch
eric7
changeset 10439
21c28b0f9e41
parent 10428
a071d4065202
child 11090
f5f5f5803935
permissions
-rw-r--r--

Updated copyright for 2024.

# -*- coding: utf-8 -*-

# Copyright (c) 2019 - 2024 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing a QSerialPort with additional functionality for
MicroPython devices.
"""

from PyQt6.QtCore import QCoreApplication, QEventLoop, QIODevice, QTime
from PyQt6.QtSerialPort import QSerialPort


class MicroPythonSerialPort(QSerialPort):
    """
    Class implementing a QSerialPort with additional functionality for
    MicroPython devices.
    """

    def __init__(self, timeout=10000, parent=None):
        """
        Constructor

        @param timeout timout in milliseconds to be set
        @type int
        @param parent reference to the parent object
        @type QObject
        """
        super().__init__(parent)

        self.__connected = False
        self.__timeout = timeout  # 10s default timeout
        self.__timedOut = False

    def setTimeout(self, timeout):
        """
        Public method to set the timeout for device operations.

        @param timeout timout in milliseconds to be set
        @type int
        """
        self.__timeout = timeout

    def openSerialLink(self, port):
        """
        Public method to open a serial link to a given serial port.

        @param port port name to connect to
        @type str
        @return flag indicating success and an error message
        @rtype tuple of (bool, str)
        """
        self.setPortName(port)
        if self.open(QIODevice.OpenModeFlag.ReadWrite):
            self.setDataTerminalReady(True)
            # 115.200 baud, 8N1
            self.setBaudRate(115200)
            self.setDataBits(QSerialPort.DataBits.Data8)
            self.setParity(QSerialPort.Parity.NoParity)
            self.setStopBits(QSerialPort.StopBits.OneStop)

            self.__connected = True
            return True, ""
        else:
            return False, self.errorString()

    def closeSerialLink(self):
        """
        Public method to close the open serial connection.
        """
        if self.__connected:
            self.close()

            self.__connected = False

    def isConnected(self):
        """
        Public method to get the connection state.

        @return flag indicating the connection state
        @rtype bool
        """
        return self.__connected

    def hasTimedOut(self):
        """
        Public method to check, if the last 'readUntil' has timed out.

        @return flag indicating a timeout
        @rtype bool
        """
        return self.__timedOut

    def readUntil(self, expected=b"\n", size=None, timeout=0):
        r"""
        Public method to read data until an expected sequence is found
        (default: \n) or a specific size is exceeded.

        @param expected expected bytes sequence
        @type bytes
        @param size maximum data to be read (defaults to None)
        @type int (optional)
        @param timeout timeout in milliseconds (0 for configured default)
            (defaults to 0)
        @type int (optional)
        @return bytes read from the device including the expected sequence
        @rtype bytes
        """
        data = bytearray()
        self.__timedOut = False

        if timeout == 0:
            timeout = self.__timeout

        t = QTime.currentTime()
        while True:
            QCoreApplication.processEvents(
                QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents
            )
            c = bytes(self.read(1))
            if c:
                data += c
                if data.endswith(expected):
                    break
                if size is not None and len(data) >= size:
                    break
            if t.msecsTo(QTime.currentTime()) > timeout:
                self.__timedOut = True
                break

        return bytes(data)

eric ide

mercurial