🐿️

Amazon EventBridge のパートナーにMomentoが追加されました!サンプルハンズオンの捕捉と解説

2024/08/19に公開

Amazon EventBridgeのパートナーにMomentoが追加されました!

先に本内容を紹介くださっている記事がありますので合わせてお読みいただけますと幸いです。

当記事は主に以下サンプルハンズオンの実行を捕捉する目的の内容となります。
https://github.com/momentohq/client-sdk-javascript/tree/main/examples/nodejs/aws/eventbridge

概要

構成図(※上記リポジトリから拝借)

こちらのREADME.mdページが本手順となりますので画面翻訳等しつつ合わせてお読みください。

前提

・Mac環境を前提としている事
・ハンズオンに利用可能なAWSアカウントを所持しておりリソースにかかる費用を許容出来る事
・AWS CDK利用を前提としている事

※上記が難しい場合や、難易度の高さをお感じになられた場合は再掲ですが、以下ブログをまずお読みいただけますと幸いです。
https://zenn.dev/momentobigfun/articles/f3fe271f6c9ef9

※今回に限った事ではありませんが、何らかの操作ミス等影響を受けて欲しくない他リソース(特に商用や本番環境)が存在する等のAWS環境を利用しない事を強くお勧めしています。当記事や元のハンズオンから発生したあらゆる出来事はあくまでも自己責任としてお考えください。

※以降の手順でリージョンの選択を迷われた場合、AWS、Momentoどちらにおいても、AWSにおける東京リージョンである「ap-northeast-1」をご利用ください。

※若干の不親切部分として片付け用のスクリプトのみ用意されていません。本記事手順に加えておりますので慣れている方はそのままcdk destroyコマンドを実行していただければと思いますが、CloudFormationコンソール画面からスタック自体を削除するでも良いかと思います。(途中に作成するMomento Cache自体は削除しないでも料金がかかり続けるものではありません。)

それでは早速やっていきたいと思います。

①リポジトリのクローン

まずは任意のディレクトリで以下コマンドを実行しリポジトリをクローンします。

$ mkdir momentoEventBridgeSampleHandson && cd momentoEventBridgeSampleHandson
$ git clone https://github.com/momentohq/client-sdk-javascript.git

②サンプルCDK部分への移動

次に実際のサンプルCDK部分に移動しましょう。

$ cd client-sdk-javascript/examples/nodejs/aws/eventbridge/infrastructure/

私はVS Codeを使っていますので、以下コマンドでカレントディレクトリを開いておきます。
※中身を見る為に利用しているだけですので、テキストエディタは何で開いていただいても構いません。

$ code .

ディレクトリ構成は以下である事が確認出来るかと思います。

├ bin/
│    └ eventbridge.ts
├ lib/
│    └ eventbridge-stack.ts
├ .gitignore
├ .npmignore
├ cdk.json
├ jest.conffig.json
├ package-lock.json
├ package.json
└ tsconfig.json 

後ほど実際のリソースを定義しているeventbridge-stack.tsを皆さんと一緒に見てみる事にしますが、一旦手順を進めていきましょう。

③AWS AccessKeyとSecretAccessKeyを作成・取得

以下ドキュメントから作成の手順をご確認ください。
https://docs.aws.amazon.com/ja_jp/IAM/latest/UserGuide/id_credentials_access-keys.html

④Momentoにサインアップしコンソールへ(お済みの方はログイン)

手順を畳んでいます。

以下momentoのページへアクセスします。
https://jp.gomomento.com/

画面少し下へスクロールし「無料でお試し」をクリック。


サインアップします。


6桁のverification codeが記載されたメール届きます。


入力してcontinueをクリック。


コンソールに入る事が出来ました。

⑤Momento APIキーを作成・取得

以下ドキュメントからコンソールでの作成の手順をご確認ください。
https://docs.momentohq.com/ja/cache/develop/authentication/api-keys

⑥先立って指定の名前でMomentoキャッシュを作成

今回は「momento-eventbridge-cache」という名前のキャッシュの作成を必要とします。
公式ドキュメントから手動作成の手順をご確認いただくでも良いですが、後半の手順でMomento CLIを利用するスクリプトがありますので、Momento CLIを未インストールの方は以下を実行してください。

$ brew tap momentohq/tap
$ brew install momento-cli
$ momento configure --quick

Please paste your Momento auth token.  (If you do not have an auth token, use `momento account` to generate one.)
Windows users: if CTRL-V does not work, try right-click or SHIFT-INSERT to paste.

Token: <ここに先ほど作成したMomentoAPIキーを貼付してEnterを押してください>

