|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 # Copyright (c) 2023 Detlev Offenbach <detlev@die-offenbachs.de> |
|
4 # |
|
5 |
|
6 """ |
|
7 Module implementing a class to parse and store the Bluetooth device advertisement data. |
|
8 """ |
|
9 |
|
10 import struct |
|
11 import uuid |
|
12 |
|
13 ADV_IND = 0 |
|
14 ADV_SCAN_IND = 2 |
|
15 ADV_NONCONN_IND = 3 |
|
16 SCAN_RSP = 4 |
|
17 |
|
18 ADV_TYPE_UUID16_INCOMPLETE = 0x02 |
|
19 ADV_TYPE_UUID16_COMPLETE = 0x03 |
|
20 ADV_TYPE_UUID32_INCOMPLETE = 0x04 |
|
21 ADV_TYPE_UUID32_COMPLETE = 0x05 |
|
22 ADV_TYPE_UUID128_INCOMPLETE = 0x06 |
|
23 ADV_TYPE_UUID128_COMPLETE = 0x07 |
|
24 ADV_TYPE_SHORT_NAME = 0x08 |
|
25 ADV_TYPE_COMPLETE_NAME = 0x09 |
|
26 ADV_TYPE_TX_POWER_LEVEL = 0x0A |
|
27 ADV_TYPE_SVC_DATA = 0x16 |
|
28 ADV_TYPE_MANUFACTURER = 0xFF |
|
29 |
|
30 ManufacturerId = { |
|
31 0x4C: "Apple, Inc.", |
|
32 0xE0: "Google", |
|
33 0x75: "Samsung Electronics Co. Ltd.", |
|
34 0x87: "Garmin International Inc.", |
|
35 } |
|
36 |
|
37 |
|
38 class BluetoothAdvertisement: |
|
39 """ |
|
40 Class to parse and store the Bluetooth device advertisement data. |
|
41 """ |
|
42 |
|
43 def __init__(self, address): |
|
44 """ |
|
45 Constructor |
|
46 |
|
47 @param address address of the device advertisement |
|
48 @type str |
|
49 """ |
|
50 self.__address = address |
|
51 self.__name = "" |
|
52 self.__rssi = 0 |
|
53 self.__connectable = False |
|
54 |
|
55 self.__advData = None |
|
56 self.__respData = None |
|
57 |
|
58 def update(self, advType, rssi, advData): |
|
59 """ |
|
60 Public method to update the advertisement data. |
|
61 |
|
62 @param advType type of advertisement data |
|
63 @type int |
|
64 @param rssi RSSI value in dBm |
|
65 @type int |
|
66 @param advData advertisement data |
|
67 @type bytes |
|
68 """ |
|
69 if rssi != self.__rssi: |
|
70 self.__rssi = rssi |
|
71 |
|
72 if advType in (ADV_IND, ADV_NONCONN_IND): |
|
73 if advData != self.__advData: |
|
74 self.__advData = advData |
|
75 self.__connectable = advType == ADV_IND |
|
76 elif advType == ADV_SCAN_IND: |
|
77 self.__advData = advData |
|
78 elif advType == SCAN_RSP and advData and advData != self.__respData: |
|
79 self.__respData = advData |
|
80 |
|
81 def __str__(self): |
|
82 """ |
|
83 Special method to generate a string representation. |
|
84 |
|
85 @return string representation |
|
86 @rtype str |
|
87 """ |
|
88 return "Scan result: {0} {1}".format(self.__address, self.__rssi) |
|
89 |
|
90 def __decodeField(self, *advType): |
|
91 """ |
|
92 Private method to get all fields of the specified types. |
|
93 |
|
94 @param *advType type of fields to be extracted |
|
95 @type int |
|
96 @yield requested fields |
|
97 @ytype bytes |
|
98 """ |
|
99 # Advertising payloads are repeated packets of the following form: |
|
100 # 1 byte data length (N + 1) |
|
101 # 1 byte type (see constants below) |
|
102 # N bytes type-specific data |
|
103 for payload in (self.__advData, self.__respData): |
|
104 if not payload: |
|
105 continue |
|
106 |
|
107 i = 0 |
|
108 while i + 1 < len(payload): |
|
109 if payload[i + 1] in advType: |
|
110 yield payload[i + 2 : i + payload[i] + 1] |
|
111 i += 1 + payload[i] |
|
112 |
|
113 def __splitBytes(self, data, chunkSize): |
|
114 """ |
|
115 Private method to split some data into chunks of given size. |
|
116 |
|
117 @param data data to be chunked |
|
118 @type bytes, bytearray, str |
|
119 @param chunkSize size for each chunk |
|
120 @type int |
|
121 @return list of chunks |
|
122 @rtype list of bytes, bytearray, str |
|
123 """ |
|
124 start = 0 |
|
125 dataChunks = [] |
|
126 while start < len(data): |
|
127 end = start + chunkSize |
|
128 dataChunks.append(data[start:end]) |
|
129 start = end |
|
130 return dataChunks |
|
131 |
|
132 @property |
|
133 def name(self): |
|
134 """ |
|
135 Public method to get the complete or shortened advertised name, if available. |
|
136 |
|
137 @return advertised name |
|
138 @rtype str |
|
139 """ |
|
140 for n in self.__decodeField(ADV_TYPE_COMPLETE_NAME, ADV_TYPE_SHORT_NAME): |
|
141 return str(n, "utf-8").replace("\x00", "") if n else "" |
|
142 |
|
143 return "" |
|
144 |
|
145 @property |
|
146 def rssi(self): |
|
147 """ |
|
148 Public method to get the RSSI value. |
|
149 |
|
150 @return RSSI value in dBm |
|
151 @rtype int |
|
152 """ |
|
153 return self.__rssi |
|
154 |
|
155 @property |
|
156 def address(self): |
|
157 """ |
|
158 Public method to get the address string. |
|
159 |
|
160 @return address of the device |
|
161 @rtype str |
|
162 """ |
|
163 return self.__address |
|
164 |
|
165 @property |
|
166 def txPower(self): |
|
167 """ |
|
168 Public method to get the advertised power level in dBm. |
|
169 |
|
170 @return transmit power of the device (in dBm) |
|
171 @rtype int |
|
172 """ |
|
173 for txLevel in self.__decodeField(ADV_TYPE_TX_POWER_LEVEL): |
|
174 return struct.unpack("<b", txLevel) |
|
175 |
|
176 return 0 |
|
177 |
|
178 @property |
|
179 def services(self): |
|
180 """ |
|
181 Public method to get the service IDs. |
|
182 |
|
183 @return list of tuples containing the advertised service ID and a |
|
184 flag indicating a complete ID |
|
185 @rtype list of tuple of (str, bool) |
|
186 """ |
|
187 result = [] |
|
188 |
|
189 for u in self.__decodeField(ADV_TYPE_UUID16_INCOMPLETE): |
|
190 for v in self.__splitBytes(u, 2): |
|
191 result.append((hex(struct.unpack("<H", v)[0]), False)) |
|
192 for u in self.__decodeField(ADV_TYPE_UUID16_COMPLETE): |
|
193 for v in self.__splitBytes(u, 2): |
|
194 result.append((hex(struct.unpack("<H", v)[0]), True)) |
|
195 |
|
196 for u in self.__decodeField(ADV_TYPE_UUID32_INCOMPLETE): |
|
197 for v in self.__splitBytes(u, 4): |
|
198 result.append((hex(struct.unpack("<I", v)), False)) |
|
199 for u in self.__decodeField(ADV_TYPE_UUID32_COMPLETE): |
|
200 for v in self.__splitBytes(u, 4): |
|
201 result.append((hex(struct.unpack("<I", v)), True)) |
|
202 |
|
203 for u in self.__decodeField(ADV_TYPE_UUID128_INCOMPLETE): |
|
204 for v in self.__splitBytes(u, 16): |
|
205 uid = uuid.UUID(bytes=bytes(reversed(v))) |
|
206 result.append((str(uid), False)) |
|
207 for u in self.__decodeField(ADV_TYPE_UUID128_COMPLETE): |
|
208 for v in self.__splitBytes(u, 16): |
|
209 uid = uuid.UUID(bytes=bytes(reversed(v))) |
|
210 result.append((str(uid), True)) |
|
211 |
|
212 return result |
|
213 |
|
214 def manufacturer(self, filterId=None, withName=False): |
|
215 """ |
|
216 Public method to get the manufacturer data. |
|
217 |
|
218 @param filterId manufacturer ID to filter on (defaults to None) |
|
219 @type int (optional) |
|
220 @param withName flag indicating to report the manufacturer name as well |
|
221 (if available) (defaults to False) |
|
222 @type bool |
|
223 @return tuple containing the manufacturer ID, associated data and manufacturer |
|
224 name |
|
225 @rtype tuple of (int, bytes, str) |
|
226 """ |
|
227 result = [] |
|
228 for u in self.__decodeField(ADV_TYPE_MANUFACTURER): |
|
229 if len(u) < 2: |
|
230 continue |
|
231 |
|
232 m = struct.unpack("<H", u[0:2])[0] |
|
233 if filter is None or m == filterId: |
|
234 name = ManufacturerId.get(m, "") if withName else None |
|
235 result.append((m, u[2:], name)) |
|
236 return result |