🥌

AWS IoT Core → DynamoDB

2023/02/04に公開

はじめに

AWS IoT Core は Rule を使って受け取ったデータをいろんなサービスに渡すことができます。今回は DynamoDB にデータを渡すところのお話しです[1]。既にいろんなドキュメントがあるので新規要素は少なめです。

Rule は DynamoDB について DynamoDB と DynamoDBv2 をサポートしています。この違いについては以下を見るとざっくりわかります。

要はデータをフラットに扱いたいのであれば、とりあえず DynamoDBv2 使っとけ。Dynamo はPartition Key と(テーブル作成時に指定した場合は)Sort Key がないとデータを受け取らないのでこれらを Rule から渡すのを忘れるな。ということかと思います。

例えば今回のケースだと Dynamo のキーはこんな感じでデバイスごとに時系列にデータをいれてみる[2]

  • テーブル名は telemetry
  • Partition Keyをdevice_id - String
  • Sort Keyをtime_stamp - Number

Device からはこんな JSON を MQTT Topic telemetry に送ることにします。Partition Key になる device_id をちゃんと送ります。また Device のタイムスタンプ device_time も付けておきます。

{
  "device_id": "device_8",
  "device_time": "1675499216153",
  "temperature": 101.71,
  "humidity": 100.14
}

ルールのクエリはこんな感じにして、Sort Key となる time_stamp を追加して渡すようにします。つまり Sort Key はデバイスのタイムスタンプじゃなくて IoT Core のタイムスタンプです。

SELECT *, timestamp() as time_stamp FROM 'telemetry'

マッチしたメッセージの行き先はもちろんdynamoDBv2telemetryテーブル。

最初はなんやかんやで繋がらないので Rule に Error action を設定しておくとよい。Republish to AWS IoT topic にして、Republish 先のトピックを error に設定。これを MQTT クライアントで Subscribe しておくと簡単にエラーの原因がわかり、切り分けが捗る。

DynamoDBのデータを確認

Scan

aws dynamodb scan \
--table-name telemetry \
--return-consumed-capacity TOTAL

Count

aws dynamodb scan --table-name telemetry --select "COUNT"

Partition Key

aws dynamodb query \
--table-name telemetry \
--key-condition-expression 'device_id = :device_id' \
--expression-attribute-values '{ ":device_id": { "S": "device_9" } }'

Partition Key and Sort key

aws dynamodb query \
--table-name telemetry \
--key-condition-expression 'device_id = :device_id and time_stamp = :time_stamp' \
--expression-attribute-values '{ ":device_id": { "S": "device_9" }, ":time_stamp": { "N": "1671333820303" } }'

パフォーマンス

デフォルトのキャパシティ設定だと、0.2秒毎に1000メッセージ送信した場合、DynamoDBでは79%, 77%の受信率。0.3秒毎だと100%が保存された。メッセージが多すぎるとスロットルが発生して ProvisionedThroughputExceededException のエラーが帰ってくる。今回は Rule の設定によりトピック error にエラーを流すように設定。

"errorMessage": "DynamoDBv2 Put record failed. The error received was The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: 4SA6KER1VAJ9JFEEKIEJLFU7A7VV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null). Message arrived on: telemetry, Action: dynamoDBv2, Table: telemetry"
脚注
  1. 実際には直接 DynamoDB じゃなくて他のサービスでバッファすることも多い ↩︎

  2. 公式ドキュメントの例だとこれが逆で、Partition Keyをtime_stamp、Sort Keyをdevice_idにしているがこれのメリットはなんだろうか?時間をキーにしてクエリしますかね? ↩︎

Discussion