DynamoDB StreamをトリガーにLambdaを実行する処理をCDKで書いてみた
はじめに
- DynamoDB StreamとLambdaを使ってDynamoDBのデータに変更があった場合(追加、編集、削除)に、その変更内容を別アプリに自動で反映する処理を実装しました。
- 自動化した処理をCDK化しました。
DynamoDB Streamとは
DynamoDB Streamは、Amazon Web ServicesのDynamoDBサービスに含まれる機能の一つで、DynamoDBテーブルの変更をリアルタイムでキャプチャし、変更を処理するアプリケーションとの間でデータを配信するためのサービスです。DynamoDB Streamを使用することで、アプリケーション開発者は、DynamoDBテーブルの変更を非同期的に受信し、変更を処理することができます。この機能は、サーバーレスアーキテクチャを使用する場合に特に有用であり、システムの拡張性と柔軟性を高めるのに役立ちます。
やりたいこと(イメージ図)
1.DynamoDBに対する項目の追加、編集、削除をDynamoDB StreamのイベントとしてLambdaに渡す
2.イベントを受け取ったLambdaは、更新されたDynamoDBの値を別アプリのレコードに反映させる
CDK環境を構築
CDK環境を構築します。
参考:AWS CDK
ストリームを有効化する
DynamoDBの「エクスポートおよびストリーム」タブから、Streamを有効化します。新旧イメージを選択すると、項目の変更時にLambdaが変更前と変更後の値を受け取ることができます。
CDKのstackを定義
依頼実行Lambdaを定義
Lambdaを定義します
参考:class NodejsFunction (construct)
NodeJsFunctionのビルド時に必要な依存関係を解決できず苦戦しました。
nodeModules
にライブラリを指定し、depsLockFilePath
に明示的にバージョンを指定することで、エラーが解消されました。
参考:NodeJsFunctionのビルド時に依存関係を解決する3つの方法
const lambdaFunction = new NodejsFunction(
this,
"example",
{
bundling: {
nodeModules: ["@kintone/rest-api-client"],
externalModules: ["aws-sdk"],
preCompilation: false,
target: "es2020",
minify: false,
charset: Charset.UTF8,
},
architecture: Architecture.X86_64,
runtime: Runtime.NODEJS_18_X,
entry: "./lib/src/index.mjs",
depsLockFilePath: "./lib/src/package-lock.json",
handler: "handler",
environment: environment,
description:
"ユーザー情報を別アプリ反映する",
}
);
dynamoDBのテーブルをインポート
テーブル名だけでなく、ストリームのArnも指定する必要がありました。
const table = dynamodb.Table.fromTableAttributes(
this,
tableEnvironment.tableName,
{
tableName: tableEnvironment.tableName,
tableStreamArn: tableEnvironment.tableArn,
}
);
Streamの読み取り権限を追加
ここも苦戦しましたが、table.grantStreamRead(lambdaFunction)
でStreamの読み取り権限を追加することができました。
参考:AWS CDK
table.grantStreamRead(lambdaFunction);
lambdaFunction.addEventSource(
new DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.LATEST,
})
);
Lambdaの処理
このプログラムでは、event.Recordsが配列としてイベントを受け取っており、for文を使って一つ一つの処理を行っています。その中で、eventNameという変数に「INSERT」、「MODIFY」、「REMOVE」のいずれかが格納され、それぞれの変更の種類を教えてくれます。このようにして、event.Recordsの中身を順番に処理することで、データベースの変更を正確に把握することができます。
参考:DynamoDB StreamをトリガーにしてLambdaを実行する
export const handler = async (event) => {};
for (const record of event.Records) {
console.log("イベント:", record.eventName);
console.log("Record:", record.dynamodb);
if (record.eventName === "INSERT" || record.eventName === "MODIFY") {
//レコードがアップサートされた時の処理
} else if (record.eventName === "REMOVE") {
//レコードが削除された時の処理
}
}
};
CDKデプロイ
CDKをデプロイします。
以下の記事を参考にしてください。
参考:Create a new CDK project
Discussion