AWS SQSのローカル開発環境をDockerで構築する
はじめに
開発をスムーズに行うためにSQSのローカル開発環境が欲しくなったので、
Dockerを利用して構築してみます
ElasticMQ
というSQS互換のインメモリメッセージキューシステムがあるようなのでこれを利用します
※ソースコード全文はこちら
利用技術
・Docker, docker-compose
・golang 1.18.3
・aws-sdk-go-v2
・ElasticMQ => 今回はroribio16/alpine-sqsのimageを利用
最終的なディレクトリ構成はこんな感じです
├── Dockerfile
├── .air.toml
├── docker-compose.yaml
├── go.mod
├── go.sum
└── src
├── main.go
├── publisher
│ └── publisher.go
├── services
│ └── queue.go
└── subscriber
└── subscriber.go
GoとElasticMQの環境構築
Go
go.mod
はこんな感じです
module github.com/TadayoshiOtsuka/sqs_local
go 1.18
require (
github.com/aws/aws-sdk-go-v2 v1.16.7
github.com/aws/aws-sdk-go-v2/config v1.15.14
github.com/aws/aws-sdk-go-v2/service/sqs v1.19.0
)
require (
github.com/aws/aws-sdk-go-v2/credentials v1.12.9 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.8 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.14 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.8 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.8 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.12 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.16.9 // indirect
github.com/aws/smithy-go v1.12.0 // indirect
)
Dockerfile
はこんな感じです
FROM golang:1.18.3
ENV TZ=Asia/Tokyo
ENV CGO_ENABLED=0
ENV GOOS=linux
ENV GOARCH=amd64
ENV ROOTPATH=/go/app
ENV QUEUE_URL=http://queue:9324/queue/default
ENV AWS_REGION=ap-northeast-1
ENV AWS_ACCESS_KEY_ID=dummy
ENV AWS_SECRET_ACCESS_KEY=dummy
WORKDIR ${ROOTPATH}
RUN go install github.com/cosmtrek/air@v1.29.0
COPY go.mod go.sum .air.toml ./
RUN go mod download
CMD ["air", "-c", ".air.toml"]
- 環境変数について
-
QUEUE_URL
は、のちほど作成するalpine-sqs
のコンテナがデフォルトで作成するQueueのエンドポイント(/queue/default
)とPort(9324
)からなっています -
AWS_REGION,AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY
は、
のちほど利用するaws-sdk-go-v2
がデフォルトでこれらの環境変数を読みにいくため、
何かしら値を設定しておく必要があります(ダミーでOK)
-
- CMDについて
- ホットリロードが便利なのでairを導入しています。必要なければ単に
go run src/main.go
としても大丈夫です
- ホットリロードが便利なのでairを導入しています。必要なければ単に
airの設定ファイルはこちら
# Config file for [Air](https://github.com/cosmtrek/air) in TOML format
# Working directory
# . or absolute path, please note that the directories following must be under root.
root = "."
tmp_dir = "tmp"
[build]
# Just plain old shell command. You could use `make` as well.
cmd = "go build -o ./tmp/main ./src/main.go"
# Binary file yields from `cmd`.
bin = "tmp/main"
# Customize binary, can setup environment variables when run your app.
full_bin = "APP_ENV=dev APP_USER=air ./tmp/main"
# Watch these filename extensions.
include_ext = ["go", "tpl", "tmpl", "html"]
# Ignore these filename extensions or directories.
exclude_dir = ["assets", "tmp", "vendor", "frontend/node_modules"]
# Watch these directories if you specified.
include_dir = []
# Exclude files.
exclude_file = []
# Exclude specific regular expressions.
exclude_regex = ["_test.go"]
# Exclude unchanged files.
exclude_unchanged = true
# Follow symlink for directories
follow_symlink = true
# This log file places in your tmp_dir.
log = "air.log"
# It's not necessary to trigger build each time file changes if it's too frequent.
delay = 1000 # ms
# Stop running old binary when build errors occur.
stop_on_error = true
# Send Interrupt signal before killing process (windows does not support this feature)
send_interrupt = false
# Delay after sending Interrupt signal
kill_delay = 500 # ms
[log]
# Show log time
time = false
[color]
# Customize each part's color. If no color found, use the raw app log.
main = "magenta"
watcher = "cyan"
build = "yellow"
runner = "green"
[misc]
# Delete tmp directory on exit
clean_on_exit = true
動作確認用にmain.go
は以下のようにしておきます
package main
import "fmt"
func main() {
fmt.Println("it's works!")
}
ElasticMQ & docker-compose.yaml
alpine-sqs
のimageはdocker-compose.yaml
から直で利用します
今回はデフォルトのQueueをそのまま利用するため行いませんが、
デッドレターキューや可視性タイムアウトなど設定のカスタムをする場合はここにある通り、
elasticmq.conf
を作成してマウントすることで実現できます
version: "3.8"
services:
subscriber:
build:
context: .
dockerfile: Dockerfile
tty: true
stdin_open: true
volumes:
- type: bind
source: "src"
target: "/go/app/src"
depends_on:
- queue
queue:
image: roribio16/alpine-sqs
ports:
- 9324:9324
- 9325:9325
# デフォルトの設定を書き換えるならこんなかんじ
# volumes:
# - type: bind
# source: "elasticmq.conf"
# target: "/opt/elasticmq.conf"
- ポートマッピングについて
-
9324
はQueueのport -
9325
はalpine-sqs
の管理画面のportです
-
動かしてみる
ここまでできたらコンテナを動かしてみます
docker-compose up --build
で、
・コンソールにit's works!
が出力
・ブラウザからhttp://localhost:9325
へアクセスしてalpine-sqs
の管理画面が表示
されれば成功です🎉
Publisher/Subscriberの実装
環境構築ができたので簡易的なPublisher/Subscriberを実装していきます
QueueService(SQSClient)の実装
・QueueへMessageを送るSend
メソッド
・QueueからMessageを受け取るRecieve
メソッド
・QueueのMessageを削除するDelete
メソッド
を実装します
のちに実装するPublisher
, Subscriber
から利用されます
package services
import (
"context"
"log"
"os"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
type QueueService struct {
client *sqs.Client
queueUrl string
}
func NewQueueService() *QueueService {
cfg, err := initAwsConfig()
if err != nil {
log.Panicln("Failed To Load Configuration ", err)
}
c := sqs.NewFromConfig(*cfg)
url := os.Getenv("QUEUE_URL")
return &QueueService{client: c, queueUrl: url}
}
func (s *QueueService) Send(ctx context.Context, body string) (*string, error) {
params := &sqs.SendMessageInput{
MessageBody: aws.String(body),
QueueUrl: aws.String(s.queueUrl),
DelaySeconds: 5,
}
res, err := s.client.SendMessage(ctx, params)
if err != nil {
return nil, err
}
return res.MessageId, nil
}
func (s *QueueService) Receive(ctx context.Context) (*sqs.ReceiveMessageOutput, error) {
params := &sqs.ReceiveMessageInput{
QueueUrl: aws.String(s.queueUrl),
WaitTimeSeconds: 20,
}
res, err := s.client.ReceiveMessage(ctx, params)
if err != nil {
return nil, err
}
return res, nil
}
func (s *QueueService) Delete(ctx context.Context, receiptHandle *string) error {
params := &sqs.DeleteMessageInput{
QueueUrl: aws.String(s.queueUrl),
ReceiptHandle: receiptHandle,
}
if _, err := s.client.DeleteMessage(ctx, params); err != nil {
return err
}
return nil
}
func initAwsConfig() (*aws.Config, error) {
resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
if os.Getenv("ENV") != "production" {
return aws.Endpoint{
PartitionID: "aws",
URL: os.Getenv("QUEUE_URL"),
SigningRegion: os.Getenv("AWS_REGION"),
}, nil
}
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
})
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithEndpointResolverWithOptions(resolver))
if err != nil {
return nil, err
}
return &cfg, nil
}
Publisherの実装
stringのsliceを渡すとそれを一つづつPublishするSendMessages
メソッドを実装します
package publisher
import (
"context"
"log"
"github.com/TadayoshiOtsuka/sqs_local/src/services"
)
type Publisher struct {
queueService services.QueueService
}
func NewPublisher(queue services.QueueService) *Publisher {
return &Publisher{queueService: queue}
}
func (s *Publisher) SendMessages(ctx context.Context, messages []string) {
for _, v := range messages {
_, err := s.queueService.Send(ctx, v)
if err != nil {
log.Panicln("Send Message Error: ", err)
}
}
}
Subscriberの実装
無限loopによってSQSのQueueへMessageを受け取るためのリクエストをし続け、
・Messageを受け取ったらそのBodyを出力しQueueからMessageを削除
・MessageがなければNo Message Contains
を出力
するstart
メソッドを実装します
package subscriber
import (
"context"
"log"
"github.com/TadayoshiOtsuka/sqs_local/src/services"
)
type Subscriber struct {
queueService services.QueueService
}
func NewSubscriber(queue services.QueueService) *Subscriber {
return &Subscriber{queueService: queue}
}
func (s *Subscriber) Start(ctx context.Context) {
for {
res, err := s.queueService.Receive(ctx)
if err != nil {
log.Panicln("Receive Message Error: ", err)
}
if len(res.Messages) <= 0 {
log.Println("No Message Contains")
continue
}
log.Println("Receive Message Body is:", *res.Messages[0].Body)
s.queueService.Delete(ctx, res.Messages[0].ReceiptHandle)
}
}
main.goの更新
QueueService
とPublisher
, Subscriber
をmain.go
で初期化/呼び出しします
package main
import (
"context"
"github.com/TadayoshiOtsuka/sqs_local/src/publisher"
"github.com/TadayoshiOtsuka/sqs_local/src/services"
"github.com/TadayoshiOtsuka/sqs_local/src/subscriber"
)
func main() {
ctx := context.Background()
queueService := services.NewQueueService()
subscriber := subscriber.NewSubscriber(*queueService)
publisher := publisher.NewPublisher(*queueService)
publisher.SendMessages(ctx, []string{"hello", "world"})
subscriber.Start(ctx)
}
動作確認
docker-compose up --build
で
sqs_local-pub-sub-1 | 2022/07/17 00:11:31 Receive Message Body is: hello
sqs_local-pub-sub-1 | 2022/07/17 00:11:31 Receive Message Body is: world
sqs_local-pub-sub-1 | 2022/07/17 00:11:51 No Message Contains
コンソールに上記のように表示されれば、正常にPub/Subできています🎉
alpine-sqs
の管理画面も見てみます、http://localhost:9325
をブラウザで開きます
PublishしたMessageが表示されています!
以上です!
これでSQSを利用した開発をする際にローカルで色々試しながらやっていけそうです🎉
参考にさせていただいたURL
Discussion