WT901BLECLにBLE接続して設定をいじる
WitMotionのWT901BLECLをBLEで接続したときに、設定をいじったり他のデータを取る方法を考えます。
BLE接続をする方法はこちら:
設定を送信する場所
設定の変更や指示などは、あるCharacteristicにデータを書き込むことで実現できます。
WT901BLECLの持つCharacteristic一覧はこちらから入手できることがわかっています。
その中でnotifyと同じServiceに含まれwriteコマンドを持つもの、つまり0000ffe9で始まるUUIDが設定用のCharacteristicです。
0000ffe5-0000-1000-8000-00805f9a34fb: Unknown: ["['write-without-response', 'write'],0000ffe9-0000-1000-8000-00805f9a34fb", "['notify'],0000ffe4-0000-1000-8000-00805f9a34fb"]
データ送信間隔を変更する
デフォルトのセンサデータを送る間隔を変更することができます。
設定データはビット列で構成され、接続されたBleakClientに対してwrite_gatt_char()を呼ぶことで指定のCharacteristicに書き込むことができます。
例えば送信間隔を1 Hzに変更したい場合、上に習ってFF AA 03 03 00
というビット列を用意します。PythonではByte型を使って下のように簡潔に表現可能です。
await client.write_gatt_char("0000ffe9-0000-1000-8000-00805f9a34fb", b'\xFF\xAA\x03\x03\x00')
実際に動かすコードは以下のようになります。デフォルトでは10 Hzなので毎秒10回データが届きますが、設定変更すると毎秒1回に変化することがわかります。
import asyncio
from bleak import BleakClient
mac_address = "EA:78:B5:4D:E3:21"
def notification_handler(sender, data: bytearray):
decoded = [int.from_bytes(data[i:i+2], byteorder='little', signed=True) for i in range(2, len(data), 2)]
ax = decoded[0] / 32768.0 * 16 * 9.8
ay = decoded[1] / 32768.0 * 16 * 9.8
az = decoded[2] / 32768.0 * 16 * 9.8
wx = decoded[3] / 32768.0 * 2000
wy = decoded[4] / 32768.0 * 2000
wz = decoded[5] / 32768.0 * 2000
roll = decoded[6] / 32768.0 * 180
pitch = decoded[7] / 32768.0 * 180
yaw = decoded[8] / 32768.0 * 180
print(f"ax: {ax:.3f}, ay: {ay:.3f}, az: {az:.3f}, wx: {wx:.3f}, wy: {wy:.3f}, wz: {wz:.3f}, roll: {roll:.3f}, pitch: {pitch:.3f}, yaw: {yaw:.3f}")
async def run(address, loop):
async with BleakClient(address, loop=loop) as client:
is_connected = client.is_connected
print(f"Connected: {is_connected}")
await client.start_notify("0000ffe4-0000-1000-8000-00805f9a34fb", notification_handler)
# return rateを1 Hzに設定
await client.write_gatt_char("0000ffe9-0000-1000-8000-00805f9a34fb", b'\xFF\xAA\x03\x03\x00')
while True:
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
loop.run_until_complete(run(mac_address, loop))
キャリブレーション
IMU内のセンサー、特に地磁気センサーは環境により大きくずれるので、正確な値を得るためにはキャリブレーションが必要です。
キャリブレーションは開始コードを書き込んでスタートし、終了コードを書き込んで終了することができます。
例えば地磁気センサーのキャリブレーションは以下のようにできます。
# 地磁気キャリブレーション開始
await client.write_gatt_char("0000ffe9-0000-1000-8000-00805f9a34fb", b'\xFF\xAA\x01\x07\x00')
# 10秒間キャリブレーションする
await asyncio.sleep(10)
# キャリブレーション終了
await client.write_gatt_char("0000ffe9-0000-1000-8000-00805f9a34fb",b'\xFF\xAA\x01\x00\x00')
地磁気キャリブレーションは鉄や磁石から20cm以上離した状態で、3次元的に何周かIMUを回す必要があります。
特殊なデータを読み取る
デフォルトで送られてくるデータは、加速度・ジャイロ・角度の3つですが、レジスタの値を読み取るコマンドを使うことで他のデータにもアクセスすることができます。
コマンドはFF AA 27 XX 00
です。XXに読み取りたい最初のレジスタ番号を入れると、8つの連続したレジスタの値を返してくれます。
マニュアルにはMagnetic Field、Quaternion、Temperatureの読み方の記述があります。
このコマンドを書き込むと、notify用のCharacteristic(通常データと同じ)に1度だけレジスタの値を持つパケットが流れてきます。通常データとの区別は、2byte目のSignの値からできます(通常データは55 61 ...
でレジスタデータは55 71 ...
)。
実際に3種類のデータを読み取るコードは以下のようになります。
import asyncio
from bleak import BleakClient
import numpy as np
import matplotlib.pyplot as plt
address = "F8:1B:3D:20:A9:FE" # scan_ble_devices.pyで取得したMACアドレスを指定
def notification_handler(sender, data):
header_bit = data[0]
assert header_bit == 0x55
flag_bit = data[1] # 0x51 or 0x71
if flag_bit == 0x61:
# Default Data Packet
decoded = [int.from_bytes(data[i:i + 2], byteorder='little', signed=True) for i in range(2, len(data), 2)]
ax = decoded[0] / 32768.0 * 16 * 9.8
ay = decoded[1] / 32768.0 * 16 * 9.8
az = decoded[2] / 32768.0 * 16 * 9.8
wx = decoded[3] / 32768.0 * 2000
wy = decoded[4] / 32768.0 * 2000
wz = decoded[5] / 32768.0 * 2000
roll = decoded[6] / 32768.0 * 180
pitch = decoded[7] / 32768.0 * 180
yaw = decoded[8] / 32768.0 * 180
print(f"ax:{ax}, ay:{ay}, az:{az}, wx:{wx}, wy:{wy}, wz:{wz}, roll:{roll}, pitch:{pitch}, yaw:{yaw}")
elif flag_bit == 0x71:
# Single Return Data Packet
reg_l = data[2]
reg_h = data[3]
if reg_l == 0x3A and reg_h == 0x00:
# Magnetic Field
decoded = [int.from_bytes(data[i:i + 2], byteorder='little', signed=True) for i in range(4, 10, 2)]
hx = decoded[0]
hy = decoded[1]
hz = decoded[2]
print("Magnetic Field:", hx,hy,hz)
elif reg_l == 0x51 and reg_h == 0x00:
# Quaternion
decoded = [int.from_bytes(data[i:i + 2], byteorder='little', signed=True) for i in range(4, 12, 2)]
q0 = decoded[0] / 32768.0
q1 = decoded[1] / 32768.0
q2 = decoded[2] / 32768.0
q3 = decoded[3] / 32768.0
print("Quaternion:", q0,q1,q2,q3)
elif reg_l == 0x40 and reg_h == 0x00:
temperature = int.from_bytes(data[4:6], byteorder='little', signed=True) / 100
print("Temperature:", temperature)
else:
raise Exception("Unknown data type")
else:
raise Exception("Unknown data type: ", data.hex())
async def run(address, loop):
notify_id = "0000ffe4-0000-1000-8000-00805f9a34fb"
write_id = "0000ffe9-0000-1000-8000-00805f9a34fb"
async with BleakClient(address, loop=loop) as client:
x = client.is_connected
print("Connected: {0}".format(x))
await client.start_notify(notify_id, notification_handler)
# data rate -> 1 Hz
await client.write_gatt_char(write_id, b'\xFF\xAA\x03\x03\x00')
while True:
await asyncio.sleep(0.2)
# Magnetic Field
await client.write_gatt_char(write_id,b'\xFF\xAA\x27\x3A\x00')
await asyncio.sleep(0.2)
# Quaternion
await client.write_gatt_char(write_id,b'\xFF\xAA\x27\x51\x00')
await asyncio.sleep(0.2)
# Temperature
await client.write_gatt_char(write_id,b'\xFF\xAA\x27\x40\x00')
loop = asyncio.get_event_loop()
loop.run_until_complete(run(address, loop))
> python wt901_data.py
Connected: True
Magnetic Field: 235 -540 -7
ax:-0.35888671875, ay:0.30146484375000004, az:9.771289062500001, wx:0.0, wy:0.0, wz:0.0, roll:1.7138671875, pitch:2.1148681640625, yaw:156.26953125
Quaternion: 0.20574951171875 -0.014984130859375 0.018463134765625 0.978271484375
Temperature: 21.13
Magnetic Field: 234 -540 -5
Quaternion: 0.20574951171875 -0.01483154296875 0.01849365234375 0.978271484375
ax:-0.3541015625, ay:0.2966796875, az:9.766503906250001, wx:0.0, wy:0.0, wz:0.0, roll:1.724853515625, pitch:2.098388671875, yaw:156.26953125
Temperature: 21.19
Quaternion: 0.20574951171875 -0.0147705078125 0.01849365234375 0.978271484375
Magnetic Field: 234 -542 -5
データシートを見ると、レジスタ番号とデータの対応表が載っているため他のデータでも同様にして入手できると考えられます。
参考
Discussion