[CDK] SNSメッセージをKinesisFirehoseでバッファリングしてS3へ出力する
掲題通り、SNS+KinesisFirehose+S3の素振りメモです。
環境
- constructs 10.1.43
- aws-cdk-lib 2.46.0
- typescript 4.7.4
構成図
やはりタイトル通りなんですが、SNSトピックから送られてきたメッセージをKinesisFirehose配信ストリームでバッファリングしてS3バケットへファイル出力する、という構成です。
CDKコード
インフラ定義はAWS CDKで行います。
[AWS CDK] Kinesis Data FirehoseのデータのS3出力失敗時のログを記録してみた | DevelopersIO
上記を主に参考にさせて頂きました。こちらを少々アレンジしつつSNSトピックとそのサブスクリプション設定をつぎ足したのが以下のCDKコードになります。
import { Construct } from "constructs"
import * as cdk from "aws-cdk-lib"
export class KinesisFirehoseStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props)
const region = cdk.Stack.of(this).region
const accountId = cdk.Stack.of(this).account
// S3バケット
const streamDestinationBucket = new cdk.aws_s3.Bucket(
this,
"streamDestinationBucket",
{
bucketName: `stream-destination-bucket-${region}-${accountId}`,
removalPolicy: cdk.RemovalPolicy.DESTROY,
blockPublicAccess: cdk.aws_s3.BlockPublicAccess.BLOCK_ALL,
encryption: cdk.aws_s3.BucketEncryption.S3_MANAGED,
}
)
// KinesisFirehose配信ストリーム用ロググループ
const deliveryStreamFailLogGroup = new cdk.aws_logs.LogGroup(
this,
"deliveryStreamFailLogGroup",
{
logGroupName: `/aws/kinesisfirehose/sample-stream-fail-log`,
}
)
// KinesisFirehose配信ストリーム用ログストリーム
const deliveryStreamLogStream = new cdk.aws_logs.LogStream(
this,
"deliveryStreamLogStream",
{
logGroup: deliveryStreamFailLogGroup,
logStreamName: "logs",
}
)
// KinesisFirehose配信ストリーム用ロール
const deliveryStreamRole = new cdk.aws_iam.Role(
this,
"deliveryStreamRole",
{
assumedBy: new cdk.aws_iam.ServicePrincipal("firehose.amazonaws.com"),
}
)
deliveryStreamRole.addToPolicy(
new cdk.aws_iam.PolicyStatement({
actions: [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
],
effect: cdk.aws_iam.Effect.ALLOW,
resources: [`arn:aws:kinesis:${region}:${accountId}:stream/*`],
})
)
deliveryStreamRole.addToPolicy(
new cdk.aws_iam.PolicyStatement({
actions: [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject",
],
effect: cdk.aws_iam.Effect.ALLOW,
resources: [
streamDestinationBucket.bucketArn,
`${streamDestinationBucket.bucketArn}/*`,
],
})
)
deliveryStreamRole.addToPolicy(
new cdk.aws_iam.PolicyStatement({
actions: ["logs:PutLogEvents"],
effect: cdk.aws_iam.Effect.ALLOW,
resources: [
`arn:aws:logs:${region}:${accountId}:log-group:/aws/kinesisfirehose/*`,
],
})
)
// KinesisFirehose配信ストリーム
const deliveryStream = new cdk.aws_kinesisfirehose.CfnDeliveryStream(
this,
"deliveryStream",
{
deliveryStreamName: "deliveryStream",
deliveryStreamType: "DirectPut",
s3DestinationConfiguration: {
bucketArn: streamDestinationBucket.bucketArn,
roleArn: deliveryStreamRole.roleArn,
//S3出力失敗時のログ記録設定
cloudWatchLoggingOptions: {
enabled: true,
logGroupName: deliveryStreamFailLogGroup.logGroupName,
logStreamName: "logs",
},
compressionFormat: "GZIP",
prefix: "/",
errorOutputPrefix: "errorOutput",
bufferingHints: {
intervalInSeconds: 60,
},
},
}
)
deliveryStream.addDependsOn(
streamDestinationBucket.node.defaultChild as cdk.CfnResource
)
deliveryStream.addDependsOn(
deliveryStreamFailLogGroup.node.defaultChild as cdk.CfnResource
)
deliveryStream.addDependsOn(
deliveryStreamLogStream.node.defaultChild as cdk.CfnResource
)
// SNSトピック
const topic = new cdk.aws_sns.CfnTopic(this, "topic", {})
// SNSサブスクリプション用ロール
const subscriptionRole = new cdk.aws_iam.Role(this, "subscriptionRole", {
assumedBy: new cdk.aws_iam.ServicePrincipal("sns.amazonaws.com"),
})
subscriptionRole.addToPolicy(
new cdk.aws_iam.PolicyStatement({
actions: ["firehose:PutRecord"],
effect: cdk.aws_iam.Effect.ALLOW,
resources: [deliveryStream.attrArn],
})
)
// SNSサブスクリプション
const subscription = new cdk.aws_sns.CfnSubscription(this, "subscription", {
topicArn: topic.attrTopicArn,
protocol: "firehose",
endpoint: deliveryStream.attrArn,
subscriptionRoleArn: subscriptionRole.roleArn,
})
subscription.addDependsOn(topic)
subscription.addDependsOn(deliveryStream)
subscription.addDependsOn(
subscriptionRole.node.defaultChild as cdk.CfnResource
)
}
}
動作確認
AWSマネコンよりSNSトピックへ以下2通のメッセージを発行。
- 1通目
- Subject:
test1
- Message:
test1
- Subject:
- 2通目
- Subject:
test2
- Message:
test2
- Subject:
約1分後、S3バケットに {年}/{月}/{日}/{番号}/xxxxxxxxxxxxxxxxx.gz
ファイルが作成されていました。
S3マネコンからダウンロードして解凍しようとすると Unable to expand xxxxxxx.gz
というエラーになりました(M1 MacのFinder上で操作)。どうやら下記の事情に起因するようです。
Firehose で S3 に保存したgzipファイルが正常に解凍出来ない問題の対処方法 | DevelopersIO
ということで自分の場合はgzファイルをそのままVSCodeで開くことで内容確認できました。
{"Type":"Notification","MessageId":"xxxxxxxxxxxxxxxxxxxxxxx","TopicArn":"arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:KinesisFirehoseStack-topic-xxTXyoP0G1qA","Subject":"test1","Message":"test1","Timestamp":"2022-11-12T10:23:43.286Z","UnsubscribeURL":"https://sns.ap-northeast-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:xxxxxxxxxxxxxxxxxxxxxxx"}
{"Type":"Notification","MessageId":"xxxxxxxxxxxxxxxxxxxxxxx","TopicArn":"arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:KinesisFirehoseStack-topic-xxTXyoP0G1qA","Subject":"test2","Message":"test2","Timestamp":"2022-11-12T10:23:49.472Z","UnsubscribeURL":"https://sns.ap-northeast-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:xxxxxxxxxxxxxxxxxxxxxxx"}
このように改行区切りでSNSメッセージのJSONが出力されていました。結果としてJSONLines形式となるようです。
配信データ重複の可能性があることに注意する
SNSはFIFO設定にしない限りはベストエフォート型の重複防止であり、この点注意する必要があります。
また、調べたところKinesisFirehoseについても配信時にデータ重複が生じる可能性があるようです。
よくある質問 - Amazon Kinesis Data Firehose | AWS
Amazon Kinesis Data Firehose では、データの配信に少なくとも 1 回のセマンティクスが使用されます。データ配信のリクエスト試行がタイムアウトして、配信が Firehose によって再試行されるなど、ごくまれに以前のリクエストが実行されて重複が発生する場合があります。
システム仕様として重複を許容する、DBに状態を持ってアプリケーションで重複チェックする、処理に冪等性を持たせるなど、上記予め留意した方針を建てると良さそうです。
参考
- Firehose で S3 に保存したgzipファイルが正常に解凍出来ない問題の対処方法 | DevelopersIO
- GitHubでmermaid記法が使えるようになったのでアーキテクチャーの図を書いてみた | DevelopersIO
- SQSとSNSによるPub/SubをCloudFormationで構築 - Qiita
- mermaid記法でFontAwesomeを使ってみた | DevelopersIO
- Kinesis Data Firehose をゼロからざっくり理解する | DevelopersIO
- CloudFormationでSNSトピックへの複数のイベント通知を設定したS3バケットを作成する | DevelopersIO
- Subscribe - Amazon Simple Notification Service
Discussion