🐓

Raspberry Pi Pico WでBLE通信を使ってみる(micropython)

2024/05/11に公開

はじめに

Raspberry Pi Pico WでBLE通信を実践してみたので、まとめてみました

対象とする読者

以下の条件を満たしている人向けです(中級者向け?)
そのため、細かい用語は解説しません

  • Raspberry Pi Pico Wの基本的な使い方が分かる
    • programの書き込み・実行
  • micropythonのコードが読める(ライブラリの使い方など)
  • BLE関連用語(Central, Peripheral等)

動作環境

書き込み用PC

  • Thonny:4.1.4

Raspberry Pi Pico W

  • micropython:v1.22.2
    ※aiobleを使うにはv1.17以上が必須

1. プログラムの作成

micropython公式のコードを流用して、以下の変更を加えます

  • Central側:受信した温度センサ値をplotする
  • Peripheral側:変数tの値を、内臓温度センサ値にする

ライブラリは、「aioble」を使用します(以下、Githubです)
https://github.com/micropython/micropython-lib/tree/master/micropython/bluetooth/aioble

1.1 central側(受信側)のプログラム

examples/temp_client.py を改変して作ります

プログラムと適当な解説コメントを記載しておきます(未来の自分向け)

from micropython import const
import uasyncio as asyncio
import aioble
import bluetooth
import struct

# UUIDの定義
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)  # org.bluetooth.service.environmental_sensing
_ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)  # org.bluetooth.characteristic.temperature


# Helper to decode the temperature characteristic encoding (sint16, hundredths of a degree).
def _decode_temperature(data):
    """
    受信したデータを、温度センサ値にデコードする関数

    具体的には、以下のことを行っている
    ・binaryをバイトに変換(unpack)
    ・温度センサ値を1/100する

    Peripheral側のコードにある_encode_temperature関数 と対の関係になる
    """
    return struct.unpack("<h", data)[0] / 100


async def find_temp_sensor():
    # 5秒間、周辺のAdvertiseをスキャン
    async with aioble.scan(5000, interval_us=30000, window_us=30000, active=True) as scanner:
        async for result in scanner:
            # 目的の端末(「名称がmpy-temp」かつ「サービスに環境センシングが含まれている」)の場合、デバイス情報を返す
            if result.name() == "mpy-temp" and _ENV_SENSE_UUID in result.services():
                return result.device
    return None


async def main():
    device = await find_temp_sensor()
    if not device:  # 目的の端末が見つからなかった場合
        print("Temperature sensor not found")
        return

    # 目的の端末が見つかった場合、以下の処理も実行される
    try:
        print("Connecting to", device)
        connection = await device.connect()  # 端末に接続を試みる
    except asyncio.TimeoutError:
        print("Timeout during connection")  # 接続がTimeoutErrorとなった場合
        return

    # device.connect()が成功した場合、以下の処理も実行される
    async with connection:
        try:
            temp_service = await connection.service(_ENV_SENSE_UUID)  # 環境センシングのサービス
            temp_characteristic = await temp_service.characteristic(_ENV_SENSE_TEMP_UUID)
        except asyncio.TimeoutError:
            print("Timeout discovering services/characteristics")
            return

        # キャラクタリスティックが取得できた場合、以下の処理も実行される
        while True:
            temp_deg_c = _decode_temperature(await temp_characteristic.read())  # 受信データから温度センサ値を復元
            print("Temperature: {:.2f}".format(temp_deg_c))  # Thonnyのplotterに表示する
            await asyncio.sleep_ms(1000)  # 1000ms一時定理


asyncio.run(main())

受信した温度センサ値をplotする

print文を1行追加するだけです。 ラクラク~

        while True:
            temp_deg_c = _decode_temperature(await temp_characteristic.read())
            print("Temperature: {:.2f}".format(temp_deg_c))  # 追加したコード
            await asyncio.sleep_ms(1000)

1.2 Peripheral側(送信側)のプログラム

examples/temp_sensor.py を改変して作ります

こちらも、プログラムと適当な解説コメントを記載しておきます(需要は不明)

from micropython import const
import uasyncio as asyncio
import aioble
import bluetooth
import struct


# UUIDの定義等
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)  # org.bluetooth.service.environmental_sensing
_ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)  # org.bluetooth.characteristic.temperature
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)  # org.bluetooth.characteristic.gap.appearance.xml

_ADV_INTERVAL_MS = 250_000  # Advertiseの送信頻度(ms)


# GATTサーバの登録
temp_service = aioble.Service(_ENV_SENSE_UUID)  # サービスの定義
temp_characteristic = aioble.Characteristic(
    temp_service, _ENV_SENSE_TEMP_UUID, read=True, notify=True
)  # 温度キャラクタリスティックの設定(読み出し・通知可能)
aioble.register_services(temp_service)  # サービスの登録(環境センシング)

