🐳

AWS SQSのローカル開発環境をDockerで構築する

2022/07/17に公開

はじめに

開発をスムーズに行うためにSQSのローカル開発環境が欲しくなったので、
Dockerを利用して構築してみます
ElasticMQというSQS互換のインメモリメッセージキューシステムがあるようなのでこれを利用します

※ソースコード全文はこちら
https://github.com/TadayoshiOtsuka/sqs_local

利用技術

・Docker, docker-compose
・golang 1.18.3
aws-sdk-go-v2
・ElasticMQ => 今回はroribio16/alpine-sqsのimageを利用

最終的なディレクトリ構成はこんな感じです

├── Dockerfile
├── .air.toml
├── docker-compose.yaml
├── go.mod
├── go.sum
└── src
    ├── main.go
    ├── publisher
    │   └── publisher.go
    ├── services
    │   └── queue.go
    └── subscriber
        └── subscriber.go

GoとElasticMQの環境構築

Go

go.modはこんな感じです

go.mod
module github.com/TadayoshiOtsuka/sqs_local

go 1.18

require (
	github.com/aws/aws-sdk-go-v2 v1.16.7
	github.com/aws/aws-sdk-go-v2/config v1.15.14
	github.com/aws/aws-sdk-go-v2/service/sqs v1.19.0
)

require (
	github.com/aws/aws-sdk-go-v2/credentials v1.12.9 // indirect
	github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.8 // indirect
	github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.14 // indirect
	github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.8 // indirect
	github.com/aws/aws-sdk-go-v2/internal/ini v1.3.15 // indirect
	github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.8 // indirect
	github.com/aws/aws-sdk-go-v2/service/sso v1.11.12 // indirect
	github.com/aws/aws-sdk-go-v2/service/sts v1.16.9 // indirect
	github.com/aws/smithy-go v1.12.0 // indirect
)

Dockerfileはこんな感じです

Dockerfile
FROM golang:1.18.3

ENV TZ=Asia/Tokyo
ENV CGO_ENABLED=0
ENV GOOS=linux
ENV GOARCH=amd64
ENV ROOTPATH=/go/app
ENV QUEUE_URL=http://queue:9324/queue/default
ENV AWS_REGION=ap-northeast-1
ENV AWS_ACCESS_KEY_ID=dummy
ENV AWS_SECRET_ACCESS_KEY=dummy

WORKDIR ${ROOTPATH}

RUN go install github.com/cosmtrek/air@v1.29.0
COPY go.mod go.sum .air.toml ./
RUN go mod download

CMD ["air", "-c", ".air.toml"]
  • 環境変数について
    • QUEUE_URLは、のちほど作成するalpine-sqsのコンテナがデフォルトで作成するQueueのエンドポイント(/queue/default)とPort(9324)からなっています
    • AWS_REGION,AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEYは、
      のちほど利用するaws-sdk-go-v2がデフォルトでこれらの環境変数を読みにいくため、
      何かしら値を設定しておく必要があります(ダミーでOK)
  • CMDについて
    • ホットリロードが便利なのでairを導入しています。必要なければ単にgo run src/main.goとしても大丈夫です
airの設定ファイルはこちら
.air.toml
# Config file for [Air](https://github.com/cosmtrek/air) in TOML format

# Working directory
# . or absolute path, please note that the directories following must be under root.
root = "."
tmp_dir = "tmp"

[build]
# Just plain old shell command. You could use `make` as well.
cmd = "go build -o ./tmp/main ./src/main.go"
# Binary file yields from `cmd`.
bin = "tmp/main"
# Customize binary, can setup environment variables when run your app.
full_bin = "APP_ENV=dev APP_USER=air ./tmp/main"
# Watch these filename extensions.
include_ext = ["go", "tpl", "tmpl", "html"]
# Ignore these filename extensions or directories.
exclude_dir = ["assets", "tmp", "vendor", "frontend/node_modules"]
# Watch these directories if you specified.
include_dir = []
# Exclude files.
exclude_file = []
# Exclude specific regular expressions.
exclude_regex = ["_test.go"]
# Exclude unchanged files.
exclude_unchanged = true
# Follow symlink for directories
follow_symlink = true
# This log file places in your tmp_dir.
log = "air.log"
# It's not necessary to trigger build each time file changes if it's too frequent.
delay = 1000 # ms
# Stop running old binary when build errors occur.
stop_on_error = true
# Send Interrupt signal before killing process (windows does not support this feature)
send_interrupt = false
# Delay after sending Interrupt signal
kill_delay = 500 # ms

