📩

nestjs-sqsをLocalStackのSQS環境で試してみる

2022/03/12に公開約7,200字

はじめに

  • NestJSAWS SQSのProducer[1]とConsumer[2]側のソースコードのサンプルを実装してみたいと思います。
  • AWS SQSのテスト環境はLocalStackで構築します。
  • nestjs-sqsモジュールを使用して今回は作業します。

対象読者

  • AWS SQSのProducerとConsumer側の実装をしてみたい方
  • NestJSを勉強中の方

nestjs-sqsとは

https://www.npmjs.com/package/@ssut/nestjs-sqs

動作環境

  • Docker Desktop - 4.2.0以上
  • AWS CLI - 2.2.x
  • Node.js - 16.x
  • Yarn - 1.22.x

使用ライブラリ

  • nestjs - 8.x
  • @ssut/nestjs-sqs - 1.2.x

構成イメージ

  • Docker内にLocalStackを使用してSQSを立てます。
  • NestJSでProducerとConsumer側を実装します。
    • Producer側はコントローラーを定義して、該当URLを起動したらSQSメッセージを送信するようにします。
    • Consumer側は常駐起動させて、SQSのキューにメッセージが入ったら処理うするようにします。

LocalStack内のSQS環境構築

docker-composeで環境構築

docker-compose.yml
version: "3.8"

services:
  localstack:
    container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}"
    image: localstack/localstack:0.14.0
    network_mode: bridge
    ports:
      - "127.0.0.1:4510-4559:4510-4559"  # external service port range
      - "127.0.0.1:4566:4566"            # LocalStack Edge Proxy
    environment:
      - SERVICES=sqs
      - DEBUG=${DEBUG-}
      - DATA_DIR=${DATA_DIR-}
      - HOST_TMP_FOLDER=${TMPDIR:-/tmp/}localstack
      - DOCKER_HOST=unix:///var/run/docker.sock
    volumes:
      - "${TMPDIR:-/tmp}/localstack:/tmp/localstack"
      - "/var/run/docker.sock:/var/run/docker.sock"
# docker-compose実行
docker-compose up -d

  • SQS Serviceが起動していれば成功です。

AWS CLIでキューを作成

事前にAWS CLIのプロファイルにlocalstackを作成します。
以下のコマンドを実行してLocalStackのSQSにキューを作成してください。

aws sqs create-queue --endpoint-url http://localhost:4566 --queue-name sample-queue --profile localstack

Producer側実装

Module実装

apps/sqs-producer/src/sqs-producer.module.ts
import { Module } from '@nestjs/common';
import { SqsModule } from '@ssut/nestjs-sqs';
import { SqsProducerController } from './sqs-producer.controller';

@Module({
  imports: [
    // SqsModule設定
    SqsModule.register({
      consumers: [],
      // Producer設定
      producers: [
        {
	  // test-queueとして内部で使用できるように定義
          name: 'test-queue',
	  // localstackのキューのURLを指定
          queueUrl: 'http://localhost:4566/000000000000/sample-queue',
          region: 'us-east-1',
        },
      ],
    }),
  ],
  // Controller設定
  controllers: [SqsProducerController],
  providers: [],
})
export class SqsProducerModule {}

Controller実装

apps/sqs-producer/src/sqs-producer.controller.ts
import { Controller, Get, Query } from '@nestjs/common';
import { SqsService } from '@ssut/nestjs-sqs';

@Controller()
export class SqsProducerController {
  // SqsServiceをインジェクション
  constructor(private readonly sqsService: SqsService) {}

  // ?message= でメッセージを送信するAPIを定義
  @Get()
  async sendQueue(@Query('message') message: string) {
    const msg = message || 'hoge';
    const date = (+new Date()).toString();

    // 内部で定義したtest-queueに対してキューを送信
    await this.sqsService.send('test-queue', {
      id: date,
      body: { message: msg, date: date },
    });

    return 'ok';
  }
}
  • http://localhost:3000/?message=をURL起動することでLocalStackのSQSに対してメッセージを送信できるようになります。

エントリーポイント実装

apps/sqs-producer/src/main.ts
import { NestFactory } from '@nestjs/core';
import { SqsProducerModule } from './sqs-producer.module';

async function bootstrap() {
  const app = await NestFactory.create(SqsProducerModule);
  await app.listen(3000);
}
bootstrap();

