🦔

PL/pgSQLでQueueに模したテーブルから非同期に取得する

2024/04/12に公開

目的

インフラにPostgreSQLがいるけど適当なQueueが無い場合、テーブルでQueueの代わりを作ろうと思います。この時アプリケーションは非同期にキューからデータを引き抜いて処理していくことを想定します。

コード

workers

処理する対象を保存するテーブルです。

workers.sql
DROP TABLE IF EXISTS workers;
CREATE TABLE workers (
    uuid UUID NOT NULL
    ,created_at TIMESTAMPTZ NOT NULL
    ,start_at TIMESTAMPTZ
    ,PRIMARY KEY (uuid)
);

処理が開始されるとstart_atに開始時間が設定されます。データはcreated_atの一番古いものから処理をしていきます。

PL/pgSQL

my_set_workers_for_start.sql
DROP TYPE IF EXISTS type_my_set_workers_for_start CASCADE;
CREATE TYPE type_my_set_workers_for_start AS (
  uuid UUID
);

CREATE OR REPLACE FUNCTION my_set_workers_for_start (
  p_now TIMESTAMPTZ DEFAULT NULL
) RETURNS SETOF type_my_set_workers_for_start AS $FUNCTION$
DECLARE
  w_now TIMESTAMPTZ := COALESCE(p_now, NOW());
  w_row RECORD;
BEGIN
  FOR i IN 1..10 LOOP
    SELECT 
      uuid
    INTO 
      w_row
    FROM 
      workers
    WHERE 
      start_at IS NULL
    ORDER BY 
      t1.created_at ASC
    LIMIT 1;
    IF NOT FOUND THEN
      RETURN;
    END IF;
    
    UPDATE workers SET
      start_at = w_now
    WHERE
      uuid = w_row.uuid
      AND start_at IS NULL
    ;
    IF FOUND THEN
      RETURN QUERY SELECT w_row.uuid;
      RETURN;
    END IF;
  END LOOP;
END;
$FUNCTION$ LANGUAGE plpgsql;

SELECT文

最初のSELECT文では、まだ処理されていない一番古いデータを1つ取得しています。取れなければキューにデータが無いので終了します。

UPDATE文

開始時間を更新します。ここでわざわざstart_at IS NULLをチェックしているのは非同期がからんできます。同時にこのPL/pgSQLが実行されることがあるので、SELECT文の時点では同じレコードを見ている可能性があります。

UPDATE文は処理がロックされるので、各トランザクションは同時実行ができずに早い者勝ちになります。最初のトランザクションだけがstart_atがNULLで一度更新が成功するとstart_atがNOT NULLになります。

FOR文

UPDATEができなかったトランザクションたちは、次のレコードを探しに行きます。そのため全体をFOR文で囲っています。適当に10回くらい失敗したら、レコードはなかったことにしています。

まとめ

今回のコードはPostgreSQLのデフォルトのトランザクション分離レベルのリードコミッティドであることを利用しています。ダーティリードが発生しないので、UPDATE文では他のトランザクションがコミットされるまで待たされ、ファントムリードが発生するのでUPDATE文でstart_atをチェックしています。

詳細は本家のドキュメントトランザクションの分離を参照してください。

Discussion