😽

Rust力を上げるために何か作る 〜CRONDパーサーとジョブスケジューラ編〜

2023/12/01に公開

最近はプライベートではほぼRustかGoしか書いてない。Goを書く比率が上がっているおかしい。Rustを書く比率を上げていきたい。

というわけで、これはRust力を上げるために何か作っていくシリーズの記事です。

以前はこの記事にあるようにULID生成器を作ったが、引き続き作りたいものを作る。技術が手段として使えるように、ちゃんと目的化しなければ(ポジショントーク

https://zenn.dev/j5ik2o/articles/e2e3e30b47a6537cd5a2

作りたいものを考える

さて次は何を作るか。

指定の日時になったら登録した関数を実行するジョブスケジューラを作ってみよう。ジョブスケジューラといえばQuartzですが、それには遠く及ばないしょぼものを作ってみよう。

要件は以下のようなもの。

  • ジョブスケジューラにジョブを登録できる
    • ジョブにはトリガーとしてCROND文字列を指定でき、ジョブとしての関数も登録できる
    • 雑に作るのでジョブの削除・確認は後回しにできるかも
  • ジョブのトリガーが有効になったら、ジョブスケジューラはジョブを実行する
    • CROND文字列を指定して周期的に実行できるようにしたい
  • ジョブスケジューラは開始・停止できる
    • 開始・停止以外に周期的にジョブスケジューラのメソッドをコールバックするやり方がある。そっちのほうが簡単かも

まず作るな、リサーチせよ

矛盾しているようなことをいうが、「まず作るな、リサーチせよ」という誰かの格言があるが正しいと思う。

ゼロから作るにしても、この手のソリューションはきっと誰かが作っている、絶対に。よい素材のオープンソースがあれば、そこから学ぶほうが効率がよい。

検索すると以下がよく知られているらしい。ちゃんとスター数と最近更新されているかみよう。

https://github.com/mvniekerk/tokio-cron-scheduler

https://github.com/lholden/job_scheduler

lholden/job_schedulerは設計がシンプル。ただし非同期の考慮が全くない。tickメソッドが呼ばれることでジョブが実行される。しかしジョブの処理が長い場合は登録されている他のジョブの起動が遅れてしまうかもしれない。mvniekerk/tokio-cron-schedulerはそのあたりの問題を解消しているようだ。だが、シンプルなものがよいのでlholden/job_schedulerを参考にしよう。

lholden/job_schedulerはCROND文字例のパースのためにzslayton/cronを利用している模様。CROND文字列を与えると指定した日時以降の実行日時を返すイテレータが実装されている。これはとても使いやすそう。こういう設計がよさそう。

https://github.com/zslayton/cron

まずCROND文字列のパーサーを作った

もともとLL(k)のパーサコンビネータライブラリであるoni-comb-rsは作っていたので、それを使って実装することにした。作ったものはこれ。

https://github.com/j5ik2o/chronos-parser-rs

zslayton/cronと同じように指定した日時以降の実行日時を返すイテレータが使える。

CROND文字列を解析してツリー構造のデータ(AST)に変換する。さらに、指定日時がCRONDのASTに該当するかを判定するCronSpecificationを実装した。DDDにも登場する仕様パターンというやつ。判定にはis_satisfied_byメソッドを使う。

pub trait Specification<T>: Clone {
  fn is_satisfied_by(&self, arg: &T) -> bool;
}

#[derive(Debug, Clone)]
pub struct CronSpecification {
  expr: Expr,
}

impl CronSpecification {
  pub fn new(expr: Expr) -> Self {
    Self { expr }
  }
}

impl<Tz: TimeZone> Specification<DateTime<Tz>> for CronSpecification {
  fn is_satisfied_by(&self, datetime: &DateTime<Tz>) -> bool {
    CronEvaluator::new(datetime).eval(&self.expr)
  }
}

次に日時の区間を表すオブジェクト CronIntervalを実装する。
ちなみに区間ライブラリは以前自作したj5ik2o/intervals-rsを使う。開区間、閉区間などを簡単に扱えるもの。

CronSpecificationCronIntervalを合成したCronIntervalIteratorを作ることで、指定した時刻内の実行日時を返す。

let dt = Utc.with_ymd_and_hms(2021, 1, 1, 1, 1, 0).unwrap();

let expr = CronParser::parse("0-59/30 0-23/2 * * *").to_result().unwrap();
let interval = CronInterval::new(
  LimitValue::Limit(dt),
  LimitValue::Limitless,
  CronSpecification::new(expr),
);
let itr = interval.iter(Utc);

// イテレータから次の実行日時が好きなだけ取れる。
itr.take(5).for_each(|e| println!("{:?}", e));

機能しては区間オブジェクトが扱う範囲内で生み出される日時情報をCronSpecificationに渡し該当した場合はイテレータがその実行日時を返す。今回の実装では終点がLimitlessになっており、実質無限に実行日時を返すことができる(無限リスト)。まぁ、それはあまり実用的ではないので、take(n)で必要な分だけ取り出すことができる。これらの機能はファザードであるCronSchedule経由で利用する。

let dt: chrono::DateTime<Utc> = Utc.with_ymd_and_hms(2021, 1, 1, 1, 1, 0).unwrap();
let itr = CronSchedule::new("0-59/30 0-23/2 * * *").unwrap().upcoming(dt);

let dt_vec = itr.take(5).collect::<Vec<_>>();
assert_eq!(dt_vec[0], Utc.with_ymd_and_hms(2021, 1, 1, 2, 0, 0).unwrap());
assert_eq!(dt_vec[1], Utc.with_ymd_and_hms(2021, 1, 1, 2, 30, 0).unwrap());
assert_eq!(dt_vec[2], Utc.with_ymd_and_hms(2021, 1, 1, 4, 0, 0).unwrap());
assert_eq!(dt_vec[3], Utc.with_ymd_and_hms(2021, 1, 1, 4, 30, 0).unwrap());
assert_eq!(dt_vec[4], Utc.with_ymd_and_hms(2021, 1, 1, 6, 0, 0).unwrap());
// 2021-01-01T02:00:00Z
// 2021-01-01T02:30:00Z
// 2021-01-01T04:00:00Z
// 2021-01-01T04:30:00Z
// 2021-01-01T06:00:00Z

結局 zslayton/cron のインターフェイスしか参考にしなかった。イテレータ周りのライフタイムに若干手こずったが、コードを読んでもらうとあかるが、かなりシンプルに書けた。満足。

ジョブスケジューラの実装

CROND文字列のパーサーの説明で、食傷気味だと思うが、ジョブスケジューラ。

https://github.com/j5ik2o/chronos-scheduler-rs

前述したCronScheduleとクロージャーを内包するJobを実装した。実装は大したことないのですぐ理解できると思う。

Jobにはtickメソッドがあり、この中で現在日時がCronScheduleの返す実行日時を過ぎていたら、Jobが保持するクロージャをコールバックする。このあたりはlholden/job_schedulerを参考にした。

#[derive(Debug, Clone)]
pub struct Job<F, T>
where
  F: FnMut(JobContext<T>), {
  id: ULID,
  crond_expr: String,
  tick_interval: Duration,
  limit_missed_runs: usize,
  func: F,
  data: Rc<RefCell<Option<T>>>,
  cond_schedule: CronSchedule<Utc>,
  last_tick: Option<DateTime<Utc>>,
  _phantom: std::marker::PhantomData<T>,
}

impl<F, T> Job<F, T>
where
  F: FnMut(JobContext<T>),
{
// ...
  pub fn tick(&mut self) {
    let now = Utc::now();
    match self.last_tick {
      None => {
        self.last_tick = Some(now);
      }
      Some(lt) if lt + self.tick_interval < now => {
        let itr = self.cond_schedule.upcoming(lt).take(self.limit_missed_runs);
        for next_trigger in itr {
          if next_trigger > now {
            self.run(JobContext::new(
              &self.crond_expr,
              &next_trigger,
              &now,
              self.data.clone(),
            ));
            break;
          }
        }
        self.last_tick = Some(now);
      }
      _ => {}
    }
  }
// ...
}

JobSchedulerJobを集合で持っているだけ。add_jobメソッドを呼ぶとJobを登録できる。
JobSchedulerにもtickメソッドがある。それは保持するJob集合のtickメソッドを呼び出すだけ。

pub struct JobScheduler<F, T>
where
  F: FnMut(JobContext<T>), {
  id: ULID,
  jobs: Vec<Job<F, T>>,
}

impl<F, T> JobScheduler<F, T>
where
  F: FnMut(JobContext<T>),
{
  pub fn new() -> Self {
    let mut generator = ULIDGenerator::new();
    let id = generator.generate().unwrap();
    JobScheduler { id, jobs: Vec::new() }
  }

  pub fn add_job(&mut self, job: Job<F, T>) {
    self.jobs.push(job);
  }

  pub fn tick(&mut self) {
    for job in self.jobs.iter_mut() {
      job.tick();
    }
  }
}

最終的な使い方はこれ。Jobが持つクロージャをFnMutにしたので、Jobに引き渡すJobContext内のデータの型がRc<RefCell<T>>になった。ここはもう少し考えようがあるかなとは思っている…。

// Create a new job scheduler
let mut job_scheduler = JobScheduler::new();

// Set the interval for the scheduler's tick to 1 minute
let tick_interval = Duration::minutes(1);
// Initialize a counter to track the number of job executions
let mut counter = 0;

// Define a new job that runs every minute
let job = Job::new(
  "*/1 * * * *".to_string(), // Cron expression for every minute
  |job_context| {
    // Borrow the data passed to the job context and unwrap it
    let data = job_context.data().borrow().unwrap();
    // Log the details of the job execution including the schedule, current time, counter, and data
    log::debug!(
      "schedule_datetime = {}, now = {}: {}) {}",
      job_context.trigger(), // The scheduled datetime
      job_context.now(),     // The current datetime
      counter,               // The execution counter
      data                   // The data passed to the job context
    );
    // Increment the counter after each job execution
    counter += 1;
  },
  Some("Hello, world!"), // Optional data passed to the job context
);
// Add the defined job to the job scheduler
job_scheduler.add_job(job);

// Enter an infinite loop to continuously check and run scheduled jobs
loop {
  // Check and execute jobs based on the current datetime
  job_scheduler.tick();
  // Log the waiting period until the next tick
  log::debug!("waiting for {} seconds...", tick_interval.num_seconds());
  // Sleep for the duration of the tick interval
  sleep(tick_interval.to_std().unwrap());
}

非同期ランタイムと密結合していないので、どんなランタイムからも使える。例えばtikioのランタイムやactixのアクターからも呼べる。

しかし、最初に懸念したとおり、Job#tickが遅くなると、後続のすべてのJobの起動が遅くなるので流石にasyncにしないとまずい。が、力尽きたので、暇を見つけてやっておこう。


というわけで、書き散らかしながら学んでいけるとよいのではないかと思う。近々に、組込みRustデビューの予定なので、そういう記事も書いていきたい。

Discussion