S3 Event Notification → Lambda → Momento Topics
Momento Topics → Webhook → AWS Lambda URL → EventBridge → AWS Lambda → Amazon S3 とメッセージが伝わるところまでを作りました。メッセージの度はまだまだ続きます。
次にAmazon S3 にメッセージがJSONオブジェクトとしてPutされたイベントをもとに新たなAWS Lambda 関数を起動させ Momento Topics にメッセージをパブリッシュするところまでを作ります。こうすることでさらにEvent Driven Architecture のハブとしてMomento Topics を活用することが可能です。
さっそくやってみる
Lambda 関数の作成
まずは単純にMomento TopicsへメッセージをPublishするLambda関数を作成します。
tomomento
と名前を付けてNode 20.x
で関数を作成します。その他は全てデフォルトで問題ありません。
Lambda関数は@gomomento/sdk
を読み込ませる必要があるため手元でzipを作成しアップロードする形態をとるため、任意の環境で作業を行いますがnpm
を使える必要があります。
mkdir tomomento
cd tomomento
npm init -y
npm install @gomomento/sdk
index.js
を以下の内容で作成します。
const { TopicClient, TopicConfigurations,CredentialProvider, TopicPublish, TopicSubscribe,} = require('@gomomento/sdk');
const MOMENTO_API_KEY = process.env.MOMENTO_API_KEY;
exports.handler = async (event) => {
const topicClient = new TopicClient({
configuration: TopicConfigurations.Default.latest(),
credentialProvider: CredentialProvider.fromString({
authToken: MOMENTO_API_KEY,
}),
});
const result = await topicClient.publish('default-cache', 'fromaws', 'fromAWSLambda');
console.log('Message published successfully:', result);
};
フォルダ構造はこうなります。
次にこれらのファイルをzip圧縮しますが、注意点としてはtomomento
フォルダを含まずフォルダの中身だけをzip圧縮します。ファイル名はlambda.zip
にしましょう。つまりこうなります。
Lambdaのマネージメントコンソールから作成されたzipをアップロードします。
コードの中身はいかがポイントとなっていて
const result = await topicClient.publish('default-cache', 'fromaws', 'fromAWSLambda');
Momento の'default-cache'というキャッシュに存在しているfromaws
というTopicチャネルにfroAWSLambda
というメッセージをパブリッシュします。後ほどfroAWSLambda
はS3のJSON Objectを読み取るように変更します。
fromaws
ではなくtoaws
とせってしてしまうとMomento Topicsが再度受け取ったメッセージをLambdaに送ってMomentoとAWSの間で無限ループが発生します。Lambdaのサーキットブレイカーが発生するまで途方もない課金が発生します。
fromaws
を設定するのです!!!
fromaws
を設定するのです!!!
fromaws
を設定するのです!!!
fromaws
を設定するのです!!!
fromaws
を設定するのです!!!
fromaws
を設定するのです!!!
fromaws
を設定するのです!!!
ちゃんといいましたからね!
次にConfiguration
タブのEnvironment variables
から環境変数をセットするためEdit
をクリックします。
Add environment variable
をクリックします。
Momentoのマネージメントコンソールから取得したAPI Keyの値をこちらに投入してSave
を押します。MOMENTO_API_KEY
が最新の変数名になります。
Momento Topicsのコンソール画面でfromaws
チャネルからメッセージを読める状態にしておきます。
LambdaマネージメントコンソールのTest
タブからTest
ボタンを押すとLambdaからメッセージがPublishされたことがわかります。
Lambda 関数 と S3 Event Notificationの連携
Lambda関数の起動トリガーを設定していきます。Add trigger
ボタンを押します。
前の手順で作成したS3バケットを使用します。
PUT
を選びます。
チェックボックスを付けてAdd
を押します。
この状態でMomento Topicsのコンソールを2つ開きます。
toawsのサブスクライブ
fromawsのサブスクライブ
toaws
から送ったメッセージがAWSを一周して数秒後fromaws
に出てきます。ただこの時点でfromaws
が受け取るメッセージはまだfromAWSLambda
のままなのでtoaws
に送ったメッセージを戻してくれるようにLambda関数を修正します。
Lambda 関数が S3 バケットのオブジェクトを読むように修正
まずはindex.js
を以下に変更します。
const { S3Client, GetObjectCommand } = require('@aws-sdk/client-s3');
const s3 = new S3Client({});
const { TopicClient, TopicConfigurations,CredentialProvider, TopicPublish, TopicSubscribe,} = require('@gomomento/sdk');
const MOMENTO_API_KEY = process.env.MOMENTO_API_KEY;
exports.handler = async (event) => {
// S3イベント情報の取得
const bucket = event.Records[0].s3.bucket.name;
const key = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
const command = new GetObjectCommand({
Bucket: bucket,
Key: key,
});
const response = await s3.send(command);
const str = await response.Body.transformToString();
console.log(str);
const topicClient = new TopicClient({
configuration: TopicConfigurations.Default.latest(),
credentialProvider: CredentialProvider.fromString({
authToken: MOMENTO_API_KEY,
}),
});
const result = await topicClient.publish('default-cache', 'fromaws', str);
console.log('Message published successfully:', result);
};
先ほどと同じようにzipでアップロードします。
次にConfiguration
タブのPermission
でIAMロールをクリックします。
Add permission
をクリックしてS3へのアクセス権
をつけてあります。
この手順では簡略化のためにS3FullAccess
を付けてあります。
この状態でtoaws
にメッセージをパブリッシュするとfromaws
には以下の値が出てきます。
{ "version": "0", "id": "1347e522-99b4-ad5f-8482-f309fc6851ef", "detail-type": "default-cache#toaws", "source": "Momento", "account": "968281453546", "time": "2024-06-06T05:38:37Z", "region": "us-west-2", "resources": [], "detail": { "cache": "default-cache", "topic": "toaws", "event_timestamp": 1717652317681, "publish_timestamp": 1717652317682, "topic_sequence_number": 7, "token_id": "", "text": "test" } }
ずいぶんおっきくなって戻ってきました!
Discussion