📝
AWS IoT から Timestream にデータを書き込んでみた
デバイスから送信したデータを Timestream に書き込んでクエリしてみました。
前提
- ダミーデバイスとして Cloud9 を使用
01. モノの作成
IoT Core でモノを作成します。
- モノの名前: SimulatedDevice
- 証明書: 自動生成
上記以外はデフォルト設定です。
モノ作成後、各証明書をダウンロードします。

02. ポリシーの作成
IoT Core でポリシーを作成します。
- ポリシー名: IoTDevicePolicy
- ポリシーステートメント: 以下の内容
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:Connect",
"iot:Publish",
"iot:Subscribe",
"iot:Receive"
],
"Resource": "*"
}
]
}
上記以外はデフォルト設定です。
03. 証明書にポリシーをアタッチ
手順 01 で作成された証明書に手順 02 のポリシーをアタッチします。

04. Timestream データベースの作成
以下の設定でデータベースを作成します。
- 標準データベース
- データベース名: IoTDatabase
次に、以下の設定でテーブルを作成します。
- データベース: IoTDatabase
- テーブル名: SensorTable
- デフォルトパーティショニング
04. IoT ルールの作成
以下の設定で作成します。
- ルール名: IotToTimestreamRule
- SQL ステートメント: 以下の内容
SELECT temperature, humidity, timestamp() as ts FROM 'topic/sensor-data'
- ルールアクション: Timestream table
- データベース名: IoTDatabase
- テーブル名: SensorTable
- ディメンション名: device_id
- ディメンション値: ${device_id}
- タイムスタンプ値: ${timestamp()}
- タイムスタンプの単位: MILLISECONDS
- IAM ロール: 新しいロールを作成
05. Cloud9 環境でダミーデバイス作成
- デフォルト設定で Cloud9 環境を起動
- iot-test というフォルダを作成
- iot-test フォルダに手順 01 でダウンロードした証明書をアップロード
- Python ライブラリのインストール
$ sudo yum install -y python3
$ pip3 install AWSIoTPythonSDK
- Python スクリプトの作成
iot_publish.py
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import time
import json
import random
client = AWSIoTMQTTClient("MyClientID")
client.configureEndpoint("your-iot-endpoint", 8883)
client.configureCredentials("AmazonRootCA1.pem", "your-private.pem.key", "your-certificate.pem.crt")
client.configureOfflinePublishQueueing(-1)
client.configureDrainingFrequency(2)
client.configureConnectDisconnectTimeout(10)
client.configureMQTTOperationTimeout(5)
print("Connecting to AWS IoT...")
client.connect()
print("Connected!")
while True:
message = {
"device_id": "SimulatedDevice", # この行を追加
"temperature": round(random.uniform(20.0, 30.0), 2),
"humidity": round(random.uniform(40.0, 80.0), 2)
}
messageJson = json.dumps(message)
client.publish("topic/sensor-data", messageJson, 1) # トピック名を修正
print(f"Published: {messageJson}")
time.sleep(5)
-
your-iot-endpointは各アカウントの IoT エンドポイントに置換 -
your-private.pem.keyは手順 01 でダウンロードした .pem.key ファイル名に置換 -
your-certificate.pem.crtは手順 01 でダウンロードした .pem.crt ファイル名に置換
06. 動作確認
手順 05 で作成した Python スクリプトを実行します。
$ python iot_simulator.py
Connecting to AWS IoT...
Connected!
Published: {"temperature": 20.17, "humidity": 55.51}
Published: {"temperature": 22.05, "humidity": 51.7}
Published: {"temperature": 22.19, "humidity": 73.27}
上記のようなログが出力されれば IoT Core への送信は成功しています。
次に Timestream のテーブル > アクション > クエリテーブルで以下のクエリを実行します。
SELECT * FROM "IoTDatabase"."SensorTable"
ORDER BY time DESC
LIMIT 10
以下のようなクエリ結果が出力されれば Timestream への送信も成功しています。

Timestream 側でクエリ結果が表示されない場合は、IoT ルールのエラーアクションで CloudWatch Logs にログを出力してエラー内容を確認してください。
まとめ
今回は AWS IoT から Timestream にデータを書き込んでみました。
どなたかの参考になれば幸いです。
Discussion