src/eric7/MicroPython/Devices/RP2040Devices.py

branch
eric7
changeset 10089
5fe9bfafbc7c
parent 10069
435cc5875135
child 10138
56614cf9d03c
--- a/src/eric7/MicroPython/Devices/RP2040Devices.py	Sun Jun 04 15:16:12 2023 +0200
+++ b/src/eric7/MicroPython/Devices/RP2040Devices.py	Thu Jun 15 17:11:14 2023 +0200
@@ -1266,6 +1266,220 @@
         return out.decode("utf-8").strip() == "True", ""
 
     ##################################################################
+    ## Methods below implement Bluetooth related methods
+    ##################################################################
+
+    def hasBluetooth(self):
+        """
+        Public method to check the availability of Bluetooth.
+
+        @return flag indicating the availability of Bluetooth
+        @rtype bool
+        @exception OSError raised to indicate an issue with the device
+        """
+        command = """
+def has_bt():
+    try:
+        import bluetooth
+        if hasattr(bluetooth, 'BLE'):
+            return True
+    except ImportError:
+        pass
+
+    return False
+
+print(has_bt())
+del has_bt
+"""
+        out, err = self.executeCommands(command, mode=self._submitMode, timeout=10000)
+        if err:
+            raise OSError(self._shortError(err))
+        return out.strip() == b"True"
+
+    def getBluetoothStatus(self):
+        """
+        Public method to get Bluetooth status data of the connected board.
+
+        @return list of tuples containing the translated status data label and
+            the associated value
+        @rtype list of tuples of (str, str)
+        @exception OSError raised to indicate an issue with the device
+        """
+        command = """
+def ble_status():
+    import bluetooth
+    import ubinascii
+    import ujson
+
+    ble = bluetooth.BLE()
+
+    ble_active = ble.active()
+    if not ble_active:
+        ble.active(True)
+
+    res = {
+        'active': ble_active,
+        'mac': ubinascii.hexlify(ble.config('mac')[1], ':').decode(),
+        'addr_type': ble.config('mac')[0],
+        'name': ble.config('gap_name'),
+        'mtu': ble.config('mtu'),
+    }
+
+    if not ble_active:
+        ble.active(False)
+
+    print(ujson.dumps(res))
+
+ble_status()
+del ble_status
+"""
+        out, err = self.executeCommands(command, mode=self._submitMode)
+        if err:
+            raise OSError(self._shortError(err))
+
+        status = []
+        bleStatus = json.loads(out.decode("utf-8"))
+        status.append((self.tr("Active"), self.bool2str(bleStatus["active"])))
+        status.append((self.tr("Name"), bleStatus["name"]))
+        status.append((self.tr("MAC-Address"), bleStatus["mac"]))
+        status.append(
+            (
+                self.tr("Address Type"),
+                self.tr("Public") if bleStatus == 0 else self.tr("Random"),
+            )
+        )
+        status.append((self.tr("MTU"), self.tr("{0} Bytes").format(bleStatus["mtu"])))
+
+        return status
+
+    def activateBluetoothInterface(self):
+        """
+        Public method to activate the Bluetooth interface.
+
+        @return flag indicating the new state of the Bluetooth interface
+        @rtype bool
+        @exception OSError raised to indicate an issue with the device
+        """
+        command = """
+def activate_ble():
+    import bluetooth
+
+    ble = bluetooth.BLE()
+    if not ble.active():
+        ble.active(True)
+    print(ble.active())
+
+activate_ble()
+del activate_ble
+"""
+        out, err = self.executeCommands(command, mode=self._submitMode)
+        if err:
+            raise OSError(self._shortError(err))
+
+        return out.strip() == b"True"
+
+    def deactivateBluetoothInterface(self):
+        """
+        Public method to deactivate the Bluetooth interface.
+
+        @return flag indicating the new state of the Bluetooth interface
+        @rtype bool
+        @exception OSError raised to indicate an issue with the device
+        """
+        command = """
+def deactivate_ble():
+    import bluetooth
+
+    ble = bluetooth.BLE()
+    if ble.active():
+        ble.active(False)
+    print(ble.active())
+
+deactivate_ble()
+del deactivate_ble
+"""
+        out, err = self.executeCommands(command, mode=self._submitMode)
+        if err:
+            raise OSError(self._shortError(err))
+
+        return out.strip() == b"True"
+
+    def getDeviceScan(self, timeout=10):
+        """
+        Public method to perform a Bluetooth device scan.
+
+        @param timeout duration of the device scan in seconds (defaults
+            to 10)
+        @type int (optional)
+        @return tuple containing a dictionary with the scan results and
+            an error string
+        @rtype tuple of (dict, str)
+        """
+        from ..BluetoothDialogs.BluetoothAdvertisement import BluetoothAdvertisement
+
+        command = """
+_scan_done = False
+
+def ble_scan():
+    import bluetooth
+    import time
+    import ubinascii
+
+    IRQ_SCAN_RESULT = 5
+    IRQ_SCAN_DONE = 6
+
+    def _bleIrq(event, data):
+        global _scan_done
+        if event == IRQ_SCAN_RESULT:
+            addr_type, addr, adv_type, rssi, adv_data = data
+            if addr:
+                print({{
+                    'address': ubinascii.hexlify(addr,':').decode('utf-8'),
+                    'rssi': rssi,
+                    'adv_type': adv_type,
+                    'advertisement': bytes(adv_data),
+                }})
+        elif event == IRQ_SCAN_DONE:
+            _scan_done = True
+
+    ble = bluetooth.BLE()
+
+    ble_active = ble.active()
+    if not ble_active:
+        ble.active(True)
+
+    ble.irq(_bleIrq)
+    ble.gap_scan({0} * 1000, 1000000, 50000, True)
+    while not _scan_done:
+        time.sleep(0.2)
+
+    if not ble_active:
+        ble.active(False)
+
+ble_scan()
+del ble_scan, _scan_done
+""".format(
+            timeout
+        )
+        out, err = self.executeCommands(
+            command, mode=self._submitMode, timeout=(timeout + 5) * 1000
+        )
+        if err:
+            return {}, err
+
+        scanResults = {}
+        for line in out.decode("utf-8").splitlines():
+            res = ast.literal_eval(line)
+            address = res["address"]
+            if address not in scanResults:
+                scanResults[address] = BluetoothAdvertisement(address)
+            scanResults[address].update(
+                res["adv_type"], res["rssi"], res["advertisement"]
+            )
+
+        return scanResults, ""
+
+    ##################################################################
     ## Methods below implement Ethernet related methods
     ##################################################################
 

eric ide

mercurial