# 追加分
sensor_temp = machine.ADC(4)  # 内臓温度センサ値取得用のPINを定義
conversion_factor = 3.3 / (65535)  # 変換式の係数を事前に計算


# 追加分 内臓温度センサ値を取得
def _read_temperature():
    reading = sensor_temp.read_u16() * conversion_factor
    temperature = 27 - (reading - 0.706)/0.001721
    return temperature


def _encode_temperature(temp_deg_c):
    """
    温度センサ値を、送信用のフォーマットにエンコードする関数
    具体的には、以下のことを行っている
    ・温度センサ値を100倍(整数値にする)
    ・バイト列をbinaryに変換(pack)

    Central側のコードにある_decode_temperature関数 と対の関係になる
    """
    return struct.pack("<h", int(temp_deg_c * 100))


async def sensor_task():
    """
    送信する温度センサ値を更新する関数
    ・温度センサ値を読み取り
    ・キャラクタリスティックに書き込み
    ・1000ms一時停止
    """
    # t = 24.5  # ここは削除
    while True:
        # t += random.uniform(-0.5, 0.5)
        t = _read_temperature()  # 実際の内臓温度センサ値を使用
        temp_characteristic.write(_encode_temperature(t))
        await asyncio.sleep_ms(1000)


async def peripheral_task():
    """
    接続するまでadvertiseを送信
     接続した場合、つながったCentral端末のMacアドレスを表示
     接続が切れるまで、待つ(await)
    """
    while True:
        async with await aioble.advertise(
            _ADV_INTERVAL_MS,
            name="mpy-temp",
            services=[_ENV_SENSE_UUID],
            appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER,
        ) as connection:
            print("Connection from", connection.device)
            await connection.disconnected()


async def main():
    """
    2つのタスクをasyncで動かす関数
     t1(task1):温度センサ値の更新タスク
     t2(task2):Peripheralとしてのタスク(centralとの接続等)
    """
    t1 = asyncio.create_task(sensor_task())
    t2 = asyncio.create_task(peripheral_task())
    await asyncio.gather(t1, t2)


asyncio.run(main())

1.2.1 温度センサ値取得の準備

pico Wの内臓温度センサ値は、pin4を使用して取得可能
ADCで電圧値を読み取り、温度値に変換します

# 追加分
sensor_temp = machine.ADC(4)  # 内臓温度センサ値取得用のPINを定義
conversion_factor = 3.3 / (65535)  # 変換式の係数を事前に計算

# 追加分 内臓温度センサ値を取得
def _read_temperature():
    reading = sensor_temp.read_u16() * conversion_factor
    temperature = 27 - (reading - 0.706)/0.001721
    return temperature

Peripheral側のプログラムは、PCに接続した状態で使用するため、書き込まなくてOKです。

1.2.2 温度値をtaskで変数tに書き込む

先ほど追加した_read_temperature()関数の値を、変数tに代入するようにします
(元のコードの一部をコメントアウトしてます)

# This would be periodically polling a hardware sensor.
async def sensor_task():
    # t = 24.5
    while True:
        # t += random.uniform(-0.5, 0.5)
        t = _read_temperature()  # 実際の内臓温度センサ値を使用
        temp_characteristic.write(_encode_temperature(t))
        await asyncio.sleep_ms(1000)

Peripheral側は、モバイルバッテリーで駆動させるため「main.py」というファイル名で書き込みしておきます

これで、準備完了です。

2. 実際に動かす

実行前の配線等を確認しておきます

  • (右)Peripheral側の端末:モバイルバッテリーでmicroUSB経由で給電
  • (左)Central側の端末:PCにUSB経由で接続し、Thonnyから実行する

2.1 実行結果

PC側のThonnyのplot画面に、Peripheralから送付された温度センサ値がグラフとして表示されました

念のため、実物の画面も載せておきます

3. 動かしてみて気が付いたこと

実際に動かしてみて気が付いた課題を記載しておきます

3.1 たまにErrorが発生する

下記のようにCentral側のプログラムにてTypeErrorが発生することがある。
実際に使用する際はTry-Except等で復帰できるように対処したほうがよさそう。

Error時の出力(Peripheral側)

「CancelledError:」というものが発生する

Error時の出力(Central側)

「TypeError: can't convert NoneType to int」が発生する

3.2 内臓温度センサの精度は低い

(これは公式も言ってることですが)pico Wの内臓温度センサはそんなに精度良くないです
値が飛ぶこともあります(突発的に25℃くらいまで下がったりしている)

まとめ

  • Raspberry Pi Pico WのBLE通信のサンプルコードを(ちょっと変えて)動かしてみた
  • aiobleは元のbluetoothライブラリよりも使いやすく、短いコードでBLE通信を行うことができる
  • BLE通信の専門用語については、別途勉強する必要がある
    • サービスの概念、キャラクタリスティック等、用語や概念を理解していないとプログラムを理解するのが難しい

Discussion