📝

AWS IoT から Timestream にデータを書き込んでみた

に公開

デバイスから送信したデータを Timestream に書き込んでクエリしてみました。

前提

  • ダミーデバイスとして Cloud9 を使用

01. モノの作成

IoT Core でモノを作成します。

  • モノの名前: SimulatedDevice
  • 証明書: 自動生成

上記以外はデフォルト設定です。
モノ作成後、各証明書をダウンロードします。

02. ポリシーの作成

IoT Core でポリシーを作成します。

  • ポリシー名: IoTDevicePolicy
  • ポリシーステートメント: 以下の内容
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "iot:Connect",
        "iot:Publish",
        "iot:Subscribe",
        "iot:Receive"
      ],
      "Resource": "*"
    }
  ]
}

上記以外はデフォルト設定です。

03. 証明書にポリシーをアタッチ

手順 01 で作成された証明書に手順 02 のポリシーをアタッチします。

04. Timestream データベースの作成

以下の設定でデータベースを作成します。

  • 標準データベース
  • データベース名: IoTDatabase

次に、以下の設定でテーブルを作成します。

  • データベース: IoTDatabase
  • テーブル名: SensorTable
  • デフォルトパーティショニング

04. IoT ルールの作成

以下の設定で作成します。

  • ルール名: IotToTimestreamRule
  • SQL ステートメント: 以下の内容
SELECT temperature, humidity, timestamp() as ts FROM 'topic/sensor-data'
  • ルールアクション: Timestream table
  • データベース名: IoTDatabase
  • テーブル名: SensorTable
  • ディメンション名: device_id
  • ディメンション値: ${device_id}
  • タイムスタンプ値: ${timestamp()}
  • タイムスタンプの単位: MILLISECONDS
  • IAM ロール: 新しいロールを作成

05. Cloud9 環境でダミーデバイス作成

  • デフォルト設定で Cloud9 環境を起動
  • iot-test というフォルダを作成
  • iot-test フォルダに手順 01 でダウンロードした証明書をアップロード
  • Python ライブラリのインストール
$ sudo yum install -y python3
$ pip3 install AWSIoTPythonSDK
  • Python スクリプトの作成
iot_publish.py
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import time
import json
import random

client = AWSIoTMQTTClient("MyClientID")
client.configureEndpoint("your-iot-endpoint", 8883)
client.configureCredentials("AmazonRootCA1.pem", "your-private.pem.key", "your-certificate.pem.crt")
client.configureOfflinePublishQueueing(-1)
client.configureDrainingFrequency(2)
client.configureConnectDisconnectTimeout(10)
client.configureMQTTOperationTimeout(5)

print("Connecting to AWS IoT...")
client.connect()
print("Connected!")

while True:
    message = {
        "device_id": "SimulatedDevice",  # この行を追加
        "temperature": round(random.uniform(20.0, 30.0), 2),
        "humidity": round(random.uniform(40.0, 80.0), 2)
    }
    messageJson = json.dumps(message)
    client.publish("topic/sensor-data", messageJson, 1)  # トピック名を修正
    print(f"Published: {messageJson}")
    time.sleep(5)
  • your-iot-endpoint は各アカウントの IoT エンドポイントに置換
  • your-private.pem.key は手順 01 でダウンロードした .pem.key ファイル名に置換
  • your-certificate.pem.crt は手順 01 でダウンロードした .pem.crt ファイル名に置換

06. 動作確認

手順 05 で作成した Python スクリプトを実行します。

$ python iot_simulator.py
Connecting to AWS IoT...
Connected!
Published: {"temperature": 20.17, "humidity": 55.51}
Published: {"temperature": 22.05, "humidity": 51.7}
Published: {"temperature": 22.19, "humidity": 73.27}

上記のようなログが出力されれば IoT Core への送信は成功しています。
次に Timestream のテーブル > アクション > クエリテーブルで以下のクエリを実行します。

SELECT * FROM "IoTDatabase"."SensorTable" 
ORDER BY time DESC 
LIMIT 10

以下のようなクエリ結果が出力されれば Timestream への送信も成功しています。

Timestream 側でクエリ結果が表示されない場合は、IoT ルールのエラーアクションで CloudWatch Logs にログを出力してエラー内容を確認してください。

まとめ

今回は AWS IoT から Timestream にデータを書き込んでみました。
どなたかの参考になれば幸いです。

Discussion