🙆

xiao nRF52480でBLE通信を試す(データ構造を3つ)

2023/06/10に公開

前回はESP32でBLE通信をmicropythonを使って書きました。
今回はxiao nRF52480を使います。

xiao nRF52480はファームウェアの仕様上、micropythonのubluetoothを使えません。
色々試しましたが、2023年6月現在、circuitPythonをMu editorで書く方法に落ち着いています。

キャラクタリスティックのプロパティはnotifyで書いています。

1. 単一サービスでJSONとする

webアプリを作っている者には、データ構造をJSONとした方が扱いやすいです。

payload = {
  "divice_label":"1号機",
  "data":{
    "temp":20.3,
    "humid":50.2
  }
}
クリックで展開

サンプルコードをさらに単純にしています。

まずはサービスクラスのSensorService。
(サンプルコードではsensorsという変数名でしたが、今回はpayloadという変数名に変えています。変数名は分かりやすければ何でも良いです)

payloadにJSONCharacteristicを用いることで、データをJSON形式で保持できるんですね。

from adafruit_ble.uuid import VendorUUID
from adafruit_ble.services import Service
from adafruit_ble.characteristics import Characteristic
from adafruit_ble.characteristics.json import JSONCharacteristic

class SensorService(Service):
    # pylint: disable=too-few-public-methods

    uuid = VendorUUID("51ad213f-e568-4e35-84e4-67af89c79ef0")

    payload = JSONCharacteristic(
        uuid=VendorUUID("e077bdec-f18b-4944-9e9e-8b3a815162b4"),
        properties=Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
    )

    def __init__(self, service=None):
        super().__init__(service=service)
        self.connectable = True

code.pyでは、SensorSeviceクラスを用いてserviceインスタンスを作成しています。
以下のコードで大事な部分はservice.payloadの部分だけです。
ここの部分にJSON形式のデータを指定すると、自動的にnotifyとしてセントラル側に飛んでいきます。

import time
import random
from ble_json_service import SensorService
from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement


# Create BLE radio, custom service, and advertisement.
ble = BLERadio()
service = SensorService()
advertisement = ProvideServicesAdvertisement(service)


# Function to get some fake weather sensor readings for this example in the desired unit.
def measure():
    temperature = random.uniform(0.0, 10.0)
    moisture = random.uniform(0.0, 100.0)
    return {"temperature": temperature, "moisture": moisture}


while True:
    print("Advertise services")
    ble.stop_advertising()  # you need to do this to stop any persistent old advertisement
    ble.start_advertising(advertisement)
    
    print("Waiting for connection...")
    while not ble.connected:
        led.on()
        blue_led.off()
        pass
    
    print("Connected")
    while ble.connected:
        service.payload = {
          "device_label": "1号機",
          "data": measure()
        }
    print("Disconnected")

2. 複数サービスを設ける方法

sensorsTとsensorsHの2つのサービスを設け、それぞれのサービスが温度、湿度を提供する方法です。

この方法の場合、受け取り側(ラズパイ側)ではsensorTとsensorHのそれぞれについて受け取った後に呼び出されるコールバック関数を記述する必要があります。

notifyでセントラル側に送る時は、str型で書く方法とbytearrayにする方法があります(セントラル側でデコードすればどちらでも人が読める形になります)

一般的にbytearrayは様々な通信プロトコル(TCP/IPやBluetoothなど)でデータを送受信する際の一般的な形式であるため、この形式に変換しておくと通信が容易になります。二つ目は、浮動小数点数のバイト列は原則として数値そのものの比較的小さい固定サイズの表現であるため、メモリや帯域幅を節約することができます。

ってchatGPTさんが言ってた。

nRF52480側

クリックで展開
# SPDX-FileCopyrightText: 2020 Mark Raleson
# SPDX-License-Identifier: MIT

from adafruit_ble.uuid import VendorUUID
from adafruit_ble.services import Service
from adafruit_ble.characteristics import Characteristic
# from adafruit_ble.characteristics.stream import StreamIn
from adafruit_ble.characteristics.stream import StreamOut


class SensorService(Service):

    uuid = VendorUUID("51ad213f-e568-4e35-84e4-67af89c79ef0")

    sensorsT = StreamOut(
        uuid=VendorUUID("e077bdec-f18b-4944-9e9e-8b3a815162b4"),
        properties=Characteristic.READ | Characteristic.NOTIFY,
    )

    sensorsH = StreamOut(
        uuid=VendorUUID("528ff74b-fdb8-444c-9c64-3dd5da4135ae"),
        properties=Characteristic.READ | Characteristic.NOTIFY,
    )

    def __init__(self, service=None):
        super().__init__(service=service)
        self.connectable = True
# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT
import time
import struct
import board
# import adafruit_ahtx0
from SwitchBot import SensorService
from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement

# Create BLE radio, custom service, and advertisement.
ble = BLERadio()
sService = SensorService()
advertisement = ProvideServicesAdvertisement(sService)
ble.name = "AHT20 Humidity&Temperature"

# Create sensor object, communicating over the board's default I2C bus
# i2c = board.I2C()  # uses board.SCL and board.SDA
# sensor = adafruit_ahtx0.AHTx0(i2c)

class Sensor():
    temperature:float
    relative_humidity:float
    
    def __init__(self):
        self.temperature = 20.3
        self.relative_humidity = 50.2

sensor = Sensor()

while True:
    print("Advertise services")
    ble.stop_advertising()  # you need to do this to stop any persistent old advertisement
    ble.start_advertising(advertisement)

    print("Waiting for connection...")
    while not ble.connected:
        pass
    ble.stop_advertising()
    print("Connected")
    while ble.connected:
        sensor.temperature = 23.7
        sensor.relative_humidity = 50.2
        print("\nTemperature: %0.1f C" % sensor.temperature)
        print("Humidity: %0.1f %%" % sensor.relative_humidity)
        sendTemperature = sService.sensorsT
#         sendTemperature.write(str(sensor.temperature))  # string
        sendTemperature.write(struct.pack('>f', sensor.temperature))  # float
        sendHumidity = sService.sensorsH
#         sendHumidity.write(str(sensor.relative_humidity))  # string
        sendHumidity.write(struct.pack('>f', sensor.relative_humidity))  # float
        time.sleep(3)

    print("Disconnected")

ラズパイ側

クリックで展開
from ast import Pass
import struct # bytearrayをfloatに変換する
import asyncio
from bleak import BleakScanner, BleakClient

# Nordic UART Service (NUS)
NUS_UUID = '51ad213f-e568-4e35-84e4-67af89c79ef0'
RX_UUID = '93222d1f-2837-4f1d-88d0-e30b6d1935e1'  # RX Characteristic UUID (from ESP32 to Computer)
TX_UUID = '4836c2f5-001a-4d2b-a67f-a2701b1354e5'

async def scan():
    deviceNum = input('デバイスは何台ありますか:')
    target_clients = []

    allConnected:bool=False

    while not allConnected:
        clients:list(BleakClient) = []
        # 1. 周囲のBLE発信をスキャン
        print('スキャン開始')
        scanner = BleakScanner()
        devices = await scanner.discover()

        for i, device in enumerate(devices):
            print(f'{i+1} name:{device.name},address:{device.address}')
            client = BleakClient(device)
            clients.append(client)

        for i, client in enumerate(clients):
            client:BleakClient
            try:
                await asyncio.wait_for(client.connect(),3)
                print(f'{i+1}番目の接続は{client.is_connected}')
                if client.is_connected:
                    print(f'コネクトできたクライアント address:{client.address},{client.services.services}')

                    # 2 serviceから、Characteristicの情報を得る
                    for j ,service in client.services.services.items():
                        print(f'クライアントのservice.uuid:{service.uuid},{j}')
                        if service.uuid == NUS_UUID:
                            print(f'device.address:{client.address} のservice_uuidは、ペリフェラルのNUS_UUIDと一致しました')
                            print(f"service uuid:{service.uuid}, description:{service.description}")
                            target_clients.append(client)
                            # for c in service.characteristics:                            
                            #     print(f'characteristic properties:{c.properties},characteristic uuid:{c.uuid}')
                            #     target_clients.append(client)
                        if len(target_clients) == int(deviceNum):
                            print('コネクトできたデバイス数がdevicesNumと等しくなったので、早期リターン')
                            allConnected = True
            except:
                await client.disconnect()
            finally:
                Pass
                

            if allConnected:
                break

    print('スキャン完了')
    return target_clients

# コールバック関数: データが送信されたときに呼び出されます
def notification_handler(sender: int, payload: bytearray, **_kwargs):
    temp, = struct.unpack('>f', payload)
    print(f'1,{temp}, {sender}')


def notification_handler2(sender: int, payload: bytearray, **_kwargs):
    temp, = struct.unpack('>f', payload)
    print(f'2,{temp}, {sender}')


async def run():
    # スキャンしたBluetooth発信機のうち、デバイスのTX_UUIDを知っているもの
    clients = await scan()
    print(len(clients))

    try:
        # 2. クライアント(ESP32などのデバイス)とデータのやり取りをする
        for client in clients:
            await client.connect()
            await client.start_notify('e077bdec-f18b-4944-9e9e-8b3a815162b4', notification_handler)
            await client.start_notify('528ff74b-fdb8-444c-9c64-3dd5da4135ae', notification_handler2)
            
        while True:
            # 実際のアプリケーションではここで何らかの処理を行います
            await asyncio.sleep(1.0)
    finally:
        print(14)
        # for client in clients:
        #     await client.stop_notify(RX_UUID)
        #     await client.disconnect()

# asyncio.run(scan())
asyncio.run(run())

3. UART

Discussion