🦷

ぼくのかんがえたさいきょうのBLE通信

2023/12/19に公開

BLEを用いた開発をするにあたり、自分なりに良い設計方法を作りました

設計思想

モデル

  • サービスは一つだけ。そのサービスUUIDはNUS_UUIDと定める。
  • そのサービスの中にTXとRXという二つのキャラスタリスティックを設ける
  • セントラルからペリフェラルに送られたデータはRXキャラクタリスティックに格納する。
  • ペリフェラルからセントラルに送るデータはTXキャラクタリスティックに格納する

TXキャラクタリスティックの構造

要素1 デバイス名
要素2 payload

RXキャラクタリスティックの構造

要素1 命令のコマンド。例えば'measure'、'sleep'など
要素2 payload。例えばsleep時間など

プログラム設計思想

  • ペリフェラル側の動作は全てイベント(BLE.irq)で処理する。whileループさせない。
  • circuitPythonではBLE.irqが使えないので、whileループで処理する

プログラムの流れ

①(ペリフェラル側)アドバタイズ開始
②(セントラル側)周囲のデバイスをスキャン
③(セントラル側)目的のデバイスをクライアント登録
④ セントラル→ペリフェラルに測定指示
⑤ ペリフェラルで測定し、データをTXキャラクタリスティックに書き込む
⑥ セントラル側でデータの信憑性を確認。OKならDBに書き込む
⑦ セントラル→ペリフェラルにディープスリープ指示

再び起動した時は①から繰り返します。

プログラムの詳細

1.(ペリフェラル側)アドバタイズ開始

    def advertiser(self):
        name = bytes(self.name, 'UTF-8')
        adv_data = bytearray(b'x02x01x02') + bytearray((len(name) + 1, 0x09)) + name
        self.ble.gap_advertise(100, adv_data) # 100msごとにアドバタイズ
        print(f'start advertise, adv_data:{adv_data}')

2 (セントラル側)周囲のデバイスをスキャン

async def scan()->list[BleakClient]:
    """サービスのuuidがNUS_UUIDと一致したデバイスを、BleakClientのリスト形式で返します"""
    scanner = BleakScanner()
    devices = await scanner.discover(timeout=2)
    for i,device in enumerate(devices):
        print(f'{i+1} name:{device.name},address:{device.address}')

    tasks = [connect_device(device) for device in devices]

    return [client for client in await asyncio.gather(*tasks) if client is not None]

3. (セントラル側)目的のデバイスをクライアント登録


async def connect_device(device):
    """service.uuidがNUS_UUIDと一致したclientだけを返します"""
    try:
        async with BleakClient(device,timeout=2) as client:
            for service in client.services:
                if service.uuid == NUS_UUID:
                    print(f'{client.address} is target')
                    return client
    except Exception:
        return None

async def scan()->list[BleakClient]:
    """サービスのuuidがNUS_UUIDと一致したデバイスを、BleakClientのリスト形式で返します"""
    scanner = BleakScanner()
    devices = await scanner.discover(timeout=2)
    for i,device in enumerate(devices):
        print(f'{i+1} name:{device.name},address:{device.address}')

    tasks = [connect_device(device) for device in devices]

    return [client for client in await asyncio.gather(*tasks) if client is not None]

with構文を使うと、client.connect()とclient.disconnect()を裏でやってくれます。
asyncioを使うことで、複数のデバイスの接続作業を非同期でやってくれますので、完了までの時間を短縮できます。

4〜7 セントラル側からの指示とペリフェラル側の行動

指示と言っても、RXキャラクタリスティックに書き込んでいるだけです。
書き込みをすると、ペリフェラル側のBLE.irqイベントが発火しますので、RXキャラクタリスティックに書き込まれたデータを読み取って何らかのアクションをします。

セントラル側

async def task(client:BleakClient):
    """各デバイスとのデータのやりとりです"""
    async with client:
        # ペリフェラル側に'measure'指示
        instruction = ('measure',0)
        instruction_b = json.dumps(instruction).encode('utf-8')
        await client.write_gatt_char(TX_UUID,instruction_b,True)       

        # 待機時間は適宜調整
        await asyncio.sleep(5)

        # ペリフェラル側から測定結果を受け取る
        data_b = await client.read_gatt_char(RX_UUID)
        data = json.loads(data_b)
        print(data)

        # 測定結果が妥当か判断

        # DBに書き込み


        # deepsleepを指示
        instruction = ('sleep',10*1000)
        instruction_b = json.dumps(instruction).encode('utf-8')
        try:
            print('sleep指示を送るよ')
            await client.write_gatt_char(TX_UUID,instruction_b,True) # この処理はペリフェラルとの切断が切れてしまうのでエラーが発生する
            await asyncio.sleep(2)
            print('sleep指示を送ったよ')
        except Exception:
            print('clientの接続は切れました')
        print('finish!')

ペリフェラル側

    def ble_irq(self, event, data):        
        if event == 1: #_IRQ_CENTRAL_CONNECT:
            self.is_connected = True
            self.conn_handle = data[0]  # Save the connection handle
            print(f'connected!{data}')

        elif event == 2: #_IRQ_CENTRAL_DISCONNECT:
            print(f'disconnected!{data}')                         
            self.is_connected = False
            self.advertiser()
        
        elif event == 3: #_IRQ_GATTS_WRITE:
            buffer = self.ble.gatts_read(self.rx)
            data_str = buffer.decode('utf-8')
            instruction, val = ujson.loads(data_str)
            if instruction=='sleep':
                print('sleep!')
                deepsleep(val)

            elif instruction=='measure':
                data = measure()
                data_bytes = ujson.dumps(data).encode('utf-8')
                self.ble.gatts_write(self.tx, data_bytes)

        elif event == 4: #_IRQ_GATTS_READ:
            # print(f'gatts_read!,{data}')
            pass

ちなみに実践では以下のように進めています。

  1. セントラル側からRXキャラクタリスティックに('measure',)の書き込み
  2. ペリフェラル側では、BLE.irqのevent==3でRXキャラクタリスティックにmeasureが書き込まれたことを確認。measureメソッドを実行
  3. TXキャラクタリスティックに測定結果をjson形式で書き込み
  4. セントラル側ではTXキャラクタリスティックのデータを読み取り
  5. 読み取ったデータの信ぴょう性を確認 → もし疑わしいなら、もう一度測定するようペリフェラル側に指示(RXキャラクタリスティックに('measure',)の書き込み)
  6. 読み取ったデータをDBに書き込み
  7. セントラル側からペリフェラル側にsleep指示
    (スリープ時間は現在時刻から6時or18時までの残り時間。
    例えば3時間45分眠らせたい場合はgatts_writeに3*60*60*1000+45*60*1000と書き込む)
  8. ペリフェラル側では、BLE.irqのevent==3でRXキャラクタリスティックに('sleep',3*60*60*1000+45*60*1000)が書き込まれたことを確認。deepsleep(3*60*60*1000+45*60*1000)を実行

Discussion