🔗

Amazon EventBridge Pipesの機能をマネジメントコンソールとCloudFormationで試して概要を掴む

2023/04/08に公開

Amazon EventBridge PipesがGA

こんにちわ!

DevelopersIO BASECAMP参加者の加藤です!

2022年12月1日「Amazon EventBridge Pipes」がGAになっていたようです!

https://aws.amazon.com/jp/about-aws/whats-new/2022/12/amazon-eventbridge-pipes-generally-available/

当記事では、

・マネジメントコンソール
・CloudFormation

で確認していきたいと思います!


概要

「イベント駆動型アーキテクチャ開発時、専門知識・統合コード不要でソースをターゲットに接続。途中にフィルタリングを追加し、情報を拡張するオプションがある」との事です!


EventBridge Pipesの4ステップの概要

Amazon EventBridge Pipesには4つのステップがあります。

公式ドキュメントを確認いただくのが正確ですが、最低限に思える部分を抜粋して畳んでいます。

ソース

≒イベントデータ元

利用可能なソース一覧

  • Amazon DynamoDB ストリーム
  • Amazon Kinesis Streams
  • Amazon MQ ブローカー
  • Amazon MSK ストリーム
  • セルフマネージド Apache Kafka ストリーム
  • Amazon SQS キュー
    (※2023/4/8現在)
フィルタリング

≒フィルター条件「FilterCriteria」というフィルタリングパターン (Pattern) を定義可能な、リスト(Filters)構成のオブジェクト構造があり、JSON文字列で表現。

{
  "Filters": [
    {"Pattern": "{ \"Metadata1\": [ rule1 ], \"data\": { \"Data1\": [ rule2 ] }}"
    }
  ]
}
わかりやすくする為に一部展開
{
  "Metadata1": [ pattern1 ],
  "data": {"Data1": [ pattern2 ]}
}

とすると、

  • メタデータプロパティ
    →FilterCriteria.Metadata1 のように参照可能。

  • データプロパティ
    →FilterCriteria.Data1 のように参照可能。

エンリッチメント(強化)

≒ソースからのデータをターゲットに送信する前に情報を拡張するステージ。

Stepfunctionsに関しては「エクスプレスワークフローをのみサポートしている」との事です。

ターゲット

≒送信先

  • API 送信先
  • API Gateway
  • バッチジョブのキュー
  • CloudWatch ロググループ
  • ECS タスク
  • 同じアカウントとリージョンのイベントバス
  • Firehose 配信ストリーム
  • Inspector 評価テンプレート
  • Kinesis ストリーミング
  • Lambda 関数 (同期または非同期)
  • Redshift クラスターデータ API クエリ
  • SageMaker パイプライン
  • SNS トピック
  • SQS キュー
  • Step Functions ステートマシン
  • Express ワークフロー (SYNC または ASYNC)
  • Standard ワークフロー (ASYNC)

(※2023/4/8現在)

ちなみに、エンリッチメントまたはターゲットの呼び出しタイプには

  • 同期(=REQUEST_RESPONSE) ※デフォルト
    レスポンスを待って処理実行。

  • 非同期(=FIRE_AND_FORGET)
    レスポンスを待たずに処理実行。

があるそうです。


マネジメントコンソールで確認

早速、Eventbridgeコンソールに移動します。

「パイプ」という項目が追加されています。


「パイプを作成」をクリック。


名前と説明を入力し再び「パイプを作成」をクリック。

「パイプを構築」タブが選択されています。

前章「概要と用語」の通り、4ステップが確認出来ます。


ソース

今回はDynamoDB(ストリーム)を選択してみます。


事前準備等何もしていなかった為、DynamoDBコンソールに移動し、テーブルとストリームを作成しました。

更新ボタンを押して表示されたストリームを選択し「次へ」をクリックします。


フィルタリング

フィルタリングステップに移動しました。
※このステップは必須ではなくオプションのようです。

「サンプルイベントタイプ」を選択出来るようです。

