DynamoDB Streams による Lambda 関数トリガーを Terraform で実装する

こんにちは。シンプルフォーム株式会社 にてインフラエンジニアをしています、山岸です。

DynamoDB Streams は、DynamoDB テーブルにおけるデータ変更イベントをキャプチャできる機能です。これにより、レコードの挿入・更新・削除といったイベントを Lambda 関数などのダウンストリームアプリケーションで非同期に処理できます。

機能自体は結構前からあるものですが、Terraform によるインフラ定義から Lambda 関数ハンドラの実装までを一気に扱っている記事が意外と少なかったので、今回備忘も兼ねて記事にしてみたいと思います。

実装

以下のようなシンプルなアーキテクチャを実装してみます。DynamoDB テーブル requests で発生した INSERT イベントをキャプチャし、DynamoDB Streams 経由で Lambda 関数 requests-processor に渡し、各レコードを処理します。

アプリケーションの中身は、ひとまず request_id (ハッシュキー) と payload (Map 属性) を持つ DynamoDB テーブル上のレコードを、Lambda 関数側でリクエスト毎に何らかの処理をする、といったものを想定します。

DynamoDB テーブル

まずは DynamoDB テーブルの Terraform 定義です。テーブル設計としては、ハッシュキーである request_id のみ指定しておきます。

DynamoDB Streams を利用する場合、stream_enabledtrue とした上で stream_view_type を要件に合う適切な値に設定します。

resource "aws_dynamodb_table" "default" {
  name     = "requests-table"
  hash_key = "request_id"

  attribute {
    name = "request_id"
    type = "S"
  }

  billing_mode                = "PAY_PER_REQUEST"
  deletion_protection_enabled = false

  # DynamoDB Streams
  stream_enabled   = true
  stream_view_type = "NEW_IMAGE"  # StreamViewType

  # Server-side encryption with AWS managed KMS key
  server_side_encryption {
    enabled = true
  }
}

StreamViewType は、ストリームレコードにどのデータを含めるかを制御するための設定です。指定可能な値は [ KEYS_ONLY | NEW_IMAGE | OLD_IMAGE | NEW_AND_OLD_IMAGES ] です。各設定値ごとの挙動は以下の記事で詳しく説明されていました。

まとめを引用させて頂くと、各 StreamViewType・各レコード操作についてアプリケーション側で取得可能なデータは以下のようになります。

StreamViewType INSERT MODIFY DELETE
KEYS_ONLY - - -
NEW_IMAGE NewImage NewImage -
OLD_IMAGE - OldImage OldImage
NEW_AND_OLD_IMAGES NewImage NewImage, OldImage OldImage

Lambda 関数

特に言及は不要かもしれませんが、Lambda 関数の Terraform 定義のサンプル実装も一応記載しておきます。以下はパッケージタイプとしてコンテナイメージを利用する場合の例です。

data "aws_ecr_repository" "default" {
  name = var.ecr_image.repository_name
}
data "aws_ecr_image" "default" {
  repository_name = var.ecr_image.repository_name
  image_tag       = var.ecr_image.image_tag
}

resource "aws_lambda_function" "default" {
  function_name = "requests-processor"

  role          = aws_iam_role.default.arn
  architectures = ["x86_64"]
  timeout       = 300

  package_type = "Image"
  image_uri    = "${data.aws_ecr_repository.default.repository_url}@${data.aws_ecr_image.default.image_digest}"

  image_config {
    command = ["src/main.handler"]
  }
  environment {
    variables = {}
  }
  vpc_config {
    subnet_ids         = var.network.private_subnet_ids
    security_group_ids = var.network.security_group_ids
  }
}

イベントソースマッピング

続いてイベントソースマッピングです。event_source_arn には DynamoDB テーブル構築時に作成されたストリームの ARN を、function_name には Lambda 関数の ARN を指定します。

data "aws_dynamodb_table" "requests" {
  name = "requests"
}

resource "aws_lambda_event_source_mapping" "stream-requests-to-processor" {
  event_source_arn = data.aws_dynamodb_table.requests.stream_arn
  function_name    = aws_lambda_function.default.arn

  batch_size        = 50
  enabled           = true
  starting_position = "LATEST"
}

イベントソースマッピングが設定されると、マネジメントコンソール上で Lambda 関数詳細を開いた際の表示が以下のようになります。

Lambda 関数ハンドラ

各種リソースが正常に作成されていれば、DynamoDB テーブル上のデータ変更イベントによって Lambda 関数がトリガーされるようになっています。

Lambda に渡されるストリームレコードは以下のような形式になっています。StreamViewType に NEW_IMAGE を指定しているため、INSERT, MODIFY イベントにおいて、"NEW_IMAGE" がレコードに表れています。ただし、REMOVE 時もキーのみですがレコードとしては流れてきます。

