📚

Rustで常駐プログラムでバッチの起動を1つに制御する

2024/02/20に公開

目的

Rustで常駐プログラムでDBアクセスでDBアクセスができるようになりました。
ここでは指定時間に起動するバッチ処理を考えていきます。

プロセス複数起動

バッチ処理をするプロセスは複数立ち上げます。複数起動する理由は冗長化のためです。例えばAWSで別々のavailability zoneに配置することで、1つのゾーンで落ちても稼働し続けることができます。

バッチ処理の排他

複数プロセスが起動すると、それぞれが同じ時間にバッチを実行しようとします。これはバッチ処理では困ることがあります。どちらかのプロセスだけが実行してほしいです。同じサーバーならプロセス同期する方法もありますが、プロセスはavailability zoneが違うことを想定しています。よってDBを使って排他制御をしていきます。

バッチの追い越し

ここでは定期的に実行されるバッチの追い越しは想定しません。そこまで考えると複雑になってしまうので。
例えば1分毎のバッチでは、必ず1分以内で終わることを想定しています。もしバッチ間隔を超える処理時間ならバッチの起動間隔を長くするか、別のバッチに分割することを検討してください。

寄り道

バッチの排他処理の前に2つ寄り道をします。

make_looperの修正

以前のmake_looperですがcronの文字列表現の引数がstaticになっていました。これは当初コンパイラに怒られてしかたなく付けてましたが、staticを外すことができました。

具体的にはspawnする前に所有権を取得します。

