MQTTとCloud IoT Core
業務の中でMQTTとCloud IoT Coreを利用したアーキテクチャパターンを見かけたので、実際に構成するとどんな感じなのかな?と思って実際にGCP上で構築してみました。
MQTT
MQTTというのは、HTTPと比べると通信量が軽量なプロトコルで、通信時のオーバーヘッドが少ないらしく、スペックの高くないIoT機器で主に用いられているそうです。その辺はあまり詳しく無いので、時間がある時にもう少し調べてみようと思います。
構成図
今回試してみたGCPの構成は以下の通りです。ローカルマシンからMQTTプロトコルでIoT CoreにPublishを行い、そこからPub/Sub、Dataflowを通ってBigQueryまでデータが連携されることを目指します。
※構成図はGoogle純正の構成図ツール(Google Cloud Architecture Diagramming Tool)で作成してみたんですが、何やらフォントがポップな感じになってます・・。設定が良くわからなかったのでそのままにしています。
環境構築
BigQuery
初めに、蓄積先のBigQueryテーブルを作成しておきます。
データセット→テーブルの順で作成します。
データセットの作成
データセットの作成
データセット名を適当に入力(mqtt_dataset)して、データセットを作成
テーブルの作成
作成したデータセット上に、データを格納したいテーブルを作成する
テーブル名は適当に入力する。(mqtt_table)
Computerから送られる想定のJSONデータのスキーマに合わせて、BigQueryのスキーマ名を設定しておく。今回は以下の様なデータを送信する想定とする。
{"text":"string data from computer"}
その他はデフォルト値、または適当に設定して、テーブルを作成する。
Pub/Sub・Dataflow
IoT Coreからデータを受け取るPub/Subの設定を行います。
トピックの作成
トピック名を適当に入力(mqtt_pubsub)して、トピックを作成
Dataflowジョブの作成
Pub/SubからBigQueryに対してデータを送る為のDataflowジョブテンプレートが予め用意されているので、以下を押します。
Dataflowジョブの作成画面が表示されるので、BigQuery output tableの欄に、先程作成したテーブルへのパスを入力します。
GCPプロジェクト名:mqtt_dataset.mqtt_table
また、一時的なデータ格納場所としてGCSのバケット名を指定する必要があります。都合の良いバケットが無ければ、適当に作成して指定して下さい。
入力が完了したら、ジョブを実行します。
Dataflowのジョブ画面で、以下のようにストリーミングとして実行中になっていたらOKです。
※私は初めにジョブ実行した時はエラーになってしまいました。permisson関係のエラーメッセージが表示されていました。初めて利用する際はDataflow APIが勝手に有効化されますが、有効化のタイミングとジョブの実行タイミングが良くなかったようで、少し時間を置いて再度ジョブを実行したら成功しました。
IoT Core
ここまで来て、ようやくIoT Coreの設定を行います。
レジストリの作成
IoT Coreのサービス画面より、レジストリの作成を初めに行います。
- レジストリIDを適当に入力(mqtt_registry)します。
- 先程作成したPub/Subトピック(mqtt_pubsub)を指定しておきます。ここで設定することで、IoT CoreにPublishされたデータがPub/SubにPublishされるようになるみたいです。
- プロトコルの設定は、今回はMQTTで送ってみるテストなので、HTTPのチェックを外しておきます。
デバイスの作成
レジストリを作成したら、次にデバイスを作成します。
一旦デバイスIDだけ適当に入力して作成します。
公開鍵・秘密鍵ペアの作成
Cloud Console等を利用して、デバイス認証用の公開鍵・秘密鍵ペアの作成を行います。
参考:https://cloud.google.com/iot/docs/how-tos/credentials/keys?hl=ja
Cloud Consoleを起動して、以下コマンドを実行
openssl genpkey -algorithm RSA -out rsa_private.pem -pkeyopt rsa_keygen_bits:2048
openssl rsa -in rsa_private.pem -pubout -out rsa_public.pem
以下の2ファイルが作成されるので、Cloud Consoleからダウンロードしておきます。
- rsa_private.pem
- rsa_public.pem
デバイスに公開鍵を設定
デバイスの画面に戻り、認証タブを開き、公開鍵を追加を押します。
先程ダウンロードしたrsq_public.pemをテキストエディタ等で開いて、中身をコピーし、「公開鍵の値」の欄にペーストし、追加します。
ここまで設定出来たら、Cloud IoT Coreに対してMQTTプロトコルでデータを送信することで、BigQueryにデータが流れるような構成となっているはずです。実際に試していきます。
PythonからMQTTプロトコルでデータを送信する
実際にPythonでデータを送信し、BigQueryに蓄積されるかを確認してみたいと思います。
環境準備
以下のサンプルコードを利用して、MQTTでデータを送ってみます。
上記のGithubから、必要なファイルだけダウンロードしてワークフォルダに格納します。
- requirements.txt
- cloudiot_mqtt_example.py
ワークフォルダには、先程ダウンロードした秘密鍵ファイル(rsa_private.pem)及びGoogleのクライアント証明書(https://pki.goog/roots.pem)をダウンロードして配置しておきます。
最終的にワークフォルダはこんな構成になりました。
- requirements.txt
- cloudiot_mqtt_example.py
- rsa_private.pem
- roots.pem
以下を実行して、必要なライブラリをインストールしておきます。
pip install -r requirements.txt
サンプルコードの微修正
サンプルコードそのままだと以下のようなテキストデータのみが送られるようで、テンプレートのDataflowジョブではエラーが発生し、BigQueryまでデータが辿り着きませんでした。
xxxx/mqqqdevice-payload-xx
大分力技ですが、{"text":"xxxx/mqqqdevice-payload-xx"}
というようなJSONデータが送られるようにサンプルコードを修正します。
cloudiot_mqtt_example.py : 560
import json
client.publish(mqtt_topic, json.dumps({"text":payload}), qos=1)
サンプルコードの実行
以下の様に、引数にレジストリID、プロジェクトID、デバイスID等、今回作成したものを適切に設定し、Python実行します。
python cloudiot_mqtt_example.py --registry_id=mqtt_registry --project_id=avidxxxxxx --device_id=mqtt_device --algorithm=RS256 --private_key_file=./rsa_private.pem --cloud_region=us-central1
こんな感じで、データが送られます。
Publishing message 1/100: 'mqtt_registry/mqtt_device-payload-1'
Publishing message 2/100: 'mqtt_registry/mqtt_device-payload-2'
送られたデータがBigQuery上で以下の様に蓄積されていることを確認できました。
こんな感じで、システムの構築自体は凄く簡単に行うことが出来ましたが、構成上はかなりの負荷に耐えられるシステムになっているはずです。その辺がクラウド利用の凄さであり、利点になるのかなと思います。
参考にさせて頂いたページ
今回参考にさせて頂いたページが以下の通りです。お陰様でスムーズな検証を行うことが出来ました。大変感謝です。
Discussion