AWS IoT Core → DynamoDB
はじめに
AWS IoT Core は Rule を使って受け取ったデータをいろんなサービスに渡すことができます。今回は DynamoDB にデータを渡すところのお話しです[1]。既にいろんなドキュメントがあるので新規要素は少なめです。
Rule は DynamoDB について DynamoDB と DynamoDBv2 をサポートしています。この違いについては以下を見るとざっくりわかります。
- IoT Coreルールアクションの DynamoDB と DynamoDBv2 の違いを調べてみた
- AWS IoT Coreのルールアクション DynamoDBとDynamoDBv2の違いを図解してみる
要はデータをフラットに扱いたいのであれば、とりあえず DynamoDBv2 使っとけ。Dynamo はPartition Key と(テーブル作成時に指定した場合は)Sort Key がないとデータを受け取らないのでこれらを Rule から渡すのを忘れるな。ということかと思います。
例えば今回のケースだと Dynamo のキーはこんな感じでデバイスごとに時系列にデータをいれてみる[2]
- テーブル名は
telemetry
- Partition Keyを
device_id
- String - Sort Keyを
time_stamp
- Number
Device からはこんな JSON を MQTT Topic telemetry
に送ることにします。Partition Key になる device_id
をちゃんと送ります。また Device のタイムスタンプ device_time
も付けておきます。
{
"device_id": "device_8",
"device_time": "1675499216153",
"temperature": 101.71,
"humidity": 100.14
}
ルールのクエリはこんな感じにして、Sort Key となる time_stamp
を追加して渡すようにします。つまり Sort Key はデバイスのタイムスタンプじゃなくて IoT Core のタイムスタンプです。
SELECT *, timestamp() as time_stamp FROM 'telemetry'
マッチしたメッセージの行き先はもちろんdynamoDBv2
でtelemetry
テーブル。
最初はなんやかんやで繋がらないので Rule に Error action を設定しておくとよい。Republish to AWS IoT topic にして、Republish 先のトピックを error
に設定。これを MQTT クライアントで Subscribe しておくと簡単にエラーの原因がわかり、切り分けが捗る。
DynamoDBのデータを確認
Scan
aws dynamodb scan \
--table-name telemetry \
--return-consumed-capacity TOTAL
Count
aws dynamodb scan --table-name telemetry --select "COUNT"
Partition Key
aws dynamodb query \
--table-name telemetry \
--key-condition-expression 'device_id = :device_id' \
--expression-attribute-values '{ ":device_id": { "S": "device_9" } }'
Partition Key and Sort key
aws dynamodb query \
--table-name telemetry \
--key-condition-expression 'device_id = :device_id and time_stamp = :time_stamp' \
--expression-attribute-values '{ ":device_id": { "S": "device_9" }, ":time_stamp": { "N": "1671333820303" } }'
パフォーマンス
デフォルトのキャパシティ設定だと、0.2秒毎に1000メッセージ送信した場合、DynamoDBでは79%, 77%の受信率。0.3秒毎だと100%が保存された。メッセージが多すぎるとスロットルが発生して ProvisionedThroughputExceededException
のエラーが帰ってくる。今回は Rule の設定によりトピック error にエラーを流すように設定。
"errorMessage": "DynamoDBv2 Put record failed. The error received was The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: 4SA6KER1VAJ9JFEEKIEJLFU7A7VV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null). Message arrived on: telemetry, Action: dynamoDBv2, Table: telemetry"
Discussion