Momento Topic のメッセージを Webhook で Amazon EventBridge につなげる
この記事は3回連載を予定しています。Momento Topics は PublishされたメッセージをイベントとしてWebhookを任意のURLに出すことが出来ます。 これによりEDA(Event Driven Architecture)のハブとしての機能をMomento Topicsに持たせることができます。Momento Topicsはメッセージの到達性を保証しているわけではありませんが体感としてメッセージがドロップされるケースは今までありませんでしたし、例えばLINEやSMSなどもメッセージの到達性を保証しているわけではないことから、ミッションクリティカルなデータを取り扱う以外のケースではあまり気にする必要はないかと思います。
このシリーズ記事では以下の内容を予定しています。
1.(この記事です)Momento Topics から Amazon EventBridge まで
2.Amazon EventBridge → AWS Lambda → Amazon S3 まで
3.Amazon S3 → Cloudflare Queue → Cloudflare Workers → TiDB Serverless
いろんなベンダーサービスの間をメッセージがぐるぐる回る環境を作ることで、それぞれのサービスの基礎を学んでいこうという趣旨です。
1.についてはMomento 側が公式のドキュメントを出してくれていますのでそれをやっていきます。
さっそくやってみる
1. Momento のセットアップ

Momento TopicsにメッセージがパブリッシュされるとWebhookを出すことが出来るのは過去にまとめた通りです。このサンプルではLambda URL(URL エンドポイント付のLambda)がWebhookのPOSTを受け取りAmazon EventBridgeのEventBusにメッセージを書き込みます。
まずはMomentoのマネージメントコンソールでCacheがあることを確認します。一つもCacheがない人は以下を実行するとCacheが作成されます。
任意のキャッシュをクリックしてWebhooksをクリックします。

Create Webhookをクリックします。

名前とトピック名にtoawsと入力します。

ブラウザの別タブでhttps://webhook.site/を開きます。
WebhookをテストできるURLを作成してくれますのでその値をWebhook Destinationに入力してCreate Webhookをクリックします。
次にTopicsの画面で以下のように入力してSubscribeをクリックします。

適当なメッセージをPublishすると画面左にそれが受信されます。

https://webhook.site/でも以下の通りメッセージが受信されています。

{
"cache": "default-cache",
"topic": "toaws",
"event_timestamp": 1717559612250,
"publish_timestamp": 1717559612250,
"topic_sequence_number": 1,
"token_id": "",
"text": "test"
}
2. AWS の環境セットアップ
次にそのメッセージを受け取るAWS環境をセットアップしていきます。以下をクリックしてください。
準備されているCloudFormationテンプレートはオレゴン用ですので、AWSのリージョンもオレゴンで実行します。Momento Cache はオレゴンである必要はなくWebhookはWeb経由で送信されます。
MomentoSecretStringはMomentoの管理者画面から取得できます。API Keyのことです。

Momento Webhookを作成した画面からSecret Stringをコピーして貼り付けます。
チェックを付けてCreate stackをクリックします。

実行が終わるまで待ちます。

実行が完了するとOutputタブでLambda URLのURLが出力されています。

その値をMomento Console側のWebhook管理画面から先ほどテストに使用したURLの値を上書きします。これでWebhookの飛ばし先がwebhook.siteからLambda URLに変更されました。

Lambda関数のMonitoringやCloudWatchLogsを見ると起動されていることがわかります。
{
"timestamp": "2024-06-05T04:11:51.220765Z",
"level": "INFO",
"fields": {
"message": "Output of request=200",
"body": "Success"
},
"line_number": 119,
"span": {
"requestId": "cbb0df7b-9965-46af-b3f9-4ba2f3d8703b",
"xrayTraceId": "Root=1-665fe587-1b71d12b53f8e2b4703486e3;Parent=48fb0d1060a025ad;Sampled=0;Lineage=3e3b22ed:0",
"name": "Lambda runtime invoke"
},
"spans": [
{
"requestId": "cbb0df7b-9965-46af-b3f9-4ba2f3d8703b",
"xrayTraceId": "Root=1-665fe587-1b71d12b53f8e2b4703486e3;Parent=48fb0d1060a025ad;Sampled=0;Lineage=3e3b22ed:0",
"name": "Lambda runtime invoke"
}
]
}

EventBridgeのMonitoringでも何かが飛んできていることがわかります。

EventBridgeのログは体感的に5~10分程度遅延して表示されるようです。
次の記事ではこれをメッセージ単位でS3に出力する部分を作っていきます。
Discussion