looper.rs
pub fn make_looper<Fut1, Fut2>(
    pg_pool: PgPool,
    token: CancellationToken,
    expression: &str,
    f: impl Fn(&DateTime<Utc>, PgClient) -> Fut1 + Send + Sync + 'static,
    g: impl Fn() -> Fut2 + Send + Sync + 'static,
) -> JoinHandle<()>
where
    Fut1: Future<Output = ()> + Send,
    Fut2: Future<Output = ()> + Send,
{
    let expression = expression.to_owned();
    spawn(async move {
...

tracing

printlnでログを出すのをやめました。より実践的なtracingを使います。ただの情報はinfoでもう少し重めのエラーはwarnで表示することにしました。
以下のコードはログの初期化コードです。

fn prepare_log(app_name: &str) {
    let formatting_layer = BunyanFormattingLayer::new(app_name.into(), std::io::stdout);
    let filter = Targets::new().with_target(app_name, Level::INFO);
    let subscriber = Registry::default()
        .with(filter)
        .with(JsonStorageLayer)
        .with(formatting_layer);
    tracing::subscriber::set_global_default(subscriber).unwrap();
}

DBによる排他処理

閑話休題

テーブル

batch処理を制御するテーブルを作成します。

batches.sql
CREATE TABLE IF NOT EXISTS public.batches  (
  batch_code TEXT NOT NULL
  ,batch_start_at TIMESTAMPTZ
  ,batch_enable_second_count BIGINT NOT NULL DEFAULT 0
  ,PRIMARY KEY(batch_code)
);
  • batch_code : バッチの種類を特定するコード。例えば「daily_batch」、「hourly_batch」など。
  • batch_start_at : バッチの開始時間。バッチの処理が開始されて時間を保持します。
  • batch_enable_second_count : バッチが起動した後でこの時間経過した時に次のバッチが実行可能になる秒数

ここでは1分間に1回実行されるバッチを想定します。
55秒なのは60秒ぎりぎりだと起動したタイミングがたまたま同時だったりすると、うまく動かないかもしれないのでちょっと短くしています。

INSERT INTO batches(
    batch_code
    ,batch_enable_second_count
) VALUES (
    'minutely_batch'
    ,55
);

ストアードプロシージャー

排他制御するためのストアードプロシージャーを書きます。このストアードプロシージャーが行を返したらバッチを実行する権限を得られたとします。
行を返さなかったらプロセスは実行権限が無いとみなします。

resident_set_update_batch.sql
DROP TYPE IF EXISTS type_resident_set_update_batch CASCADE;
CREATE TYPE type_resident_set_update_batch AS (
  batch_code TEXT
);

-- バッチを開始する
-- 引数
--   p_batch_code : バッチコード
--   p_now : 現在時刻
-- 戻り値
--   batch_code : バッチUUID
CREATE OR REPLACE FUNCTION resident_set_update_batch(
  p_batch_code TEXT DEFAULT NULL
  ,p_now TIMESTAMP DEFAULT NULL
) RETURNS SETOF type_resident_set_update_batch AS $FUNCTION$
DECLARE
  w_now TIMESTAMP := COALESCE(p_now, NOW());
  w_batch RECORD;
BEGIN
  -- パラメーターチェック
  IF p_batch_code IS NULL OR '' = p_batch_code THEN
    RAISE SQLSTATE 'U0002' USING MESSAGE = 'p_batch_code is null';
  END IF;

  -- 有効なバッチレコードを取得する
  SELECT
    t1.batch_code
    ,t1.batch_start_at
    ,t1.batch_enable_second_count
  INTO
    w_batch
  FROM
    public.batches AS t1
  WHERE
    t1.batch_code = p_batch_code
  LIMIT
    1
  ;

  -- 存在しなければエラー
  IF w_batch.batch_code IS NULL THEN
    RAISE SQLSTATE 'U0003' USING MESSAGE = 'p_batch_code not found ' || p_batch_code;
  END IF;

  -- 開始チェック
  IF w_batch.batch_start_at IS NOT NULL 
    AND w_batch.batch_start_at >= w_now - (w_batch.batch_enable_second_count || 'second')::interval
  THEN 
    RETURN;
  END IF;

  -- バッチの更新
  UPDATE batches SET
    batch_start_at = w_now
  WHERE
    batch_code = w_batch.batch_code
    AND (
      batch_start_at IS NULL
      OR batch_start_at = w_batch.batch_start_at
    )
  ;

  -- 更新チェック
  IF NOT FOUND THEN
    RETURN;
  END IF;

  RETURN QUERY SELECT
    w_batch.batch_code
  ;
END;
$FUNCTION$ LANGUAGE plpgsql;

メイン処理

バッチの実行資格の取得

バッチが実行できる場合はtrueを返します。

async fn is_batch(batch_code: &str, pg_conn: &PgClient) -> anyhow::Result<bool> {
    let stmt = pg_conn
        .prepare("SELECT batch_code FROM resident_set_update_batch(p_batch_code := $1)")
        .await?;
    let res = pg_conn.query(&stmt, &[&batch_code]).await?;
    Ok(!res.is_empty())
}

メイン

以前は直接実行していたSQLの処理の部分をis_batchの結果で動作を制御しています。

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    prepare_log("resident");
    let pg_url =
        std::env::var("PG_URL").unwrap_or("postgres://user:pass@localhost:5432/web".to_owned());
    let pg_pool = get_postgres_pool(&pg_url)?;
    let token: CancellationToken = CancellationToken::new();

    let handles = vec![make_looper(
        pg_pool.clone(),
        token.clone(),
        "*/10 * * * * *",
        |&now: &_, pg_conn: _| async move {
            info!("定期的に処理する何か1 {}", now);
            match is_batch("minutely_batch", &pg_conn).await {
                Ok(res) => {
                    if res {
                        // 本当にやりたいバッチ処理
                        let _result = pg_conn.query("SELECT pg_sleep(5)", &[]).await.unwrap();
                    } else {
                        info!("is_batch is false");
                    }
                }
                Err(e) => {
                    warn!("is_batch error={}", e);
                }
            }
        },
        || async move {
            info!("graceful stop looper 1");
        },
    )];

    #[allow(clippy::let_underscore_future)]
    let _ = ctrl_c_handler(token);
    for handle in handles {
        handle.await.unwrap();
    }
    Ok(())
}

まとめ

コード一式はGithubに起きました。regident_sample

今回の対応で複数プロセスを起動して冗長化をしつつ、定期的にバッチ処理するプログラムができました。

Discussion