PL/pgSQLでQueueに模したテーブルから非同期に取得する
目的
インフラにPostgreSQLがいるけど適当なQueueが無い場合、テーブルでQueueの代わりを作ろうと思います。この時アプリケーションは非同期にキューからデータを引き抜いて処理していくことを想定します。
コード
workers
処理する対象を保存するテーブルです。
DROP TABLE IF EXISTS workers;
CREATE TABLE workers (
uuid UUID NOT NULL
,created_at TIMESTAMPTZ NOT NULL
,start_at TIMESTAMPTZ
,PRIMARY KEY (uuid)
);
処理が開始されるとstart_atに開始時間が設定されます。データはcreated_atの一番古いものから処理をしていきます。
PL/pgSQL
DROP TYPE IF EXISTS type_my_set_workers_for_start CASCADE;
CREATE TYPE type_my_set_workers_for_start AS (
uuid UUID
);
CREATE OR REPLACE FUNCTION my_set_workers_for_start (
p_now TIMESTAMPTZ DEFAULT NULL
) RETURNS SETOF type_my_set_workers_for_start AS $FUNCTION$
DECLARE
w_now TIMESTAMPTZ := COALESCE(p_now, NOW());
w_row RECORD;
BEGIN
FOR i IN 1..10 LOOP
SELECT
uuid
INTO
w_row
FROM
workers
WHERE
start_at IS NULL
ORDER BY
t1.created_at ASC
LIMIT 1;
IF NOT FOUND THEN
RETURN;
END IF;
UPDATE workers SET
start_at = w_now
WHERE
uuid = w_row.uuid
AND start_at IS NULL
;
IF FOUND THEN
RETURN QUERY SELECT w_row.uuid;
RETURN;
END IF;
END LOOP;
END;
$FUNCTION$ LANGUAGE plpgsql;
SELECT文
最初のSELECT文では、まだ処理されていない一番古いデータを1つ取得しています。取れなければキューにデータが無いので終了します。
UPDATE文
開始時間を更新します。ここでわざわざstart_at IS NULLをチェックしているのは非同期がからんできます。同時にこのPL/pgSQLが実行されることがあるので、SELECT文の時点では同じレコードを見ている可能性があります。
UPDATE文は処理がロックされるので、各トランザクションは同時実行ができずに早い者勝ちになります。最初のトランザクションだけがstart_atがNULLで一度更新が成功するとstart_atがNOT NULLになります。
FOR文
UPDATEができなかったトランザクションたちは、次のレコードを探しに行きます。そのため全体をFOR文で囲っています。適当に10回くらい失敗したら、レコードはなかったことにしています。
まとめ
今回のコードはPostgreSQLのデフォルトのトランザクション分離レベルのリードコミッティドであることを利用しています。ダーティリードが発生しないので、UPDATE文では他のトランザクションがコミットされるまで待たされ、ファントムリードが発生するのでUPDATE文でstart_atをチェックしています。
詳細は本家のドキュメントトランザクションの分離を参照してください。
Discussion