PostgreSQL で SKIP LOCKED を利用する
PostgreSQL (TimescaleDB) で SKIP LOCKED という機能がある事を知ったため、自分用のメモ。
目的
最大処理時間が数十時間かかる可能性があるワーカーの管理に、MQ 的な仕組みを使いたいと考え、できるだけ楽に使える仕組みを検討することにした。
- クラウドのサービスは利用しない
- AWS / GCP / Azure といったサービスを普段から利用していない
- Go で利用できること
- 普段の負荷は 1 時間 10 リクエストもない
- 負荷対策での利用ではない
- MQ をもし使う場合は RabbitMQ だが、今回はオーバーキル
- Erlang VM で、ソースも一通り読んだことあるので、何かあってもなんとかなる
- 一つの処理が数十時間かかることもあり、ジョブにはキャンセルをできるようにしたい
- 処理が終わるまでは Ack を返さない仕組みが必要
Redis の検討
MQ を除くと色々調べてみるとどうやらバックエンドに Redis を使うというのが多かったので、一旦そちらの方向で検討してみた。
Redis 自体は公式が提供しているクラウドサービスを利用する前提。
日本リージョンがあり、容量が少なければかなり安い。
ただ永続性という点では Redis を使うのに抵抗があることや、新しくサービスを増やしたくなかったので、Redis の採用は見送った。
PostgreSQL の検討
普段から利用している TimescaleDB は PostgreSQL の拡張ということもあり、PostgreSQL で何か実現できないか検討してみた。
特に sqlc を利用している事もあり SQL にまとめられるのは理想だし、永続化の事を考えるとできる限り PostgreSQL を利用したい。
SKIP LOCKED
色々調べてみるとどうやら PostgreSQL 9.5 で SKIP LOCKED という仕組みが入ったらしい。
これは簡単に言うと 行ロックされている行をスキップできる という仕組みの模様。
ワーカーがジョブを取り出す時、ロックを取ってそのジョブの状態を変更するという SQL を書くことになるだろうと思っていたので、これがあれば、どうやらやりたいことができそうと判断した。
性能が要件に一切ない事もあり、PostgreSQL でまかなえるならなんとかまかないたい。
利用している TimescaleDB は開発元が提供しているマネージドサービスを利用している事もあり、運用コストがかかっていない。
実際 1 年以上稼働しっぱなしかつ、アップデートも定期的に行っているが問題は一切起きていない。
イメージコード
- SQL は sqlc 文法を利用している
ジョブテーブル
テーブル的には job というのを用意して、このテーブルにジョブを追加する。
CREATE TYPE job_status AS ENUM (
-- ジョブが追加された
'queued',
-- ジョブ実行中
'in_progress',
-- キャンセル処理待ちのジョブ
'waiting',
-- ジョブ完了
'completed',
'success',
'failure',
-- キャンセルされたジョブ
'cancelled',
'time_out',
'skipped'
);
CREATE TABLE job (
pk BIGSERIAL PRIMARY KEY,
-- ジョブの状態
job_status job_status NOT NULL,
-- ジョブ追加時のタイムスタンプ
job_add_timestamp TIMESTAMPTZ NOT NULL,
-- ジョブ開始時のタイムスタンプ
job_start_timestamp TIMESTAMPTZ,
-- ジョブキャンセル時のタイムスタンプ
job_cancel_timestamp TIMESTAMPTZ,
-- ジョブ完了時のタイムスタンプ
job_complete_timestamp TIMESTAMPTZ,
-- ジョブ失敗時のタイムスタンプ
job_failure_timestmap TIMESTAMPTZ,
-- ジョブの中身
body JSONB NOT NULL,
-- ジョブがキャンセルされたかどうか
canceled BOOLEAN DEFAULT FALSE NOT NULL,
);
ジョブ追加
-- name: AddJob :exec
INSERT INTO job (
job_status,
job_add_timestamp,
body
)
VALUES (
'queued',
@job_add_timestamp,
@body
);
ジョブ取得
-- name: GetJob :one
SELECT *
FROM job
WHERE job_status = 'queued'
ORDER BY job_add_timestamp ASC
LIMIT 1
FOR UPDATE
SKIP LOCKED;
TX で別のワーカーがロックしていない queued 状態のジョブを取得して、job_status を in_progress に変更し、job_start_timestamp を追加しておく。
見つからなければそのまま何もしない。
ジョブのキャンセルチェック
ワーカー側では定期的にジョブの状態を確認しにいき、キャンセルになっていたら、ワーカー側の処理を停止してジョブをキャンセルする。
ワーカーは Systemd Timer ユニットを利用
一定間隔で起動し、GetJob を行い、ジョブがあれば処理を行い、ジョブがなければ処理は行わない。
Systemd Timer ユニットは発火中に、再度タイマーが発火してもスキップしてくれる。
まとめ
sqlc を利用して Go でテストコードを書いて、dockertest で TimescaleDB で E2E テストを行ったところ、期待通りの動作をすることを確認できた。
性能が要件になければ、 SKIP LOCKED を利用したジョブキューは自分の用途にはぴったりだった。
Discussion