Rustで常駐プログラムの中で定期的に処理したい

2023/12/14に公開

目的

Rustで常駐プログラムを作りたいです。以前はグレースフルストップについて検討しました。今回はcronのように定期的に動く仕組みを考えます。
わざわざ常駐プログラムの中でcronのような仕組みを行うのは、1つのプロセスで完結したいからです。特に頻繁に立ち上がるように間隔のプログラムだと、インフラ構成によってはその都度コンテナーを立ち上げる必要があって、起動時間や起動の成否などを考えるのが大変です。

プログラム

cron

いいかんじのcrate cronがあります。

main.rs
fn main() {
    let schedule = Schedule::from_str("*/10 * * * * *").unwrap();
    let mut next_tick = schedule.upcoming(Utc).next().unwrap();
    println!("{}", next_tick);
}

nextすると現在時間から指定している時間の次の時間を返してくれます。
ちなみに普通のcronに比べて指定項目が多いのは年があるからです。

定期的に処理

ではcronを使って定期的に処理をするプログラムを書きます。
次の時間を取得して、現在時間が経過していたら処理をして経過していなかったら、最大60秒間sleepします。最大60秒にしてるのはグレースフルストップのチェックのためです。

main.rs
use chrono::prelude::*;
use cron::Schedule;
use std::{str::FromStr, time::Duration};
use tokio::{signal::ctrl_c, spawn, task::JoinHandle, time::sleep};
use tokio_util::sync::CancellationToken;

fn ctrl_c_handler(token: CancellationToken) -> JoinHandle<()> {
    spawn(async move {
        ctrl_c().await.unwrap();
        println!("received ctrl-c");
        token.cancel();
    })
}

#[tokio::main]
async fn main() {
    let token: CancellationToken = CancellationToken::new();
    let token2 = token.clone();
    let handle = spawn(async move {
        let schedule = Schedule::from_str("*/10 * * * * *").unwrap();
        let mut next_tick = schedule.upcoming(Utc).next().unwrap();
        loop {
            // グレースフルストップのチェック
            if token2.is_cancelled() {
                println!("graceful stop handle looper 1");
                break;
            }

            let now = Utc::now();
            if now >= next_tick {
                // 定期的に行う処理実行
                println!("定期的に処理する何か{}", now);

                // 次の時間取得
                next_tick = schedule.upcoming(Utc).next().unwrap();
            }

            // 次の時間計算
            sleep(Duration::from_secs(std::cmp::min(
                (next_tick - now).num_seconds() as u64,
                60,
            )))
            .await;
        }
    });

    #[allow(clippy::let_underscore_future)]
    let _ = ctrl_c_handler(token);
    handle.await.unwrap();
}

完成形

複数の処理を動かしたいので、定期的処理の部分を関数化します。
実際の処理の部分はクロージャーを引き渡すようにしました。
定期的に呼び出す部分は外部に通信してみました。

Cargo.toml
[package]
name = "resident"
version = "0.1.0"
edition = "2021"

[dependencies]
chrono = "0.4"
cron = "0.12"
reqwest = { version = "0.11", features = ["json", "rustls-tls"], default-features = false }
serde_json = "1"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "signal"] }
tokio-util = "0.7"
main.rs
use chrono::prelude::*;
use cron::Schedule;
use std::{
    future::Future,
    marker::{Send, Sync},
    str::FromStr,
    time::Duration,
};
use tokio::{signal::ctrl_c, spawn, task::JoinHandle, time::sleep};
use tokio_util::sync::CancellationToken;

fn ctrl_c_handler(token: CancellationToken) -> JoinHandle<()> {
    spawn(async move {
        ctrl_c().await.unwrap();
        println!("received ctrl-c");
        token.cancel();
    })
}

fn make_looper<Fut1, Fut2>(
    token: CancellationToken,
    expression: &'static str,
    f: impl Fn(&DateTime<Utc>) -> Fut1 + Send + Sync + 'static,
    g: impl Fn() -> Fut2 + Send + Sync + 'static,
) -> JoinHandle<()>
where
    Fut1: Future<Output = ()> + Send,
    Fut2: Future<Output = ()> + Send,
{
    spawn(async move {
        let schedule = Schedule::from_str(expression).unwrap();
        let mut next_tick = schedule.upcoming(Utc).next().unwrap();
        loop {
            // グレースフルストップのチェック
            if token.is_cancelled() {
                g().await;
                break;
            }

            let now = Utc::now();
            if now >= next_tick {
                // 定期的に行う処理実行
                f(&now).await;

                // 次の時間取得
                next_tick = schedule.upcoming(Utc).next().unwrap();
            }

            // 次の時間計算
            sleep(Duration::from_secs(std::cmp::min(
                (next_tick - now).num_seconds() as u64,
                60,
            )))
            .await;
        }
    })
}

#[tokio::main]
async fn main() {
    let token: CancellationToken = CancellationToken::new();
    let handles = vec![make_looper(
        token.clone(),
        "*/10 * * * * *",
        |&now: &_| async move {
            println!("定期的に処理する何か{}", now);
            let resp = reqwest::get("https://httpbin.org/get")
                .await
                .unwrap()
                .json::<serde_json::Value>()
                .await
                .unwrap();
            println!("{:#?}", resp);
        },
        || async move {
            println!("graceful stop looper 1");
        },
    )];

    #[allow(clippy::let_underscore_future)]
    let _ = ctrl_c_handler(token);
    for handle in handles {
        handle.await.unwrap();
    }
}

まとめ

これでグレースフルストップと定期的に処理する常駐プログラムが完成しました。

Discussion