$ momento cache create momento-eventbridge-cache

完成図

⑦AWS CDKが利用出来るように準備

以下ドキュメントから手順をご確認ください。
https://docs.aws.amazon.com/ja_jp/cdk/v2/guide/getting_started.html

⑧.env ファイルの作成

現在のディレクトリに「.env」というファイルを作成します。

$ vim .env

以下を貼付します。
<>で囲まれた部分は先ほど作成したAPIキーの値に置換が必要である事に注意ください。
※置き換え後<>は不要です。
※最後の行のAWS_SESSION_TOKEN環境変数はコメントアウトにもあるように一時的なクレデンシャルを利用していない場合は行ごと削除してしまって構いません。

MOMENTO_API_KEY=<your-momento-api-key>
MOMENTO_API_ENDPOINT=<your-momento-api-endpoint>
AWS_ACCESS_KEY_ID=<your-aws-access-key-id>
AWS_SECRET_ACCESS_KEY=<your-aws-secret-access-key>
AWS_REGION=<your-aws-region>
AWS_SESSION_TOKEN=<your-aws-session-token> # Optional, if you are using temporary credentials

念の為、catコマンドで意図通りのファイル内容になっている事も確認しておきましょう。

$ cat .env

⑨用意されたスクリプトファイルを順に実行する

今回README.mdや手順を手助けするスクリプトファイルが存在する場所よりひとつ下の階層(※infrastructure)に移動してしまっていた為ひとつ上の階層に上がります。

$ cd ../

CDKスタックのデプロイに以下コマンドでスクリプトファイルを実行します。

$ ./deploy-stack.sh

初めての方に複雑に感じられてしまう可能性がある為念の為触れておきますと、今回のようなスクリプトファイルはあくまでもサンプルハンズオン手順をスムーズに進める意図でハンズオン作者が良い意味でおせっかい気味に作成したものであり、通常は別に作成を必須としません。

ざっくりスクリプト内でやっている事も
①.envファイルの存在確認
②環境変数を読み込みMomentoとAWSの環境変数が設定されているか確認
③CDK Deployコマンドの実行
をしているだけです。

スクリーンショットには含まれていませんが「Deployed stack successfully.」と表示されていれば成功と捉えていただいて問題ありません。

ここでeventbridge-stack.tsを一緒に見ていきましょう

ファイル畳んであります。クリックください。
eventbridge-stack.ts
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";
import * as events from "aws-cdk-lib/aws-events";
import * as pipes from "aws-cdk-lib/aws-pipes";
import * as iam from "aws-cdk-lib/aws-iam";
import * as logs from "aws-cdk-lib/aws-logs";
import * as sqs from "aws-cdk-lib/aws-sqs";
import {Secret} from "aws-cdk-lib/aws-secretsmanager";

const cacheName: string = "momento-eventbridge-cache";
const topicName: string = "momento-eventbridge-topic";

