🦷
ぼくのかんがえたさいきょうのBLE通信
BLEを用いた開発をするにあたり、自分なりに良い設計方法を作りました
設計思想
モデル
- サービスは一つだけ。そのサービスUUIDはNUS_UUIDと定める。
- そのサービスの中にTXとRXという二つのキャラスタリスティックを設ける
- セントラルからペリフェラルに送られたデータはRXキャラクタリスティックに格納する。
- ペリフェラルからセントラルに送るデータはTXキャラクタリスティックに格納する
TXキャラクタリスティックの構造
要素1 デバイス名
要素2 payload
RXキャラクタリスティックの構造
要素1 命令のコマンド。例えば'measure'、'sleep'など
要素2 payload。例えばsleep時間など
プログラム設計思想
- ペリフェラル側の動作は全てイベント(BLE.irq)で処理する。whileループさせない。
- circuitPythonではBLE.irqが使えないので、whileループで処理する
プログラムの流れ
①(ペリフェラル側)アドバタイズ開始
②(セントラル側)周囲のデバイスをスキャン
③(セントラル側)目的のデバイスをクライアント登録
④ セントラル→ペリフェラルに測定指示
⑤ ペリフェラルで測定し、データをTXキャラクタリスティックに書き込む
⑥ セントラル側でデータの信憑性を確認。OKならDBに書き込む
⑦ セントラル→ペリフェラルにディープスリープ指示
再び起動した時は①から繰り返します。
プログラムの詳細
1.(ペリフェラル側)アドバタイズ開始
def advertiser(self):
name = bytes(self.name, 'UTF-8')
adv_data = bytearray(b'x02x01x02') + bytearray((len(name) + 1, 0x09)) + name
self.ble.gap_advertise(100, adv_data) # 100msごとにアドバタイズ
print(f'start advertise, adv_data:{adv_data}')
2 (セントラル側)周囲のデバイスをスキャン
async def scan()->list[BleakClient]:
"""サービスのuuidがNUS_UUIDと一致したデバイスを、BleakClientのリスト形式で返します"""
scanner = BleakScanner()
devices = await scanner.discover(timeout=2)
for i,device in enumerate(devices):
print(f'{i+1} name:{device.name},address:{device.address}')
tasks = [connect_device(device) for device in devices]
return [client for client in await asyncio.gather(*tasks) if client is not None]
3. (セントラル側)目的のデバイスをクライアント登録
async def connect_device(device):
"""service.uuidがNUS_UUIDと一致したclientだけを返します"""
try:
async with BleakClient(device,timeout=2) as client:
for service in client.services:
if service.uuid == NUS_UUID:
print(f'{client.address} is target')
return client
except Exception:
return None
async def scan()->list[BleakClient]:
"""サービスのuuidがNUS_UUIDと一致したデバイスを、BleakClientのリスト形式で返します"""
scanner = BleakScanner()
devices = await scanner.discover(timeout=2)
for i,device in enumerate(devices):
print(f'{i+1} name:{device.name},address:{device.address}')
tasks = [connect_device(device) for device in devices]
return [client for client in await asyncio.gather(*tasks) if client is not None]
with構文を使うと、client.connect()とclient.disconnect()を裏でやってくれます。
asyncioを使うことで、複数のデバイスの接続作業を非同期でやってくれますので、完了までの時間を短縮できます。
4〜7 セントラル側からの指示とペリフェラル側の行動
指示と言っても、RXキャラクタリスティックに書き込んでいるだけです。
書き込みをすると、ペリフェラル側のBLE.irqイベントが発火しますので、RXキャラクタリスティックに書き込まれたデータを読み取って何らかのアクションをします。
セントラル側
async def task(client:BleakClient):
"""各デバイスとのデータのやりとりです"""
async with client:
# ペリフェラル側に'measure'指示
instruction = ('measure',0)
instruction_b = json.dumps(instruction).encode('utf-8')
await client.write_gatt_char(TX_UUID,instruction_b,True)
# 待機時間は適宜調整
await asyncio.sleep(5)
# ペリフェラル側から測定結果を受け取る
data_b = await client.read_gatt_char(RX_UUID)
data = json.loads(data_b)
print(data)
# 測定結果が妥当か判断
# DBに書き込み
# deepsleepを指示
instruction = ('sleep',10*1000)
instruction_b = json.dumps(instruction).encode('utf-8')
try:
print('sleep指示を送るよ')
await client.write_gatt_char(TX_UUID,instruction_b,True) # この処理はペリフェラルとの切断が切れてしまうのでエラーが発生する
await asyncio.sleep(2)
print('sleep指示を送ったよ')
except Exception:
print('clientの接続は切れました')
print('finish!')
ペリフェラル側
def ble_irq(self, event, data):
if event == 1: #_IRQ_CENTRAL_CONNECT:
self.is_connected = True
self.conn_handle = data[0] # Save the connection handle
print(f'connected!{data}')
elif event == 2: #_IRQ_CENTRAL_DISCONNECT:
print(f'disconnected!{data}')
self.is_connected = False
self.advertiser()
elif event == 3: #_IRQ_GATTS_WRITE:
buffer = self.ble.gatts_read(self.rx)
data_str = buffer.decode('utf-8')
instruction, val = ujson.loads(data_str)
if instruction=='sleep':
print('sleep!')
deepsleep(val)
elif instruction=='measure':
data = measure()
data_bytes = ujson.dumps(data).encode('utf-8')
self.ble.gatts_write(self.tx, data_bytes)
elif event == 4: #_IRQ_GATTS_READ:
# print(f'gatts_read!,{data}')
pass
ちなみに実践では以下のように進めています。
- セントラル側からRXキャラクタリスティックに('measure',)の書き込み
- ペリフェラル側では、BLE.irqのevent==3でRXキャラクタリスティックにmeasureが書き込まれたことを確認。measureメソッドを実行
- TXキャラクタリスティックに測定結果をjson形式で書き込み
- セントラル側ではTXキャラクタリスティックのデータを読み取り
- 読み取ったデータの信ぴょう性を確認 → もし疑わしいなら、もう一度測定するようペリフェラル側に指示(RXキャラクタリスティックに('measure',)の書き込み)
- 読み取ったデータをDBに書き込み
- セントラル側からペリフェラル側にsleep指示
(スリープ時間は現在時刻から6時or18時までの残り時間。
例えば3時間45分眠らせたい場合はgatts_writeに3*60*60*1000+45*60*1000
と書き込む) - ペリフェラル側では、BLE.irqのevent==3でRXキャラクタリスティックに
('sleep',3*60*60*1000+45*60*1000)
が書き込まれたことを確認。deepsleep(3*60*60*1000+45*60*1000)
を実行
Discussion