🐷

[CDK] SNSメッセージをKinesisFirehoseでバッファリングしてS3へ出力する

2022/11/12に公開

掲題通り、SNS+KinesisFirehose+S3の素振りメモです。

環境

  • constructs 10.1.43
  • aws-cdk-lib 2.46.0
  • typescript 4.7.4

構成図

やはりタイトル通りなんですが、SNSトピックから送られてきたメッセージをKinesisFirehose配信ストリームでバッファリングしてS3バケットへファイル出力する、という構成です。

CDKコード

インフラ定義はAWS CDKで行います。

[AWS CDK] Kinesis Data FirehoseのデータのS3出力失敗時のログを記録してみた | DevelopersIO

上記を主に参考にさせて頂きました。こちらを少々アレンジしつつSNSトピックとそのサブスクリプション設定をつぎ足したのが以下のCDKコードになります。

import { Construct } from "constructs"
import * as cdk from "aws-cdk-lib"

export class KinesisFirehoseStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props)

    const region = cdk.Stack.of(this).region
    const accountId = cdk.Stack.of(this).account

    // S3バケット
    const streamDestinationBucket = new cdk.aws_s3.Bucket(
      this,
      "streamDestinationBucket",
      {
        bucketName: `stream-destination-bucket-${region}-${accountId}`,
        removalPolicy: cdk.RemovalPolicy.DESTROY,
        blockPublicAccess: cdk.aws_s3.BlockPublicAccess.BLOCK_ALL,
        encryption: cdk.aws_s3.BucketEncryption.S3_MANAGED,
      }
    )

    // KinesisFirehose配信ストリーム用ロググループ
    const deliveryStreamFailLogGroup = new cdk.aws_logs.LogGroup(
      this,
      "deliveryStreamFailLogGroup",
      {
        logGroupName: `/aws/kinesisfirehose/sample-stream-fail-log`,
      }
    )

    // KinesisFirehose配信ストリーム用ログストリーム
    const deliveryStreamLogStream = new cdk.aws_logs.LogStream(
      this,
      "deliveryStreamLogStream",
      {
        logGroup: deliveryStreamFailLogGroup,
        logStreamName: "logs",
      }
    )

    // KinesisFirehose配信ストリーム用ロール
    const deliveryStreamRole = new cdk.aws_iam.Role(
      this,
      "deliveryStreamRole",
      {
        assumedBy: new cdk.aws_iam.ServicePrincipal("firehose.amazonaws.com"),
      }
    )
    deliveryStreamRole.addToPolicy(
      new cdk.aws_iam.PolicyStatement({
        actions: [
          "kinesis:DescribeStream",
          "kinesis:GetShardIterator",
          "kinesis:GetRecords",
        ],
        effect: cdk.aws_iam.Effect.ALLOW,
        resources: [`arn:aws:kinesis:${region}:${accountId}:stream/*`],
      })
    )
    deliveryStreamRole.addToPolicy(
      new cdk.aws_iam.PolicyStatement({
        actions: [
          "s3:AbortMultipartUpload",
          "s3:GetBucketLocation",
          "s3:GetObject",
          "s3:ListBucket",
          "s3:ListBucketMultipartUploads",
          "s3:PutObject",
        ],
        effect: cdk.aws_iam.Effect.ALLOW,
        resources: [
          streamDestinationBucket.bucketArn,
          `${streamDestinationBucket.bucketArn}/*`,
        ],
      })
    )
    deliveryStreamRole.addToPolicy(
      new cdk.aws_iam.PolicyStatement({
        actions: ["logs:PutLogEvents"],
        effect: cdk.aws_iam.Effect.ALLOW,
        resources: [
          `arn:aws:logs:${region}:${accountId}:log-group:/aws/kinesisfirehose/*`,
        ],
      })
    )

    // KinesisFirehose配信ストリーム
    const deliveryStream = new cdk.aws_kinesisfirehose.CfnDeliveryStream(
      this,
      "deliveryStream",
      {
        deliveryStreamName: "deliveryStream",
        deliveryStreamType: "DirectPut",
        s3DestinationConfiguration: {
          bucketArn: streamDestinationBucket.bucketArn,
          roleArn: deliveryStreamRole.roleArn,
          //S3出力失敗時のログ記録設定
          cloudWatchLoggingOptions: {
            enabled: true,
            logGroupName: deliveryStreamFailLogGroup.logGroupName,
            logStreamName: "logs",
          },
          compressionFormat: "GZIP",
          prefix: "/",
          errorOutputPrefix: "errorOutput",
          bufferingHints: {
            intervalInSeconds: 60,
          },
        },
      }
    )
    deliveryStream.addDependsOn(
      streamDestinationBucket.node.defaultChild as cdk.CfnResource
    )
    deliveryStream.addDependsOn(
      deliveryStreamFailLogGroup.node.defaultChild as cdk.CfnResource
    )
    deliveryStream.addDependsOn(
      deliveryStreamLogStream.node.defaultChild as cdk.CfnResource
    )

    // SNSトピック
    const topic = new cdk.aws_sns.CfnTopic(this, "topic", {})

    // SNSサブスクリプション用ロール
    const subscriptionRole = new cdk.aws_iam.Role(this, "subscriptionRole", {
      assumedBy: new cdk.aws_iam.ServicePrincipal("sns.amazonaws.com"),
    })
    subscriptionRole.addToPolicy(
      new cdk.aws_iam.PolicyStatement({
        actions: ["firehose:PutRecord"],
        effect: cdk.aws_iam.Effect.ALLOW,
        resources: [deliveryStream.attrArn],
      })
    )

    // SNSサブスクリプション
    const subscription = new cdk.aws_sns.CfnSubscription(this, "subscription", {
      topicArn: topic.attrTopicArn,
      protocol: "firehose",
      endpoint: deliveryStream.attrArn,
      subscriptionRoleArn: subscriptionRole.roleArn,
    })
    subscription.addDependsOn(topic)
    subscription.addDependsOn(deliveryStream)
    subscription.addDependsOn(
      subscriptionRole.node.defaultChild as cdk.CfnResource
    )
  }
}

