📖

Rustで常駐プログラムでタスクがある限り仕事する

2024/02/21に公開

目的

Rustで常駐プログラムでバッチの起動を1つに制御するでバッチ処理ができるようになりました。
今回はデータがある間はタスクをこなして、なくなったらしばらくタスクが貯まるまで待つワーカー処理を作ります。

具体的には10秒ごとにバッチ処理でワーカーに処理するデータを作成して、DBに保存していきます。ワーカーはデータがなければ60秒スリープした後にデータがある間、DBから取得して処理を続けます。

寄り道

make_looper

グレースフルストップの確認のために60秒ごとに待っていましたが、関数の引数で渡せるようにしました。

pub fn make_looper<Fut1, Fut2>(
    pg_pool: PgPool,
    token: CancellationToken,
    expression: &str,
    stop_check_second_count: u64,
    f: impl Fn(&DateTime<Utc>, PgClient) -> Fut1 + Send + Sync + 'static,
    g: impl Fn() -> Fut2 + Send + Sync + 'static,
) -> JoinHandle<()>
where
    Fut1: Future<Output = ()> + Send,
    Fut2: Future<Output = ()> + Send,
{
...
            // 次の時間計算
            sleep(Duration::from_secs(std::cmp::min(
                (next_tick - now).num_seconds() as u64,
                stop_check_second_count,
            )))
            .await;
        }
    })
}

コード

make_worker

make_looperに似ていますが、違いはcronの表記がなくなり関数fの戻り値でsleepしています。
戻り値が0の場合は直ぐに処理を再開します。
error_sleep_countはDBのコネクションが取れなかったときのsleep時間になります。

pub fn make_worker<Fut1, Fut2>(
    pg_pool: PgPool,
    token: CancellationToken,
    stop_check_second_count: u64,
    error_sleep_count: u64,
    f: impl Fn(&DateTime<Utc>, PgClient) -> Fut1 + Send + Sync + 'static,
    g: impl Fn() -> Fut2 + Send + Sync + 'static,
) -> JoinHandle<()>
where
    Fut1: Future<Output = u64> + Send,
    Fut2: Future<Output = ()> + Send,
{
    spawn(async move {
        // 動き出した瞬間は実行する
        let mut next_tick: DateTime<Utc> = Utc::now();
        loop {
            // グレースフルストップのチェック
            if token.is_cancelled() {
                g().await;
                break;
            }

            // 現在時間と次実行する処理の時間をチェックする
            let now = Utc::now();
            if now >= next_tick {
                // 定期的に行う処理実行
                let next_tick_count = match get_postgres_client(&pg_pool).await {
                    Ok(pg_conn) => {
                        f(&now, pg_conn).await
                    }
                    Err(e) => {
                        // エラーが出たので、ここでは何もしないで次に期待する
                        warn!("get_postgres_client error={}", e);
                        error_sleep_count
                    }
                };

                // 待つ必要が無いなら次のループに入る
                if next_tick_count == 0 {
                    continue;
                }

                // 次の時間取得
                next_tick = now + Duration::from_secs(next_tick_count);
            }

            // 次の時間計算
            sleep(Duration::from_secs(std::cmp::min(
                (next_tick - now).num_seconds() as u64,
                stop_check_second_count,
            )))
            .await;
        }
    })
}

DB

処理するタスクを保存するテーブルを作成します。バッチでデータを登録してワーカーでデータを回収します。

テーブル

02_workers.sql
CREATE TABLE IF NOT EXISTS public.workers  (
  uuid UUID NOT NULL DEFAULT gen_random_uuid()
  ,data_json JSONB NOT NULL DEFAULT '{}'
  ,PRIMARY KEY(uuid)
);

ストアードプロシージャー

workersのデータを取得して削除します。データがなければ直ぐに戻ります。複数ワーカーがいて取り合いになった場合10回取れなかったら抜けます。