※中身
サンプルイベントとしてデフォルトで入力のあったもの
{
  "eventID": "1",
  "eventVersion": "1.0",
  "dynamodb": {
    "Keys": {
      "Id": {
        "N": "101"
      }
    },
    "NewImage": {
      "Message": {
        "S": "New item!"
      },
      "Id": {
        "N": "101"
      }
    },
    "StreamViewType": "NEW_AND_OLD_IMAGES",
    "SequenceNumber": "111",
    "SizeBytes": 26
  },
  "awsRegion": "us-west-2",
  "eventName": "INSERT",
  "eventSourceARN": "arn:aws:dynamodb:us-east-1:111122223333:table/EventSourceTable",
  "eventSource": "aws:dynamodb"
}


更に下にスクロールするとイベントパターンを定義する箇所もありました。

以下のようにマッチングを選択し挿入出来るようです。

フィルタリングステップはオプションの為、今回は触らずそのまま「次へ」をクリックします。


エンリッチメント(強化)

サービスの選択画面

こちらもオプションの為、今回は次のステップへ進みます。


ターゲット

ターゲットの選択画面。
ここではCloudWatchログを選択しました。


ロググループ名のみ入力し、「パイプを作成」をクリックして作成が完了です。


(ちなみに)「パイプを設定」タブ

ロールの作成・選択、その他の設定が可能です。


確認

作成が完了し以下のようになりました。


CloudFormationで再現

先ほどDynamoDBストリームやCloudWatchログを選択してみた事には特段理由はないのですが、
ここまでマネジメントコンソールで作成したリソースをCloudFromationテンプレートでも再現してみたいと思います。

テンプレート

sample.yml
AWSTemplateFormatVersion: 2010-09-09
Description: Sample.
Resources:
  # ------------------------------------------------------------#
  # Pipes
  # ------------------------------------------------------------#
  TestPipe:
    Type: AWS::Pipes::Pipe
    Properties: 
      Description: test
      Name: test-pipe
      RoleArn: !GetAtt TestPipeRole.Arn
      Source: !GetAtt TestPipeDynamoDBTable.StreamArn
      SourceParameters: 
        DynamoDBStreamParameters:
          StartingPosition: LATEST
          BatchSize: 1
      Target: !GetAtt TestPipeTargetLogGloup.Arn
  # ------------------------------------------------------------#
  # DynamoDB
  # ------------------------------------------------------------#
  TestPipeDynamoDBTable:
    Type: AWS::DynamoDB::Table
    Properties: 
      TableName: testdynamodbtable
      AttributeDefinitions:
        - AttributeName: testpartition
          AttributeType: S
        - AttributeName: testsort
          AttributeType: S
      KeySchema: 
        - AttributeName: testpartition
          KeyType: HASH
        - AttributeName: testsort
          KeyType: RANGE
      StreamSpecification:
        StreamViewType: KEYS_ONLY
      ProvisionedThroughput:
        ReadCapacityUnits: 1
        WriteCapacityUnits: 1
  # ------------------------------------------------------------#
  # Logs
  # ------------------------------------------------------------#
  TestPipeTargetLogGloup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: /aws/events/test-pipe
  # ------------------------------------------------------------#
  # IAM
  # ------------------------------------------------------------#
  TestPipeRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: testpiperole
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - pipes.amazonaws.com
            Action:
              - sts:AssumeRole

  TestPipePolicy:
    Type: AWS::IAM::Policy
    Properties: 
      PolicyName: testpipepolicy
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Action:
              - dynamodb:DescribeStream
              - dynamodb:GetRecords
              - dynamodb:GetShardIterator
              - dynamodb:ListStreams
            Resource: !GetAtt TestPipeDynamoDBTable.StreamArn
      Roles:
        - !Ref TestPipeRole

実行


再現出来ました!

終わりに

今回はざっくりと概要を掴む所まででしたが、次回はもう一歩内容にも踏み込んで触ってみたい思います。

お付き合いいただき有難うございました。

デベキャン

Discussion