📖

go-sqs-worker というライブラリを作った

2025/01/23に公開

きらぼしシステム株式会社でエンジニアをしている mickamy です。
今期の目標で、当社のプレゼンスを高める、というものをおいたので、開発を行う上で得た知見をご紹介したいと思います。
以下、都合により言い切り(ですます調にあらず)です。


TL;DR

  • go-sqs-worker という AWS SQS バックエンドの非同期ジョブライブラリを作った
  • それで管理されるジョブを閲覧する viewer も作った。docker hub で公開している

motivation

周知の通り、バックエンドアプリケーションにおいては、非同期ジョブというものが求められるシチュエーションが多くある。具体的には、メールやプッシュ通知の送信処理の送信など、API などのレスポンスに直接関わらないが、一定度時間がかかるか失敗する可能性がある処理が該当する。これらの処理を非同期に実行し、リトライも考慮した非同期ジョブ管理システムが求められる。
大規模システムにおけるソリューションとしては、AWS Batch/Step Functions や Apache Kafka などが挙げられる。今回は、ジョブ間での依存(あるジョブが別のジョブの完了を待つなど)が少ないことからそれらの選択はとらなかった。Lambda は関数が増えたときにその管理が煩雑になる。
以上を考慮し、今回はちょうどいい選択肢として単一のコマンドからキックされる SQS バックエンドなジョブシステムを開発することとした。

specification

簡単なフロー図を以下に記載する。

フロー図

Producer

ジョブの発行を担う。これが発出するペイロードがワーカキューに積まれ、後述の Consumer により実行される。具体的なペイロードは以下だ。

// Message represents a job message.
// The Message struct is used to pass job information between components of the job processing system.
type Message struct {
	// ID is the unique identifier of the job
	ID uuid.UUID `json:"id" validate:"required,uuid"`

	// Type is the type of the job. It is used to determine the job handler
	Type string `json:"type" validate:"required"`

	// Payload is the data to be passed to the job
	Payload string `json:"payload" validate:"-"`

	// Status is the current status of the job
	Status Status `json:"status" validate:"required"`

	// OldStatus is the previous status of the job
	OldStatus Status `json:"-" validate:"-"`

	// RetryCount is the number of times the job has been retried
	RetryCount int `json:"retry_count" validate:"-"`

	// Caller is the information about the caller of the job
	Caller string `json:"caller" validate:"required"`

	// CreatedAt is the time the job was created
	CreatedAt time.Time `json:"created_at" validate:"required"`

	// UpdatedAt is the time the job was last updated
	UpdatedAt time.Time `json:"updated_at" validate:"required"`
}

Consumer

Producer によって発行・キューイングされたジョブを処理する存在者。実行時には、以下のフローを辿る。

  1. ジョブメッセージの受信
  2. ジョブハンドラの取得
    a. これは Consumer をインスタンス化する際に注入される関数によって行われる
  3. ジョブの実行
    a. 最大リトライ回数に満たない場合は再度同じキューにエンキューされる
    b. その際には、指数関数バックオフによって間隔の空いたリトライが実施されることになる(ここに、SQS にある DLQ 機能を使わなかった理由がある)
    c. 最大リトライ回数に達した場合には、DLQ にエンキューされる
  4. ユーザ指定のコールバック関数が存する場合は、それを実行

ざっくりとした流れは以上だ。実際にはジョブのステータス閲覧のために Redis への Write が走ることになる。

viewer

先述した Redis のデータを元に、各ジョブのステータスを閲覧するライブラリも開発した。管理画面の機能については、Sidekiq のそれを参考にした。ジョブの詳細が閲覧できる分、Sidekiq より機能が充実していると自負している(おれが知らないだけで、方法はあるのかもしれない)

how to use

ジョブの生成は、以下のコードになる。

func main() {
	ctx := context.Background()

	cfg := producer.Config{
		WorkerQueueURL: "http://localhost.localstack.cloud:4566/000000000000/worker-queue",
	}

	p, err := producer.New(cfg, aws.NewSQSClient(ctx))
	if err != nil {
		fmt.Println("failed to create producer", "error", err)
		return
	}

	msg, err := message.New(ctx, job.SuccessfulJobType.String(), job.SuccessfulJobPayload{
		Message: "hello",
	})
	if err != nil {
		fmt.Println("failed to create successful job message", "error", err)
		return
	}
	if err := p.Do(ctx, msg); err != nil {
		fmt.Println("failed to produce successful job", "error", err)
		return
	}
}

また、Consumer を実行は、以下のようになるだろう。

func main() {
	ctx := context.Background()

	cfg := consumer.Config{
		WorkerQueueURL:     "http://localhost.localstack.cloud:4566/000000000000/worker-queue",
		DeadLetterQueueURL: "http://localhost.localstack.cloud:4566/000000000000/dead-letter-queue",
	}

	c, err := consumer.New(cfg, aws.NewSQSClient(ctx), job.GetJobHandler)

	if err != nil {
		fmt.Println("failed to create consumer", "error", err)
		return
	}

	c.Do(ctx)
}

詳しくは example をご覧いただきたい。

summary

go-sqs-workerは、AWS SQSを利用したシンプルかつ拡張性の高い非同期ジョブ管理ライブラリとして開発された。このライブラリを使用することで、以下のような利点が得られる。

  • 簡潔なインターフェース: Producer と Consumer の明確な分離により、ジョブの生成と処理が直感的に実装可能
  • 信頼性の高い処理: 最大リトライ回数や指数関数バックオフなどの機能により、ジョブ失敗時の対応が標準化
  • 視覚的なステータス管理: Redis との連携と go-sqs-worker-viewer を通じて、ジョブの状態を視覚的に確認可能

本ライブラリは、メール送信やプッシュ通知といった一般的なバックグラウンドタスクだけでなく、一定程度の規模のジョブ管理にも対応できると考える。

ライブラリの詳細や導入方法については、GitHub リポジトリをぜひ参照してほしい。

Discussion