S3 Event Notification → Lambda → Momento Topics
前回までの記事で
Momento Topics → Webhook → AWS Lambda URL → EventBridge → AWS Lambda → Amazon S3 とメッセージが伝わるところまでを作りました。メッセージの度はまだまだ続きます。
次にAmazon S3 にメッセージがJSONオブジェクトとしてPutされたイベントをもとに新たなAWS Lambda 関数を起動させ Momento Topics にメッセージをパブリッシュするところまでを作ります。こうすることでさらにEvent Driven Architecture のハブとしてMomento Topics を活用することが可能です。
さっそくやってみる
Lambda 関数の作成
まずは単純にMomento TopicsへメッセージをPublishするLambda関数を作成します。
tomomentoと名前を付けてNode 20.xで関数を作成します。その他は全てデフォルトで問題ありません。

Lambda関数は@gomomento/sdkを読み込ませる必要があるため手元でzipを作成しアップロードする形態をとるため、任意の環境で作業を行いますがnpmを使える必要があります。
mkdir tomomento
cd tomomento
npm init -y
npm install @gomomento/sdk
index.jsを以下の内容で作成します。
const { TopicClient, TopicConfigurations,CredentialProvider, TopicPublish, TopicSubscribe,} = require('@gomomento/sdk');
const MOMENTO_API_KEY = process.env.MOMENTO_API_KEY;
exports.handler = async (event) => {
const topicClient = new TopicClient({
configuration: TopicConfigurations.Default.latest(),
credentialProvider: CredentialProvider.fromString({
authToken: MOMENTO_API_KEY,
}),
});
const result = await topicClient.publish('default-cache', 'fromaws', 'fromAWSLambda');
console.log('Message published successfully:', result);
};
フォルダ構造はこうなります。

次にこれらのファイルをzip圧縮しますが、注意点としてはtomomentoフォルダを含まずフォルダの中身だけをzip圧縮します。ファイル名はlambda.zipにしましょう。つまりこうなります。

Lambdaのマネージメントコンソールから作成されたzipをアップロードします。


コードの中身はいかがポイントとなっていて
const result = await topicClient.publish('default-cache', 'fromaws', 'fromAWSLambda');
Momento の'default-cache'というキャッシュに存在しているfromawsというTopicチャネルにfroAWSLambdaというメッセージをパブリッシュします。後ほどfroAWSLambdaはS3のJSON Objectを読み取るように変更します。
fromawsではなくtoawsとせってしてしまうとMomento Topicsが再度受け取ったメッセージをLambdaに送ってMomentoとAWSの間で無限ループが発生します。Lambdaのサーキットブレイカーが発生するまで途方もない課金が発生します。
fromawsを設定するのです!!!
fromawsを設定するのです!!!
fromawsを設定するのです!!!
fromawsを設定するのです!!!
fromawsを設定するのです!!!
fromawsを設定するのです!!!
fromawsを設定するのです!!!
ちゃんといいましたからね!
次にConfigurationタブのEnvironment variablesから環境変数をセットするためEditをクリックします。

Add environment variableをクリックします。

Momentoのマネージメントコンソールから取得したAPI Keyの値をこちらに投入してSaveを押します。MOMENTO_API_KEYが最新の変数名になります。

Momento Topicsのコンソール画面でfromawsチャネルからメッセージを読める状態にしておきます。

LambdaマネージメントコンソールのTestタブからTestボタンを押すとLambdaからメッセージがPublishされたことがわかります。


Lambda 関数 と S3 Event Notificationの連携
Lambda関数の起動トリガーを設定していきます。Add triggerボタンを押します。

前の手順で作成したS3バケットを使用します。

PUTを選びます。

チェックボックスを付けてAddを押します。

この状態でMomento Topicsのコンソールを2つ開きます。
toawsのサブスクライブ

fromawsのサブスクライブ

toawsから送ったメッセージがAWSを一周して数秒後fromawsに出てきます。ただこの時点でfromawsが受け取るメッセージはまだfromAWSLambdaのままなのでtoawsに送ったメッセージを戻してくれるようにLambda関数を修正します。
Lambda 関数が S3 バケットのオブジェクトを読むように修正
まずはindex.jsを以下に変更します。
const { S3Client, GetObjectCommand } = require('@aws-sdk/client-s3');
const s3 = new S3Client({});
const { TopicClient, TopicConfigurations,CredentialProvider, TopicPublish, TopicSubscribe,} = require('@gomomento/sdk');
const MOMENTO_API_KEY = process.env.MOMENTO_API_KEY;
exports.handler = async (event) => {
// S3イベント情報の取得
const bucket = event.Records[0].s3.bucket.name;
const key = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
const command = new GetObjectCommand({
Bucket: bucket,
Key: key,
});
const response = await s3.send(command);
const str = await response.Body.transformToString();
console.log(str);
const topicClient = new TopicClient({
configuration: TopicConfigurations.Default.latest(),
credentialProvider: CredentialProvider.fromString({
authToken: MOMENTO_API_KEY,
}),
});
const result = await topicClient.publish('default-cache', 'fromaws', str);
console.log('Message published successfully:', result);
};
先ほどと同じようにzipでアップロードします。
次にConfigurationタブのPermissionでIAMロールをクリックします。

Add permissionをクリックしてS3へのアクセス権をつけてあります。

この手順では簡略化のためにS3FullAccessを付けてあります。
この状態でtoawsにメッセージをパブリッシュするとfromawsには以下の値が出てきます。
{ "version": "0", "id": "1347e522-99b4-ad5f-8482-f309fc6851ef", "detail-type": "default-cache#toaws", "source": "Momento", "account": "968281453546", "time": "2024-06-06T05:38:37Z", "region": "us-west-2", "resources": [], "detail": { "cache": "default-cache", "topic": "toaws", "event_timestamp": 1717652317681, "publish_timestamp": 1717652317682, "topic_sequence_number": 7, "token_id": "", "text": "test" } }
ずいぶんおっきくなって戻ってきました!
Discussion