Rust力を上げるために何か作る 〜CRONDパーサーとジョブスケジューラ編〜
最近はプライベートではほぼRustかGoしか書いてない。Goを書く比率が上がっているおかしい。Rustを書く比率を上げていきたい。
というわけで、これはRust力を上げるために何か作っていくシリーズの記事です。
以前はこの記事にあるようにULID生成器を作ったが、引き続き作りたいものを作る。技術が手段として使えるように、ちゃんと目的化しなければ(ポジショントーク
作りたいものを考える
さて次は何を作るか。
指定の日時になったら登録した関数を実行するジョブスケジューラを作ってみよう。ジョブスケジューラといえばQuartzですが、それには遠く及ばないしょぼものを作ってみよう。
要件は以下のようなもの。
- ジョブスケジューラにジョブを登録できる
- ジョブにはトリガーとしてCROND文字列を指定でき、ジョブとしての関数も登録できる
- 雑に作るのでジョブの削除・確認は後回しにできるかも
- ジョブのトリガーが有効になったら、ジョブスケジューラはジョブを実行する
- CROND文字列を指定して周期的に実行できるようにしたい
- ジョブスケジューラは開始・停止できる
- 開始・停止以外に周期的にジョブスケジューラのメソッドをコールバックするやり方がある。そっちのほうが簡単かも
まず作るな、リサーチせよ
矛盾しているようなことをいうが、「まず作るな、リサーチせよ」という誰かの格言があるが正しいと思う。
ゼロから作るにしても、この手のソリューションはきっと誰かが作っている、絶対に。よい素材のオープンソースがあれば、そこから学ぶほうが効率がよい。
検索すると以下がよく知られているらしい。ちゃんとスター数と最近更新されているかみよう。
lholden/job_schedulerは設計がシンプル。ただし非同期の考慮が全くない。tickメソッドが呼ばれることでジョブが実行される。しかしジョブの処理が長い場合は登録されている他のジョブの起動が遅れてしまうかもしれない。mvniekerk/tokio-cron-schedulerはそのあたりの問題を解消しているようだ。だが、シンプルなものがよいのでlholden/job_schedulerを参考にしよう。
lholden/job_schedulerはCROND文字例のパースのためにzslayton/cron
を利用している模様。CROND文字列を与えると指定した日時以降の実行日時を返すイテレータが実装されている。これはとても使いやすそう。こういう設計がよさそう。
まずCROND文字列のパーサーを作った
もともとLL(k)のパーサコンビネータライブラリであるoni-comb-rsは作っていたので、それを使って実装することにした。作ったものはこれ。
zslayton/cron
と同じように指定した日時以降の実行日時を返すイテレータが使える。
CROND文字列を解析してツリー構造のデータ(AST)に変換する。さらに、指定日時がCRONDのASTに該当するかを判定するCronSpecificationを実装した。DDDにも登場する仕様パターンというやつ。判定にはis_satisfied_by
メソッドを使う。
pub trait Specification<T>: Clone {
fn is_satisfied_by(&self, arg: &T) -> bool;
}
#[derive(Debug, Clone)]
pub struct CronSpecification {
expr: Expr,
}
impl CronSpecification {
pub fn new(expr: Expr) -> Self {
Self { expr }
}
}
impl<Tz: TimeZone> Specification<DateTime<Tz>> for CronSpecification {
fn is_satisfied_by(&self, datetime: &DateTime<Tz>) -> bool {
CronEvaluator::new(datetime).eval(&self.expr)
}
}
次に日時の区間を表すオブジェクト CronIntervalを実装する。
ちなみに区間ライブラリは以前自作したj5ik2o/intervals-rsを使う。開区間、閉区間などを簡単に扱えるもの。
CronSpecificationとCronIntervalを合成したCronIntervalIteratorを作ることで、指定した時刻内の実行日時を返す。
let dt = Utc.with_ymd_and_hms(2021, 1, 1, 1, 1, 0).unwrap();
let expr = CronParser::parse("0-59/30 0-23/2 * * *").to_result().unwrap();
let interval = CronInterval::new(
LimitValue::Limit(dt),
LimitValue::Limitless,
CronSpecification::new(expr),
);
let itr = interval.iter(Utc);
// イテレータから次の実行日時が好きなだけ取れる。
itr.take(5).for_each(|e| println!("{:?}", e));
機能しては区間オブジェクトが扱う範囲内で生み出される日時情報をCronSpecification
に渡し該当した場合はイテレータがその実行日時を返す。今回の実装では終点がLimitless
になっており、実質無限に実行日時を返すことができる(無限リスト)。まぁ、それはあまり実用的ではないので、take(n)
で必要な分だけ取り出すことができる。これらの機能はファザードであるCronSchedule経由で利用する。
let dt: chrono::DateTime<Utc> = Utc.with_ymd_and_hms(2021, 1, 1, 1, 1, 0).unwrap();
let itr = CronSchedule::new("0-59/30 0-23/2 * * *").unwrap().upcoming(dt);
let dt_vec = itr.take(5).collect::<Vec<_>>();
assert_eq!(dt_vec[0], Utc.with_ymd_and_hms(2021, 1, 1, 2, 0, 0).unwrap());
assert_eq!(dt_vec[1], Utc.with_ymd_and_hms(2021, 1, 1, 2, 30, 0).unwrap());
assert_eq!(dt_vec[2], Utc.with_ymd_and_hms(2021, 1, 1, 4, 0, 0).unwrap());
assert_eq!(dt_vec[3], Utc.with_ymd_and_hms(2021, 1, 1, 4, 30, 0).unwrap());
assert_eq!(dt_vec[4], Utc.with_ymd_and_hms(2021, 1, 1, 6, 0, 0).unwrap());
// 2021-01-01T02:00:00Z
// 2021-01-01T02:30:00Z
// 2021-01-01T04:00:00Z
// 2021-01-01T04:30:00Z
// 2021-01-01T06:00:00Z
結局 zslayton/cron のインターフェイスしか参考にしなかった。イテレータ周りのライフタイムに若干手こずったが、コードを読んでもらうとあかるが、かなりシンプルに書けた。満足。
ジョブスケジューラの実装
CROND文字列のパーサーの説明で、食傷気味だと思うが、ジョブスケジューラ。
前述したCronScheduleとクロージャーを内包するJobを実装した。実装は大したことないのですぐ理解できると思う。
Jobにはtickメソッドがあり、この中で現在日時がCronSchedule
の返す実行日時を過ぎていたら、Job
が保持するクロージャをコールバックする。このあたりはlholden/job_schedulerを参考にした。
#[derive(Debug, Clone)]
pub struct Job<F, T>
where
F: FnMut(JobContext<T>), {
id: ULID,
crond_expr: String,
tick_interval: Duration,
limit_missed_runs: usize,
func: F,
data: Rc<RefCell<Option<T>>>,
cond_schedule: CronSchedule<Utc>,
last_tick: Option<DateTime<Utc>>,
_phantom: std::marker::PhantomData<T>,
}
impl<F, T> Job<F, T>
where
F: FnMut(JobContext<T>),
{
// ...
pub fn tick(&mut self) {
let now = Utc::now();
match self.last_tick {
None => {
self.last_tick = Some(now);
}
Some(lt) if lt + self.tick_interval < now => {
let itr = self.cond_schedule.upcoming(lt).take(self.limit_missed_runs);
for next_trigger in itr {
if next_trigger > now {
self.run(JobContext::new(
&self.crond_expr,
&next_trigger,
&now,
self.data.clone(),
));
break;
}
}
self.last_tick = Some(now);
}
_ => {}
}
}
// ...
}
JobSchedulerもJobを集合で持っているだけ。add_job
メソッドを呼ぶとJob
を登録できる。
JobScheduler
にもtickメソッドがある。それは保持するJob
集合のtickメソッドを呼び出すだけ。
pub struct JobScheduler<F, T>
where
F: FnMut(JobContext<T>), {
id: ULID,
jobs: Vec<Job<F, T>>,
}
impl<F, T> JobScheduler<F, T>
where
F: FnMut(JobContext<T>),
{
pub fn new() -> Self {
let mut generator = ULIDGenerator::new();
let id = generator.generate().unwrap();
JobScheduler { id, jobs: Vec::new() }
}
pub fn add_job(&mut self, job: Job<F, T>) {
self.jobs.push(job);
}
pub fn tick(&mut self) {
for job in self.jobs.iter_mut() {
job.tick();
}
}
}
最終的な使い方はこれ。Job
が持つクロージャをFnMut
にしたので、Job
に引き渡すJobContext
内のデータの型がRc<RefCell<T>>
になった。ここはもう少し考えようがあるかなとは思っている…。
// Create a new job scheduler
let mut job_scheduler = JobScheduler::new();
// Set the interval for the scheduler's tick to 1 minute
let tick_interval = Duration::minutes(1);
// Initialize a counter to track the number of job executions
let mut counter = 0;
// Define a new job that runs every minute
let job = Job::new(
"*/1 * * * *".to_string(), // Cron expression for every minute
|job_context| {
// Borrow the data passed to the job context and unwrap it
let data = job_context.data().borrow().unwrap();
// Log the details of the job execution including the schedule, current time, counter, and data
log::debug!(
"schedule_datetime = {}, now = {}: {}) {}",
job_context.trigger(), // The scheduled datetime
job_context.now(), // The current datetime
counter, // The execution counter
data // The data passed to the job context
);
// Increment the counter after each job execution
counter += 1;
},
Some("Hello, world!"), // Optional data passed to the job context
);
// Add the defined job to the job scheduler
job_scheduler.add_job(job);
// Enter an infinite loop to continuously check and run scheduled jobs
loop {
// Check and execute jobs based on the current datetime
job_scheduler.tick();
// Log the waiting period until the next tick
log::debug!("waiting for {} seconds...", tick_interval.num_seconds());
// Sleep for the duration of the tick interval
sleep(tick_interval.to_std().unwrap());
}
非同期ランタイムと密結合していないので、どんなランタイムからも使える。例えばtikioのランタイムやactixのアクターからも呼べる。
しかし、最初に懸念したとおり、Job#tick
が遅くなると、後続のすべてのJob
の起動が遅くなるので流石にasync
にしないとまずい。が、力尽きたので、暇を見つけてやっておこう。
というわけで、書き散らかしながら学んでいけるとよいのではないかと思う。近々に、組込みRustデビューの予定なので、そういう記事も書いていきたい。
Discussion