Thu, 09 Mar 2023 16:56:24 +0100
MicroPython
- fixed a few bugs
- fixed support for ESP devices with CircuitPython with one USB port
9857 | 1 | # -*- coding: utf-8 -*- |
2 | ||
3 | # Copyright (c) 2023 Detlev Offenbach <detlev@die-offenbachs.de> | |
4 | # | |
5 | ||
6 | """ | |
7 | Module implementing a class to parse and store the Bluetooth device advertisement data. | |
8 | """ | |
9 | ||
10 | import struct | |
11 | import uuid | |
12 | ||
13 | ADV_IND = 0 | |
9859 | 14 | ADV_DIRECT_IND = 1 |
9857 | 15 | ADV_SCAN_IND = 2 |
16 | ADV_NONCONN_IND = 3 | |
17 | SCAN_RSP = 4 | |
18 | ||
19 | ADV_TYPE_UUID16_INCOMPLETE = 0x02 | |
20 | ADV_TYPE_UUID16_COMPLETE = 0x03 | |
21 | ADV_TYPE_UUID32_INCOMPLETE = 0x04 | |
22 | ADV_TYPE_UUID32_COMPLETE = 0x05 | |
23 | ADV_TYPE_UUID128_INCOMPLETE = 0x06 | |
24 | ADV_TYPE_UUID128_COMPLETE = 0x07 | |
25 | ADV_TYPE_SHORT_NAME = 0x08 | |
26 | ADV_TYPE_COMPLETE_NAME = 0x09 | |
27 | ADV_TYPE_TX_POWER_LEVEL = 0x0A | |
28 | ADV_TYPE_SVC_DATA = 0x16 | |
29 | ADV_TYPE_MANUFACTURER = 0xFF | |
30 | ||
31 | ManufacturerId = { | |
9866 | 32 | 0x0006: "Microsoft", |
33 | 0x004C: "Apple, Inc.", | |
34 | 0x0075: "Samsung Electronics Co. Ltd.", | |
35 | 0x0087: "Garmin International Inc.", | |
36 | 0x00E0: "Google", | |
37 | 0x0822: "adafruit industries", | |
9857 | 38 | } |
39 | ||
40 | ||
41 | class BluetoothAdvertisement: | |
42 | """ | |
43 | Class to parse and store the Bluetooth device advertisement data. | |
44 | """ | |
45 | ||
46 | def __init__(self, address): | |
47 | """ | |
48 | Constructor | |
49 | ||
50 | @param address address of the device advertisement | |
51 | @type str | |
52 | """ | |
53 | self.__address = address | |
9863
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
54 | ##self.__name = "" |
9857 | 55 | self.__rssi = 0 |
56 | self.__connectable = False | |
57 | ||
58 | self.__advData = None | |
59 | self.__respData = None | |
60 | ||
61 | def update(self, advType, rssi, advData): | |
62 | """ | |
63 | Public method to update the advertisement data. | |
64 | ||
65 | @param advType type of advertisement data | |
66 | @type int | |
67 | @param rssi RSSI value in dBm | |
68 | @type int | |
69 | @param advData advertisement data | |
70 | @type bytes | |
71 | """ | |
72 | if rssi != self.__rssi: | |
73 | self.__rssi = rssi | |
74 | ||
75 | if advType in (ADV_IND, ADV_NONCONN_IND): | |
76 | if advData != self.__advData: | |
77 | self.__advData = advData | |
78 | self.__connectable = advType == ADV_IND | |
79 | elif advType == ADV_SCAN_IND: | |
80 | self.__advData = advData | |
81 | elif advType == SCAN_RSP and advData and advData != self.__respData: | |
82 | self.__respData = advData | |
83 | ||
84 | def __str__(self): | |
85 | """ | |
86 | Special method to generate a string representation. | |
87 | ||
88 | @return string representation | |
89 | @rtype str | |
90 | """ | |
91 | return "Scan result: {0} {1}".format(self.__address, self.__rssi) | |
92 | ||
93 | def __decodeField(self, *advType): | |
94 | """ | |
95 | Private method to get all fields of the specified types. | |
96 | ||
97 | @param *advType type of fields to be extracted | |
98 | @type int | |
99 | @yield requested fields | |
100 | @ytype bytes | |
101 | """ | |
102 | # Advertising payloads are repeated packets of the following form: | |
103 | # 1 byte data length (N + 1) | |
104 | # 1 byte type (see constants below) | |
105 | # N bytes type-specific data | |
106 | for payload in (self.__advData, self.__respData): | |
107 | if not payload: | |
108 | continue | |
109 | ||
110 | i = 0 | |
111 | while i + 1 < len(payload): | |
112 | if payload[i + 1] in advType: | |
113 | yield payload[i + 2 : i + payload[i] + 1] | |
114 | i += 1 + payload[i] | |
115 | ||
116 | def __splitBytes(self, data, chunkSize): | |
117 | """ | |
118 | Private method to split some data into chunks of given size. | |
119 | ||
120 | @param data data to be chunked | |
121 | @type bytes, bytearray, str | |
122 | @param chunkSize size for each chunk | |
123 | @type int | |
124 | @return list of chunks | |
125 | @rtype list of bytes, bytearray, str | |
126 | """ | |
127 | start = 0 | |
128 | dataChunks = [] | |
129 | while start < len(data): | |
130 | end = start + chunkSize | |
131 | dataChunks.append(data[start:end]) | |
132 | start = end | |
133 | return dataChunks | |
134 | ||
135 | @property | |
9863
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
136 | def completeName(self): |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
137 | """ |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
138 | Public method to get the complete advertised name, if available. |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
139 | |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
140 | @return advertised name |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
141 | @rtype str |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
142 | """ |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
143 | for n in self.__decodeField(ADV_TYPE_COMPLETE_NAME): |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
144 | return str(n, "utf-8").replace("\x00", "") if n else "" |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
145 | |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
146 | return "" |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
147 | |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
148 | @property |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
149 | def shortName(self): |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
150 | """ |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
151 | Public method to get the shortened advertised name, if available. |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
152 | |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
153 | @return advertised name |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
154 | @rtype str |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
155 | """ |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
156 | for n in self.__decodeField(ADV_TYPE_SHORT_NAME): |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
157 | return str(n, "utf-8").replace("\x00", "") if n else "" |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
158 | |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
159 | return "" |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
160 | |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
161 | @property |
9857 | 162 | def name(self): |
163 | """ | |
164 | Public method to get the complete or shortened advertised name, if available. | |
165 | ||
166 | @return advertised name | |
167 | @rtype str | |
168 | """ | |
9863
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
169 | return self.completeName or self.shortName |
9857 | 170 | |
171 | @property | |
172 | def rssi(self): | |
173 | """ | |
174 | Public method to get the RSSI value. | |
175 | ||
176 | @return RSSI value in dBm | |
177 | @rtype int | |
178 | """ | |
179 | return self.__rssi | |
180 | ||
181 | @property | |
182 | def address(self): | |
183 | """ | |
184 | Public method to get the address string. | |
185 | ||
186 | @return address of the device | |
187 | @rtype str | |
188 | """ | |
189 | return self.__address | |
190 | ||
191 | @property | |
192 | def txPower(self): | |
193 | """ | |
194 | Public method to get the advertised power level in dBm. | |
195 | ||
196 | @return transmit power of the device (in dBm) | |
197 | @rtype int | |
198 | """ | |
199 | for txLevel in self.__decodeField(ADV_TYPE_TX_POWER_LEVEL): | |
9859 | 200 | return struct.unpack("<b", txLevel)[0] |
9857 | 201 | |
202 | return 0 | |
203 | ||
204 | @property | |
205 | def services(self): | |
206 | """ | |
207 | Public method to get the service IDs. | |
208 | ||
209 | @return list of tuples containing the advertised service ID and a | |
210 | flag indicating a complete ID | |
211 | @rtype list of tuple of (str, bool) | |
212 | """ | |
213 | result = [] | |
214 | ||
215 | for u in self.__decodeField(ADV_TYPE_UUID16_INCOMPLETE): | |
216 | for v in self.__splitBytes(u, 2): | |
217 | result.append((hex(struct.unpack("<H", v)[0]), False)) | |
218 | for u in self.__decodeField(ADV_TYPE_UUID16_COMPLETE): | |
219 | for v in self.__splitBytes(u, 2): | |
220 | result.append((hex(struct.unpack("<H", v)[0]), True)) | |
221 | ||
222 | for u in self.__decodeField(ADV_TYPE_UUID32_INCOMPLETE): | |
223 | for v in self.__splitBytes(u, 4): | |
224 | result.append((hex(struct.unpack("<I", v)), False)) | |
225 | for u in self.__decodeField(ADV_TYPE_UUID32_COMPLETE): | |
226 | for v in self.__splitBytes(u, 4): | |
227 | result.append((hex(struct.unpack("<I", v)), True)) | |
228 | ||
229 | for u in self.__decodeField(ADV_TYPE_UUID128_INCOMPLETE): | |
230 | for v in self.__splitBytes(u, 16): | |
231 | uid = uuid.UUID(bytes=bytes(reversed(v))) | |
232 | result.append((str(uid), False)) | |
233 | for u in self.__decodeField(ADV_TYPE_UUID128_COMPLETE): | |
234 | for v in self.__splitBytes(u, 16): | |
235 | uid = uuid.UUID(bytes=bytes(reversed(v))) | |
236 | result.append((str(uid), True)) | |
237 | ||
238 | return result | |
239 | ||
240 | def manufacturer(self, filterId=None, withName=False): | |
241 | """ | |
242 | Public method to get the manufacturer data. | |
243 | ||
244 | @param filterId manufacturer ID to filter on (defaults to None) | |
245 | @type int (optional) | |
246 | @param withName flag indicating to report the manufacturer name as well | |
247 | (if available) (defaults to False) | |
248 | @type bool | |
249 | @return tuple containing the manufacturer ID, associated data and manufacturer | |
250 | name | |
251 | @rtype tuple of (int, bytes, str) | |
252 | """ | |
253 | result = [] | |
254 | for u in self.__decodeField(ADV_TYPE_MANUFACTURER): | |
255 | if len(u) < 2: | |
256 | continue | |
257 | ||
258 | m = struct.unpack("<H", u[0:2])[0] | |
9858
6518c336fcd3
Fixed some bugs in MicroPython Bluetooth support.
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9857
diff
changeset
|
259 | if filterId is None or m == filterId: |
9857 | 260 | name = ManufacturerId.get(m, "") if withName else None |
261 | result.append((m, u[2:], name)) | |
262 | return result |