🗿

PostgreSQL で SKIP LOCKED を利用する

2023/01/03に公開

PostgreSQL (TimescaleDB) で SKIP LOCKED という機能がある事を知ったため、自分用のメモ。

目的

最大処理時間が数十時間かかる可能性があるワーカーの管理に、MQ 的な仕組みを使いたいと考え、できるだけ楽に使える仕組みを検討することにした。

  • クラウドのサービスは利用しない
    • AWS / GCP / Azure といったサービスを普段から利用していない
  • Go で利用できること
  • 普段の負荷は 1 時間 10 リクエストもない
    • 負荷対策での利用ではない
  • MQ をもし使う場合は RabbitMQ だが、今回はオーバーキル
    • Erlang VM で、ソースも一通り読んだことあるので、何かあってもなんとかなる
  • 一つの処理が数十時間かかることもあり、ジョブにはキャンセルをできるようにしたい
    • 処理が終わるまでは Ack を返さない仕組みが必要

Redis の検討

MQ を除くと色々調べてみるとどうやらバックエンドに Redis を使うというのが多かったので、一旦そちらの方向で検討してみた。

Redis 自体は公式が提供しているクラウドサービスを利用する前提。

https://redis.com/redis-enterprise-cloud/overview/

日本リージョンがあり、容量が少なければかなり安い。

ただ永続性という点では Redis を使うのに抵抗があることや、新しくサービスを増やしたくなかったので、Redis の採用は見送った。

PostgreSQL の検討

普段から利用している TimescaleDB は PostgreSQL の拡張ということもあり、PostgreSQL で何か実現できないか検討してみた。

特に sqlc を利用している事もあり SQL にまとめられるのは理想だし、永続化の事を考えるとできる限り PostgreSQL を利用したい。

SKIP LOCKED

色々調べてみるとどうやら PostgreSQL 9.5 で SKIP LOCKED という仕組みが入ったらしい。
これは簡単に言うと 行ロックされている行をスキップできる という仕組みの模様。

ワーカーがジョブを取り出す時、ロックを取ってそのジョブの状態を変更するという SQL を書くことになるだろうと思っていたので、これがあれば、どうやらやりたいことができそうと判断した。

性能が要件に一切ない事もあり、PostgreSQL でまかなえるならなんとかまかないたい。
利用している TimescaleDB は開発元が提供しているマネージドサービスを利用している事もあり、運用コストがかかっていない。

実際 1 年以上稼働しっぱなしかつ、アップデートも定期的に行っているが問題は一切起きていない。

イメージコード

  • SQL は sqlc 文法を利用している

ジョブテーブル

テーブル的には job というのを用意して、このテーブルにジョブを追加する。

CREATE TYPE job_status AS ENUM (
  -- ジョブが追加された
  'queued',
  -- ジョブ実行中
  'in_progress',
  -- キャンセル処理待ちのジョブ
  'waiting',
  -- ジョブ完了
  'completed',
  'success',
  'failure',
  -- キャンセルされたジョブ
  'cancelled',
  'time_out',
  'skipped'
);

CREATE TABLE job (
  pk BIGSERIAL PRIMARY KEY,

  -- ジョブの状態
  job_status job_status NOT NULL,

  -- ジョブ追加時のタイムスタンプ
  job_add_timestamp TIMESTAMPTZ NOT NULL,

  -- ジョブ開始時のタイムスタンプ
  job_start_timestamp TIMESTAMPTZ,

  -- ジョブキャンセル時のタイムスタンプ
  job_cancel_timestamp TIMESTAMPTZ,

  -- ジョブ完了時のタイムスタンプ
  job_complete_timestamp TIMESTAMPTZ,

  -- ジョブ失敗時のタイムスタンプ
  job_failure_timestmap TIMESTAMPTZ,

  -- ジョブの中身
  body JSONB NOT NULL,

  -- ジョブがキャンセルされたかどうか
  canceled BOOLEAN DEFAULT FALSE NOT NULL,
);

ジョブ追加

-- name: AddJob :exec
INSERT INTO job (
    job_status,
    job_add_timestamp,
    body
  )
VALUES (
    'queued',
    @job_add_timestamp,
    @body
  );

ジョブ取得

-- name: GetJob :one
SELECT *
FROM job
WHERE job_status = 'queued'
ORDER BY job_add_timestamp ASC
LIMIT 1
FOR UPDATE
SKIP LOCKED;

TX で別のワーカーがロックしていない queued 状態のジョブを取得して、job_status を in_progress に変更し、job_start_timestamp を追加しておく。

見つからなければそのまま何もしない。

ジョブのキャンセルチェック

ワーカー側では定期的にジョブの状態を確認しにいき、キャンセルになっていたら、ワーカー側の処理を停止してジョブをキャンセルする。

ワーカーは Systemd Timer ユニットを利用

一定間隔で起動し、GetJob を行い、ジョブがあれば処理を行い、ジョブがなければ処理は行わない。
Systemd Timer ユニットは発火中に、再度タイマーが発火してもスキップしてくれる。

まとめ

sqlc を利用して Go でテストコードを書いて、dockertest で TimescaleDB で E2E テストを行ったところ、期待通りの動作をすることを確認できた。

性能が要件になければ、 SKIP LOCKED を利用したジョブキューは自分の用途にはぴったりだった。

参考資料

Discussion