INSERT 時のストリームレコード
{
    "Records": [
        {
            "eventID": "e6c89addac325c0aa4f5034a3babdf9d",
            "eventName": "INSERT",
            "eventVersion": "1.1",
            "eventSource": "aws:dynamodb",
            "awsRegion": "ap-northeast-1",
            "dynamodb": {
                "ApproximateCreationDateTime": 1713774119,
                "Keys": {
                    "request_id": {
                        "S": "01hw2ebjb940aqbejqcb42z7cg"
                    }
                },
                "NewImage": {
                    "request_id": {
                        "S": "01hw2ebjb94pch8sd6e696ptwq"
                    },
                     "payload": {
                        "M": {"key": "val"}
                    }
                },
                "SequenceNumber": "32028700000000016141821660",
                "SizeBytes": 248,
                "StreamViewType": "NEW_IMAGE"
            },
            "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/requests/stream/2024-04-17T23:46:37.621"
        },
        {
            "eventID": "0e4bb56a42ded17c4b28a5c12bf0445c",
            "eventName": "INSERT",
            "eventVersion": "1.1",
            "eventSource": "aws:dynamodb",
            "awsRegion": "ap-northeast-1",
            "dynamodb": {
                "ApproximateCreationDateTime": 1713774119,
                "Keys": {
                    "request_id": {
                        "S": "01hw2ebjb94pch8sd6e696ptwq"
                    }
                },
                "NewImage": {
                    "request_id": {
                        "S": "01hw2ebjb94pch8sd6e696ptwq"
                    },
                    "payload": {
                        "M": {"key": "val"}
                    }
                },
                "SequenceNumber": "32028800000000016141821687",
                "SizeBytes": 254,
                "StreamViewType": "NEW_IMAGE"
            },
            "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/requests/stream/2024-04-17T23:46:37.621"
        }
    ]
}
MODIFY 時のストリームレコード
{
    "Records": [
        {
            "eventID": "a731e36b327fe4814977190647d3d82c",
            "eventName": "MODIFY",
            "eventVersion": "1.1",
            "eventSource": "aws:dynamodb",
            "awsRegion": "ap-northeast-1",
            "dynamodb": {
                "ApproximateCreationDateTime": 1714177678,
                "Keys": {
                    "request_id": {
                        "S": "01hvrmxrn5f6k01ewsj1vtc05k"
                    }
                },
                "NewImage": {
                    "request_id": {
                        "S": "01hvrmxrn5f6k01ewsj1vtc05k"
                    },
                    "payload": {
                        "M": {"key": "val"}
                    }
                },
                "SequenceNumber": "53681800000000027444253289",
                "SizeBytes": 224,
                "StreamViewType": "NEW_IMAGE"
            },
            "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/requests/stream/2024-04-17T23:46:37.621"
        }
    ]
}
REMOVE 時のストリームレコード
{
    "Records": [
        {
            "eventID": "73092f3dfa8bc5a28055f08a5eb3932a",
            "eventName": "REMOVE",
            "eventVersion": "1.1",
            "eventSource": "aws:dynamodb",
            "awsRegion": "ap-northeast-1",
            "dynamodb": {
                "ApproximateCreationDateTime": 1714113756,
                "Keys": {
                    "request_id": {
                        "S": "01hw2ebjb8v3sk49ry3r693x9p"
                    }
                },
                "SequenceNumber": "50515000000000057901517456",
                "SizeBytes": 71,
                "StreamViewType": "NEW_IMAGE"
            },
            "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/requests/stream/2024-04-17T23:46:37.621"
        }
    ]
}

今回は INSERT イベント発生時のみ何らかの処理をさせる想定なので、以下のサンプル実装ではイベント変数の Records に含まれる eventName の値が INSERT の場合のみ、処理を継続して some_process() 関数を実行しています。

main.py
from aws_lambda_powertools import Logger

logger = Logger()

def some_process(request_id: str, payload: dict):
    pass

@logger.inject_lambda_context(log_event=True)
def handler(event, context):
    records = event.get("Records", [])
    logger.info(records)

    results = []
    for record in records:
        if record.get("eventName") != "INSERT":
            continue
        
        # Get new image
        new_image = record.get("dynamodb", {}).get("NewImage")
        logger.info(new_image)

        # Get attributes from new image
        request_id = new_image.get("request_id", {}).get("S")
        payload = new_image.get("payload", {}).get("M")

        # Process each record
        result = some_process(request_id, payload)
        results.append(result)

    return {
        "statusCode": 200,
        "body": {
            "Records": records,
            "Results": results,
        }
    }

落穂拾い

イベントソースマッピングにおけるレコードフィルタリング

これまで見てきた通り、StreamViewType をいずれに設定しても、INSERT, MODIFY, REMOVE の全てのデータ変更イベントがストリームレコードとして流れてきます。しかし、今回のように INSERT のみが処理対象である場合、Lambda 関数ハンドラ内でレコードを無視するのではなく、そもそも Invoke 自体させないようにできるのが望ましいです。

これとは異なるユースケースではありますが、以下のようにイベントソースマッピングの設定でレコードフィルタリングを実現できるようです。(以下は、TTL によって削除されるレコードに対して、バックアップ等の処理をするようなケースを想定したドキュメントです)

この設定を利用して以下のようなフィルターを追加してみたところ、INSERT イベントが発生した場合のみ Lambda 関数が呼び出されるようになりました。

resource "aws_lambda_event_source_mapping" "stream-requests-to-processor" {
  event_source_arn = data.aws_dynamodb_table.default["requests"].stream_arn
  function_name    = aws_lambda_function.default["processor"].arn

  batch_size        = 50
  enabled           = true
  starting_position = "LATEST"

  # INSERT イベントのみをフィルタリングを追加
  filter_criteria {
    filter {
      pattern = jsonencode({
        eventName = ["INSERT"]
      })
    }
  }
}

以上、DynamoDB Streams の開発 Tips でした!
最後まで読んで頂き、ありがとうございました。

SimpleForm Tech Blog

Discussion