src/eric7/MicroPython/BluetoothDialogs/BluetoothAdvertisement.py

branch
mpy_network
changeset 9857
0122ae72618d
child 9858
6518c336fcd3
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/eric7/MicroPython/BluetoothDialogs/BluetoothAdvertisement.py	Wed Mar 08 14:25:24 2023 +0100
@@ -0,0 +1,236 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2023 Detlev Offenbach <detlev@die-offenbachs.de>
+#
+
+"""
+Module implementing a class to parse and store the Bluetooth device advertisement data.
+"""
+
+import struct
+import uuid
+
+ADV_IND = 0
+ADV_SCAN_IND = 2
+ADV_NONCONN_IND = 3
+SCAN_RSP = 4
+
+ADV_TYPE_UUID16_INCOMPLETE = 0x02
+ADV_TYPE_UUID16_COMPLETE = 0x03
+ADV_TYPE_UUID32_INCOMPLETE = 0x04
+ADV_TYPE_UUID32_COMPLETE = 0x05
+ADV_TYPE_UUID128_INCOMPLETE = 0x06
+ADV_TYPE_UUID128_COMPLETE = 0x07
+ADV_TYPE_SHORT_NAME = 0x08
+ADV_TYPE_COMPLETE_NAME = 0x09
+ADV_TYPE_TX_POWER_LEVEL = 0x0A
+ADV_TYPE_SVC_DATA = 0x16
+ADV_TYPE_MANUFACTURER = 0xFF
+
+ManufacturerId = {
+    0x4C: "Apple, Inc.",
+    0xE0: "Google",
+    0x75: "Samsung Electronics Co. Ltd.",
+    0x87: "Garmin International Inc.",
+}
+
+
+class BluetoothAdvertisement:
+    """
+    Class to parse and store the Bluetooth device advertisement data.
+    """
+
+    def __init__(self, address):
+        """
+        Constructor
+
+        @param address address of the device advertisement
+        @type str
+        """
+        self.__address = address
+        self.__name = ""
+        self.__rssi = 0
+        self.__connectable = False
+
+        self.__advData = None
+        self.__respData = None
+
+    def update(self, advType, rssi, advData):
+        """
+        Public method to update the advertisement data.
+
+        @param advType type of advertisement data
+        @type int
+        @param rssi RSSI value in dBm
+        @type int
+        @param advData advertisement data
+        @type bytes
+        """
+        if rssi != self.__rssi:
+            self.__rssi = rssi
+
+        if advType in (ADV_IND, ADV_NONCONN_IND):
+            if advData != self.__advData:
+                self.__advData = advData
+                self.__connectable = advType == ADV_IND
+        elif advType == ADV_SCAN_IND:
+            self.__advData = advData
+        elif advType == SCAN_RSP and advData and advData != self.__respData:
+            self.__respData = advData
+
+    def __str__(self):
+        """
+        Special method to generate a string representation.
+
+        @return string representation
+        @rtype str
+        """
+        return "Scan result: {0} {1}".format(self.__address, self.__rssi)
+
+    def __decodeField(self, *advType):
+        """
+        Private method to get all fields of the specified types.
+
+        @param *advType type of fields to be extracted
+        @type int
+        @yield requested fields
+        @ytype bytes
+        """
+        # Advertising payloads are repeated packets of the following form:
+        #   1 byte data length (N + 1)
+        #   1 byte type (see constants below)
+        #   N bytes type-specific data
+        for payload in (self.__advData, self.__respData):
+            if not payload:
+                continue
+
+            i = 0
+            while i + 1 < len(payload):
+                if payload[i + 1] in advType:
+                    yield payload[i + 2 : i + payload[i] + 1]
+                i += 1 + payload[i]
+
+    def __splitBytes(self, data, chunkSize):
+        """
+        Private method to split some data into chunks of given size.
+
+        @param data data to be chunked
+        @type bytes, bytearray, str
+        @param chunkSize size for each chunk
+        @type int
+        @return list of chunks
+        @rtype list of bytes, bytearray, str
+        """
+        start = 0
+        dataChunks = []
+        while start < len(data):
+            end = start + chunkSize
+            dataChunks.append(data[start:end])
+            start = end
+        return dataChunks
+
+    @property
+    def name(self):
+        """
+        Public method to get the complete or shortened advertised name, if available.
+
+        @return advertised name
+        @rtype str
+        """
+        for n in self.__decodeField(ADV_TYPE_COMPLETE_NAME, ADV_TYPE_SHORT_NAME):
+            return str(n, "utf-8").replace("\x00", "") if n else ""
+
+        return ""
+
+    @property
+    def rssi(self):
+        """
+        Public method to get the RSSI value.
+
+        @return RSSI value in dBm
+        @rtype int
+        """
+        return self.__rssi
+
+    @property
+    def address(self):
+        """
+        Public method to get the address string.
+
+        @return address of the device
+        @rtype str
+        """
+        return self.__address
+
+    @property
+    def txPower(self):
+        """
+        Public method to get the advertised power level in dBm.
+
+        @return transmit power of the device (in dBm)
+        @rtype int
+        """
+        for txLevel in self.__decodeField(ADV_TYPE_TX_POWER_LEVEL):
+            return struct.unpack("<b", txLevel)
+
+        return 0
+
+    @property
+    def services(self):
+        """
+        Public method to get the service IDs.
+
+        @return list of tuples containing the advertised service ID and a
+            flag indicating a complete ID
+        @rtype list of tuple of (str, bool)
+        """
+        result = []
+
+        for u in self.__decodeField(ADV_TYPE_UUID16_INCOMPLETE):
+            for v in self.__splitBytes(u, 2):
+                result.append((hex(struct.unpack("<H", v)[0]), False))
+        for u in self.__decodeField(ADV_TYPE_UUID16_COMPLETE):
+            for v in self.__splitBytes(u, 2):
+                result.append((hex(struct.unpack("<H", v)[0]), True))
+
+        for u in self.__decodeField(ADV_TYPE_UUID32_INCOMPLETE):
+            for v in self.__splitBytes(u, 4):
+                result.append((hex(struct.unpack("<I", v)), False))
+        for u in self.__decodeField(ADV_TYPE_UUID32_COMPLETE):
+            for v in self.__splitBytes(u, 4):
+                result.append((hex(struct.unpack("<I", v)), True))
+
+        for u in self.__decodeField(ADV_TYPE_UUID128_INCOMPLETE):
+            for v in self.__splitBytes(u, 16):
+                uid = uuid.UUID(bytes=bytes(reversed(v)))
+                result.append((str(uid), False))
+        for u in self.__decodeField(ADV_TYPE_UUID128_COMPLETE):
+            for v in self.__splitBytes(u, 16):
+                uid = uuid.UUID(bytes=bytes(reversed(v)))
+                result.append((str(uid), True))
+
+        return result
+
+    def manufacturer(self, filterId=None, withName=False):
+        """
+        Public method to get the manufacturer data.
+
+        @param filterId manufacturer ID to filter on (defaults to None)
+        @type int (optional)
+        @param withName flag indicating to report the manufacturer name as well
+            (if available) (defaults to False)
+        @type bool
+        @return tuple containing the manufacturer ID, associated data and manufacturer
+            name
+        @rtype tuple of (int, bytes, str)
+        """
+        result = []
+        for u in self.__decodeField(ADV_TYPE_MANUFACTURER):
+            if len(u) < 2:
+                continue
+
+            m = struct.unpack("<H", u[0:2])[0]
+            if filter is None or m == filterId:
+                name = ManufacturerId.get(m, "") if withName else None
+                result.append((m, u[2:], name))
+        return result

eric ide

mercurial