部屋のCO2濃度をMQTT経由で取得してStreamlitで描画するまで
TL;DR
- CO2(TVOC/eCO2)濃度をマイコンで取得しMQTTブローカーに送信していたので、MQTTでデータ受信してCSVに保存、インタラクティブなデータ描画をするWebアプリをStreamlitで実現した。
- クラウドサービスは未使用。
はじめに
なんか作ろうの会の原田です。
原田イメージ
部屋のCO2濃度が高すぎると、めまい、頭痛など人の生活に良くない影響を及ぼすようです。[1]
以前それを知り、CO2の値が高かったら、GoogleHomeが「窓を開けましょう」と話しかけてくれる仕組みを作りました。それと同時に、そもそもCO2濃度をグラフなどで可視化したいと考えていました。
CO2濃度の値はTVOC/eCO2ガスセンサユニット及びマイコンM5Atomを通して得ており、データをMQTTブローカーに向けて送信をしていたので、あとはデータを取得して描画するだけの状態でした。
そしてスマートホーム基盤としてNode-REDを用いていたので、MQTTブローカーから簡易なグラフを出すだけならNode-REDのMQTTbrokerとdashboardだけで簡単に実現できました。
構築までの手間が少なくのは嬉しかったのですが、見た目とインタラクティブ性にあまり納得いきませんでしたw
もっとこう意味もなくグラフにズームしたいんや・・・とw
そこでひょんなことから知ったPythonのStreamlitを使ってデータを可視化することに取り組みました。
構成図
メッセージのやりとりにMQTTを使っているだけで、大したことはありません。
MQTTなので既存の仕組みに影響を及ぼさずに機能を追加できるのは嬉しいですね。
MQTTでデータを受信(Subscribe)、描画までの処理を今回Streamlitを使って作ったことになります。赤枠部分
Jetson Nano Developer Kitを用いていますが、Pythonが動いて動作に余裕ある機材なら何でも良いかなと。私も余っていた機材を用いました。
Node-REDはv3.1.3です。
コード
- MQTTブローカーへの接続:paho コード内ではDisconnect処理などはかなり端折っています。
- データの受信間隔:15秒(マイコン側でのデータ送信Publishの間隔)
- グラフの更新間隔:60秒
- データを受信したら毎回再描画とするのもできましたが、短時間で再描画するとズーム状態がリセットされてしまうので、グラフ描画はゆっくりで十分と判断しました。1回の描画更新あたり4点ほどデータが追加される形ですね。
- データを受信したらCSVに保存、グラフ描画時にCSVを開く形としました。
- Streamlitでの描画は散布図(st.scatter_chart)を使いました。
- CSVに保存するならStreamlitとは別の処理にしても良かったのですが、せっかくなのでデータ受信部分をworkerとして登録しました。
- 使う場合はMQTT_BROKER_IPに正しいMQTTブローカーのIPアドレスを入れてください。ここではRaspberry Pi4BのIPアドレスを指します。
import streamlit as st
import threading
import pandas as pd
import paho.mqtt.client as mqtt
import time
import datetime
LOCAL_CSV_FILE = "./data.csv"
MQTT_BROKER_IP = "192.168.XX.XX"
MQTT_BROKER_PORT = 1883
MQTT_TOPIC = "CO2Sensor"
# MQTT
## Subscribe MQTT Topic.
class Worker(threading.Thread):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.data = pd.DataFrame([], columns=['date', 'CO2[ppm]'])
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
def on_connect(self, client, userdata, flag, rc):
print("Connected with result code " + str(rc))
client.subscribe(MQTT_TOPIC)
def on_disconnect(self, client, userdata, rc):
if rc != 0:
print("Unexpected disconnection.")
def on_message(self, client, userdata, msg):
print("Received message :'" + str(msg.payload) + "', topic :'" + msg.topic + "', with QoS :" + str(msg.qos))
df = pd.DataFrame([[datetime.datetime.now(), int(msg.payload)]], columns=['date','CO2'])
df.to_csv(LOCAL_CSV_FILE, mode='a', header=False, index=False)
def run(self):
self.client.connect(MQTT_BROKER_IP, MQTT_BROKER_PORT, 60)
self.client.loop_forever()
# Streamlit
st.title("Environment of the room")
placeholder = st.empty()
## Start worker.
if "worker" not in st.session_state:
st.session_state.worker = Worker()
st.session_state.worker.start()
st.session_state.worker.data = []
## Show scatter chart.
while True:
st.session_state.worker.data = pd.read_csv(LOCAL_CSV_FILE, names=['date', 'CO2'], header=None)
placeholder.scatter_chart(st.session_state.worker.data[-100:], x='date', y='CO2', y_label='CO2[ppm]') ## Show last 100 points.
time.sleep(60) ## Wait 60 sec.
感想
描画した様子
横軸時間 - 縦軸CO2濃度(ppm)です
意味もなくデータにズームできて嬉しいw
正直DataFrameを表示させる部分は何も装飾なくかなり手抜きですが、それでも簡単に描画できるのは魅力ですね。
ある程度自由に描けそうなので、CO2濃度の急な上昇を検知したら別途通知なども表現しても面白いでしょうね。
今後は別センサの情報なども1画面内に表示させるなど、描画機能の充実を図りたいですね。
ColとContainerを併用したら、グリッド表示ができるみたいですし。
いろいろ表示させてみたくなりました。
考察
なぜマイコンは時刻データを送っていないのか?
コードを見て気付いた方もいるかもですが、MQTT経由で送られているのはCO2濃度のPPMの数値だけです。時刻はPythonの中で得ています。
これは、時刻を得る処理をマイコン側からなくして、マイコンでの処理を軽量化するためです。
その処理を行うのは計算リソースに余裕のある側に任せたほうが良いので、Streamlitを動作させている側で行っています。
なお通信分、およそ数msec-100msec分程度マイコンがデータ取得した時刻からは遅延します。
今回のユースケースでは遅延が気にならないレベルです。
頻度高くセンサーデータを取得したい場合はどうするか?
少し頻繁に、の場合は即時通信を使いMQTT。(MQTTの1回あたりの送信データ量制限範囲内であれば複数送信も可能)
もっと頻繁に、という場合は通信回数を下げるためにまとめて送る方針をとり、かつプロトコルを変えHTTPで送れば良いでしょう。
まとめて送信の場合はいつのデータか?を明らかにするために、RTC処理で取得した時刻情報も送信内容に必要でしょう。
案外その分の計算/データ量/通信処理が馬鹿にならないケースもあるでしょうね。そこまで求めるなら通信は忘れてエッジでストレージを持って一旦溜め込んでくれ~
MQTTからデータを受信したときに一度CSVに入れている理由は?代替は?
StreamlitがPandasのDataFrameを元にグラフを描けますが、CSVファイルを元にDataFrameを作るのが簡単だったからそうしたまでです。
MQTTからデータを受信したらCSVではなくRDBにいれるのは真っ当な道です。
StreamlitとしてもSQL接続 が用意されているようですし、RDBへのアクセスは容易でしょうね。
もちろん公式が推してるSnowflakeなどもDWHとして使ってみても面白いでしょうが、ホームユース勢としてはオーバースペックなんですよね。
それと、自宅外から閲覧したいという気持ちもなかったので、クラウドを用いる必要がどこにもなかった。
データの前処理など考慮してる?
まったくしていません。なので処理としてはかなり手抜きです。
データ欠損?データ重複時処理(QoS0送信なので今回は起きないが)?切断時再接続?CSV書き込みエラー?CSVの定期クリア?CSV全読み込みによるI/O負荷?
他にも考えれば色々ありますけど、細かく挙げればきりが無いので気になったら取りかかればいいかなとw
MQTTブローカーって便利?
やはり既存機能に影響を及ぼさずに新規機能をアドオンできるので楽ですね。
令和時代は「一家に一台MQTTブローカー」と割と本気で思っています。
スマートホームの基盤としてとても有用ですよ。自宅のなにかを色々自動化したい方におすすめです。
ここまで見ていただきありがとうございました。
Discussion