export class EventbridgeStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const { apiKeySecret, momentoApiEndpointParameter, logGroup } = this.createUtilities();

    // Define the DynamoDB table for the weather stats demo
    const weatherStatsDemoTable = new dynamodb.Table(this, "weather-stats-demo-table", {
      tableName: "weather-stats-demo",
      partitionKey: { name: "Location", type: dynamodb.AttributeType.STRING },
      stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
    });

    // Define the connection for the event bridge
    const connection = new events.Connection(this, 'weather-stats-demo-connection', {
      connectionName: 'weather-stats-demo-connection',
      authorization: events.Authorization.apiKey('Authorization', cdk.SecretValue.secretsManager(apiKeySecret.secretName)),
      description: 'Connection with API Key Authorization',
    });

    // Define the API destination for the cache put operation.
    const cachePutApiDestination = new events.ApiDestination(this, "weather-stats-demo-cache-put-api-destination", {
      apiDestinationName: "weather-stats-demo-cache-put-api-destination",
      connection,
      endpoint: `${momentoApiEndpointParameter.valueAsString}/cache/*`,
      description: "Cache Set API",
      httpMethod: events.HttpMethod.PUT,
    });

    // Define the API destination for the topic publish operation
    const topicPublishApiDestination = new events.ApiDestination(this, "weather-stats-demo-topic-publish-api-destination", {
      apiDestinationName: "weather-stats-demo-topic-publish-api-destination",
      connection,
      endpoint: `${momentoApiEndpointParameter.valueAsString}/topics/*/*`,
      description: "Topic Publish API",
      httpMethod: events.HttpMethod.POST,
    });

    // Define the API destination for the cache delete operation
    const cacheDeleteApiDestination = new events.ApiDestination(this, "weather-stats-demo-cache-delete-api-destination", {
      apiDestinationName: "weather-stats-demo-cache-delete-api-destination",
      connection,
      endpoint: `${momentoApiEndpointParameter.valueAsString}/cache/*`,
      description: "Cache Delete API",
      httpMethod: events.HttpMethod.DELETE,
    });
    // Define the dead letter queue for inspecting failed events to event bridge
    const deadLetterQueue = new sqs.Queue(this, "DeadLetterQueue", {
      queueName: "weather-stats-demo-dlq",
      retentionPeriod: cdk.Duration.days(14),
    });

    // Define the role for the event bridge
    const role = new iam.Role(this, "AmazonEventBridgePipeWeatherStatsDemoEventToMomentoCache", {
      roleName: "AmazonEventBridgePipeWeatherStatsDemoEventToMomentoCache",
      assumedBy: new iam.ServicePrincipal("pipes.amazonaws.com"),
    });
    this.addPolicyForEventBridgeRole(role, cachePutApiDestination, cacheDeleteApiDestination, topicPublishApiDestination, weatherStatsDemoTable, deadLetterQueue);


    // Define the pipe for the cache put operation
    const cachePutCfnPipe = new pipes.CfnPipe(this, "weather-stats-demo-cache-put-pipe", {
      name: "weather-stats-demo-cache-put-pipe",
      desiredState: "RUNNING",
      source: weatherStatsDemoTable.tableStreamArn!,
      sourceParameters: {
        dynamoDbStreamParameters: {
          batchSize: 1,
          startingPosition: "LATEST",
          maximumRetryAttempts: 0,
          deadLetterConfig: {
            arn: deadLetterQueue.queueArn,
          },
        },
        filterCriteria: {
          filters: [{
            pattern: '{"eventName": ["INSERT", "MODIFY"]}',
          }],
        },
      },
      target: cachePutApiDestination.apiDestinationArn!,
      roleArn: role.roleArn,
      logConfiguration: {
        cloudwatchLogsLogDestination: {
          logGroupArn: logGroup.logGroupArn,
        },
        level: "INFO",
        includeExecutionData: ["ALL"],
      },
    });

    // Define the pipe for the topic publish operation
    const topicPublishCfnPipe = new pipes.CfnPipe(this, "weather-stats-demo-topic-publish-pipe", {
      name: "weather-stats-demo-topic-publish-pipe",
      desiredState: "RUNNING",
      source: weatherStatsDemoTable.tableStreamArn!,
      sourceParameters: {
        dynamoDbStreamParameters: {
          batchSize: 1,
          startingPosition: "LATEST",
          maximumRetryAttempts: 0,
          deadLetterConfig: {
            arn: deadLetterQueue.queueArn,
          },
        },
      },
      target: topicPublishApiDestination.apiDestinationArn!,
      roleArn: role.roleArn,
      logConfiguration: {
        cloudwatchLogsLogDestination: {
          logGroupArn: logGroup.logGroupArn,
        },
        level: "INFO",
        includeExecutionData: ["ALL"],
      },
    });

    // Define the pipe for the cache delete operation
    const cacheDeleteCfnPipe = new pipes.CfnPipe(this, "weather-stats-demo-cache-delete-pipe", {
      name: "weather-stats-demo-cache-delete-pipe",
      desiredState: "RUNNING",
      source: weatherStatsDemoTable.tableStreamArn!,
      sourceParameters: {
        dynamoDbStreamParameters: {
          batchSize: 1,
          startingPosition: "LATEST",
          maximumRetryAttempts: 0,
          deadLetterConfig: {
            arn: deadLetterQueue.queueArn,
          },
        },
        filterCriteria: {
          filters: [{
            pattern: '{"eventName": ["REMOVE"]}',
          }],
        },
      },
      target: cacheDeleteApiDestination.apiDestinationArn!,
      roleArn: role.roleArn,
      logConfiguration: {
        cloudwatchLogsLogDestination: {
          logGroupArn: logGroup.logGroupArn,
        },
        level: "INFO",
        includeExecutionData: ["ALL"],
      },
    });

    // Add target parameters to the pipes
    cachePutCfnPipe.targetParameters = {
      inputTemplate: "{\n  \"Location\": <$.dynamodb.Keys.Location.S>, \n  \"MaxTemp\": <$.dynamodb.NewImage.MaxTemp.N>,\n  \"MinTemp\": <$.dynamodb.NewImage.MinTemp.N>, \n  \"ChancesOfPrecipitation\": <$.dynamodb.NewImage.ChancesOfPrecipitation.N>\n}",
      httpParameters: {
        pathParameterValues: [cacheName],
        queryStringParameters: {
          key: "$.dynamodb.Keys.Location.S",
          ttl_seconds: "$.dynamodb.NewImage.TTL.N"
        },
      },
    };

    topicPublishCfnPipe.targetParameters = {
      inputTemplate: "{\n \"EventType\": <$.eventName>,  \"Location\": <$.dynamodb.Keys.Location.S>, \n  \"MaxTemp\": <$.dynamodb.NewImage.MaxTemp.N>,\n  \"MinTemp\": <$.dynamodb.NewImage.MinTemp.N>, \n  \"ChancesOfPrecipitation\": <$.dynamodb.NewImage.ChancesOfPrecipitation.N>\n}",
      httpParameters: {
        pathParameterValues: [cacheName, topicName],
      },
    };

    cacheDeleteCfnPipe.targetParameters = {
      httpParameters: {
        pathParameterValues: [cacheName],
        queryStringParameters: {
          key: "$.dynamodb.Keys.Location.S"
        },
      },
    };

    // Add dependencies to the pipes
    cachePutCfnPipe.node.addDependency(weatherStatsDemoTable);
    cachePutCfnPipe.node.addDependency(cachePutApiDestination);
    topicPublishCfnPipe.node.addDependency(weatherStatsDemoTable);
    topicPublishCfnPipe.node.addDependency(topicPublishApiDestination);
    cacheDeleteCfnPipe.node.addDependency(weatherStatsDemoTable);
    cacheDeleteCfnPipe.node.addDependency(cacheDeleteApiDestination);
  }

  private createUtilities() {
    // Define the Momento API Key parameter
    const momentoApiKeyParameter = new cdk.CfnParameter(this, 'MomentoApiKey', {
      type: 'String',
      description: 'The API key for Momento.',
    });

    // Define the Momento API Endpoint parameter
    const momentoApiEndpointParameter = new cdk.CfnParameter(this, 'MomentoApiEndpoint', {
      type: 'String',
      description: 'The API endpoint for Momento.',
    });

    // Define the API key secret for the connection. The API key is stored in AWS Secrets Manager.
    const apiKeySecret = new Secret(this, 'MomentoEventbridgeApiKey', {
      secretName: 'momento-eventbridge-api-key',
      secretStringValue: new cdk.SecretValue(momentoApiKeyParameter.valueAsString),
    });

    // Define the log group for the access logs
    const logGroup = new logs.LogGroup(this, "AccessLogs", {
      retention: 90,
      logGroupName: cdk.Fn.sub(
        `weather-stats-demo-logs-\${AWS::Region}`,
      ),
      removalPolicy: cdk.RemovalPolicy.DESTROY,
    });

    return { apiKeySecret, momentoApiEndpointParameter, logGroup };
  }

  private addPolicyForEventBridgeRole(role: iam.Role, cachePutApiDestination: events.ApiDestination, cacheDeleteApiDestination: events.ApiDestination, topicPublishApiDestination: events.ApiDestination, weatherStatsDemoTable: dynamodb.Table, deadLetterQueue: sqs.Queue) {
    // Define the role policy for restricting access to the API destinations
    const apiDestinationPolicy = new iam.PolicyStatement({
      actions: ["events:InvokeApiDestination"],
      resources: [cachePutApiDestination.apiDestinationArn, cacheDeleteApiDestination.apiDestinationArn, topicPublishApiDestination.apiDestinationArn],
      effect: iam.Effect.ALLOW,
    });
    role.addToPolicy(apiDestinationPolicy);

    // Define the role policy for accessing the DynamoDB stream
    const dynamoDbStreamPolicy = new iam.PolicyStatement({
      actions: [
        "dynamodb:DescribeStream",
        "dynamodb:GetRecords",
        "dynamodb:GetShardIterator",
        "dynamodb:ListStreams",
      ],
      effect: iam.Effect.ALLOW,
      resources: [weatherStatsDemoTable.tableStreamArn!],
    });
    role.addToPolicy(dynamoDbStreamPolicy);

    const sqsPolicy = new iam.PolicyStatement({
      actions: [
        "sqs:SendMessage",
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes",
        "sqs:GetQueueUrl"
      ],
      effect: iam.Effect.ALLOW,
      resources: [deadLetterQueue.queueArn],
    });
    role.addToPolicy(sqsPolicy);
  }
}

