📖
Rustで常駐プログラムでタスクがある限り仕事する
目的
Rustで常駐プログラムでバッチの起動を1つに制御するでバッチ処理ができるようになりました。
今回はデータがある間はタスクをこなして、なくなったらしばらくタスクが貯まるまで待つワーカー処理を作ります。
具体的には10秒ごとにバッチ処理でワーカーに処理するデータを作成して、DBに保存していきます。ワーカーはデータがなければ60秒スリープした後にデータがある間、DBから取得して処理を続けます。
寄り道
make_looper
グレースフルストップの確認のために60秒ごとに待っていましたが、関数の引数で渡せるようにしました。
pub fn make_looper<Fut1, Fut2>(
pg_pool: PgPool,
token: CancellationToken,
expression: &str,
stop_check_second_count: u64,
f: impl Fn(&DateTime<Utc>, PgClient) -> Fut1 + Send + Sync + 'static,
g: impl Fn() -> Fut2 + Send + Sync + 'static,
) -> JoinHandle<()>
where
Fut1: Future<Output = ()> + Send,
Fut2: Future<Output = ()> + Send,
{
...
// 次の時間計算
sleep(Duration::from_secs(std::cmp::min(
(next_tick - now).num_seconds() as u64,
stop_check_second_count,
)))
.await;
}
})
}
コード
make_worker
make_looperに似ていますが、違いはcronの表記がなくなり関数fの戻り値でsleepしています。
戻り値が0の場合は直ぐに処理を再開します。
error_sleep_countはDBのコネクションが取れなかったときのsleep時間になります。
pub fn make_worker<Fut1, Fut2>(
pg_pool: PgPool,
token: CancellationToken,
stop_check_second_count: u64,
error_sleep_count: u64,
f: impl Fn(&DateTime<Utc>, PgClient) -> Fut1 + Send + Sync + 'static,
g: impl Fn() -> Fut2 + Send + Sync + 'static,
) -> JoinHandle<()>
where
Fut1: Future<Output = u64> + Send,
Fut2: Future<Output = ()> + Send,
{
spawn(async move {
// 動き出した瞬間は実行する
let mut next_tick: DateTime<Utc> = Utc::now();
loop {
// グレースフルストップのチェック
if token.is_cancelled() {
g().await;
break;
}
// 現在時間と次実行する処理の時間をチェックする
let now = Utc::now();
if now >= next_tick {
// 定期的に行う処理実行
let next_tick_count = match get_postgres_client(&pg_pool).await {
Ok(pg_conn) => {
f(&now, pg_conn).await
}
Err(e) => {
// エラーが出たので、ここでは何もしないで次に期待する
warn!("get_postgres_client error={}", e);
error_sleep_count
}
};
// 待つ必要が無いなら次のループに入る
if next_tick_count == 0 {
continue;
}
// 次の時間取得
next_tick = now + Duration::from_secs(next_tick_count);
}
// 次の時間計算
sleep(Duration::from_secs(std::cmp::min(
(next_tick - now).num_seconds() as u64,
stop_check_second_count,
)))
.await;
}
})
}
DB
処理するタスクを保存するテーブルを作成します。バッチでデータを登録してワーカーでデータを回収します。
テーブル
02_workers.sql
CREATE TABLE IF NOT EXISTS public.workers (
uuid UUID NOT NULL DEFAULT gen_random_uuid()
,data_json JSONB NOT NULL DEFAULT '{}'
,PRIMARY KEY(uuid)
);
ストアードプロシージャー
workersのデータを取得して削除します。データがなければ直ぐに戻ります。複数ワーカーがいて取り合いになった場合10回取れなかったら抜けます。
resident_set_delete_worker.sql
DROP TYPE IF EXISTS type_resident_set_delete_worker CASCADE;
CREATE TYPE type_resident_set_delete_worker AS (
data_json JSONB
);
-- 処理を削除する
-- 引数
-- 戻り値
-- data_json : データJSON
CREATE OR REPLACE FUNCTION resident_set_delete_worker(
) RETURNS SETOF type_resident_set_delete_worker AS $FUNCTION$
DECLARE
w_record RECORD;
w_count BIGINT := 0;
BEGIN
LOOP
w_count := w_count + 1;
-- 処理対象を取得
SELECT
t1.uuid
,t1.data_json
INTO
w_record
FROM
public.workers AS t1
LIMIT
1
;
-- 存在しなければエラー
IF w_record.uuid IS NULL THEN
RETURN;
END IF;
DELETE FROM public.workers
WHERE
uuid = w_record.uuid
;
-- 更新チェック
IF FOUND THEN
RETURN QUERY SELECT
w_record.data_json
;
RETURN;
END IF;
-- ループストッパー
IF w_count >= 10 THEN
RETURN;
END IF;
END LOOP;
END;
$FUNCTION$ LANGUAGE plpgsql;
メイン処理
async fn add_task(data_json: &serde_json::Value, pg_conn: &PgClient) -> anyhow::Result<()> {
let stmt = pg_conn
.prepare("INSERT INTO public.workers (data_json) VALUES ($1)")
.await?;
let _ = pg_conn.execute(&stmt, &[&data_json]).await?;
Ok(())
}
async fn get_task(pg_conn: &PgClient) -> anyhow::Result<Option<serde_json::Value>> {
let stmt = pg_conn
.prepare("SELECT data_json FROM resident_set_delete_worker()")
.await?;
let res = pg_conn.query(&stmt, &[]).await?;
Ok(if res.is_empty() {
None
} else {
let data_json = res[0].get(0);
Some(data_json)
})
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
prepare_log("resident");
let pg_url =
std::env::var("PG_URL").unwrap_or("postgres://user:pass@localhost:5432/web".to_owned());
let pg_pool = get_postgres_pool(&pg_url)?;
let token: CancellationToken = CancellationToken::new();
let handles = vec![
make_looper(
pg_pool.clone(),
token.clone(),
"*/10 * * * * *",
60,
|&now: &_, pg_conn: _| async move {
info!("定期的に処理する何か1 {}", now);
match is_batch("minutely_batch", &pg_conn).await {
Ok(res) => {
if res {
// 本当にやりたいバッチ処理
add_task(&serde_json::json!({"now": now}), &pg_conn).await.unwrap();
} else {
info!("is_batch is false");
}
}
Err(e) => {
warn!("is_batch error={}", e);
}
}
},
|| async move {
info!("graceful stop looper 1");
},
),
make_worker(
pg_pool.clone(),
token.clone(),
60,
60,
|&now: &_, _pg_conn: _| async move {
info!("データがあれば処理する何か1 {}", now);
match get_task(&_pg_conn).await {
Ok(Some(data_json)) => {
info!("data_json={}", data_json);
0
}
Ok(None) => {
info!("no data");
60
}
Err(e) => {
warn!("get_task error={}", e);
60
}
}
},
|| async move {
info!("graceful stop worker 1");
},
)
];
#[allow(clippy::let_underscore_future)]
let _ = ctrl_c_handler(token);
for handle in handles {
handle.await.unwrap();
}
Ok(())
}
まとめ
バッチ処理とワーカー処理が共同して処理できるようになりました。
コードを以下に置いておきます。
ソース
Discussion