eric7/MicroPython/MicroPythonSerialPort.py

branch
eric7
changeset 8312
800c432b34c8
parent 8218
7c09585bd960
child 8318
962bce857696
equal deleted inserted replaced
8311:4e8b98454baa 8312:800c432b34c8
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2019 - 2021 Detlev Offenbach <detlev@die-offenbachs.de>
4 #
5
6 """
7 Module implementing a QSerialPort with additional functionality for
8 MicroPython devices.
9 """
10
11 from PyQt5.QtCore import QIODevice, QTime, QCoreApplication, QEventLoop
12 from PyQt5.QtSerialPort import QSerialPort
13
14
15 class MicroPythonSerialPort(QSerialPort):
16 """
17 Class implementing a QSerialPort with additional functionality for
18 MicroPython devices.
19 """
20 def __init__(self, timeout=10000, parent=None):
21 """
22 Constructor
23
24 @param timeout timout in milliseconds to be set
25 @type int
26 @param parent reference to the parent object
27 @type QObject
28 """
29 super().__init__(parent)
30
31 self.__connected = False
32 self.__timeout = timeout # 10s default timeout
33 self.__timedOut = False
34
35 def setTimeout(self, timeout):
36 """
37 Public method to set the timeout for device operations.
38
39 @param timeout timout in milliseconds to be set
40 @type int
41 """
42 self.__timeout = timeout
43
44 def openSerialLink(self, port):
45 """
46 Public method to open a serial link to a given serial port.
47
48 @param port port name to connect to
49 @type str
50 @return flag indicating success
51 @rtype bool
52 """
53 self.setPortName(port)
54 if self.open(QIODevice.OpenModeFlag.ReadWrite):
55 self.setDataTerminalReady(True)
56 # 115.200 baud, 8N1
57 self.setBaudRate(115200)
58 self.setDataBits(QSerialPort.DataBits.Data8)
59 self.setParity(QSerialPort.Parity.NoParity)
60 self.setStopBits(QSerialPort.StopBits.OneStop)
61
62 self.__connected = True
63 return True
64 else:
65 return False
66
67 def closeSerialLink(self):
68 """
69 Public method to close the open serial connection.
70 """
71 if self.__connected:
72 self.close()
73
74 self.__connected = False
75
76 def isConnected(self):
77 """
78 Public method to get the connection state.
79
80 @return flag indicating the connection state
81 @rtype bool
82 """
83 return self.__connected
84
85 def hasTimedOut(self):
86 """
87 Public method to check, if the last 'readUntil' has timed out.
88
89 @return flag indicating a timeout
90 @@rtype bool
91 """
92 return self.__timedOut
93
94 def readUntil(self, expected=b"\n", size=None):
95 r"""
96 Public method to read data until an expected sequence is found
97 (default: \n) or a specific size is exceeded.
98
99 @param expected expected bytes sequence
100 @type bytes
101 @param size maximum data to be read
102 @type int
103 @return bytes read from the device including the expected sequence
104 @rtype bytes
105 """
106 data = bytearray()
107 self.__timedOut = False
108
109 t = QTime()
110 t.start()
111 while True:
112 QCoreApplication.processEvents(
113 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents)
114 c = bytes(self.read(1))
115 if c:
116 data += c
117 if data.endswith(expected):
118 break
119 if size is not None and len(data) >= size:
120 break
121 if t.elapsed() > self.__timeout:
122 self.__timedOut = True
123 break
124
125 return bytes(data)

eric ide

mercurial