😊

S3 Event Notification → Lambda → Momento Topics

2024/06/06に公開

https://zenn.dev/momentobigfun/articles/28382c05d9def7
https://zenn.dev/momentobigfun/articles/c6f3377d5a18bc
前回までの記事で
Momento Topics → Webhook → AWS Lambda URL → EventBridge → AWS Lambda → Amazon S3 とメッセージが伝わるところまでを作りました。メッセージの度はまだまだ続きます。
次にAmazon S3 にメッセージがJSONオブジェクトとしてPutされたイベントをもとに新たなAWS Lambda 関数を起動させ Momento Topics にメッセージをパブリッシュするところまでを作ります。こうすることでさらにEvent Driven Architecture のハブとしてMomento Topics を活用することが可能です。

さっそくやってみる

Lambda 関数の作成

まずは単純にMomento TopicsへメッセージをPublishするLambda関数を作成します。
tomomentoと名前を付けてNode 20.xで関数を作成します。その他は全てデフォルトで問題ありません。

Lambda関数は@gomomento/sdkを読み込ませる必要があるため手元でzipを作成しアップロードする形態をとるため、任意の環境で作業を行いますがnpmを使える必要があります。

mkdir tomomento
cd tomomento
npm init -y
npm install @gomomento/sdk

index.jsを以下の内容で作成します。

index.js
const { TopicClient, TopicConfigurations,CredentialProvider, TopicPublish, TopicSubscribe,} = require('@gomomento/sdk');

const MOMENTO_API_KEY = process.env.MOMENTO_API_KEY;

exports.handler = async (event) => {
  const topicClient = new TopicClient({
    configuration: TopicConfigurations.Default.latest(),
    credentialProvider: CredentialProvider.fromString({
      authToken: MOMENTO_API_KEY,
    }),
  });
  const result = await topicClient.publish('default-cache', 'fromaws', 'fromAWSLambda');
  console.log('Message published successfully:', result);
};

フォルダ構造はこうなります。

次にこれらのファイルをzip圧縮しますが、注意点としてはtomomentoフォルダを含まずフォルダの中身だけをzip圧縮します。ファイル名はlambda.zipにしましょう。つまりこうなります。

Lambdaのマネージメントコンソールから作成されたzipをアップロードします。


コードの中身はいかがポイントとなっていて

  const result = await topicClient.publish('default-cache', 'fromaws', 'fromAWSLambda');

Momento の'default-cache'というキャッシュに存在しているfromawsというTopicチャネルにfroAWSLambdaというメッセージをパブリッシュします。後ほどfroAWSLambdaはS3のJSON Objectを読み取るように変更します。
fromawsではなくtoawsとせってしてしまうとMomento Topicsが再度受け取ったメッセージをLambdaに送ってMomentoとAWSの間で無限ループが発生します。Lambdaのサーキットブレイカーが発生するまで途方もない課金が発生します。
fromawsを設定するのです!!!
fromawsを設定するのです!!!
fromawsを設定するのです!!!
fromawsを設定するのです!!!
fromawsを設定するのです!!!
fromawsを設定するのです!!!
fromawsを設定するのです!!!
ちゃんといいましたからね!

次にConfigurationタブのEnvironment variablesから環境変数をセットするためEditをクリックします。

Add environment variableをクリックします。

Momentoのマネージメントコンソールから取得したAPI Keyの値をこちらに投入してSaveを押します。MOMENTO_API_KEYが最新の変数名になります。

Momento Topicsのコンソール画面でfromawsチャネルからメッセージを読める状態にしておきます。

LambdaマネージメントコンソールのTestタブからTestボタンを押すとLambdaからメッセージがPublishされたことがわかります。
AWS
Momento

Lambda 関数 と S3 Event Notificationの連携

Lambda関数の起動トリガーを設定していきます。Add triggerボタンを押します。

前の手順で作成したS3バケットを使用します。

PUTを選びます。

チェックボックスを付けてAddを押します。

この状態でMomento Topicsのコンソールを2つ開きます。
toawsのサブスクライブ

fromawsのサブスクライブ

toawsから送ったメッセージがAWSを一周して数秒後fromawsに出てきます。ただこの時点でfromawsが受け取るメッセージはまだfromAWSLambdaのままなのでtoawsに送ったメッセージを戻してくれるようにLambda関数を修正します。

Lambda 関数が S3 バケットのオブジェクトを読むように修正

まずはindex.jsを以下に変更します。

const { S3Client, GetObjectCommand } = require('@aws-sdk/client-s3');
const s3 = new S3Client({});

const { TopicClient, TopicConfigurations,CredentialProvider, TopicPublish, TopicSubscribe,} = require('@gomomento/sdk');
const MOMENTO_API_KEY = process.env.MOMENTO_API_KEY;

exports.handler = async (event) => {
  // S3イベント情報の取得
  const bucket = event.Records[0].s3.bucket.name;
  const key = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));

  const command = new GetObjectCommand({
    Bucket: bucket,
    Key: key,
});
  const response = await s3.send(command);
  const str = await response.Body.transformToString();

  console.log(str);

  const topicClient = new TopicClient({
    configuration: TopicConfigurations.Default.latest(),
    credentialProvider: CredentialProvider.fromString({
      authToken: MOMENTO_API_KEY,
    }),
  });
  const result = await topicClient.publish('default-cache', 'fromaws', str);
  console.log('Message published successfully:', result);
};

先ほどと同じようにzipでアップロードします。
次にConfigurationタブのPermissionでIAMロールをクリックします。

Add permissionをクリックしてS3へのアクセス権をつけてあります。

この手順では簡略化のためにS3FullAccessを付けてあります。
この状態でtoawsにメッセージをパブリッシュするとfromawsには以下の値が出てきます。

{ "version": "0", "id": "1347e522-99b4-ad5f-8482-f309fc6851ef", "detail-type": "default-cache#toaws", "source": "Momento", "account": "968281453546", "time": "2024-06-06T05:38:37Z", "region": "us-west-2", "resources": [], "detail": { "cache": "default-cache", "topic": "toaws", "event_timestamp": 1717652317681, "publish_timestamp": 1717652317682, "topic_sequence_number": 7, "token_id": "", "text": "test" } }

ずいぶんおっきくなって戻ってきました!

Discussion