eric6/MicroPython/MicroPythonSerialPort.py

Sat, 20 Jul 2019 14:48:09 +0200

author
Detlev Offenbach <detlev@die-offenbachs.de>
date
Sat, 20 Jul 2019 14:48:09 +0200
branch
micropython
changeset 7070
3368ce0e7879
parent 7067
3fc4082fc6ba
child 7077
3b7475b7a1ef
permissions
-rw-r--r--

Started to implement the device file system interface.

# -*- coding: utf-8 -*-

# Copyright (c) 2019 Detlev Offenbach <detlev@die-offenbachs.de>
#

"""
Module implementing a QSerialPort with additional functionality for
MicroPython devices.
"""

from __future__ import unicode_literals

from PyQt5.QtCore import QIODevice, QTime
from PyQt5.QtSerialPort import QSerialPort


class MicroPythonSerialPort(QSerialPort):
    """
    Class implementing a QSerialPort with additional functionality for
    MicroPython devices.
    """
    def __init__(self, timeout=10000, parent=None):
        """
        Constructor
        
        @param timeout timout in milliseconds to be set
        @type int
        @param parent reference to the parent object
        @type QObject
        """
        super(MicroPythonSerialPort, self).__init__(parent)
        
        self.__connected = False
        self.__timeout = timeout      # 10s default timeout
    
    def setTimeout(self, timeout):
        """
        Public method to set the timeout for device operations.
        
        @param timeout timout in milliseconds to be set
        @type int
        """
        self.__timeout = timeout
    
    def openSerialLink(self, port):
        """
        Public method to open a serial link to a given serial port.
        
        @param port port name to connect to
        @type str
        @return flag indicating success
        @rtype bool
        """
        self.setPortName(port)
        if self.open(QIODevice.ReadWrite):
            self.setDataTerminalReady(True)
            # 115.200 baud, 8N1
            self.setBaudRate(115200)
            self.setDataBits(QSerialPort.Data8)
            self.setParity(QSerialPort.NoParity)
            self.setStopBits(QSerialPort.OneStop)
            
            self.__connected = True
            return True
        else:
            return False
    
    def closeSerialLink(self):
        """
        Public method to close the open serial connection.
        """
        if self.__connceted:
            self.close()
            
            self.__connected = False
    
    def isConnected(self):
        """
        Public method to get the connection state.
        
        @return flag indicating the connection state
        @rtype bool
        """
        return self.__connected
    
    def readUntil(self, expected=b"\n", size=None):
        r"""
        Public method to read data until an expected sequence is found
        (default: \n) or a specific size is exceeded.
        
        @param expected expected bytes sequence
        @type bytes
        @param size maximum data to be read
        @type int
        @return bytes read from the device including the expected sequence
        @rtype bytes
        """
        from PyQt5.QtCore import QCoreApplication, QEventLoop
        data = bytearray()
        
        t = QTime()
        t.start()
        while True:
            # TODO: check if this is still needed when used with a QThread
            QCoreApplication.processEvents(QEventLoop.ExcludeUserInputEvents)
##            if self.waitForReadyRead(self.__timeout):
            c = bytes(self.read(1))
            if c:
                data += c
                if data.endswith(expected):
                    break
                if size is not None and len(data) >= size:
                    break
#            else:
#                break
            if t.elapsed() > self.__timeout:
                break
##            else:
##                break
        
        return bytes(data)

eric ide

mercurial