Sat, 23 Dec 2023 15:48:12 +0100
Updated copyright for 2024.
9857 | 1 | # -*- coding: utf-8 -*- |
2 | ||
10439
21c28b0f9e41
Updated copyright for 2024.
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
10090
diff
changeset
|
3 | # Copyright (c) 2023 - 2024 Detlev Offenbach <detlev@die-offenbachs.de> |
9857 | 4 | # |
5 | ||
6 | """ | |
7 | Module implementing a class to parse and store the Bluetooth device advertisement data. | |
8 | """ | |
9 | ||
10090 | 10 | import contextlib |
11 | import os | |
9857 | 12 | import struct |
13 | import uuid | |
14 | ||
10090 | 15 | import yaml |
16 | ||
9857 | 17 | ADV_IND = 0 |
9859 | 18 | ADV_DIRECT_IND = 1 |
9857 | 19 | ADV_SCAN_IND = 2 |
20 | ADV_NONCONN_IND = 3 | |
21 | SCAN_RSP = 4 | |
22 | ||
23 | ADV_TYPE_UUID16_INCOMPLETE = 0x02 | |
24 | ADV_TYPE_UUID16_COMPLETE = 0x03 | |
25 | ADV_TYPE_UUID32_INCOMPLETE = 0x04 | |
26 | ADV_TYPE_UUID32_COMPLETE = 0x05 | |
27 | ADV_TYPE_UUID128_INCOMPLETE = 0x06 | |
28 | ADV_TYPE_UUID128_COMPLETE = 0x07 | |
29 | ADV_TYPE_SHORT_NAME = 0x08 | |
30 | ADV_TYPE_COMPLETE_NAME = 0x09 | |
31 | ADV_TYPE_TX_POWER_LEVEL = 0x0A | |
32 | ADV_TYPE_SVC_DATA = 0x16 | |
33 | ADV_TYPE_MANUFACTURER = 0xFF | |
34 | ||
10090 | 35 | ManufacturerIDs = None |
36 | ServiceIDs = None | |
37 | ||
38 | ||
39 | def _loadManufacturerIDs(): | |
40 | """ | |
41 | Function to load the manufacturer IDs. | |
42 | """ | |
43 | global ManufacturerIDs | |
44 | ||
45 | idsFile = os.path.join( | |
46 | os.path.dirname(__file__), "data", "company_identifiers.yaml" | |
47 | ) | |
48 | with contextlib.suppress(OSError): | |
49 | with open(idsFile, "r") as f: | |
50 | idsDict = yaml.safe_load(f) | |
51 | ||
52 | ManufacturerIDs = { | |
53 | entry["value"]: entry["name"] for entry in idsDict["company_identifiers"] | |
54 | } | |
55 | ||
56 | ||
57 | def _loadServiceUUIDs(): | |
58 | """ | |
59 | Function to load the service UUIDs. | |
60 | """ | |
61 | global ServiceIDs | |
62 | ||
63 | ServiceIDs = {} | |
64 | ||
65 | for uuidFilename in ("member_uuids.yaml", "sdo_uuids.yaml", "service_uuids.yaml"): | |
66 | uuidFilepath = os.path.join(os.path.dirname(__file__), "data", uuidFilename) | |
67 | with contextlib.suppress(OSError): | |
68 | with open(uuidFilepath, "r") as f: | |
69 | uuidDict = yaml.safe_load(f) | |
70 | ||
71 | ServiceIDs.update({u["uuid"]: u["name"] for u in uuidDict["uuids"]}) | |
9857 | 72 | |
73 | ||
74 | class BluetoothAdvertisement: | |
75 | """ | |
76 | Class to parse and store the Bluetooth device advertisement data. | |
77 | """ | |
78 | ||
79 | def __init__(self, address): | |
80 | """ | |
81 | Constructor | |
82 | ||
83 | @param address address of the device advertisement | |
84 | @type str | |
85 | """ | |
86 | self.__address = address | |
87 | self.__rssi = 0 | |
88 | self.__connectable = False | |
89 | ||
90 | self.__advData = None | |
91 | self.__respData = None | |
92 | ||
93 | def update(self, advType, rssi, advData): | |
94 | """ | |
95 | Public method to update the advertisement data. | |
96 | ||
97 | @param advType type of advertisement data | |
98 | @type int | |
99 | @param rssi RSSI value in dBm | |
100 | @type int | |
101 | @param advData advertisement data | |
102 | @type bytes | |
103 | """ | |
104 | if rssi != self.__rssi: | |
105 | self.__rssi = rssi | |
106 | ||
107 | if advType in (ADV_IND, ADV_NONCONN_IND): | |
108 | if advData != self.__advData: | |
109 | self.__advData = advData | |
110 | self.__connectable = advType == ADV_IND | |
111 | elif advType == ADV_SCAN_IND: | |
112 | self.__advData = advData | |
113 | elif advType == SCAN_RSP and advData and advData != self.__respData: | |
114 | self.__respData = advData | |
115 | ||
116 | def __str__(self): | |
117 | """ | |
118 | Special method to generate a string representation. | |
119 | ||
120 | @return string representation | |
121 | @rtype str | |
122 | """ | |
123 | return "Scan result: {0} {1}".format(self.__address, self.__rssi) | |
124 | ||
125 | def __decodeField(self, *advType): | |
126 | """ | |
127 | Private method to get all fields of the specified types. | |
128 | ||
129 | @param *advType type of fields to be extracted | |
130 | @type int | |
131 | @yield requested fields | |
132 | @ytype bytes | |
133 | """ | |
134 | # Advertising payloads are repeated packets of the following form: | |
135 | # 1 byte data length (N + 1) | |
10089 | 136 | # 1 byte type (see constants at top) |
9857 | 137 | # N bytes type-specific data |
138 | for payload in (self.__advData, self.__respData): | |
139 | if not payload: | |
140 | continue | |
141 | ||
142 | i = 0 | |
143 | while i + 1 < len(payload): | |
144 | if payload[i + 1] in advType: | |
145 | yield payload[i + 2 : i + payload[i] + 1] | |
146 | i += 1 + payload[i] | |
147 | ||
148 | def __splitBytes(self, data, chunkSize): | |
149 | """ | |
150 | Private method to split some data into chunks of given size. | |
151 | ||
152 | @param data data to be chunked | |
153 | @type bytes, bytearray, str | |
154 | @param chunkSize size for each chunk | |
155 | @type int | |
156 | @return list of chunks | |
157 | @rtype list of bytes, bytearray, str | |
158 | """ | |
159 | start = 0 | |
160 | dataChunks = [] | |
161 | while start < len(data): | |
162 | end = start + chunkSize | |
163 | dataChunks.append(data[start:end]) | |
164 | start = end | |
165 | return dataChunks | |
166 | ||
167 | @property | |
9863
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
168 | def completeName(self): |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
169 | """ |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
170 | Public method to get the complete advertised name, if available. |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
171 | |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
172 | @return advertised name |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
173 | @rtype str |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
174 | """ |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
175 | for n in self.__decodeField(ADV_TYPE_COMPLETE_NAME): |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
176 | return str(n, "utf-8").replace("\x00", "") if n else "" |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
177 | |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
178 | return "" |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
179 | |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
180 | @property |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
181 | def shortName(self): |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
182 | """ |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
183 | Public method to get the shortened advertised name, if available. |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
184 | |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
185 | @return advertised name |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
186 | @rtype str |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
187 | """ |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
188 | for n in self.__decodeField(ADV_TYPE_SHORT_NAME): |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
189 | return str(n, "utf-8").replace("\x00", "") if n else "" |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
190 | |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
191 | return "" |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
192 | |
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
193 | @property |
9857 | 194 | def name(self): |
195 | """ | |
196 | Public method to get the complete or shortened advertised name, if available. | |
197 | ||
198 | @return advertised name | |
199 | @rtype str | |
200 | """ | |
9863
5f2377b32716
BluetoothAdvertisement
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9859
diff
changeset
|
201 | return self.completeName or self.shortName |
9857 | 202 | |
203 | @property | |
204 | def rssi(self): | |
205 | """ | |
206 | Public method to get the RSSI value. | |
207 | ||
208 | @return RSSI value in dBm | |
209 | @rtype int | |
210 | """ | |
211 | return self.__rssi | |
212 | ||
213 | @property | |
214 | def address(self): | |
215 | """ | |
216 | Public method to get the address string. | |
217 | ||
218 | @return address of the device | |
219 | @rtype str | |
220 | """ | |
221 | return self.__address | |
222 | ||
223 | @property | |
224 | def txPower(self): | |
225 | """ | |
226 | Public method to get the advertised power level in dBm. | |
227 | ||
228 | @return transmit power of the device (in dBm) | |
229 | @rtype int | |
230 | """ | |
231 | for txLevel in self.__decodeField(ADV_TYPE_TX_POWER_LEVEL): | |
9859 | 232 | return struct.unpack("<b", txLevel)[0] |
9857 | 233 | |
234 | return 0 | |
235 | ||
236 | @property | |
237 | def services(self): | |
238 | """ | |
239 | Public method to get the service IDs. | |
240 | ||
10090 | 241 | @return list of tuples containing the advertised service ID, the associated |
242 | service name (if available) and a flag indicating a complete ID | |
9857 | 243 | @rtype list of tuple of (str, bool) |
244 | """ | |
10090 | 245 | if ServiceIDs is None: |
246 | _loadServiceUUIDs() | |
247 | ||
9857 | 248 | result = [] |
249 | ||
250 | for u in self.__decodeField(ADV_TYPE_UUID16_INCOMPLETE): | |
251 | for v in self.__splitBytes(u, 2): | |
10090 | 252 | uid = struct.unpack("<H", v)[0] |
253 | result.append((hex(uid), ServiceIDs.get(uid, ""), False)) | |
9857 | 254 | for u in self.__decodeField(ADV_TYPE_UUID16_COMPLETE): |
255 | for v in self.__splitBytes(u, 2): | |
10090 | 256 | uid = struct.unpack("<H", v)[0] |
257 | result.append((hex(uid), ServiceIDs.get(uid, ""), False)) | |
9857 | 258 | |
259 | for u in self.__decodeField(ADV_TYPE_UUID32_INCOMPLETE): | |
260 | for v in self.__splitBytes(u, 4): | |
10090 | 261 | result.append((hex(struct.unpack("<I", v)), "", False)) |
9857 | 262 | for u in self.__decodeField(ADV_TYPE_UUID32_COMPLETE): |
263 | for v in self.__splitBytes(u, 4): | |
10090 | 264 | result.append((hex(struct.unpack("<I", v)), "", True)) |
9857 | 265 | |
266 | for u in self.__decodeField(ADV_TYPE_UUID128_INCOMPLETE): | |
267 | for v in self.__splitBytes(u, 16): | |
268 | uid = uuid.UUID(bytes=bytes(reversed(v))) | |
10090 | 269 | result.append((str(uid), "", False)) |
9857 | 270 | for u in self.__decodeField(ADV_TYPE_UUID128_COMPLETE): |
271 | for v in self.__splitBytes(u, 16): | |
272 | uid = uuid.UUID(bytes=bytes(reversed(v))) | |
10090 | 273 | result.append((str(uid), "", True)) |
9857 | 274 | |
275 | return result | |
276 | ||
277 | def manufacturer(self, filterId=None, withName=False): | |
278 | """ | |
279 | Public method to get the manufacturer data. | |
280 | ||
281 | @param filterId manufacturer ID to filter on (defaults to None) | |
282 | @type int (optional) | |
283 | @param withName flag indicating to report the manufacturer name as well | |
284 | (if available) (defaults to False) | |
285 | @type bool | |
286 | @return tuple containing the manufacturer ID, associated data and manufacturer | |
287 | name | |
288 | @rtype tuple of (int, bytes, str) | |
289 | """ | |
10090 | 290 | if ManufacturerIDs is None: |
291 | _loadManufacturerIDs() | |
292 | ||
9857 | 293 | result = [] |
294 | for u in self.__decodeField(ADV_TYPE_MANUFACTURER): | |
295 | if len(u) < 2: | |
296 | continue | |
297 | ||
298 | m = struct.unpack("<H", u[0:2])[0] | |
9858
6518c336fcd3
Fixed some bugs in MicroPython Bluetooth support.
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9857
diff
changeset
|
299 | if filterId is None or m == filterId: |
10090 | 300 | name = ManufacturerIDs.get(m, "") if withName else None |
9857 | 301 | result.append((m, u[2:], name)) |
302 | return result | |
10060
b946699e9e79
Corrected some issues related to unused global variables.
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9952
diff
changeset
|
303 | |
10065
de4ae767b0e3
Corrected and checked some code style issues.
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
10060
diff
changeset
|
304 | |
10060
b946699e9e79
Corrected some issues related to unused global variables.
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9952
diff
changeset
|
305 | # |
b946699e9e79
Corrected some issues related to unused global variables.
Detlev Offenbach <detlev@die-offenbachs.de>
parents:
9952
diff
changeset
|
306 | # eflag: noqa = U200 |