[log]
# Show log time
time = false

[color]
# Customize each part's color. If no color found, use the raw app log.
main = "magenta"
watcher = "cyan"
build = "yellow"
runner = "green"

[misc]
# Delete tmp directory on exit
clean_on_exit = true

動作確認用にmain.goは以下のようにしておきます

src/main.go
package main

import "fmt"

func main() {
	fmt.Println("it's works!")
}

ElasticMQ & docker-compose.yaml

alpine-sqsのimageはdocker-compose.yamlから直で利用します
今回はデフォルトのQueueをそのまま利用するため行いませんが、
デッドレターキューや可視性タイムアウトなど設定のカスタムをする場合はここにある通り、
elasticmq.confを作成してマウントすることで実現できます

docker-compose.yaml
version: "3.8"

services:
  subscriber:
    build:
      context: .
      dockerfile: Dockerfile
    tty: true
    stdin_open: true
    volumes:
      - type: bind
        source: "src"
        target: "/go/app/src"
    depends_on:
      - queue

  queue:
    image: roribio16/alpine-sqs
    ports:
      - 9324:9324
      - 9325:9325
# デフォルトの設定を書き換えるならこんなかんじ
#   volumes:
#      - type: bind
#        source: "elasticmq.conf"
#        target: "/opt/elasticmq.conf"
  • ポートマッピングについて
    • 9324はQueueのport
    • 9325alpine-sqsの管理画面のportです

動かしてみる

ここまでできたらコンテナを動かしてみます
docker-compose up --buildで、
・コンソールにit's works!が出力
・ブラウザからhttp://localhost:9325へアクセスしてalpine-sqsの管理画面が表示
されれば成功です🎉

Publisher/Subscriberの実装

環境構築ができたので簡易的なPublisher/Subscriberを実装していきます

QueueService(SQSClient)の実装

・QueueへMessageを送るSendメソッド
・QueueからMessageを受け取るRecieveメソッド
・QueueのMessageを削除するDeleteメソッド
を実装します
のちに実装するPublisher, Subscriberから利用されます

src/services/queue.go
package services

import (
	"context"
	"log"
	"os"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
)

type QueueService struct {
	client   *sqs.Client
	queueUrl string
}

func NewQueueService() *QueueService {
	cfg, err := initAwsConfig()
	if err != nil {
		log.Panicln("Failed To Load Configuration ", err)
	}
	c := sqs.NewFromConfig(*cfg)
	url := os.Getenv("QUEUE_URL")
	return &QueueService{client: c, queueUrl: url}
}

func (s *QueueService) Send(ctx context.Context, body string) (*string, error) {
	params := &sqs.SendMessageInput{
		MessageBody:  aws.String(body),
		QueueUrl:     aws.String(s.queueUrl),
		DelaySeconds: 5,
	}
	res, err := s.client.SendMessage(ctx, params)
	if err != nil {
		return nil, err
	}

	return res.MessageId, nil
}

func (s *QueueService) Receive(ctx context.Context) (*sqs.ReceiveMessageOutput, error) {
	params := &sqs.ReceiveMessageInput{
		QueueUrl:        aws.String(s.queueUrl),
		WaitTimeSeconds: 20,
	}
	res, err := s.client.ReceiveMessage(ctx, params)
	if err != nil {
		return nil, err
	}

	return res, nil
}

func (s *QueueService) Delete(ctx context.Context, receiptHandle *string) error {
	params := &sqs.DeleteMessageInput{
		QueueUrl:      aws.String(s.queueUrl),
		ReceiptHandle: receiptHandle,
	}
	if _, err := s.client.DeleteMessage(ctx, params); err != nil {
		return err
	}

	return nil
}

