--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/eric7/MicroPython/BluetoothDialogs/BluetoothAdvertisement.py Wed Mar 08 14:25:24 2023 +0100 @@ -0,0 +1,236 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Detlev Offenbach <detlev@die-offenbachs.de> +# + +""" +Module implementing a class to parse and store the Bluetooth device advertisement data. +""" + +import struct +import uuid + +ADV_IND = 0 +ADV_SCAN_IND = 2 +ADV_NONCONN_IND = 3 +SCAN_RSP = 4 + +ADV_TYPE_UUID16_INCOMPLETE = 0x02 +ADV_TYPE_UUID16_COMPLETE = 0x03 +ADV_TYPE_UUID32_INCOMPLETE = 0x04 +ADV_TYPE_UUID32_COMPLETE = 0x05 +ADV_TYPE_UUID128_INCOMPLETE = 0x06 +ADV_TYPE_UUID128_COMPLETE = 0x07 +ADV_TYPE_SHORT_NAME = 0x08 +ADV_TYPE_COMPLETE_NAME = 0x09 +ADV_TYPE_TX_POWER_LEVEL = 0x0A +ADV_TYPE_SVC_DATA = 0x16 +ADV_TYPE_MANUFACTURER = 0xFF + +ManufacturerId = { + 0x4C: "Apple, Inc.", + 0xE0: "Google", + 0x75: "Samsung Electronics Co. Ltd.", + 0x87: "Garmin International Inc.", +} + + +class BluetoothAdvertisement: + """ + Class to parse and store the Bluetooth device advertisement data. + """ + + def __init__(self, address): + """ + Constructor + + @param address address of the device advertisement + @type str + """ + self.__address = address + self.__name = "" + self.__rssi = 0 + self.__connectable = False + + self.__advData = None + self.__respData = None + + def update(self, advType, rssi, advData): + """ + Public method to update the advertisement data. + + @param advType type of advertisement data + @type int + @param rssi RSSI value in dBm + @type int + @param advData advertisement data + @type bytes + """ + if rssi != self.__rssi: + self.__rssi = rssi + + if advType in (ADV_IND, ADV_NONCONN_IND): + if advData != self.__advData: + self.__advData = advData + self.__connectable = advType == ADV_IND + elif advType == ADV_SCAN_IND: + self.__advData = advData + elif advType == SCAN_RSP and advData and advData != self.__respData: + self.__respData = advData + + def __str__(self): + """ + Special method to generate a string representation. + + @return string representation + @rtype str + """ + return "Scan result: {0} {1}".format(self.__address, self.__rssi) + + def __decodeField(self, *advType): + """ + Private method to get all fields of the specified types. + + @param *advType type of fields to be extracted + @type int + @yield requested fields + @ytype bytes + """ + # Advertising payloads are repeated packets of the following form: + # 1 byte data length (N + 1) + # 1 byte type (see constants below) + # N bytes type-specific data + for payload in (self.__advData, self.__respData): + if not payload: + continue + + i = 0 + while i + 1 < len(payload): + if payload[i + 1] in advType: + yield payload[i + 2 : i + payload[i] + 1] + i += 1 + payload[i] + + def __splitBytes(self, data, chunkSize): + """ + Private method to split some data into chunks of given size. + + @param data data to be chunked + @type bytes, bytearray, str + @param chunkSize size for each chunk + @type int + @return list of chunks + @rtype list of bytes, bytearray, str + """ + start = 0 + dataChunks = [] + while start < len(data): + end = start + chunkSize + dataChunks.append(data[start:end]) + start = end + return dataChunks + + @property + def name(self): + """ + Public method to get the complete or shortened advertised name, if available. + + @return advertised name + @rtype str + """ + for n in self.__decodeField(ADV_TYPE_COMPLETE_NAME, ADV_TYPE_SHORT_NAME): + return str(n, "utf-8").replace("\x00", "") if n else "" + + return "" + + @property + def rssi(self): + """ + Public method to get the RSSI value. + + @return RSSI value in dBm + @rtype int + """ + return self.__rssi + + @property + def address(self): + """ + Public method to get the address string. + + @return address of the device + @rtype str + """ + return self.__address + + @property + def txPower(self): + """ + Public method to get the advertised power level in dBm. + + @return transmit power of the device (in dBm) + @rtype int + """ + for txLevel in self.__decodeField(ADV_TYPE_TX_POWER_LEVEL): + return struct.unpack("<b", txLevel) + + return 0 + + @property + def services(self): + """ + Public method to get the service IDs. + + @return list of tuples containing the advertised service ID and a + flag indicating a complete ID + @rtype list of tuple of (str, bool) + """ + result = [] + + for u in self.__decodeField(ADV_TYPE_UUID16_INCOMPLETE): + for v in self.__splitBytes(u, 2): + result.append((hex(struct.unpack("<H", v)[0]), False)) + for u in self.__decodeField(ADV_TYPE_UUID16_COMPLETE): + for v in self.__splitBytes(u, 2): + result.append((hex(struct.unpack("<H", v)[0]), True)) + + for u in self.__decodeField(ADV_TYPE_UUID32_INCOMPLETE): + for v in self.__splitBytes(u, 4): + result.append((hex(struct.unpack("<I", v)), False)) + for u in self.__decodeField(ADV_TYPE_UUID32_COMPLETE): + for v in self.__splitBytes(u, 4): + result.append((hex(struct.unpack("<I", v)), True)) + + for u in self.__decodeField(ADV_TYPE_UUID128_INCOMPLETE): + for v in self.__splitBytes(u, 16): + uid = uuid.UUID(bytes=bytes(reversed(v))) + result.append((str(uid), False)) + for u in self.__decodeField(ADV_TYPE_UUID128_COMPLETE): + for v in self.__splitBytes(u, 16): + uid = uuid.UUID(bytes=bytes(reversed(v))) + result.append((str(uid), True)) + + return result + + def manufacturer(self, filterId=None, withName=False): + """ + Public method to get the manufacturer data. + + @param filterId manufacturer ID to filter on (defaults to None) + @type int (optional) + @param withName flag indicating to report the manufacturer name as well + (if available) (defaults to False) + @type bool + @return tuple containing the manufacturer ID, associated data and manufacturer + name + @rtype tuple of (int, bytes, str) + """ + result = [] + for u in self.__decodeField(ADV_TYPE_MANUFACTURER): + if len(u) < 2: + continue + + m = struct.unpack("<H", u[0:2])[0] + if filter is None or m == filterId: + name = ManufacturerId.get(m, "") if withName else None + result.append((m, u[2:], name)) + return result