DynamoDB Streams による Lambda 関数トリガーを Terraform で実装する
こんにちは。シンプルフォーム株式会社 にてインフラエンジニアをしています、山岸です。
DynamoDB Streams は、DynamoDB テーブルにおけるデータ変更イベントをキャプチャできる機能です。これにより、レコードの挿入・更新・削除といったイベントを Lambda 関数などのダウンストリームアプリケーションで非同期に処理できます。
機能自体は結構前からあるものですが、Terraform によるインフラ定義から Lambda 関数ハンドラの実装までを一気に扱っている記事が意外と少なかったので、今回備忘も兼ねて記事にしてみたいと思います。
実装
以下のようなシンプルなアーキテクチャを実装してみます。DynamoDB テーブル requests
で発生した INSERT イベントをキャプチャし、DynamoDB Streams 経由で Lambda 関数 requests-processor
に渡し、各レコードを処理します。
アプリケーションの中身は、ひとまず request_id
(ハッシュキー) と payload
(Map 属性) を持つ DynamoDB テーブル上のレコードを、Lambda 関数側でリクエスト毎に何らかの処理をする、といったものを想定します。
DynamoDB テーブル
まずは DynamoDB テーブルの Terraform 定義です。テーブル設計としては、ハッシュキーである request_id
のみ指定しておきます。
DynamoDB Streams を利用する場合、stream_enabled
を true
とした上で stream_view_type
を要件に合う適切な値に設定します。
resource "aws_dynamodb_table" "default" {
name = "requests-table"
hash_key = "request_id"
attribute {
name = "request_id"
type = "S"
}
billing_mode = "PAY_PER_REQUEST"
deletion_protection_enabled = false
# DynamoDB Streams
stream_enabled = true
stream_view_type = "NEW_IMAGE" # StreamViewType
# Server-side encryption with AWS managed KMS key
server_side_encryption {
enabled = true
}
}
StreamViewType は、ストリームレコードにどのデータを含めるかを制御するための設定です。指定可能な値は [ KEYS_ONLY
| NEW_IMAGE
| OLD_IMAGE
| NEW_AND_OLD_IMAGES
] です。各設定値ごとの挙動は以下の記事で詳しく説明されていました。
- DynamoDB Streamで渡されるeventデータの表示タイプごとの内容をまとめてみた - DevelopersIO
まとめを引用させて頂くと、各 StreamViewType・各レコード操作についてアプリケーション側で取得可能なデータは以下のようになります。
StreamViewType | INSERT | MODIFY | DELETE |
---|---|---|---|
KEYS_ONLY |
- | - | - |
NEW_IMAGE |
NewImage | NewImage | - |
OLD_IMAGE |
- | OldImage | OldImage |
NEW_AND_OLD_IMAGES |
NewImage | NewImage, OldImage | OldImage |
Lambda 関数
特に言及は不要かもしれませんが、Lambda 関数の Terraform 定義のサンプル実装も一応記載しておきます。以下はパッケージタイプとしてコンテナイメージを利用する場合の例です。
data "aws_ecr_repository" "default" {
name = var.ecr_image.repository_name
}
data "aws_ecr_image" "default" {
repository_name = var.ecr_image.repository_name
image_tag = var.ecr_image.image_tag
}
resource "aws_lambda_function" "default" {
function_name = "requests-processor"
role = aws_iam_role.default.arn
architectures = ["x86_64"]
timeout = 300
package_type = "Image"
image_uri = "${data.aws_ecr_repository.default.repository_url}@${data.aws_ecr_image.default.image_digest}"
image_config {
command = ["src/main.handler"]
}
environment {
variables = {}
}
vpc_config {
subnet_ids = var.network.private_subnet_ids
security_group_ids = var.network.security_group_ids
}
}
イベントソースマッピング
続いてイベントソースマッピングです。event_source_arn
には DynamoDB テーブル構築時に作成されたストリームの ARN を、function_name
には Lambda 関数の ARN を指定します。
data "aws_dynamodb_table" "requests" {
name = "requests"
}
resource "aws_lambda_event_source_mapping" "stream-requests-to-processor" {
event_source_arn = data.aws_dynamodb_table.requests.stream_arn
function_name = aws_lambda_function.default.arn
batch_size = 50
enabled = true
starting_position = "LATEST"
}
イベントソースマッピングが設定されると、マネジメントコンソール上で Lambda 関数詳細を開いた際の表示が以下のようになります。
Lambda 関数ハンドラ
各種リソースが正常に作成されていれば、DynamoDB テーブル上のデータ変更イベントによって Lambda 関数がトリガーされるようになっています。
Lambda に渡されるストリームレコードは以下のような形式になっています。StreamViewType に NEW_IMAGE
を指定しているため、INSERT, MODIFY イベントにおいて、"NEW_IMAGE"
がレコードに表れています。ただし、REMOVE 時もキーのみですがレコードとしては流れてきます。
INSERT 時のストリームレコード
{
"Records": [
{
"eventID": "e6c89addac325c0aa4f5034a3babdf9d",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "ap-northeast-1",
"dynamodb": {
"ApproximateCreationDateTime": 1713774119,
"Keys": {
"request_id": {
"S": "01hw2ebjb940aqbejqcb42z7cg"
}
},
"NewImage": {
"request_id": {
"S": "01hw2ebjb94pch8sd6e696ptwq"
},
"payload": {
"M": {"key": "val"}
}
},
"SequenceNumber": "32028700000000016141821660",
"SizeBytes": 248,
"StreamViewType": "NEW_IMAGE"
},
"eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/requests/stream/2024-04-17T23:46:37.621"
},
{
"eventID": "0e4bb56a42ded17c4b28a5c12bf0445c",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "ap-northeast-1",
"dynamodb": {
"ApproximateCreationDateTime": 1713774119,
"Keys": {
"request_id": {
"S": "01hw2ebjb94pch8sd6e696ptwq"
}
},
"NewImage": {
"request_id": {
"S": "01hw2ebjb94pch8sd6e696ptwq"
},
"payload": {
"M": {"key": "val"}
}
},
"SequenceNumber": "32028800000000016141821687",
"SizeBytes": 254,
"StreamViewType": "NEW_IMAGE"
},
"eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/requests/stream/2024-04-17T23:46:37.621"
}
]
}
MODIFY 時のストリームレコード
{
"Records": [
{
"eventID": "a731e36b327fe4814977190647d3d82c",
"eventName": "MODIFY",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "ap-northeast-1",
"dynamodb": {
"ApproximateCreationDateTime": 1714177678,
"Keys": {
"request_id": {
"S": "01hvrmxrn5f6k01ewsj1vtc05k"
}
},
"NewImage": {
"request_id": {
"S": "01hvrmxrn5f6k01ewsj1vtc05k"
},
"payload": {
"M": {"key": "val"}
}
},
"SequenceNumber": "53681800000000027444253289",
"SizeBytes": 224,
"StreamViewType": "NEW_IMAGE"
},
"eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/requests/stream/2024-04-17T23:46:37.621"
}
]
}
REMOVE 時のストリームレコード
{
"Records": [
{
"eventID": "73092f3dfa8bc5a28055f08a5eb3932a",
"eventName": "REMOVE",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "ap-northeast-1",
"dynamodb": {
"ApproximateCreationDateTime": 1714113756,
"Keys": {
"request_id": {
"S": "01hw2ebjb8v3sk49ry3r693x9p"
}
},
"SequenceNumber": "50515000000000057901517456",
"SizeBytes": 71,
"StreamViewType": "NEW_IMAGE"
},
"eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/requests/stream/2024-04-17T23:46:37.621"
}
]
}
今回は INSERT イベント発生時のみ何らかの処理をさせる想定なので、以下のサンプル実装ではイベント変数の Records
に含まれる eventName
の値が INSERT
の場合のみ、処理を継続して some_process()
関数を実行しています。
from aws_lambda_powertools import Logger
logger = Logger()
def some_process(request_id: str, payload: dict):
pass
@logger.inject_lambda_context(log_event=True)
def handler(event, context):
records = event.get("Records", [])
logger.info(records)
results = []
for record in records:
if record.get("eventName") != "INSERT":
continue
# Get new image
new_image = record.get("dynamodb", {}).get("NewImage")
logger.info(new_image)
# Get attributes from new image
request_id = new_image.get("request_id", {}).get("S")
payload = new_image.get("payload", {}).get("M")
# Process each record
result = some_process(request_id, payload)
results.append(result)
return {
"statusCode": 200,
"body": {
"Records": records,
"Results": results,
}
}
落穂拾い
イベントソースマッピングにおけるレコードフィルタリング
これまで見てきた通り、StreamViewType をいずれに設定しても、INSERT, MODIFY, REMOVE の全てのデータ変更イベントがストリームレコードとして流れてきます。しかし、今回のように INSERT のみが処理対象である場合、Lambda 関数ハンドラ内でレコードを無視するのではなく、そもそも Invoke 自体させないようにできるのが望ましいです。
これとは異なるユースケースではありますが、以下のようにイベントソースマッピングの設定でレコードフィルタリングを実現できるようです。(以下は、TTL によって削除されるレコードに対して、バックアップ等の処理をするようなケースを想定したドキュメントです)
- DynamoDB Streams and Time to Live - Amazon DynamoDB Developer Guide
この設定を利用して以下のようなフィルターを追加してみたところ、INSERT イベントが発生した場合のみ Lambda 関数が呼び出されるようになりました。
resource "aws_lambda_event_source_mapping" "stream-requests-to-processor" {
event_source_arn = data.aws_dynamodb_table.default["requests"].stream_arn
function_name = aws_lambda_function.default["processor"].arn
batch_size = 50
enabled = true
starting_position = "LATEST"
# INSERT イベントのみをフィルタリングを追加
filter_criteria {
filter {
pattern = jsonencode({
eventName = ["INSERT"]
})
}
}
}
以上、DynamoDB Streams の開発 Tips でした!
最後まで読んで頂き、ありがとうございました。
リアルタイム法人調査システム「SimpleCheck」を開発・運営するシンプルフォーム株式会社の開発チームのメンバーが、日々の開発で得た知見や試してみた技術などについて発信していきます。 Publication 運用への移行前の記事は zenn.dev/simpleform からご覧ください。
Discussion