|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2019 - 2021 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing a QSerialPort with additional functionality for |
|
8 MicroPython devices. |
|
9 """ |
|
10 |
|
11 from PyQt5.QtCore import QIODevice, QTime, QCoreApplication, QEventLoop |
|
12 from PyQt5.QtSerialPort import QSerialPort |
|
13 |
|
14 |
|
15 class MicroPythonSerialPort(QSerialPort): |
|
16 """ |
|
17 Class implementing a QSerialPort with additional functionality for |
|
18 MicroPython devices. |
|
19 """ |
|
20 def __init__(self, timeout=10000, parent=None): |
|
21 """ |
|
22 Constructor |
|
23 |
|
24 @param timeout timout in milliseconds to be set |
|
25 @type int |
|
26 @param parent reference to the parent object |
|
27 @type QObject |
|
28 """ |
|
29 super().__init__(parent) |
|
30 |
|
31 self.__connected = False |
|
32 self.__timeout = timeout # 10s default timeout |
|
33 self.__timedOut = False |
|
34 |
|
35 def setTimeout(self, timeout): |
|
36 """ |
|
37 Public method to set the timeout for device operations. |
|
38 |
|
39 @param timeout timout in milliseconds to be set |
|
40 @type int |
|
41 """ |
|
42 self.__timeout = timeout |
|
43 |
|
44 def openSerialLink(self, port): |
|
45 """ |
|
46 Public method to open a serial link to a given serial port. |
|
47 |
|
48 @param port port name to connect to |
|
49 @type str |
|
50 @return flag indicating success |
|
51 @rtype bool |
|
52 """ |
|
53 self.setPortName(port) |
|
54 if self.open(QIODevice.OpenModeFlag.ReadWrite): |
|
55 self.setDataTerminalReady(True) |
|
56 # 115.200 baud, 8N1 |
|
57 self.setBaudRate(115200) |
|
58 self.setDataBits(QSerialPort.DataBits.Data8) |
|
59 self.setParity(QSerialPort.Parity.NoParity) |
|
60 self.setStopBits(QSerialPort.StopBits.OneStop) |
|
61 |
|
62 self.__connected = True |
|
63 return True |
|
64 else: |
|
65 return False |
|
66 |
|
67 def closeSerialLink(self): |
|
68 """ |
|
69 Public method to close the open serial connection. |
|
70 """ |
|
71 if self.__connected: |
|
72 self.close() |
|
73 |
|
74 self.__connected = False |
|
75 |
|
76 def isConnected(self): |
|
77 """ |
|
78 Public method to get the connection state. |
|
79 |
|
80 @return flag indicating the connection state |
|
81 @rtype bool |
|
82 """ |
|
83 return self.__connected |
|
84 |
|
85 def hasTimedOut(self): |
|
86 """ |
|
87 Public method to check, if the last 'readUntil' has timed out. |
|
88 |
|
89 @return flag indicating a timeout |
|
90 @@rtype bool |
|
91 """ |
|
92 return self.__timedOut |
|
93 |
|
94 def readUntil(self, expected=b"\n", size=None): |
|
95 r""" |
|
96 Public method to read data until an expected sequence is found |
|
97 (default: \n) or a specific size is exceeded. |
|
98 |
|
99 @param expected expected bytes sequence |
|
100 @type bytes |
|
101 @param size maximum data to be read |
|
102 @type int |
|
103 @return bytes read from the device including the expected sequence |
|
104 @rtype bytes |
|
105 """ |
|
106 data = bytearray() |
|
107 self.__timedOut = False |
|
108 |
|
109 t = QTime() |
|
110 t.start() |
|
111 while True: |
|
112 QCoreApplication.processEvents( |
|
113 QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents) |
|
114 c = bytes(self.read(1)) |
|
115 if c: |
|
116 data += c |
|
117 if data.endswith(expected): |
|
118 break |
|
119 if size is not None and len(data) >= size: |
|
120 break |
|
121 if t.elapsed() > self.__timeout: |
|
122 self.__timedOut = True |
|
123 break |
|
124 |
|
125 return bytes(data) |