Consumer側実装

Module実装

apps/sqs-consumer/src/sqs-consumer.module.ts
import { Module } from '@nestjs/common';
import { SqsModule } from '@ssut/nestjs-sqs';
import { MessageHandler } from './message.handler';

@Module({
  imports: [
    // SqsModule設定
    SqsModule.register({
      // Consumer設定
      consumers: [
        {
	  // test-queueとして内部で使用できるように定義
          name: 'test-queue',
	  // localstackのキューのURLを指定
          queueUrl: 'http://localhost:4566/000000000000/sample-queue',
          region: 'us-east-1',
        },
      ],
      producers: [],
    }),
  ],
  controllers: [],
  // Handler設定
  providers: [MessageHandler],
})
export class SqsConsumerModule {}

Handler実装

apps/sqs-consumer/src/message.handler.ts
import { Injectable } from '@nestjs/common';
import { SqsMessageHandler } from '@ssut/nestjs-sqs';
import * as AWS from 'aws-sdk';

@Injectable()
export class MessageHandler {
  // 内部で定義したtest-queueに対してメッセージ受信時の処理を定義(SqsMessageHandlerデコレータで定義する)
  @SqsMessageHandler('test-queue', false)
  handleMessage(message: AWS.SQS.Message) {
    // SQSメッセージ内容をオブジェクト化
    const obj = JSON.parse(message.Body) as { message: string; date: string };
    // SQSメッセージ内容をログ出力
    console.log(obj);
  }
}

エントリーポイント実装

apps/sqs-consumer/src/main.ts
import { NestFactory } from '@nestjs/core';
import { SqsConsumerModule } from './sqs-consumer.module';

async function bootstrap() {
  const app = await NestFactory.create(SqsConsumerModule);
  await app.listen(3001);
}
bootstrap();

Producer/Consumer共通設定

nest-cli.json
{
  "collection": "@nestjs/schematics",
  "sourceRoot": "apps/sqs-producer/src",
  "monorepo": true,
  "root": "apps/sqs-producer",
  "compilerOptions": {
    "webpack": true,
    "tsConfigPath": "apps/sqs-producer/tsconfig.app.json"
  },
  "projects": {
    // Producer側設定
    "sqs-producer": {
      "type": "application",
      "root": "apps/sqs-producer",
      "entryFile": "main",
      "sourceRoot": "apps/sqs-producer/src",
      "compilerOptions": {
        "tsConfigPath": "apps/sqs-producer/tsconfig.app.json"
      }
    },
    // Consumer側設定
    "sqs-consumer": {
      "type": "application",
      "root": "apps/sqs-consumer",
      "entryFile": "main",
      "sourceRoot": "apps/sqs-consumer/src",
      "compilerOptions": {
        "tsConfigPath": "apps/sqs-consumer/tsconfig.app.json"
      }
    }
  }
}
  • NestJSのMonorepo機能でProducer/Consumerをひとつのリポジトリで管理します。
package.json
{
  ...省略
  "scripts": {
    "start:producer": "nest start sqs-producer", // Producer側起動
    "start:consumer": "nest start sqs-consumer", // Consumer側起動
  }
  ...省略
}

動作確認

Producer側起動

yarn start:producer

Consumer側起動

yarn start:consumer
  • Producer側とは別ターミナルで起動します。

SQSテスト

curl 'http://localhost:3000/?message=hogeee'

  • curl -> Producer側API -> SQS -> Consumerの流れでメッセージが伝搬することが確認できます。

ソースコード一式

https://github.com/yasu-s/nestjs-sqs-sample/tree/minimum

おわりに

  • 少ないステップ数で簡単にSQSのProducerとConsumer側の実装が出来ました。
  • NestJSを使用していたり、Node.jsで簡単にSQS関連のプログラムを組みたい方におすすめです。

その他のNestJS関連記事

https://kakkoyakakko2.hatenablog.com/entry/nestjs-monorepo

https://kakkoyakakko2.hatenablog.com/entry/nestjs-bull

https://kakkoyakakko2.hatenablog.com/entry/nestjs-graphql-code-first

https://kakkoyakakko2.hatenablog.com/entry/nestjs-grpc-server
脚注
  1. メッセージを送信する側 ↩︎

  2. メッセージを受信する側 ↩︎

Discussion

ログインするとコメントできます