NestJS+SQSで非同期処理を実装してみる
はじめに
NestJSを使った実践的な実装、第2段としてNestJS+SQSを使用した、
非同期処理を実装してみました。
非同期処理について
ChatGPTに聞いてみたら下記回答が返ってきました。
API を呼び出したときに「重たい処理(時間がかかる処理)」をその場で全部やろうとすると、ユーザーは処理が終わるまで待たされてしまい、アプリケーションも他の仕事ができなくなってしまいます。
そこで「重たい処理だけをキューに渡して、バックグラウンドで処理させる」方法を取ると、API 呼び出しは「作業をキューに依頼した」段階ですぐに返事を返せるため、ユーザーは待たずに先に進められます。このように**「実際の処理はあとで進めてもらい、呼び出し元はそれを待たずに先に進む」**のが非同期処理の考え方です。
APIリクエストではタイムアウトしてしまうような重たい処理やAPIで同時更新させたくないような処理をバックグラウンドで処理させるための仕組みというのが私の認識です。
使用パッケージ
今回以下を使って実装してみました。
- nestjs-sqs
- nestjs-modules/ioredis
処理の流れ
フローは下記のような想定です。
実装
Docker
ローカルでSQSを扱えるようにlocalstackを使用しました。
docker compose up
でlocalstackを起動ごlocalstack-initでキューの作成を行うようにしました。
localstack:
image: localstack/localstack:latest
ports:
- 4566:4566
environment:
- SERVICES=sqs
- DEFAULT_REGION=ap-northeast-1
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- '${LOCALSTACK_VOLUME_DIR:-./my-localstack-store}:/var/lib/localstack'
- '/var/run/docker.sock:/var/run/docker.sock'
healthcheck:
test: ['CMD', 'curl', '-f', 'http://localhost:4566/_localstack/health']
interval: 5s
timeout: 5s
retries: 3
localstack-init:
image: amazon/aws-cli
environment:
- AWS_ACCESS_KEY_ID=dummy
- AWS_SECRET_ACCESS_KEY=dummy
- AWS_DEFAULT_REGION=ap-northeast-1
- AWS_DEFAULT_OUTPUT=json
entrypoint: /bin/sh
volumes:
- ./init-aws.sh:/init-aws.sh
command: /init-aws.sh
depends_on:
localstack:
condition: service_healthy
redis:
image: redis:latest
ports:
- 6379:6379
restart: always
networks:
- my-swagger
volumes:
- ./my-redis-store:/data
command: redis-server --appendonly yes
Producerの実装
SQSへDispatchするためのモジュールの定義になります。
import { SqsModule } from '@ssut/nestjs-sqs'
import { Module } from '@nestjs/common'
import { ConfigModule, ConfigService } from '@nestjs/config'
@Module({
imports: [
ConfigModule.forRoot(),
SqsModule.registerAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
producers: [
{
name: configService.get('AWS_SQS_QUEUE_NAME'),
queueUrl: configService.get('AWS_SQS_QUEUE_URL'),
},
],
region: 'ap-northeast-1',
}),
inject: [ConfigService],
}),
],
})
export class SqsProducerModule {}
Consumer
ConsumerでSQSをポーリングしてメッセージを受け取り処理するものになります。
MyQueueHandlerがSQSのイベントを処理する実装になります。
Module定義
モジュールで使用するキューの名前とURLをしてします。
import { ConfigModule, ConfigService } from '@nestjs/config'
import { SqsModule } from '@ssut/nestjs-sqs'
import { Module } from '@nestjs/common'
import { MyQueueHandler } from './consumer/my-queue-handler'
import { TestModule } from 'src/modules/test/test.module'
import { RedisModule } from '@nestjs-modules/ioredis'
@Module({
imports: [
TestModule,
RedisModule,
ConfigModule.forRoot(),
SqsModule.registerAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
consumers: [
{
name: configService.get('AWS_SQS_QUEUE_NAME'),
queueUrl: configService.get('AWS_SQS_QUEUE_URL'),
},
],
region: 'ap-northeast-1',
}),
inject: [ConfigService],
}),
],
providers: [MyQueueHandler],
})
export class SqsConsumerModule {}
Handler
@SqsMessageHandlerデコレーターで使用するキューの名前を指定するだけで、
バックグラウンドでキューをポーリングしてくれます。
@SqsConsumerEventHandlerでそれぞれタイムアウトした時のイベントで処理が失敗した時のイベントもリッスンしています。
import { Injectable } from '@nestjs/common'
import { SqsConsumerEventHandler, SqsMessageHandler } from '@ssut/nestjs-sqs'
import * as Sqs from '@aws-sdk/client-sqs'
import { MyQueueMessage } from '../types/message'
import { TestService } from 'src/modules/test/test.service'
@Injectable()
export class MyQueueHandler {
constructor(private readonly testService: TestService) {}
@SqsMessageHandler(process.env.AWS_SQS_QUEUE_NAME, false)
async handleMessage(message: Sqs.Message) {
if (!message.Body) {
throw new Error('Message body is empty')
}
const body = JSON.parse(message.Body) as MyQueueMessage
await this.testService.processHandleMessage(body)
}
@SqsConsumerEventHandler(process.env.AWS_SQS_QUEUE_NAME, 'processing_error')
async handleError(message: Sqs.Message) {
console.error('Error processing message', message)
await this.testService.processHandleError(JSON.parse(message.Body) as MyQueueMessage)
throw new Error('Error processing message')
}
@SqsConsumerEventHandler(process.env.AWS_SQS_QUEUE_NAME, 'timeout_error')
async handleTimeoutError(message: Sqs.Message) {
console.error('Timeout error processing message', message)
await this.testService.processHandleError(JSON.parse(message.Body) as MyQueueMessage)
throw new Error('Timeout error processing message')
}
}
Service
sendAsyncMessageがキューのメッセージ登録、processHandleMessageが実装の処理になります。
今回はサンプルなので処理自体はただ10秒スリープさせるだけにしました。
処理は進行状況はRedisに保存するようにしてみました。
async sendAsyncMessage(message: string) {
const id = crypto.randomUUID()
await this.redisService.set(id, 'waiting')
await this.sqsService.send(this.configService.get('AWS_SQS_QUEUE_NAME'), {
id,
body: {
id,
message,
timestamp: Date.now(),
},
})
return {
id,
}
}
async processHandleMessage(body: MyQueueMessage) {
await this.redisService.set(body.id, AsyncMessageStatus.PROCESSING)
// 重たい処理
await new Promise((resolve) => setTimeout(resolve, 10000))
await this.redisService.set(body.id, AsyncMessageStatus.DONE)
}
async processHandleError(body: MyQueueMessage) {
await this.redisService.set(body.id, AsyncMessageStatus.ERROR)
}
async getStatus(id: string) {
return await this.redisService.get(id)
}
Controller
Controllerは非同期処理のリクエストとステータス取得のエンドポイントの実装になります。
@ApiOperation({
operationId: 'test-send-async-message',
summary: '非同期メッセージ送信',
description: '非同期メッセージ送信APIの説明',
})
@ApiResponse({
status: 200,
type: SendAsyncMessageResponseDto,
})
@Post('send-async-message')
@HttpCode(HttpStatus.OK)
async sendAsyncMessage(@Body() body: SendAsyncMessageRequestDto): Promise<SendAsyncMessageResponseDto> {
return await this.testService.sendAsyncMessage(body.message)
}
@ApiOperation({
operationId: 'test-get-async-status',
summary: '非同期メッセージステータス取得',
description: '非同期メッセージステータス取得APIの説明',
})
@ApiResponse({
status: 200,
type: GetAsyncStatusResponseDto,
})
@Get(':id/status')
@HttpCode(HttpStatus.OK)
async getAsyncStatus(@Param('id') id: string): Promise<GetAsyncStatusResponseDto> {
const status = await this.testService.getStatus(id)
return plainToInstance(GetAsyncStatusResponseDto, { status })
}
Swagger上で実行してみる
非同期処理のリクエストを実行しIDを取得します。
取得したIDをパスパラメーターで渡す形で状態を監視します。
呼び出すタイミングの時間経過でよってwaitting、processing、DONEがレスポンスで返却されます。
最後に
今回のユースケースとしては非同期処理させたいリクエストをフロントエンド側で実行し、
ステータス取得APIを実行して状態を監視し画面切り替えるような想定にしています。
nestjs-sqsを使用すれば非同期処理を比較的簡単に実装できました。
今回は下記については省きましたが考慮することでより安定した非同期処理が実現可能かと思います。
- fifoキューにして順序の担保、失敗したキューをデッドレターキューへ送信
- 可視性タイムアウトの考慮(デフォルト設定のまま)
- NestアプリケーションでのjobConsumerとAPIとのリソース分割
- yarn start:devで同時に起動するようになっています。
参考になれば幸いです。
Discussion