🐡

Powertools for AWS (Python) を用いた DynamoDB Streams Trigger のカスタマイズ方法

2024/02/19に公開

こんにちは。hayata-yamamoto です。

今回は、Powertools for AWS (Python) を用いて、DynamoDB Streams で動く Lambda 関数を実装する方法を共有します。直近、ゼロETL統合が発表されたばかりの部分ですが、自前でハンドラを書きたいケースはおそらくまだまだ多いはずです。まずは、ゼロ統合ETLでシンプルに解決できるかを考えた後に、もしそれでも実装が必要なら本記事を参考にしてみてくださいませ!

https://aws.amazon.com/jp/blogs/news/amazon-dynamodb-zero-etl-integration-with-amazon-opensearch-service-is-now-generally-available/

https://docs.powertools.aws.dev/lambda/python/latest/utilities/data_classes/#dynamodb-streams

Powertools記事

DynamoDB Streams とは

公式によりますと、

DynamoDB Streams は、DynamoDB テーブル内の項目レベルの変更に関するシーケンスを時間順にキャプチャし、その情報を最大 24 時間ログに保存します。アプリケーションは、このログにアクセスし、データ項目の変更前および変更後の内容をほぼリアルタイムで参照できます。

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/Streams.html

と紹介されています。DynamoDB のアイテムの変更に関する情報を時間順に取得できるサービスです。この DynamoDB Streams は Lambda と統合されていて、アイテムの変更が発生した時に追加の処理をカスタマイズして実装することができる便利な機能となっています。

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/Streams.html

具体的な利用シーンは、「あるアイテムの情報が更新されたら別の DynamoDB や別のリソースに情報を反映したい」というシーンで単純なデータ同期の場合もあるでしょうし、CQRSのようなアーキテクチャを採用されている場合は、コマンドを一括に格納して別のデータベースに書き込むなどの利用用途も想像できます。

使い方

さて、そんな DynamoDB Streams を用いた Lambda 関数の実装方法ですが、Powertools の公式ではこちらで紹介されています。正直あまり説明がないのですが、雰囲気を感じるのには一読されることをお勧めします。

https://docs.powertools.aws.dev/lambda/python/latest/utilities/data_classes/#dynamodb-streams

今回は以下のようなシーケンスを想定した実装を紹介いたします。

実装例は以下のような感じになります。実際の処理の内容はケースバイケースですが、DynamoDB への書き込みだったり、別DBへの書き込み、S3などのストレージなどへの必要な情報のアップロードなどを追加すると良いでしょう。

from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord, DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext


def do_something(record: DynamoDBRecord) -> None:
    # DynamoDBRecord の中身はこちらで確認できます
    # https://github.com/aws-powertools/powertools-lambda-python/blob/9f1e42da9e513a92fc35a8c6ea0f7848894801b2/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py#L186

    if record.event_name == DynamoDBRecordEventName.REMOVE:
       # アイテムの削除イベントの時の処理を追加
       # イベントはいくつか種類があり、利用できるイベントは DynamoDB の設定によって決まります
       # https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling
       print(event) 
       return 
   if record.dynamodb and (new_image := record.dynamodb.new_image): 
       print(new_image)


@event_source(data_class=DynamoDBStreamEvent)
def handler(event: DynamoDBStreamEvent, _: LambdaContext) -> None:
    # event_source デコレターを書いとくのがお勧め
    # DynamoDBStreamEvent の中身はこちらで確認できます
    # https://github.com/aws-powertools/powertools-lambda-python/blob/9f1e42da9e513a92fc35a8c6ea0f7848894801b2/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py#L232
    print(f"{event.raw_event=}")
    try:
        for record in event.records:
            do_something(record) 
    except Exception as e: 
        # Streams の処理は時系列に行われるため、
        # 例外が起きて処理が止まることを許容できない場合は別途例外時の処理を書いておく方がよいです
        print(e) 

おわりに

もしご興味お持ちいただけた方は、以下の採用情報もご確認の上ぜひカジュアル面談などにお越しくださいませ。

トドケールテックブログ

Discussion