💡

DynamoDB StreamをトリガーにLambdaを実行する処理をCDKで書いてみた

2023/04/28に公開

はじめに

  • DynamoDB StreamとLambdaを使ってDynamoDBのデータに変更があった場合(追加、編集、削除)に、その変更内容を別アプリに自動で反映する処理を実装しました。
  • 自動化した処理をCDK化しました。

DynamoDB Streamとは

DynamoDB Streamは、Amazon Web ServicesのDynamoDBサービスに含まれる機能の一つで、DynamoDBテーブルの変更をリアルタイムでキャプチャし、変更を処理するアプリケーションとの間でデータを配信するためのサービスです。DynamoDB Streamを使用することで、アプリケーション開発者は、DynamoDBテーブルの変更を非同期的に受信し、変更を処理することができます。この機能は、サーバーレスアーキテクチャを使用する場合に特に有用であり、システムの拡張性と柔軟性を高めるのに役立ちます。

やりたいこと(イメージ図)

1.DynamoDBに対する項目の追加、編集、削除をDynamoDB StreamのイベントとしてLambdaに渡す
2.イベントを受け取ったLambdaは、更新されたDynamoDBの値を別アプリのレコードに反映させる

CDK環境を構築

CDK環境を構築します。
参考:AWS CDK

ストリームを有効化する

DynamoDBの「エクスポートおよびストリーム」タブから、Streamを有効化します。新旧イメージを選択すると、項目の変更時にLambdaが変更前と変更後の値を受け取ることができます。
image.png

CDKのstackを定義

依頼実行Lambdaを定義

Lambdaを定義します
参考:class NodejsFunction (construct)

NodeJsFunctionのビルド時に必要な依存関係を解決できず苦戦しました。
nodeModules にライブラリを指定し、depsLockFilePath に明示的にバージョンを指定することで、エラーが解消されました。
参考:NodeJsFunctionのビルド時に依存関係を解決する3つの方法

example-stack.ts
const lambdaFunction = new NodejsFunction(
      this,
      "example",
      {
        bundling: {
          nodeModules: ["@kintone/rest-api-client"],
          externalModules: ["aws-sdk"],
          preCompilation: false,
          target: "es2020",
          minify: false,
          charset: Charset.UTF8,
        },
        architecture: Architecture.X86_64,
        runtime: Runtime.NODEJS_18_X,
        entry: "./lib/src/index.mjs",
        depsLockFilePath: "./lib/src/package-lock.json",
        handler: "handler",
        environment: environment,
        description:
          "ユーザー情報を別アプリ反映する",
      }
    );

dynamoDBのテーブルをインポート

テーブル名だけでなく、ストリームのArnも指定する必要がありました。

example-stack.ts
const table = dynamodb.Table.fromTableAttributes(
      this,
      tableEnvironment.tableName,
      {
        tableName: tableEnvironment.tableName,
        tableStreamArn: tableEnvironment.tableArn,
      }
    );

Streamの読み取り権限を追加

ここも苦戦しましたが、table.grantStreamRead(lambdaFunction)でStreamの読み取り権限を追加することができました。
参考:AWS CDK

example-stack.ts
 table.grantStreamRead(lambdaFunction);
    lambdaFunction.addEventSource(
      new DynamoEventSource(table, {
        startingPosition: lambda.StartingPosition.LATEST,
      })
    );

Lambdaの処理

このプログラムでは、event.Recordsが配列としてイベントを受け取っており、for文を使って一つ一つの処理を行っています。その中で、eventNameという変数に「INSERT」、「MODIFY」、「REMOVE」のいずれかが格納され、それぞれの変更の種類を教えてくれます。このようにして、event.Recordsの中身を順番に処理することで、データベースの変更を正確に把握することができます。
参考:DynamoDB StreamをトリガーにしてLambdaを実行する

index.mts

export const handler = async (event) => {};
  for (const record of event.Records) {
    console.log("イベント:", record.eventName);
    console.log("Record:", record.dynamodb);
    if (record.eventName === "INSERT" || record.eventName === "MODIFY") {
      //レコードがアップサートされた時の処理
    } else if (record.eventName === "REMOVE") {
      //レコードが削除された時の処理
    }
  }
};

CDKデプロイ

CDKをデプロイします。
以下の記事を参考にしてください。
参考:Create a new CDK project

コラボスタイル Developers

Discussion