Amazon SQSにてPublish/SubscribeするLambdaをCDKでデプロイする

2023/07/23に公開

1.はじめに

暑いです。朝8時から草野球。5時起き。ヘロヘロ...これ書いたら昼寝します。

さて。
今回はAmazon SQSを使って、Pub/SubするLambdaをCDKでデプロイしてみます。
(訳あってアレが進まないときのお助け情報として。岡田監督の"アレ"じゃないです。)

ソースはGitHubに。
https://github.com/motucraft/sqs_pub_sub

Amazon SQSのキューは、StandardとFIFOがあります。
Standardは順序が保証されずベストエフォートであり同じメッセージが複数配信される可能性があります。そのため、Subscriberが冪等ではない場合に不都合がありますね。

ここでは、FIFOを利用してみます。

2.CDKのプロジェクト作成

GitHubに格納したプロジェクトのREADMEにも記載してありますが、以下のようにプロジェクトを作成しました。

$ npm install -g aws-cdk
$ mkdir sqs_pub_sub
$ cd sqs_pub_sub
$ cdk init --language typescript

もちろん、AWSアカウントは用意されていて、profileが用意されている(アクセスキーやシークレットアクセスキーを入手して設定済み)という前提です。

3.Publisher

publisherとなるLambdaは、APIとして用意します。
APIを呼び出すとSQSへメッセージをpublishします。

import { APIGatewayEvent, APIGatewayProxyResult, Context } from 'aws-lambda'
import { SQS } from 'aws-sdk'

const sqs = new SQS()

export const handler = async (event: APIGatewayEvent, context: Context): Promise<APIGatewayProxyResult> => {
  console.log(`Event: ${JSON.stringify(event, null, 2)}`)
  console.log(`Context: ${JSON.stringify(context, null, 2)}`)

  const params = {
    MessageBody: JSON.stringify(event),
    QueueUrl: process.env.QUEUE_URL!,
    MessageGroupId: context.awsRequestId,
    MessageDeduplicationId: context.awsRequestId,
  }

  try {
    const data = await sqs.sendMessage(params).promise()
    console.log('Success, message posted to', params.QueueUrl)
    console.log('MessageID is', data.MessageId)
  } catch (err) {
    console.log('Error', err)
    throw err
  }

  return {
    statusCode: 200,
    headers: {
      'Access-Control-Allow-Origin': '*',
      'Access-Control-Allow-Headers': 'Content-Type',
      'Access-Control-Allow-Methods': 'GET, OPTIONS'
    },
    body: JSON.stringify({
      message: 'Publish!',
    }),
  }
}

MessageBodyにはeventを放り込んでみます。
メッセージを送信するにはキューのURLが必要です。
ここでは、環境変数から入手しています。

FIFOはメッセージの順序が保証されるわけですが、保証されるのは「同じMessageGroupIdのメッセージ」です。このサンプルでは特にメッセージグループのことは気にせず、awsRequestIdを入れています。
MessageDeduplicationIdはメッセージの重複を防ぐための一意の識別子ですが、これも同様にawsRequestIdを入れています。

4.Subscriber

Subscriberはたったこれだけです。コンソールへメッセージの内容を出力するのみです。

import { Context, SQSEvent } from 'aws-lambda'

export const handler = async (event: SQSEvent, context: Context): Promise<void> => {
  console.log(`Event: ${JSON.stringify(event, null, 2)}`)
  console.log(`Context: ${JSON.stringify(context, null, 2)}`)

  for (const record of event.Records) {
    console.log(`Message body: ${record.body}`)
  }
}

5.CDK

CDKにてFIFOキュー(Dead Letter Queueも)、API Gateway、Lambdaといった各種リソースをデプロイします。

なぜ、CDKにてキューを作成するのか(AWSコンソールから作成ではダメなのか?)と言うと、ブランチごとに環境を用意したいという意図があります。複数のブランチでそれぞれCI/CDが回っているとき、キューがアカウント内に一つしかなければ、全てのブランチのSubscriberが同じキューを監視することになりますよね。そのような状況にしてしまうと、どのブランチのSubscriberがメッセージを受信することになるのか分かりません。それに、どのブランチ宛のメッセージかをSubscriberの処理内で判定、なんてしたくありませんよね。そのためです。
ここではそこまで実施していませんが、ブランチ名をキューの名称に埋め込むなどして環境を分離したいのです。

