🧙‍♂️

NestJS+SQSで非同期処理を実装してみる

2024/12/24に公開

はじめに

NestJSを使った実践的な実装、第2段としてNestJS+SQSを使用した、
非同期処理を実装してみました。

非同期処理について

ChatGPTに聞いてみたら下記回答が返ってきました。

API を呼び出したときに「重たい処理(時間がかかる処理)」をその場で全部やろうとすると、ユーザーは処理が終わるまで待たされてしまい、アプリケーションも他の仕事ができなくなってしまいます。
そこで「重たい処理だけをキューに渡して、バックグラウンドで処理させる」方法を取ると、API 呼び出しは「作業をキューに依頼した」段階ですぐに返事を返せるため、ユーザーは待たずに先に進められます。このように**「実際の処理はあとで進めてもらい、呼び出し元はそれを待たずに先に進む」**のが非同期処理の考え方です。

APIリクエストではタイムアウトしてしまうような重たい処理やAPIで同時更新させたくないような処理をバックグラウンドで処理させるための仕組みというのが私の認識です。

使用パッケージ

今回以下を使って実装してみました。

  • nestjs-sqs
  • nestjs-modules/ioredis

処理の流れ

フローは下記のような想定です。

実装

Docker

ローカルでSQSを扱えるようにlocalstackを使用しました。
docker compose upでlocalstackを起動ごlocalstack-initでキューの作成を行うようにしました。

localstack:
    image: localstack/localstack:latest
    ports:
      - 4566:4566
    environment:
      - SERVICES=sqs
      - DEFAULT_REGION=ap-northeast-1
      - DOCKER_HOST=unix:///var/run/docker.sock
    volumes:
      - '${LOCALSTACK_VOLUME_DIR:-./my-localstack-store}:/var/lib/localstack'
      - '/var/run/docker.sock:/var/run/docker.sock'
    healthcheck:
      test: ['CMD', 'curl', '-f', 'http://localhost:4566/_localstack/health']
      interval: 5s
      timeout: 5s
      retries: 3
  localstack-init:
    image: amazon/aws-cli
    environment:
      - AWS_ACCESS_KEY_ID=dummy
      - AWS_SECRET_ACCESS_KEY=dummy
      - AWS_DEFAULT_REGION=ap-northeast-1
      - AWS_DEFAULT_OUTPUT=json
    entrypoint: /bin/sh
    volumes:
      - ./init-aws.sh:/init-aws.sh
    command: /init-aws.sh
    depends_on:
      localstack:
        condition: service_healthy
  redis:
    image: redis:latest
    ports:
      - 6379:6379
    restart: always
    networks:
      - my-swagger
    volumes:
      - ./my-redis-store:/data
    command: redis-server --appendonly yes

Producerの実装

SQSへDispatchするためのモジュールの定義になります。

import { SqsModule } from '@ssut/nestjs-sqs'
import { Module } from '@nestjs/common'
import { ConfigModule, ConfigService } from '@nestjs/config'

@Module({
  imports: [
    ConfigModule.forRoot(),
    SqsModule.registerAsync({
      imports: [ConfigModule],
      useFactory: (configService: ConfigService) => ({
        producers: [
          {
            name: configService.get('AWS_SQS_QUEUE_NAME'),
            queueUrl: configService.get('AWS_SQS_QUEUE_URL'),
          },
        ],
        region: 'ap-northeast-1',
      }),
      inject: [ConfigService],
    }),
  ],
})
export class SqsProducerModule {}

Consumer

ConsumerでSQSをポーリングしてメッセージを受け取り処理するものになります。
MyQueueHandlerがSQSのイベントを処理する実装になります。

Module定義
モジュールで使用するキューの名前とURLをしてします。

import { ConfigModule, ConfigService } from '@nestjs/config'
import { SqsModule } from '@ssut/nestjs-sqs'
import { Module } from '@nestjs/common'
import { MyQueueHandler } from './consumer/my-queue-handler'
import { TestModule } from 'src/modules/test/test.module'
import { RedisModule } from '@nestjs-modules/ioredis'

@Module({
  imports: [
    TestModule,
    RedisModule,
    ConfigModule.forRoot(),
    SqsModule.registerAsync({
      imports: [ConfigModule],
      useFactory: (configService: ConfigService) => ({
        consumers: [
          {
            name: configService.get('AWS_SQS_QUEUE_NAME'),
            queueUrl: configService.get('AWS_SQS_QUEUE_URL'),
          },
        ],
        region: 'ap-northeast-1',
      }),
      inject: [ConfigService],
    }),
  ],
  providers: [MyQueueHandler],
})
export class SqsConsumerModule {}

Handler
@SqsMessageHandlerデコレーターで使用するキューの名前を指定するだけで、
バックグラウンドでキューをポーリングしてくれます。
@SqsConsumerEventHandlerでそれぞれタイムアウトした時のイベントで処理が失敗した時のイベントもリッスンしています。

