Amazon EventBridge Pipesの機能をマネジメントコンソールとCloudFormationで試して概要を掴む
Amazon EventBridge PipesがGA
こんにちわ!
DevelopersIO BASECAMP参加者の加藤です!
2022年12月1日「Amazon EventBridge Pipes」がGAになっていたようです!
当記事では、
・マネジメントコンソール
・CloudFormation
で確認していきたいと思います!
概要
「イベント駆動型アーキテクチャ開発時、専門知識・統合コード不要でソースをターゲットに接続。途中にフィルタリングを追加し、情報を拡張するオプションがある」との事です!
EventBridge Pipesの4ステップの概要
Amazon EventBridge Pipesには4つのステップがあります。
公式ドキュメントを確認いただくのが正確ですが、最低限に思える部分を抜粋して畳んでいます。
ソース
≒イベントデータ元
利用可能なソース一覧
- Amazon DynamoDB ストリーム
- Amazon Kinesis Streams
- Amazon MQ ブローカー
- Amazon MSK ストリーム
- セルフマネージド Apache Kafka ストリーム
- Amazon SQS キュー
(※2023/4/8現在)
フィルタリング
≒フィルター条件「FilterCriteria」というフィルタリングパターン (Pattern) を定義可能な、リスト(Filters)構成のオブジェクト構造があり、JSON文字列で表現。
{
"Filters": [
{"Pattern": "{ \"Metadata1\": [ rule1 ], \"data\": { \"Data1\": [ rule2 ] }}"
}
]
}
{
"Metadata1": [ pattern1 ],
"data": {"Data1": [ pattern2 ]}
}
とすると、
-
メタデータプロパティ
→FilterCriteria.Metadata1 のように参照可能。 -
データプロパティ
→FilterCriteria.Data1 のように参照可能。
エンリッチメント(強化)
≒ソースからのデータをターゲットに送信する前に情報を拡張するステージ。
Stepfunctionsに関しては「エクスプレスワークフローをのみサポートしている」との事です。
ターゲット
≒送信先
- API 送信先
- API Gateway
- バッチジョブのキュー
- CloudWatch ロググループ
- ECS タスク
- 同じアカウントとリージョンのイベントバス
- Firehose 配信ストリーム
- Inspector 評価テンプレート
- Kinesis ストリーミング
- Lambda 関数 (同期または非同期)
- Redshift クラスターデータ API クエリ
- SageMaker パイプライン
- SNS トピック
- SQS キュー
- Step Functions ステートマシン
- Express ワークフロー (SYNC または ASYNC)
- Standard ワークフロー (ASYNC)
(※2023/4/8現在)
ちなみに、エンリッチメントまたはターゲットの呼び出しタイプには
-
同期(=REQUEST_RESPONSE) ※デフォルト
レスポンスを待って処理実行。 -
非同期(=FIRE_AND_FORGET)
レスポンスを待たずに処理実行。
があるそうです。
マネジメントコンソールで確認
早速、Eventbridgeコンソールに移動します。
「パイプ」という項目が追加されています。
↓
「パイプを作成」をクリック。
↓
名前と説明を入力し再び「パイプを作成」をクリック。
↓
「パイプを構築」タブが選択されています。
前章「概要と用語」の通り、4ステップが確認出来ます。
↓
ソース
今回はDynamoDB(ストリーム)を選択してみます。
↓
事前準備等何もしていなかった為、DynamoDBコンソールに移動し、テーブルとストリームを作成しました。
↓
更新ボタンを押して表示されたストリームを選択し「次へ」をクリックします。
↓
フィルタリング
フィルタリングステップに移動しました。
※このステップは必須ではなくオプションのようです。
「サンプルイベントタイプ」を選択出来るようです。
※中身
{
"eventID": "1",
"eventVersion": "1.0",
"dynamodb": {
"Keys": {
"Id": {
"N": "101"
}
},
"NewImage": {
"Message": {
"S": "New item!"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES",
"SequenceNumber": "111",
"SizeBytes": 26
},
"awsRegion": "us-west-2",
"eventName": "INSERT",
"eventSourceARN": "arn:aws:dynamodb:us-east-1:111122223333:table/EventSourceTable",
"eventSource": "aws:dynamodb"
}
↓
更に下にスクロールするとイベントパターンを定義する箇所もありました。
以下のようにマッチングを選択し挿入出来るようです。
フィルタリングステップはオプションの為、今回は触らずそのまま「次へ」をクリックします。
↓
エンリッチメント(強化)
サービスの選択画面
こちらもオプションの為、今回は次のステップへ進みます。
↓
ターゲット
ターゲットの選択画面。
ここではCloudWatchログを選択しました。
↓
ロググループ名のみ入力し、「パイプを作成」をクリックして作成が完了です。
(ちなみに)「パイプを設定」タブ
ロールの作成・選択、その他の設定が可能です。
↓
確認
作成が完了し以下のようになりました。
CloudFormationで再現
先ほどDynamoDBストリームやCloudWatchログを選択してみた事には特段理由はないのですが、
ここまでマネジメントコンソールで作成したリソースをCloudFromationテンプレートでも再現してみたいと思います。
テンプレート
AWSTemplateFormatVersion: 2010-09-09
Description: Sample.
Resources:
# ------------------------------------------------------------#
# Pipes
# ------------------------------------------------------------#
TestPipe:
Type: AWS::Pipes::Pipe
Properties:
Description: test
Name: test-pipe
RoleArn: !GetAtt TestPipeRole.Arn
Source: !GetAtt TestPipeDynamoDBTable.StreamArn
SourceParameters:
DynamoDBStreamParameters:
StartingPosition: LATEST
BatchSize: 1
Target: !GetAtt TestPipeTargetLogGloup.Arn
# ------------------------------------------------------------#
# DynamoDB
# ------------------------------------------------------------#
TestPipeDynamoDBTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: testdynamodbtable
AttributeDefinitions:
- AttributeName: testpartition
AttributeType: S
- AttributeName: testsort
AttributeType: S
KeySchema:
- AttributeName: testpartition
KeyType: HASH
- AttributeName: testsort
KeyType: RANGE
StreamSpecification:
StreamViewType: KEYS_ONLY
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
# ------------------------------------------------------------#
# Logs
# ------------------------------------------------------------#
TestPipeTargetLogGloup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: /aws/events/test-pipe
# ------------------------------------------------------------#
# IAM
# ------------------------------------------------------------#
TestPipeRole:
Type: AWS::IAM::Role
Properties:
RoleName: testpiperole
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- pipes.amazonaws.com
Action:
- sts:AssumeRole
TestPipePolicy:
Type: AWS::IAM::Policy
Properties:
PolicyName: testpipepolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- dynamodb:DescribeStream
- dynamodb:GetRecords
- dynamodb:GetShardIterator
- dynamodb:ListStreams
Resource: !GetAtt TestPipeDynamoDBTable.StreamArn
Roles:
- !Ref TestPipeRole
実行
↓
再現出来ました!
終わりに
今回はざっくりと概要を掴む所まででしたが、次回はもう一歩内容にも踏み込んで触ってみたい思います。
お付き合いいただき有難うございました。
Discussion