🐁

Go × PostgreSQL で実装できる キューイングシステム River を触ってみる

2024/09/14に公開

はじめに

キューイングシステムを構築する選択肢として Amazon SQSGoogle Cloud Pub/Sub などが挙げられると思います。
そんな数あるキューイングシステムを構築する選択肢として Go × PostgreSQL で実装できる River というライブラリの存在を知りました。

本記事は River を実際にちょっと触って遊んでみたのでその共有となります。

※ 記事執筆時の River のバージョンは v0.11.4 です。

River とは

https://riverqueue.com/

Fast and reliable background jobs in Go.

Atomic, transaction-safe, robust job queueing for Go applications. Backed by PostgreSQL and built to scale.

速くて信頼性の高い Go のバックグラウンドジョブ

Go アプリケーション向けのアトミックでトランザクションセーフな堅牢なジョブキューイング。 PostgreSQL によってサポートされ、スケールに合わせて構築されています。

Go × PostgreSQL にてキューイングシステムを構築することができます。
永続化層に PostgreSQL を使うのであれば簡単に導入することができます。
River は3つのコアコンセプトが記載されています。

ジョブはリトライ戦略を簡単に構築でき、データの変更とジョブの登録は一貫して行うことができます。操作は一貫して PostgreSQL で行われるので Worker はコミットされたデータを前提に実行することが可能です。

キューイングシステムを構築してみる

docker compose を使ってローカル環境に以下のようなシステムを構築します。

https://github.com/riverqueue/river

リポジトリに example_ prefix で記述されているサンプルコードがいくつか存在しますがシステムチックなサンプルコードがなさそうだったのでお試しで実装してみます。

API にて Job への登録を実行し Worker にて時間がかかる処理を行います。
Transactional enqueueing にて説明があるようなユーザーの登録を行いメールを送信するようなものを想定します。シーケンスは下記のようなものになります。

※ Worker が PostgreSQL から Job を取得する処理、Job のステータスを更新する処理は正確には把握していないです。 PostgreSQL のログを見るに SELECT でポーリングして実行した Job は UPDATE で更新していそうでした。

準備

ORM として sqlc-dev/sqlc を用います。(私が好きってだけです。)
sqlc はドライバーとして jackc/pgx にて利用しますが River もドライバーとして pgx を使う方法が記載されているので使い勝手も悪くないかと思います。[1]

バージョン情報は以下の通りです。

go version
go version go1.23.1 darwin/arm64
sqlc version
v1.27.0

また、River を PostgreSQL で扱うためのマイグレーションを実施します。
CLI を下記コマンドでインストールして実行します。

go install github.com/riverqueue/river/cmd/river@latest
river migrate-up --database-url "$DATABASE_URL"

JobArgs の実装

JobArgs はキューイングシステムでメッセージのやり取りを行うための実装となります。
interface が定義されているのでおまじないを記述して実装します。

package riverx

import (
	"github.com/google/uuid"
	"github.com/riverqueue/river"
)

// おまじない
var _ river.JobArgs = (*JobArgs)(nil)

type JobArgs struct {
	UserID uuid.UUID
}

// Kind implements river.JobArgs.
func (ja JobArgs) Kind() string {
	return "job"
}

Kind() メソッドの戻り値にて Job の種類を決定するため他に JobArgs を定義する場合は注意が必要です。

Worker の実装

Worker はキューイングシステムで実際に Job を実行するための実装となります。
こちらも interface が定義されているのでおまじないを記述して実装します。

package riverx

import (
	"context"
	"fmt"
	"log/slog"
	"strconv"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/riverqueue/river"

	"github.com/otakakot/sample-go-river/pkg/schema"
)

// おまじない
var _ river.Worker[JobArgs] = (*Worker)(nil)

type Worker struct {
	pool *pgxpool.Pool // Worker の処理で PostgreSQL へと接続するために依存を含める
	river.WorkerDefaults[JobArgs] // この依存を含めることで NextRetry と Timeout がデフォルトで実装が可能
}