最上段のimport部分を見ればAWSでどんなリソースを立てているかの大枠は理解出来るかと思います。
細かくは「new」という文字列をファイル内検索してみていただくとより詳細に理解出来ます。今回は私で以下に列挙しました。

・dynamodb.Table
・events.Connection
・events.ApiDestination ×3
・sqs.Queue
・iam.Role
・pipes.CfnPipe ×3
・cdk.CfnParameter
・Secret
・cdk.SecretValue
・iam.PolicyStatement ×3

「.」の前部分は

import * as dynamodb from "aws-cdk-lib/aws-dynamodb";

のように作成者がasで「〜として」という名付けを行ったものとなる為、適宜その元リソースと照らし合わせてご覧ください。

全て細かく説明すると読む方も大変かと思いますのでざっくりとした説明ですが、既に概要を掴んでいる方は飛ばしていただいて結構です。

・iam.Roleやその中身となるiam.PolicyStatementはこれらの機能を成立させる為にリソースに必要な権限を付与している部分。
・dynamodb.TableはNoSQL型のデータベースとなりますのでKVでキーと値を保管するもの。
・Secretやcdk.SecretValueは秘匿にすべき値を扱うもの
・events.Connection、events.ApiDestination、pipes.CfnPipeはAWSにおけるEventBridgeというサーバーレスイベントバスサービスの各機能であり、順番にEventBridgeを使用して外部のSaaSアプリケーションやサービスと安全に通信するための接続構成。EventBridgeのイベントを外部APIエンドポイントに送信するための設定。イベントソース(データの発生元)とターゲット(データの送信先)の間でデータの流れを簡単に構築・管理するためのパイプライン機能(このあたりの言語化は生成AIに任せました。)