resident_set_delete_worker.sql
DROP TYPE IF EXISTS type_resident_set_delete_worker CASCADE;
CREATE TYPE type_resident_set_delete_worker AS (
  data_json JSONB
);

-- 処理を削除する
-- 引数
-- 戻り値
--   data_json : データJSON
CREATE OR REPLACE FUNCTION resident_set_delete_worker(
) RETURNS SETOF type_resident_set_delete_worker AS $FUNCTION$
DECLARE
  w_record RECORD;
  w_count BIGINT := 0;
BEGIN
  LOOP
    w_count := w_count + 1;

    -- 処理対象を取得
    SELECT
      t1.uuid
      ,t1.data_json
    INTO
      w_record
    FROM
      public.workers AS t1
    LIMIT
      1
    ;

    -- 存在しなければエラー
    IF w_record.uuid IS NULL THEN
      RETURN;
    END IF;

    DELETE FROM public.workers
    WHERE
      uuid = w_record.uuid
    ;

    -- 更新チェック
    IF FOUND THEN
      RETURN QUERY SELECT
        w_record.data_json
      ;
      RETURN;
    END IF;

    -- ループストッパー
    IF w_count >= 10 THEN
      RETURN;
    END IF;
  END LOOP;
END;
$FUNCTION$ LANGUAGE plpgsql;

メイン処理

async fn add_task(data_json: &serde_json::Value, pg_conn: &PgClient) -> anyhow::Result<()> {
    let stmt = pg_conn
        .prepare("INSERT INTO public.workers (data_json) VALUES ($1)")
        .await?;
    let _ = pg_conn.execute(&stmt, &[&data_json]).await?;
    Ok(())
}

async fn get_task(pg_conn: &PgClient) -> anyhow::Result<Option<serde_json::Value>> {
    let stmt = pg_conn
        .prepare("SELECT data_json FROM resident_set_delete_worker()")
        .await?;
    let res = pg_conn.query(&stmt, &[]).await?;
    Ok(if res.is_empty() {
        None
    } else {
        let data_json = res[0].get(0);
        Some(data_json)
    })
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    prepare_log("resident");
    let pg_url =
        std::env::var("PG_URL").unwrap_or("postgres://user:pass@localhost:5432/web".to_owned());
    let pg_pool = get_postgres_pool(&pg_url)?;
    let token: CancellationToken = CancellationToken::new();

    let handles = vec![
        make_looper(
            pg_pool.clone(),
            token.clone(),
            "*/10 * * * * *",
            60,
            |&now: &_, pg_conn: _| async move {
                info!("定期的に処理する何か1 {}", now);
                match is_batch("minutely_batch", &pg_conn).await {
                    Ok(res) => {
                        if res {
                            // 本当にやりたいバッチ処理
                            add_task(&serde_json::json!({"now": now}), &pg_conn).await.unwrap();
                        } else {
                            info!("is_batch is false");
                        }
                    }
                    Err(e) => {
                        warn!("is_batch error={}", e);
                    }
                }
            },
            || async move {
                info!("graceful stop looper 1");
            },
        ),
        make_worker(
            pg_pool.clone(),
            token.clone(),
            60,
            60,
            |&now: &_, _pg_conn: _| async move {
                info!("データがあれば処理する何か1 {}", now);
                match get_task(&_pg_conn).await {
                    Ok(Some(data_json)) => {
                        info!("data_json={}", data_json);
                        0
                    }
                    Ok(None) => {
                        info!("no data");
                        60
                    }
                    Err(e) => {
                        warn!("get_task error={}", e);
                        60
                    }
                }
            },
            || async move {
                info!("graceful stop worker 1");
            },
        )
    ];

    #[allow(clippy::let_underscore_future)]
    let _ = ctrl_c_handler(token);
    for handle in handles {
        handle.await.unwrap();
    }
    Ok(())
}

まとめ

バッチ処理とワーカー処理が共同して処理できるようになりました。

コードを以下に置いておきます。
ソース

Discussion