FIFOキューの名称については、以下のとおりです。

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html

The name of a FIFO queue must end with the .fifo suffix.

deadLetterQueue: { queue: dlQueue, maxReceiveCount: 3 }にて、Subscriberが3回失敗するとメッセージがDLQへ格納されるようにしています。

import * as cdk from 'aws-cdk-lib'
import * as sqs from 'aws-cdk-lib/aws-sqs'
import { Construct } from 'constructs'
import * as path from 'path'

export class SqsPubSubStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props)

    // Create the dead letter queue
    const dlQueue = new sqs.Queue(this, 'DeadLetterQueue', {
      fifo: true,
      queueName: `${id}-DLQueue.fifo`,
      visibilityTimeout: cdk.Duration.seconds(300)
    })

    const queue = new sqs.Queue(this, 'Queue', {
      fifo: true,
      queueName: `${id}-Queue.fifo`,
      visibilityTimeout: cdk.Duration.seconds(300),
      deadLetterQueue: { queue: dlQueue, maxReceiveCount: 3 }
    })

    this.createPublisher(queue)
    this.createSubscriber(queue)
  }

  /**
   * Create Publisher API
   */
  createPublisher(queue: sqs.IQueue) {
    const publisherApi = new cdk.aws_apigateway.RestApi(this, 'publisher_api', {
      restApiName: 'publisher_api',
      description: 'Publish a message to SQS.',
      defaultCorsPreflightOptions: {
        allowOrigins: cdk.aws_apigateway.Cors.ALL_ORIGINS,
        allowMethods: cdk.aws_apigateway.Cors.ALL_METHODS,
        allowHeaders: cdk.aws_apigateway.Cors.DEFAULT_HEADERS,
        statusCode: 200,
      },
    })

    const publisherLambda = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'publisher', {
      functionName: 'publisher',
      entry: path.join(__dirname, '../src/publisher.ts'),
      handler: 'handler',
      runtime: cdk.aws_lambda.Runtime.NODEJS_18_X,
      bundling: {
        forceDockerBundling: false,
      },
      environment: {
        QUEUE_URL: queue.queueUrl
      }
    })

    const policy = new cdk.aws_iam.PolicyStatement({
      actions: [
        'sqs:SendMessage'
      ],
      resources: [queue.queueArn],
    })

    publisherLambda.role?.addToPrincipalPolicy(policy)

    publisherApi.root.addResource('publisher').addMethod('GET', new cdk.aws_apigateway.LambdaIntegration(publisherLambda))
  }

  /**
   * Create Subscriber Lambda
   */
  createSubscriber(queue: sqs.IQueue) {
    const subscriberLambda = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'subscriber', {
      functionName: 'subscriber',
      entry: path.join(__dirname, '../src/subscriber.ts'),
      handler: 'handler',
      runtime: cdk.aws_lambda.Runtime.NODEJS_18_X,
      bundling: {
        forceDockerBundling: false,
      },
    })

    const policy = new cdk.aws_iam.PolicyStatement({
      actions: [
        'sqs:ReceiveMessage',
        'sqs:DeleteMessage',
        'sqs:GetQueueAttributes'
      ],
      resources: [queue.queueArn],
    })

    subscriberLambda.role?.addToPrincipalPolicy(policy)

    subscriberLambda.addEventSource(new cdk.aws_lambda_event_sources.SqsEventSource(queue))
  }
}

6.bootstrap & deploy

bootstrapして、deployします。

$ cdk bootstrap
$ cdk deploy

cdk bootstrapで、Cloud Formationのスタックが作成されるはずです。
キューも作成されているはずです。

できてますね。

7.Publish & Subscribe

bootstrapとdeployに成功したら、API Gatewayも作成されているはずです。作成したAPI GatewayへGETリクエストを投げてPublisherをトリガーします。

% curl https://<API Gatewayで確認>/publisher
{"message":"Publish!"}

JSONの応答が返ってきました。Publisherは正常動作したようです。
CloudWatchからSubscriberのコンソール出力を見てみます。

動いていますね。メッセージも正常に取れているようです。

8.おわりに

実際に利用する場合は、異常系のユースケースに備えた、SQS周辺の各種パラメータの考慮が必要ですね。(今回は適当に設定しているvisibilityTimeoutですとか、他にも色々。)
あとはDLQをどう処理していくか、運用設計にも絡むような話です。

そして、もうおねむです。昼寝します。。

Discussion