といった所でしょうか。
これらのリソースが適切な依存関係順にCDKというラッパーを通じCloudFormationというAWSのIaCサービスでスタックという纏まりの中で作成されたという訳です。

①⓪ブラウザデモの実行

次に以下スクリプトを実行します。

$ ./run-webapp.sh

①先程同様.envファイルの存在と中身の確認
②DynamoDBのテーブル名と項目属性の設定、Momentoキャッシュとトピック名の設定
③DynamoDBにアイテムを追加、取得
④Momentoキャッシュからアイテムを取得
⑤DynamoDBからアイテムを削除し、再度DynamoDBおよびMomentoキャッシュからアイテムを取得
というような事をしています。

以下を実行し、ブラウザでlocalhost:5173を開きます。

$ open http://localhost:5173/

以下のようなデモアプリが開けるかと思います。
※わかりやすくする為ブラウザの画面翻訳をしています。

上部のラジオボタンで気象情報の「作成」「消去」の切り替えが可能です。
それぞれの情報を作成した後右下のアイテムを取得で結果窓に値を表示する事が出来ます。
是非MomentoコンソールやDynamoDBコンソールと見比べながら触ってみていただければ幸いです。

①①TypeScript CLIデモの実行

$ ./run-typescript-cli-app.sh

以下のような結果が返ります。

# ※前半部分は省略して表示
Subscribed to topic
Creating a record in DynamoDB
Getting the record from DynamoDB
Record from DynamoDB: {
  "Location": "San Diego",
  "MaxTemp": "42",
  "MinTemp": "31",
  "ChancesOfPrecipitation": "91"
}
Received item from topic subscription; {
 "EventType": "INSERT",  "Location": "San Diego",
  "MaxTemp": "42",
  "MinTemp": "31",
  "ChancesOfPrecipitation": "91"
}
Getting the item from cache
Item from cache: {
  "Location": "San Diego",
  "MaxTemp": "42",
  "MinTemp": "31",
  "ChancesOfPrecipitation": "91"
}
Unsubscribing from topic subscription
Done!

①②Bash CLIデモの実行

二つターミナルウインドウを開きます。
片方で以下を実行します。

/subscribe-to-topic.sh

もう片方で以下を実行します。

$ ./run-bash-cli-app.sh

①③片付け

冒頭触れましたが、残念ながらcdk destoryを実行するスクリプトはハンズオン自体の説明手順やリポジトリ内には用意されていません。
以下コマンド実行し「y」 + Enterでスタックを削除し、念の為CloudFormationコンソールで結果を確認ください。

$ cd infrastructure/
$ cdk destroy
Are you sure you want to delete: momento-ddb-eventbridge-stack (y/n)?

終わりに

以上でした。

本ハンズオンにて作成されたアーキテクチャについて、改めてどういう所に旨みがあるかや勘所などまた別途の記事にて紹介させていただきたいと思います。

有り難うございました!

Discussion