SwitchBotAPIを利用して自宅の温湿度をBigQueryに定期転送する
モチベーション
- SwitchBot※のアプリはデータの保持日数に制限があるが、気にせずにデータを貯めておきたい
- 使ってみたいものがある
- Pub/SubのBigQueryサブスクリプション
- Workflows
※ SwitchBotはIoTデバイスのシリーズで、リモコン制御、温湿度計、カーテンレールロボットなどいろいろあります。それらのデバイスの状態取得や制御ができるAPIが公開されています。
全体アーキテクチャと各構成要素
Workflows
GCPのHTTPベースのジョブオーケストレーションのサービスです。
今回は、CloudFunctionsの実行とPub/Subへのpushといったワークフロー全体制御に利用しました。
オーケストレーション用のシステムを導入・構築したり管理したりしたくないので採用してみました。
(Workflows触ってみたかっただけ)
実行時は↓のようにdevice_idを指定します。
{"device_ids":["HOGEHOGE1234","FOOBAR3456"]}
このワークフローを、CloudSchedulerで毎分実行するようにスケジュールしました。
CloudFunctions
SwitchBotのAPIを叩くためだけのPythonスクリプトが乗っています。
作成したFunctionsをトリガーするURLをWorkflowsのtransferThermalInfo
のgetThermalInfo
内のURLに記載します。
Workflows内で実現できた気がする
Pub/SubとBigQueryサブスクリプション
BigQueryへの書き込みでは、使ってみたかったBigQueryサブスクリプションを利用しました。
下記諸々を作成し、Pub/Subにメッセージを投げるとBigQueryにレコードが追加されるようにしました。
Pub/Sub周りを用意する必要があるのと、BigQueryサブスクリプションが利用しているBigQuery Storage Write APIは取り込みで課金が発生するので、そのあたりを考えると素朴にINSERTクエリを投げるほうがが良かったかもしれない
-
Pub/Subスキーマの作成
テーブル書き込みを前提としたPub/Subトピックになるので、スキーマを作成しました。{ "type": "record", "name": "Avro", "fields": [ { "name": "device_id", "type": "string" }, { "name": "humidity", "type": "double" }, { "name": "temperature", "type": "double" }, { "name": "measured_at", "type": "string", "logicalType": "local-timestamp" } ] }
-
Pub/Subトピックの作成
「スキーマを使用する」で1.で作成したスキーマを指定しました。今回はBigQueryサブスクリプションを追って設定するので、「デフォルトのサブスクリプションを追加する」は外しました。
メッセージの保持はpushされ次第BigQueryに放り込むので設定しませんでした。また、作成したトピックIDはWorkflowsの
transferThermalInfo
のtransferToBigquery
に記入します。 -
BigQueryテーブルの作成
DDLのクエリをBigQuery上で実行して書き込み先のテーブルを作成しました。スキャン量やクエリパフォーマンス対策の為、パーティションとクラスタはよしなに切っておきます。
CREATE TABLE `project.dataset.table_name` ( device_id STRING NOT NULL OPTIONS(description="デバイスID"), measured_at DATETIME NOT NULL OPTIONS(description="測定日時(測定APIを叩いた時刻)"), temperature FLOAT64 OPTIONS(description="摂氏温度"), humidity FLOAT64 OPTIONS(description="湿度") ) PARTITION BY DATETIME_TRUNC(measured_at, MONTH) CLUSTER BY device_id;
-
BigQueryサブスクリプションの作成
2.で作成したトピックをサブスクし、3.で作成したテーブルにデータを投入するサブスクリプションを作成しました。トピックは2.で作成したトピックを指定します。
配信タイプで「BigQueryへの書き込み」を選択し、3.で作成したテーブルを入力します。
SchemaOptionは「トピックスキーマを使用する」を利用しました。(テーブルスキーマを利用できるっぽいので、Pub/Subスキーマなくても良かった説)
作ってみて
とりあえずデータが溜まるようになった
ベランダと自室に温度計があるのですが、5.8度とか出ているのがベランダで、21.8度のが自室です。
Workflowsが思ったよりお金かかる
ある日ふと請求を見たら毎日40円ぐらい課金されていて、そのうちのほとんどがWorkflowsのInternalStepsということがわかりました。
無料分で月5000ステップが用意されているのですが、いつの間にか突破してしまっていたようです…
改めて、直近実行ステップ数(45ぐらい)から毎分実行を1ヶ月続けるた場合を見積もってみました。
45 [step / execution] * 1 [execution / min] * 43,200 [min / month]
= 1,944,000 [steps/month] (→ $20/月 ぐらい)
現状無料トライアル分で賄えているのですが、$20は結構でかいので早急に改善しておきたいと思いました。
保守性を度外視してWorkflowsのステップ数を削るのはやりたくないのと、月5000ステップを切るのは多分無理なので、おそらくCloudFunctionsにすべて統合とかになるのかなと考えています。。
大方処理が完成して1回あたりのステップ数が見えた段階で見積もりをしておくべきでした。。
使ってみたかったものの使用感
Workflows
- GCP内のサービスを呼ぶのを組み合わせるのには便利そう
- 動作確認がデプロイしないとできないのが辛い
- 日次とか月次のバッチ処理の制御に良さそうだが、分単位の処理にはコスト的には向かなそう
BigQueryサブスクリプション
- イベントベースで書き込むのに便利そう
- データ取り込みでも課金が発生するので注意
これからやってみたいこと
防水温湿度計を使っているので絶対湿度も追加したり、dbtとかでBigQuery側のTransformをいい感じにしたりしたいけど、とりあえずまずはWorkflowsの課金をなんとかしたい🔥
03/15追記: Workflowsから早速CloudFunctionsに乗り換えました。
課金対策としてWorkflowsで行っていた処理をFunctionsにまとめてしまいました。
主な変更点
- 全体のフロー制御(そうはいってもSwitchBotAPIを叩いてPub/Subに流すだけ)をFunctionsで実施
- SwitchBotAPIはtenacityでよしなにリトライ
- デバイスIDはFunctions側では持たずCloudSchedulerがクエリパラメータとして渡す形にした
結果
課金収まりました
(青がWorkflowsで、3/12にデプロイしたため、3/13以降は料金が抑えられています)
試算
CloudFunctionsのコンソールを眺めた感じ99percentileで実行時間は7秒ぐらいだったので、1ヶ月あたりのリソース消費は
CPU: 0.083 * 7 [vCPU秒/回] * 43200[回/month] = 25099.2[vCPU秒/month]
メモリ: 256 * 7 [MiB秒/回] * 43200[回/month] = 7525[GiB秒/month]
です。
2024年3月現在CloudRunのCPUとメモリの突き当りの無料枠はそれぞれ毎月 180,000 vCPU 秒
、毎月 360,000 GiB 秒
なので、無料枠内に収まってしまいました。
念の為無料枠を超えて課金が発生した場合を計算すると、
CPU: 25099.2[vCPU秒/month] * 0.00002400 [USD/vCPU秒] ≒ 0.6 [USD/month]
メモリ: 7525[GiB秒/month] * 0.00000250 [USD/GiB秒] ≒ 0.018 [USD/month]
計: 0.6 + 0.018 [USD/month] ≒ 100 [円/月]
と月100円ぐらいなので、workflowsのときに比べて1/20以下になりました。
とりあえず課金減ってよかった。
このぐらいの簡単なフローであればWorkflowsまでは使わずにゴリゴリ書いちゃったほうが安上がりですね。。
Discussion