func New(pool *pgxpool.Pool) *Worker {
	return &Worker{
		pool: pool,
	}
}

// Work implements river.Worker.
func (w *Worker) Work(ctx context.Context, job *river.Job[JobArgs]) error {
	slog.Info("start work" + strconv.Itoa(int(job.ID)))
	defer slog.Info("end work" + strconv.Itoa(int(job.ID)))

	us, err := schema.New(w.pool).FindUserByID(ctx, job.Args.UserID)
	if err != nil {
		return fmt.Errorf("find user by id: %w", err)
	}

	slog.Info("start send email: " + us.Email)

	time.Sleep(5 * time.Second)

	slog.Info("done send email: " + us.Email)

	return nil
}

ここで実装した Worker をキューイングシステムとして以下の記述にて登録します。

conn := cmp.Or(os.Getenv("DSN"), "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable")

pool, err := pgxpool.New(context.Background(), conn)
if err != nil {
    panic(err)
}
defer pool.Close()

if err := pool.Ping(context.Background()); err != nil {
    panic(err)
}

worker := river.NewWorkers()

// ここで定義した Worker を登録する
river.AddWorker(worker, riverx.New(pool))

cli, err := river.NewClient(riverpgxv5.New(pool), &river.Config{
    Queues: map[string]river.QueueConfig{
        river.QueueDefault: {
            MaxWorkers: 1,
        },
    },
    Workers: worker,
})
if err != nil {
    panic(err)
}

Client(API) の実装

Client は Job を登録するための実装となります。
PostgreSQL との接続および Client を以下のように実装します。

conn := cmp.Or(os.Getenv("DSN"), "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable")

pool, err := pgxpool.New(context.Background(), conn)
if err != nil {
    panic(err)
}
defer pool.Close()

if err := pool.Ping(context.Background()); err != nil {
    panic(err)
}

riverCli, err := river.NewClient(riverpgxv5.New(pool), &river.Config{})
if err != nil {
    panic(err)
}

River のキューイングシステムに Job を登録する実装は以下のようになります。

hdl.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
    // ユーザーからの入力をごにょごにょする実装はサボりました

    ctx := r.Context()

    cli, err := river.NewClient(riverpgxv5.New(pool), &river.Config{})
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)

        return
    }

    // データの保存と Job の登録を一貫させるためにトランザクションを開始
    tx, err := pool.Begin(ctx)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)

        return
    }
    defer tx.Rollback(ctx)

    email := uuid.New().String() + "@example.com"

    // User のデータを保存
    us, err := schema.New(tx).InsertUser(ctx, email)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)

        return
    }

    // Job を登録
    res, err := cli.InsertTx(
        ctx,
        tx,
        riverx.JobArgs{ // 自分で定義した JobArgs を指定
            UserID: us.ID,
        },
        &river.InsertOpts{},
    )
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)

        return
    }

    if err := tx.Commit(ctx); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)

        return
    }

    w.WriteHeader(http.StatusAccepted)

    w.Write([]byte(strconv.Itoa(int(res.Job.ID))))
})

動作確認

Job の登録を API として雑に機能提供する形で実装したので curl にて実行します。

curl http://localhost:8080 

Worker は Job が登録されると起動し Worker 側ではドキュメントに記載がある通り User データにアクセスし取得することができました。

おわりに

Go の実装だけでキューイングシステムが完結するのはなかなかおもしろいと思います。
キューイングシステムでベンダーロックインおよび外部のミドルウェアに依存したくない方は検討の価値があるかと思います ... !!!

今回実装したコードは以下に置いておきます。

https://github.com/otakakot/sample-go-river

脚注
  1. : River の実装を眺めていたら River の内部実装も sqlc を採用していました。https://github.com/riverqueue/river/blob/master/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml ↩︎

Discussion