import { Injectable } from '@nestjs/common'
import { SqsConsumerEventHandler, SqsMessageHandler } from '@ssut/nestjs-sqs'
import * as Sqs from '@aws-sdk/client-sqs'
import { MyQueueMessage } from '../types/message'
import { TestService } from 'src/modules/test/test.service'

@Injectable()
export class MyQueueHandler {
  constructor(private readonly testService: TestService) {}
  @SqsMessageHandler(process.env.AWS_SQS_QUEUE_NAME, false)
  async handleMessage(message: Sqs.Message) {
    if (!message.Body) {
      throw new Error('Message body is empty')
    }
    const body = JSON.parse(message.Body) as MyQueueMessage
    await this.testService.processHandleMessage(body)
  }

  @SqsConsumerEventHandler(process.env.AWS_SQS_QUEUE_NAME, 'processing_error')
  async handleError(message: Sqs.Message) {
    console.error('Error processing message', message)
    await this.testService.processHandleError(JSON.parse(message.Body) as MyQueueMessage)
    throw new Error('Error processing message')
  }

  @SqsConsumerEventHandler(process.env.AWS_SQS_QUEUE_NAME, 'timeout_error')
  async handleTimeoutError(message: Sqs.Message) {
    console.error('Timeout error processing message', message)
    await this.testService.processHandleError(JSON.parse(message.Body) as MyQueueMessage)
    throw new Error('Timeout error processing message')
  }
}

Service

sendAsyncMessageがキューのメッセージ登録、processHandleMessageが実装の処理になります。
今回はサンプルなので処理自体はただ10秒スリープさせるだけにしました。
処理は進行状況はRedisに保存するようにしてみました。

async sendAsyncMessage(message: string) {
    const id = crypto.randomUUID()
    await this.redisService.set(id, 'waiting')
    await this.sqsService.send(this.configService.get('AWS_SQS_QUEUE_NAME'), {
      id,
      body: {
        id,
        message,
        timestamp: Date.now(),
      },
    })
    return {
      id,
    }
  }

  async processHandleMessage(body: MyQueueMessage) {
    await this.redisService.set(body.id, AsyncMessageStatus.PROCESSING)
    // 重たい処理
    await new Promise((resolve) => setTimeout(resolve, 10000))
    await this.redisService.set(body.id, AsyncMessageStatus.DONE)
  }

  async processHandleError(body: MyQueueMessage) {
    await this.redisService.set(body.id, AsyncMessageStatus.ERROR)
  }

  async getStatus(id: string) {
    return await this.redisService.get(id)
  }

Controller

Controllerは非同期処理のリクエストとステータス取得のエンドポイントの実装になります。

 @ApiOperation({
    operationId: 'test-send-async-message',
    summary: '非同期メッセージ送信',
    description: '非同期メッセージ送信APIの説明',
  })
  @ApiResponse({
    status: 200,
    type: SendAsyncMessageResponseDto,
  })
  @Post('send-async-message')
  @HttpCode(HttpStatus.OK)
  async sendAsyncMessage(@Body() body: SendAsyncMessageRequestDto): Promise<SendAsyncMessageResponseDto> {
    return await this.testService.sendAsyncMessage(body.message)
  }

  @ApiOperation({
    operationId: 'test-get-async-status',
    summary: '非同期メッセージステータス取得',
    description: '非同期メッセージステータス取得APIの説明',
  })
  @ApiResponse({
    status: 200,
    type: GetAsyncStatusResponseDto,
  })
  @Get(':id/status')
  @HttpCode(HttpStatus.OK)
  async getAsyncStatus(@Param('id') id: string): Promise<GetAsyncStatusResponseDto> {
    const status = await this.testService.getStatus(id)
    return plainToInstance(GetAsyncStatusResponseDto, { status })
  }

Swagger上で実行してみる

非同期処理のリクエストを実行しIDを取得します。

取得したIDをパスパラメーターで渡す形で状態を監視します。
呼び出すタイミングの時間経過でよってwaitting、processing、DONEがレスポンスで返却されます。


最後に

今回のユースケースとしては非同期処理させたいリクエストをフロントエンド側で実行し、
ステータス取得APIを実行して状態を監視し画面切り替えるような想定にしています。
nestjs-sqsを使用すれば非同期処理を比較的簡単に実装できました。

今回は下記については省きましたが考慮することでより安定した非同期処理が実現可能かと思います。

  • fifoキューにして順序の担保、失敗したキューをデッドレターキューへ送信
  • 可視性タイムアウトの考慮(デフォルト設定のまま)
  • NestアプリケーションでのjobConsumerとAPIとのリソース分割
    • yarn start:devで同時に起動するようになっています。

参考になれば幸いです。

今回のリポジトリはこちらになります

Discussion