📝

SpringBoot + Kotlin で @SqsListener を用いてSQSのメッセージを処理する

2023/02/08に公開

背景

AWSのSQSにメッセージをキューイングしておき、SpringBootのアプリケーションで処理したいというときに Spring Cloud AWS の @SqsListener というアノテーションで簡単に実装できると聞いたので、試してみた

2023/09/19追記

以下の記事で最新バージョンでやり直しているので、最新バージョンでやりたい場合はこちらで
https://zenn.dev/yyamada12/articles/dd08cda8b7ff01

概要

サンプルコードはこちら

準備

localstack の起動

  • docker-compose.ymmlに以下を記載
version: "3.8"

services:
  localstack:
    container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}"
    image: localstack/localstack
    ports:
      - "127.0.0.1:4566:4566"            # LocalStack Gateway
      - "127.0.0.1:4510-4559:4510-4559"  # external services port range
    environment:
      - DEBUG=${DEBUG-}
      - LAMBDA_EXECUTOR=${LAMBDA_EXECUTOR-}
      - DOCKER_HOST=unix:///var/run/docker.sock
    volumes:
      - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack"
      - "/var/run/docker.sock:/var/run/docker.sock"

(localstackの公式のdocker-compose.ymlをそのまま利用している)

  • docker compose で起動
docker compose up
  • 適当なコマンドを叩いて、うまく動いていれば OK

例) 接続しているアカウントの確認コマンド

awslocal sts get-caller-identity

↓ のような結果が取得できれば OK

{
    "UserId": "AKIAIOSFODNN7EXAMPLE",
    "Account": "000000000000",
    "Arn": "arn:aws:iam::000000000000:root"
}

awslocal コマンドを利用した SQS の操作

awslocal sqs コマンドを利用する

  • help の確認
awslocal sqs help
  • キューの作成
awslocal sqs create-queue --queue-name 'sample-queue'
  • キューの一覧を確認
awslocal sqs list-queues
  • キューにメッセージを送信
awslocal sqs send-message --queue-url http://localhost:4566/000000000000/sample-queue --message-body '{"message": "hoge"}'
  • メッセージを確認
awslocal sqs receive-message --queue-url http://localhost:4566/000000000000/sample-queue

message-apiの実装

sqsのメッセージを処理するアプリケーションとして、 message-api と名付けている

spring intializr から、以下の設定で Project を生成

Project: Gradle - Kotlin
Language: Kotlin
Spring Boot: 2.7.8
Project Metadata
  Group: com.example
  Artifact: message-api
  Name: message-api
  Description: Demo project for Spring Boot
  Package name: com.example.message.api
  Packaging: Jar
  Java: 11
Dependencies: No dependency selected

以下の手順で message-api を実装する

1: 必要なライブラリの追加
build.gradle.kts に以下を追加

implementation("io.awspring.cloud:spring-cloud-starter-aws:2.4.2")
implementation("io.awspring.cloud:spring-cloud-aws-messaging:2.4.2")

2: ローカルへの接続設定
application.properties に以下を追加

cloud.aws.sqs.endpoint=http://localhost:4566

3: handler の実装
com.example.message.api 配下に新規で handler という package を追加し、SampleSQSMeesageHandler.kt を作成

package com.example.message.api.handler

import io.awspring.cloud.messaging.listener.annotation.SqsListener
import org.springframework.stereotype.Component

@Component
class SampleSQSMeesageHandler {

    // SqsListener のアノテーションをつけることで、SQSをポーリングしてメッセージを取得するようになる
    // https://spring.pleiades.io/spring-cloud-aws/docs/current/reference/html/index.html#annotation-driven-listener-endpoints
    @SqsListener("sample-queue")
    fun queueListener(data: String) {
        println(data)
    }
}

これだけで OK
ライブラリ側で sample-queue をポーリングして、メッセージをqueueListenerメソッドの引数に渡して実行してくれる

message-api の動作確認

前提: awslocal コマンドで sample-queue を作成していること

1: application を起動
Intellij から起動
or

cd message-api
./gradlew bootRun

すでに awslocal コマンドでメッセージを送信していた場合は、コンソールに以下のように出力されるはず

{"message": "hoge"}

2: SQS にメッセージを送信

awslocal sqs send-message --queue-url http://localhost:4566/000000000000/sample-queue --message-body '{"message": "fuga"}'

3: コンソール出力の確認
message-api で処理されて以下のように出力される

{"message": "fuga"}

message-api の追加実装

先の参考実装では、メッセージを String で受け取っているが、
String では扱いにくいため json メッセージを kotlin の data class で受け取るようにする

1: 必要なライブラリの追加
build.gradle.kts に以下を追加

implementation("org.springframework.boot:spring-boot-starter-json")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")

2: メッセージを jackson を利用してマッピングする設定を追加
com.example.message.api 配下に新規で config という package を追加し、 SqsConfig.kt を作成

package com.example.message.api.config

import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.converter.MappingJackson2MessageConverter

@Configuration
class SqsConfig {

    // jackson を利用してメッセージを変換するための設定を追加
    @Bean
    fun mappingJackson2MessageConverter(objectMapper: ObjectMapper): MappingJackson2MessageConverter {
        val messageConverter = MappingJackson2MessageConverter()
        messageConverter.objectMapper = objectMapper
        return messageConverter
    }
}

3: handler の実装

以下の data class を追加

data class SampleData(
  val message: String
)

handler の method の引数の型を String -> SampleData に変更

- fun queueListener(data: String) {
+ fun queueListener(data: SampleData) {

これで先ほど同様に動作確認をすると、 json を data class のオブジェクトとして扱えることが確認できる

まとめ

@SqsListener を利用したSQSのメッセージ処理を実装した
アノテーションをつけるだけで簡単にキューのポーリングをやってくれるので便利

Discussion

ログインするとコメントできます