Amazon SQSにてPublish/SubscribeするLambdaをCDKでデプロイする
1.はじめに
暑いです。朝8時から草野球。5時起き。ヘロヘロ...これ書いたら昼寝します。
さて。
今回はAmazon SQSを使って、Pub/SubするLambdaをCDKでデプロイしてみます。
(訳あってアレが進まないときのお助け情報として。岡田監督の"アレ"じゃないです。)
ソースはGitHubに。
Amazon SQSのキューは、StandardとFIFOがあります。
Standardは順序が保証されずベストエフォートであり同じメッセージが複数配信される可能性があります。そのため、Subscriberが冪等ではない場合に不都合がありますね。
ここでは、FIFOを利用してみます。
2.CDKのプロジェクト作成
GitHubに格納したプロジェクトのREADMEにも記載してありますが、以下のようにプロジェクトを作成しました。
$ npm install -g aws-cdk
$ mkdir sqs_pub_sub
$ cd sqs_pub_sub
$ cdk init --language typescript
もちろん、AWSアカウントは用意されていて、profileが用意されている(アクセスキーやシークレットアクセスキーを入手して設定済み)という前提です。
3.Publisher
publisherとなるLambdaは、APIとして用意します。
APIを呼び出すとSQSへメッセージをpublishします。
import { APIGatewayEvent, APIGatewayProxyResult, Context } from 'aws-lambda'
import { SQS } from 'aws-sdk'
const sqs = new SQS()
export const handler = async (event: APIGatewayEvent, context: Context): Promise<APIGatewayProxyResult> => {
console.log(`Event: ${JSON.stringify(event, null, 2)}`)
console.log(`Context: ${JSON.stringify(context, null, 2)}`)
const params = {
MessageBody: JSON.stringify(event),
QueueUrl: process.env.QUEUE_URL!,
MessageGroupId: context.awsRequestId,
MessageDeduplicationId: context.awsRequestId,
}
try {
const data = await sqs.sendMessage(params).promise()
console.log('Success, message posted to', params.QueueUrl)
console.log('MessageID is', data.MessageId)
} catch (err) {
console.log('Error', err)
throw err
}
return {
statusCode: 200,
headers: {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET, OPTIONS'
},
body: JSON.stringify({
message: 'Publish!',
}),
}
}
MessageBodyにはeventを放り込んでみます。
メッセージを送信するにはキューのURLが必要です。
ここでは、環境変数から入手しています。
FIFOはメッセージの順序が保証されるわけですが、保証されるのは「同じMessageGroupIdのメッセージ」です。このサンプルでは特にメッセージグループのことは気にせず、awsRequestIdを入れています。
MessageDeduplicationIdはメッセージの重複を防ぐための一意の識別子ですが、これも同様にawsRequestIdを入れています。
4.Subscriber
Subscriberはたったこれだけです。コンソールへメッセージの内容を出力するのみです。
import { Context, SQSEvent } from 'aws-lambda'
export const handler = async (event: SQSEvent, context: Context): Promise<void> => {
console.log(`Event: ${JSON.stringify(event, null, 2)}`)
console.log(`Context: ${JSON.stringify(context, null, 2)}`)
for (const record of event.Records) {
console.log(`Message body: ${record.body}`)
}
}
5.CDK
CDKにてFIFOキュー(Dead Letter Queueも)、API Gateway、Lambdaといった各種リソースをデプロイします。
なぜ、CDKにてキューを作成するのか(AWSコンソールから作成ではダメなのか?)と言うと、ブランチごとに環境を用意したいという意図があります。複数のブランチでそれぞれCI/CDが回っているとき、キューがアカウント内に一つしかなければ、全てのブランチのSubscriberが同じキューを監視することになりますよね。そのような状況にしてしまうと、どのブランチのSubscriberがメッセージを受信することになるのか分かりません。それに、どのブランチ宛のメッセージかをSubscriberの処理内で判定、なんてしたくありませんよね。そのためです。
ここではそこまで実施していませんが、ブランチ名をキューの名称に埋め込むなどして環境を分離したいのです。
FIFOキューの名称については、以下のとおりです。
The name of a FIFO queue must end with the .fifo suffix.
deadLetterQueue: { queue: dlQueue, maxReceiveCount: 3 }
にて、Subscriberが3回失敗するとメッセージがDLQへ格納されるようにしています。
import * as cdk from 'aws-cdk-lib'
import * as sqs from 'aws-cdk-lib/aws-sqs'
import { Construct } from 'constructs'
import * as path from 'path'
export class SqsPubSubStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props)
// Create the dead letter queue
const dlQueue = new sqs.Queue(this, 'DeadLetterQueue', {
fifo: true,
queueName: `${id}-DLQueue.fifo`,
visibilityTimeout: cdk.Duration.seconds(300)
})
const queue = new sqs.Queue(this, 'Queue', {
fifo: true,
queueName: `${id}-Queue.fifo`,
visibilityTimeout: cdk.Duration.seconds(300),
deadLetterQueue: { queue: dlQueue, maxReceiveCount: 3 }
})
this.createPublisher(queue)
this.createSubscriber(queue)
}
/**
* Create Publisher API
*/
createPublisher(queue: sqs.IQueue) {
const publisherApi = new cdk.aws_apigateway.RestApi(this, 'publisher_api', {
restApiName: 'publisher_api',
description: 'Publish a message to SQS.',
defaultCorsPreflightOptions: {
allowOrigins: cdk.aws_apigateway.Cors.ALL_ORIGINS,
allowMethods: cdk.aws_apigateway.Cors.ALL_METHODS,
allowHeaders: cdk.aws_apigateway.Cors.DEFAULT_HEADERS,
statusCode: 200,
},
})
const publisherLambda = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'publisher', {
functionName: 'publisher',
entry: path.join(__dirname, '../src/publisher.ts'),
handler: 'handler',
runtime: cdk.aws_lambda.Runtime.NODEJS_18_X,
bundling: {
forceDockerBundling: false,
},
environment: {
QUEUE_URL: queue.queueUrl
}
})
const policy = new cdk.aws_iam.PolicyStatement({
actions: [
'sqs:SendMessage'
],
resources: [queue.queueArn],
})
publisherLambda.role?.addToPrincipalPolicy(policy)
publisherApi.root.addResource('publisher').addMethod('GET', new cdk.aws_apigateway.LambdaIntegration(publisherLambda))
}
/**
* Create Subscriber Lambda
*/
createSubscriber(queue: sqs.IQueue) {
const subscriberLambda = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'subscriber', {
functionName: 'subscriber',
entry: path.join(__dirname, '../src/subscriber.ts'),
handler: 'handler',
runtime: cdk.aws_lambda.Runtime.NODEJS_18_X,
bundling: {
forceDockerBundling: false,
},
})
const policy = new cdk.aws_iam.PolicyStatement({
actions: [
'sqs:ReceiveMessage',
'sqs:DeleteMessage',
'sqs:GetQueueAttributes'
],
resources: [queue.queueArn],
})
subscriberLambda.role?.addToPrincipalPolicy(policy)
subscriberLambda.addEventSource(new cdk.aws_lambda_event_sources.SqsEventSource(queue))
}
}
6.bootstrap & deploy
bootstrapして、deployします。
$ cdk bootstrap
$ cdk deploy
cdk bootstrap
で、Cloud Formationのスタックが作成されるはずです。
キューも作成されているはずです。
できてますね。
7.Publish & Subscribe
bootstrapとdeployに成功したら、API Gatewayも作成されているはずです。作成したAPI GatewayへGETリクエストを投げてPublisherをトリガーします。
% curl https://<API Gatewayで確認>/publisher
{"message":"Publish!"}
JSONの応答が返ってきました。Publisherは正常動作したようです。
CloudWatchからSubscriberのコンソール出力を見てみます。
動いていますね。メッセージも正常に取れているようです。
8.おわりに
実際に利用する場合は、異常系のユースケースに備えた、SQS周辺の各種パラメータの考慮が必要ですね。(今回は適当に設定しているvisibilityTimeoutですとか、他にも色々。)
あとはDLQをどう処理していくか、運用設計にも絡むような話です。
そして、もうおねむです。昼寝します。。
Discussion