kafkaでBLE通信のデータを中継する
やりたいこと
サーバで受信したBLE(Bluetooth Low Energy)通信のアドバタイズメントパケットを、Kafkaを使って複数サーバへばらまきたいです。
なぜ?
我が家では、各部屋の温湿度センサからBLE通信経由でデータを取り、オブザーバビリティツールなりDBサーバなりに数値を保存しています。
このセンサは随時買い足していて、現在二つのメーカのセンサを運用しています。
本当は1メーカ1種のセンサに統一すると楽なのですが、都度最安値のものを選ぶことと勉強も兼ねて、複数種を運用することにしました。
すると、新種センサを追加する場合、BLE通信の中身を解析したりいろいろテストする状況が発生します。
BLE通信はUSBパススルー機能を使って物理サーバで受信したものを仮想サーバに渡しているので、常に1経路しか使えません。ということは解析や試験でこの仮想サーバを使うと、環境を汚してしまったり、通常運用していたセンサの計測を一時的に止めてしまうことになります。かといって物理サーバをもう1台買い足して開発系を作るのもなあ・・と思っていました。
そんな時、N:N通信のPUB/SUB(Kafka)について知る機会があったので、BLE通信の受け口はVM1台にして、BLE通信の内容Kafkaを使って複数VMにばらまけるような構成を作りたいと思いました。
前提
BLE通信をばらまくといっても、BLEの1:1通信(GATT)ではなくアドバタイズ通信(片方向)を対象としています。BLEではセンサ(ペリフェラル)が情報の収集先(セントラル)に対して定期的に自身の存在通知をしています。温湿度センサの場合、このアドバタイズに温湿度データを載せていることが多いので、アドバタイズデータをばらまくこととしました。ペリフェラルとセントラルがコネクションを張って双方向通信はじめた後の通信内容をばらまくにはメッセージキューイング以外の結構高度な内容が必要になると思われます。
参考:【サルでもわかるBLE入門】(2) アドバタイズとGATT通信
構成図
このような構成を作りました。
Physical Severで受けたBLE通信をPVEのUSBパススルー機能でVirtual Server 1へ送ります。Virtual Server 1上のプログラムがKafka Producerとなり、BLE通信の中のアドバタイズメントデータをKafka Brokerへ送信します。
- Physical Server Trigkey G4, Proxmox VE
- Virtual Server OS Debian 12.8
- Kafka Producer Pythonスクリプト
- Kafka Broker Kafka-python
- Kafka Consumer Pythonスクリプト
用意するもの
ライブラリ
手順
終始お世話になったのがこの記事。Kafkaを構成する要素がとても分かりやすく解説されている。
KafkaをローカルのDocker環境で、さくっと動かしてみました 第1回
Bluetoothデータの読み込みはbleakを使用。今回の用途ではほかのライブラリより使い勝手が良かった。
Kafka Producer
import asyncio
from bleak import BleakScanner
import time
import json
from datetime import date, datetime
import argparse
import pprint
from kafka import KafkaProducer
import base64
import pickle
# BrokerのIPアドレス:リッスンポート
KAFKA_BROKER = '192.168.1.119:29093'
# BLE通信のAdvertisementPacketからJSONデータ生成
def bluetooth_json_data(mac_addr, adv_data):
dt = datetime.now()
bt_item = json.dumps({
'mac_addr': mac_addr,
'datetime': dt.strftime('%Y/%m/%d %H:%M:%S.%f'),
#advertisementDataはオブジェクトなのでシリアライズ・BASE64エンコードでJSON化
'advertisement_data': base64.b64encode(pickle.dumps(adv_data)).decode('utf-8'),
}, ensure_ascii=False).encode('utf-8')
return bt_item
# Kafka Brokerへデータ送信
def to_kafkaBroker(producer, mac_addr, adv_data):
btjsondata = bluetooth_json_data(mac_addr, adv_data)
json_data = json.loads(btjsondata)
date = datetime.now().strftime("%Y/%m/%d")
#注:MACアドレスをTopicとして使用した場合、Topicにはコロンが使えないため、ハイフン等で置換が必要
#第1引数はkafka topic. 本実装ではMACアドレスをKeyとして使用
result = producer.send('Bluetooth', key=mac_addr.encode('utf-8'), value=json.dumps(json_data).encode('utf-8'))
async def main():
stop_event = asyncio.Event()
producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
# TODO: add something that calls stop_event.set()
def callback(device, advertising_data):
# TODO: do something with incoming data
# kafka brokerへデータ送信
to_kafkaBroker(producer, device.address, advertising_data)
async with BleakScanner(callback) as scanner:
# Important! Wait for an event to trigger stop, otherwise scanner
# will stop immediately.
await stop_event.wait()
# scanner stops when block exits
print("stop")
asyncio.run(main())
※Bluetooth通信にアクセスするために、Root権限での実行が必要。
kafka Consumer
外部からセンサ一覧のCSVファイルを読み込みます。Kafka Brokerからデータを引っ張り、中身を解析して目当てのデータはNewRelicへ送信します。
from bleak import BleakScanner
import time
import json
from datetime import date, datetime
import argparse
from kafka import KafkaConsumer
import base64
import pickle
import csv
import send_api_NR
# BrokerのIPアドレス:リッスンポート
KAFKA_BROKER = '192.168.1.119:29093'
# NewRelic用メタデータ
APIKEYFILE = 'NEWRELIC APIキーファイルの場所'
ENVIRONMENT = "production"
HOST = "ホスト名"
API_ENDPOINT = "https://metric-api.newrelic.com/metric/v1"
# バックグラウンド処理用
def consume_topic(consumer, sensor_data, apikey, mode):
#print(consumer.topics()) #Topic一覧
# Read data from kafka
try :
for message in consumer:
#BASE64デコード
pickled_adv_data = base64.b64decode(message.value['advertisement_data'])
#Pickle化(シリアライズ)解除
restored_adv_data = pickle.loads(pickled_adv_data)
for sensor in sensor_data:
if(sensor['mac_addr'] == message.key.decode()):
sensorVal = DecodeSensor(sensor['sensor_type'], restored_adv_data)
if(mode == 'TM'):
#ターミナル出力
print_terminal(sensor, sensorVal)
else:
#NewRelic出力
timestamp = int(time.time() * 1000)
json_str = create_JSON_NR(sensor, sensorVal, timestamp)
try:
send_api_NR.sendAPINewRelic(API_ENDPOINT, apikey, json.dumps(json_str))
excep Exception as e:
print(datetime.now())
print(e)
pass
else:
pass
except KeyboardInterrupt :
print('\r\n Output to Terminal - interrupted!')
return
except Exception as e:
print(e)
pass
# ターミナル表示用
def print_terminal(sensor, sensorVal):
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " , "
+ 'sensorNo=' + str(sensor['sensor_no']) + ', '
+ 'sensorLoc=' + sensor['sensor_location'] + ', '
+ '\ttemp=' + str(sensorVal['temp']) + ', humid=' + str(sensorVal['humid']) + ', battery=' + str(sensorVal['battery']) )
# NewRelic送付用JSONデータ作成
def create_JSON_NR(sensor, sensorVal, timestamp):
json_str = [
{
"common": {
"timestamp": timestamp,
"attributes": {
"host.name": HOST,
"environment": ENVIRONMENT,
"sensor_no": sensor['sensor_no']
}
},
"metrics": [
{
"name": "myhome.sensors.temperature",
"type": "gauge",
"value": sensorVal['temp'],
},
{
"name": "myhome.sensors.humidity",
"type": "gauge",
"value": sensorVal['humid'],
},
{
"name": "myhome.sensors.battery",
"type": "gauge",
"value": sensorVal['battery'],
}
]
}
]
return json_str
# Kafka Topic からのデータ受取
def get_kafka_topic():
# Initialize consumer variable and set property for JSON decode
consumer = KafkaConsumer ('Bluetooth',
bootstrap_servers = [KAFKA_BROKER],
#key_deserializer=lambda m: json.loads(m.decode('utf-8')),
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
return consumer
# BLE通信のadvertising dataから各センサデータを取り出す
def DecodeSensor(sensor_type, advertising_data):
sensorVal = {"temp": 0, "humid": 0, "battery": 0}
if(sensor_type == 'GV'): #GoVeeセンサ
data_manu = advertising_data.manufacturer_data.get(1)
if(data_manu[2]&0b10000000 == 0b10000000):
temp_t = (data_manu[2]&0b01111111) * 256*256 + data_manu[3] * 256 + data_manu[4]
sensorVal['temp'] = 0 - ((temp_t - (temp_t % 1000)) / 10000)
else:
temp_t = data_manu[2] * 256*256 + data_manu[3] * 256 + data_manu[4]
sensorVal['temp'] = (temp_t - (temp_t % 1000)) / 10000
sensorVal['humid'] = (temp_t % 1000) /10
sensorVal['battery'] = data_manu[5]
elif(sensor_type == 'SB'): #SwitchBotセンサ
data_manu = advertising_data.manufacturer_data.get(2409)
data_srv = advertising_data.service_data.get('0000fd3d-0000-1000-8000-00805f9b34fb')
sensorVal['temp'] = (data_manu[9] & 0b01111111) + (data_manu[8] & 0b00001111) / 10
sensorVal['humid'] = data_manu[10]
isOverZero=(data_manu[9] & 0b10000000)
if not isOverZero:
sensorVal['temp'] = 0 - sensorVal['temp']
if(data_srv != None):
sensorVal['battery'] = data_srv[2] & 0b01111111
return sensorVal
# センサ定義(CSV)から読み込んでリストで返す
def read_sensor_data(filename):
sensor_data = []
try:
with open(filename, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
# センサ番号を整数に変換
row['sensor_no'] = int(row['sensor_no'])
sensor_data.append(row)
except FileNotFoundError:
print(f"エラー:ファイル '{filename}' が見つかりませんでした。")
except Exception as e:
print(f"エラー:CSVファイルの読み込み中にエラーが発生しました:{e}")
return sensor_data
if __name__ == '__main__':
dt_now = datetime.now()
print('開始時刻')
print(dt_now)
parser = argparse.ArgumentParser(description='[Kafka Consumer]')
parser.add_argument ('--csv', type=str, default='sensors.csv', help='CSVファイル名')
parser.add_argument ('--mode', type=str, default='TM', help='TM or NR')
args = parser.parse_args()
print('センサーリスト')
sensor_data = read_sensor_data(args.csv)
if sensor_data:
for row in sensor_data:
print(row)
# New Relic インジェストライセンスキー取得
with open(APIKEYFILE, "r", encoding="utf-8") as f:
apikey = f.read()
apikey = apikey.rstrip()
start = time.time()
consumer = get_kafka_topic()
consume_topic(consumer, sensor_data, apikey, args.mode)
making_time = time.time() - start
print("")
print("Streamデータ取得待機時間:{0}".format(making_time) + " [sec]")
print("")
dt_now = datetime.now()
print('終了時刻')
print(dt_now)
kafka Broker
confluentが提供しているコンテナイメージを使っています。
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
hostname: zookeeper
container_name: zookeeper
ports:
- "32181:32181"
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:5.5.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
- "29093:29093"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
#PLAINTEXT:開発系コンテナ(broker以外)からアクセスするときのリスナ
#PLAINTEXT_HOST:開発系コンテナをホストしているデスクトップPCからアクセスするときのリスナ
#LISTENER_RPI:ラズパイなどコンテナをホストしているPC以外からアクセスするときのリスナ
# 外部のProducer側でのメソッドでdesktop:29092を宛先に使った場合、ブローカへの初期接続は可能だが、それに対するリスナからの
# 再接続先通知(advertised listenerがdocker networkの内側からしかアクセスできないホスト名(broker)で返信されてしまうため、外部から通信できない)
# そのため、外部接続用のリスナ29093が必要。
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,LISTENER_RPI:PLAINTEXT
#KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092,LISTENER_RPI://desktop:29093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092,LISTENER_RPI://192.168.1.119:29093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
KAFKA_LOG_RETENTION_MS: 1000 # 1時間
agent:
container_name: newrelic-infra
build:
context: .
dockerfile: newrelic-infra/newrelic-infra.dockerfile
cap_add:
- SYS_PTRACE
#network_mode: bridge
pid: host
privileged: true
#enviroment:
# TINI_SUBREAPER: 1
environment:
- TINI_SUBREAPER=1
- NRIA_LICENSE_KEY=4632f19b79e62dbb60e81352ae2e0527FFFFNRAL
volumes:
- "/:/host:ro"
- "/var/run/docker.sock:/var/run/docker.sock"
- "/var/run/docker.sock:/var/run/docker.sock:ro"
#- "/:/host:ro"
- "/run/containerd/containerd.sock:/run/containerd/containerd.sock:ro"
restart: unless-stopped
振り返り
- Kafkaを使ってデータの送信側と受信側を疎結合にできたのはいい勉強になりました。
- Kafkaの持っている各種非機能は全く活用できていないので、この程度のユースケースであれば他の手段が良かった可能性は大いにあります。
- Kafka Brokerにたまっているキューの状況など確認したいと思ったのですが、kafka-pythonではまだ用意されていないようです。(JAVA実装?ではありそう)NewRelic InfrastructureもエージェントがKafka使ってますか?観測しますか?と反応してくるのですが、Javaでないとだめそうです。
- 受信したBluetoothデバイスのMACアドレスを表示すると、我が家には結構な数のデバイスがあることが分かりました。
Discussion