BLE接続でESP32とラズパイで相互通信する①
概要
BLE通信を理解するため、ここではESP32をペリフェラル、ラズパイ(3B+や4)をセントラルとして、データのやり取りをします。
BLE通信の概要
無線コネクト様の解説が超絶わかりやすい。サルレベルの知識の私でも理解できました。ウッキッキー♪
あとこちらのクラゲさん。この人のブログにもいつもお世話になってます。
デバイスの中に、複数のサービスがあり、サービスの中にデータの本体であるキャラクタリスティックがあるということ
アドバタイズやGATT通信といった基本用語は頭に入れときましょう。
今回はNUS(Nordic UART Service)というNordic社が提供するUARTサービスアプリケーションを使ってBLE通信を実装します。
よく知らないのですが、BLEの通信規格は様々なものが存在しており、NUSは通信規格の一つです。
シリアル通信のように実装できるので、わかりやすいのが特徴のようです。
実際のコードはこちらのYoutubeを参考にしました。
まあ、だいぶchatGPTさんに修正入れてもらいましたけどね。ペリフェラル(ESP32)側
BLE通信をするにあたって、ペリフェラルの役割は3つあります。
- サービスとキャラクタリスティックの構成
- アドバタイズを送信
- データ送受信(GATT通信)
class ESP32_BLE
class ESP32_BLE():
def __init__(self, name):
self.name = name
self.ble = ubluetooth.BLE()
self.ble.active(True)
self.disconnected()
self.ble.irq(self.ble_irq)
self.register()
self.advertiser()
def connected(self):
global is_ble_connected
is_ble_connected = True
def disconnected(self):
global is_ble_connected
is_ble_connected = False
def ble_irq(self, event, data):
global ble_msg
if event == 1: #_IRQ_CENTRAL_CONNECT:
# A central has connected to this peripheral
self.connected()
elif event == 2: #_IRQ_CENTRAL_DISCONNECT:
# A central has disconnected from this peripheral.
self.advertiser()
self.disconnected()
elif event == 3: #_IRQ_GATTS_WRITE:
# A client has written to this characteristic or descriptor.
buffer = self.ble.gatts_read(self.rx)
ble_msg = buffer.decode('UTF-8').strip()
def register(self):
BLE_NUS = ubluetooth.UUID(NUS_UUID)
BLE_RX = (ubluetooth.UUID(RX_UUID), ubluetooth.FLAG_WRITE)
BLE_TX = (ubluetooth.UUID(TX_UUID), ubluetooth.FLAG_NOTIFY)
BLE_UART = (BLE_NUS, (BLE_TX, BLE_RX,))
SERVICES = (BLE_UART, )
((self.tx, self.rx,), ) = self.ble.gatts_register_services(SERVICES)
def send(self, data:):
self.ble.gatts_notify(0, self.tx, data)
def advertiser(self):
name = bytes(self.name, 'UTF-8')
adv_data = bytearray(b'x02x01x02') + bytearray((len(name) + 1, 0x09)) + name
self.ble.gap_advertise(100, adv_data) # 100msごとにアドバタイズ
print(adv_data)
解説
register
def register(self):
BLE_NUS = ubluetooth.UUID(NUS_UUID)
BLE_RX = (ubluetooth.UUID(RX_UUID), ubluetooth.FLAG_WRITE)
BLE_TX = (ubluetooth.UUID(TX_UUID), ubluetooth.FLAG_NOTIFY)
BLE_UART = (BLE_NUS, (BLE_TX, BLE_RX,))
SERVICES = (BLE_UART, )
((self.tx, self.rx,), ) = self.ble.gatts_register_services(SERVICES)
最後のgatts_register_services(SERVICES)というのは、指定したサービスでサーバーを構成するという意味です。
サルでもわかるBLE基礎で、ひとつのデバイスには複数のサービスがあるという概念を読みました。
各サービス は サービス自体にUUID と キャラクタリスティック のリストを含む2項目のタプルで構成されています。
send
def send(self, data):
self.ble.gatts_notify(0, self.tx, data)
self.bleはBLEインスタンスを指しています。このインスタンスはESP32のBLE機能を制御します。
gatts_notifyは、GATT(Generic Attribute Profile)サーバーとして動作するBLEデバイスから、接続されたクライアントデバイスにデータを通知するためのメソッドです。
最初のパラメータ0は通知を送信する接続のインデックスを示しています。ほとんどの場合、0を使用します(これは通常、最初に接続されたデバイスを示します)。
self.txは送信するための特性(Characteristic)を指します。
ちなみにここで設定したTx_UUIDを手がかりに、ラズパイ側ではどのBluetooth発信デバイスをつなぐべきかを判断します
dataは実際に送信するデータです。このデータはバイト列またはバイト列に変換可能な形式である必要があります。dict型の場合はjson.dumps()でjson型に変換してから渡してやりましょう
advertiser
def advertiser(self):
name = bytes(self.name, 'UTF-8')
adv_data = bytearray(b'x02x01x02') + bytearray((len(name) + 1, 0x09)) + name
self.ble.gap_advertise(100, adv_data)
print(adv_data)
adv_dataはbyteで示すと、b'x02x01x02++ESP32BLE'となります。
bytearray(b'x02x01x02'):これはアドバタイジングパケットのフラグを設定しています。x02x01x02は、「長さ: 2バイト」「ADタイプ: 0x01」「フラグの値: 0x02」を表しています。フラグの値0x02は「一般的な発見可能モード」を意味します。
bytearray((len(name) + 1, 0x09)):これはESP32の名前をアドバタイジングパケットに追加するための長さとタイプを設定しています。len(name) + 1は名前の長さを示し、0x09は名前のフィールドタイプを示しています(この場合は完全なデバイス名)。
name:これは上記でUTF-8に変換したデバイス名です。
最後にself.ble.gap_advertise(100, adv_data)でアドバタイジングパケットを送信します。このメソッドはアドバタイジングの間隔(この場合は100ミリ秒)とパケットデータを引数に取ります。
実際のコード
クリックで展開
from machine import RTC
from time import sleep_ms
import ubluetooth
import ujson
ble_msg = ""
is_ble_connected = False
# Nordic UART Service (NUS)
NUS_UUID = 'a72c3f53-ac6f-4367-8d65-c855ee89acee'
RX_UUID = '4836c2f5-001a-4d2b-a67f-a2701b1354e5'
TX_UUID = '93222d1f-2837-4f1d-88d0-e30b6d1935e1'
ble = ESP32_BLE("ESP32BLE")
rtc = RTC()
rtc.datetime((2023,6,3,1,0,0,0,0))
while True:
if is_ble_connected:
now = rtc.datetime()
payload = {
"deviceName":"zero",
"data":{
"temp":now[5],
"hum":now[6]
}
}
ble.send(ujson.dumps(payload))
sleep_ms(1000)
仮で時刻を渡しています。
配信の確認
これで本当にデータを発信できているか確認するために、スマホでBLEをスキャンできるBLE Scannerを使います。
このようにESP32という名前で出てくるので、(この名前は変えられない?)、Connectをタップすると、DEVICE UUIDやADVERTIMENT DATAが表示されます。
ESP32の内容
DEVICE UUIDは、そのデバイス固有のUUIDで、自分で自由に設定することはできませんし、接続を切ってもこのUUIDは変わりません。
ADVERTIMENT DATAはアドバタイズに載せているデータだと思いますが、詳細はまだ理解していません。
ble=ESP32_BLE("ESP32BLE")のESP32BLEの文字列を変えてもDevice Local Nameは変わらないのでどうやら別物らしい
ESP32から発信されるデータはSERVICESに入っています。
SERVICES
サービスとして、NotifyとWriteがあります。
サンプルコードではESP32から現在時刻を発信していますので、それらしきデータはNotifyにありました。
なおdef registerで定めたRX-UUIDとTX-UUIDはここに表示されていますね。
(データの送受信に必要?)
セントラル(ラズパイ)側
esp32からbluetoothでデータが発信されていることを確認できたところで、ラズパイ側の実装をしていきましょう
1. 周囲のBLE発信をスキャン
scanner = BleakScanner()
devices = await scanner.discover()
clients = []
for device in devices:
print(f'name:{device.name},address:{device.address}')
name:None,address:96960FF0-A7DD-48FC-E9BB-16ED3FF71DB8
name:None,address:AF544B3C-75CA-175C-61B7-D153883ABD15
name:ESP32,address:6F2C7666-8A98-D478-4C08-31C166490C58
name:None,address:5B1BF915-9712-3A8F-00D2-E707A7C44B1E
name:None,address:3E2C877B-A66A-4566-1D53-F51072F03D3D
name:None,address:B0B034C6-C6C6-708E-A771-A5054CF9BB0A
・・・以下省略
nameにESP32とか書いてあるとわかりやすいんですけど、私が試した時には機種によってはESP32と書かれていないものもありました。
また、ESP32をリセットするたびにaddressが変わってしまいます(変わる時と変わらない時があります)
理由はよくわからないので、device.addressで機種を特定することは避け、後述するnotify_uuidで識別します。
2. 各デバイスと接続
client = BleakClient(device)
await client.connect()
if client.is_connect:
# Characteristicの情報を得る
for service in client.services:
print('---------------------')
print(f"service uuid:{service.uuid}, description:{service.description}")
[print(f'{c.properties},{c.uuid}') for c in service.characteristics]
await client.start_notify('93222d1f-2837-4f1d-88d0-e30b6d1935e1', notification_handler)
各デバイスと接続します。
デバイスには、サービスというフォルダのような概念があるということは冒頭で学んだ通りです。
サービスの一つ一つについて、ファイルに相当するキャラクタリスティックをみていきます。
キャラクタリスティックはproperty(notify, write, readのいずれか)と、ファイルIDに相当するuuid、そしてファイル内容に相当するvalueから構成されています。
(キャラクタリスティックはペリフェラル側で組み立てるのが一般的です。ESP32側で実装したESP32_BLEクラスのregisterメソッドを見てください)
service uuid:a72c3f53-ac6f-4367-8d65-c855ee89acee,description:Unknown
['notify'],93222d1f-2837-4f1d-88d0-e30b6d1935e1
['write'],4836c2f5-001a-4d2b-a67f-a2701b1354e5
ちなみにNUSを使った場合、これらのuuidは、以下のように対応しています
ESP32側 | ラズパイ側 |
---|---|
NUS_UUID | service uuid |
TX_UUID | notifyのcharacterristic uuid |
RX_UUID | writeのcharacterristic uuid |
1の説明で、デバイスのaddressは起動のたびに変わると説明しました。ですので、現状では自分が用意したESP32に限らず、そのへんのbluetoothデバイスとバカスカ接続してしまっています
自分が用意したESP32であれば、NUS_UUIDを決めているはずなので、これを使ってデバイスを特定すると良いでしょう。
3. データ通信
ペリフェラル側とセントラル側のデータのやり取りをGATT通信と呼びます。
- セントラルのリクエストに応じてペリフェラルがデータを返すread
- セントラルの値をペリフェラルに書き込むwrite
- ペリフェラルからセントラルにデータを送るnotify
これらをセントラル側で実装する方法はBleakのドキュメントを見たら良いのですが、一応解説します
# read
data:bytearray = client.read_gatt_char(c.uuid)
# write
client.write_gatt_char(c.uuid,data)
# notify
def notification_handler(sender: int, data: bytearray, **_kwargs):
print(f"Received: {data.decode()} (from {sender})")
await client.start_notify(c.uuid, notification_handler)
セントラル側コード
以上をまとめると、セントラル側のコードは以下のようになります
クリックで展開
import asyncio
from bleak import BleakScanner, BleakClient
# ESP32のデバイスを識別するためのUUID (これはデバイスにより異なります)
ESP32_UUIDs = ["6F2C7666-8A98-D478-4C08-31C166490C58"]
# Nordic UART Service (NUS)
NUS_UUID = '6e400002-b5a3-f393-e0a9-e50e24dcca9e'
RX_UUID = '93222d1f-2837-4f1d-88d0-e30b6d1935e1' # RX Characteristic UUID (from ESP32 to Computer)
# コールバック関数: データが送信されたときに呼び出されます
def notification_handler(sender: int, data: bytearray, **_kwargs):
print(f"Received: {data.decode()} (from {sender})")
async def run():
# 1. 周囲のBLE発信をスキャン
scanner = BleakScanner()
devices = await scanner.discover()
clients = []
for device in devices:
print(f'name:{device.name},address:{device.address}')
if device.address in ESP32_UUIDs:
client = BleakClient(device)
clients.append(client)
try:
# 2. クライアント(ESP32などのデバイス)とデータのやり取りをする
for client in clients:
await client.connect()
# Characteristicの情報を得るために記述。本番ではコメントアウトしても良い
for service in client.services:
print('---------------------')
print(f"service uuid:{service.uuid}, description:{service.description}")
[print(f'{c.properties},{c.uuid}') for c in service.characteristics]
await client.start_notify('93222d1f-2837-4f1d-88d0-e30b6d1935e1', notification_handler)
while True:
# 実際のアプリケーションではここで何らかの処理を行います
await asyncio.sleep(1.0)
finally:
print(14)
# for client in clients:
# await client.stop_notify(RX_UUID)
# await client.disconnect()
asyncio.run(run())
Discussion