動作確認

AWSマネコンよりSNSトピックへ以下2通のメッセージを発行。

  • 1通目
    • Subject: test1
    • Message: test1
  • 2通目
    • Subject: test2
    • Message: test2

約1分後、S3バケットに {年}/{月}/{日}/{番号}/xxxxxxxxxxxxxxxxx.gz ファイルが作成されていました。

S3マネコンからダウンロードして解凍しようとすると Unable to expand xxxxxxx.gz というエラーになりました(M1 MacのFinder上で操作)。どうやら下記の事情に起因するようです。

Firehose で S3 に保存したgzipファイルが正常に解凍出来ない問題の対処方法 | DevelopersIO

ということで自分の場合はgzファイルをそのままVSCodeで開くことで内容確認できました。

{"Type":"Notification","MessageId":"xxxxxxxxxxxxxxxxxxxxxxx","TopicArn":"arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:KinesisFirehoseStack-topic-xxTXyoP0G1qA","Subject":"test1","Message":"test1","Timestamp":"2022-11-12T10:23:43.286Z","UnsubscribeURL":"https://sns.ap-northeast-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:xxxxxxxxxxxxxxxxxxxxxxx"}
{"Type":"Notification","MessageId":"xxxxxxxxxxxxxxxxxxxxxxx","TopicArn":"arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:KinesisFirehoseStack-topic-xxTXyoP0G1qA","Subject":"test2","Message":"test2","Timestamp":"2022-11-12T10:23:49.472Z","UnsubscribeURL":"https://sns.ap-northeast-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:xxxxxxxxxxxxxxxxxxxxxxx"}

このように改行区切りでSNSメッセージのJSONが出力されていました。結果としてJSONLines形式となるようです。

配信データ重複の可能性があることに注意する

SNSはFIFO設定にしない限りはベストエフォート型の重複防止であり、この点注意する必要があります。

また、調べたところKinesisFirehoseについても配信時にデータ重複が生じる可能性があるようです。

よくある質問 - Amazon Kinesis Data Firehose | AWS

Amazon Kinesis Data Firehose では、データの配信に少なくとも 1 回のセマンティクスが使用されます。データ配信のリクエスト試行がタイムアウトして、配信が Firehose によって再試行されるなど、ごくまれに以前のリクエストが実行されて重複が発生する場合があります。

システム仕様として重複を許容する、DBに状態を持ってアプリケーションで重複チェックする、処理に冪等性を持たせるなど、上記予め留意した方針を建てると良さそうです。

参考

Discussion