func initAwsConfig() (*aws.Config, error) {
	resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
		if os.Getenv("ENV") != "production" {
			return aws.Endpoint{
				PartitionID:   "aws",
				URL:           os.Getenv("QUEUE_URL"),
				SigningRegion: os.Getenv("AWS_REGION"),
			}, nil
		}
		return aws.Endpoint{}, &aws.EndpointNotFoundError{}
	})

	cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithEndpointResolverWithOptions(resolver))
	if err != nil {
		return nil, err
	}

	return &cfg, nil
}

Publisherの実装

stringのsliceを渡すとそれを一つづつPublishするSendMessagesメソッドを実装します

src/publisher/publisher.go
package publisher

import (
	"context"
	"log"

	"github.com/TadayoshiOtsuka/sqs_local/src/services"
)

type Publisher struct {
	queueService services.QueueService
}

func NewPublisher(queue services.QueueService) *Publisher {
	return &Publisher{queueService: queue}
}

func (s *Publisher) SendMessages(ctx context.Context, messages []string) {
	for _, v := range messages {
		_, err := s.queueService.Send(ctx, v)
		if err != nil {
			log.Panicln("Send Message Error: ", err)
		}
	}
}

Subscriberの実装

無限loopによってSQSのQueueへMessageを受け取るためのリクエストをし続け、
・Messageを受け取ったらそのBodyを出力しQueueからMessageを削除
・MessageがなければNo Message Containsを出力
するstartメソッドを実装します

src/subscriber/subscriber.go
package subscriber

import (
	"context"
	"log"

	"github.com/TadayoshiOtsuka/sqs_local/src/services"
)

type Subscriber struct {
	queueService services.QueueService
}

func NewSubscriber(queue services.QueueService) *Subscriber {
	return &Subscriber{queueService: queue}
}

func (s *Subscriber) Start(ctx context.Context) {
	for {
		res, err := s.queueService.Receive(ctx)
		if err != nil {
			log.Panicln("Receive Message Error: ", err)
		}
		if len(res.Messages) <= 0 {
			log.Println("No Message Contains")
			continue
		}
		log.Println("Receive Message Body is:", *res.Messages[0].Body)
		s.queueService.Delete(ctx, res.Messages[0].ReceiptHandle)
	}
}


main.goの更新

QueueServicePublisher, Subscribermain.goで初期化/呼び出しします

src/main.go
package main

import (
	"context"

	"github.com/TadayoshiOtsuka/sqs_local/src/publisher"
	"github.com/TadayoshiOtsuka/sqs_local/src/services"
	"github.com/TadayoshiOtsuka/sqs_local/src/subscriber"
)

func main() {
	ctx := context.Background()
	queueService := services.NewQueueService()
	subscriber := subscriber.NewSubscriber(*queueService)
	publisher := publisher.NewPublisher(*queueService)
	publisher.SendMessages(ctx, []string{"hello", "world"})
	subscriber.Start(ctx)
}

動作確認

docker-compose up --build

sqs_local-pub-sub-1  | 2022/07/17 00:11:31 Receive Message Body is: hello
sqs_local-pub-sub-1  | 2022/07/17 00:11:31 Receive Message Body is: world
sqs_local-pub-sub-1  | 2022/07/17 00:11:51 No Message Contains

コンソールに上記のように表示されれば、正常にPub/Subできています🎉

alpine-sqsの管理画面も見てみます、http://localhost:9325をブラウザで開きます

PublishしたMessageが表示されています!

以上です!
これでSQSを利用した開発をする際にローカルで色々試しながらやっていけそうです🎉

参考にさせていただいたURL

https://docs.aws.amazon.com/code-samples/latest/catalog/code-catalog-gov2-sqs.html
https://k5-n.com/elasticmq/
https://github.com/roribio/alpine-sqs
https://github.com/aws/aws-sdk-go-v2

Discussion