🏐

[capnp][kj] 一定時間ごとに発火するタイマー(キャンセル機能付き)

に公開

概要

一定時間ごとに 繰り返し処理を行った上で、途中キャンセルを行えるような仕組みを作りたかった
タイマーだけでは、キャンセルをすることが キャンセラーをラップすることでキャンセルすることができる. タイマーをセットしてない状態でキャンセルできることを確認した

作成したコード

https://github.com/nagato0614/capnp/blob/main/src/timer_example.cpp

実行ログ

[11:11:05.146][timer_example.cpp:116]Timer fired!
[11:11:05.247][timer_example.cpp:116]Timer fired!
[11:11:05.348][timer_example.cpp:116]Timer fired!
[11:11:05.449][timer_example.cpp:116]Timer fired!
[11:11:05.550][timer_example.cpp:116]Timer fired!
[11:11:05.651][timer_example.cpp:116]Timer fired!
[11:11:05.752][timer_example.cpp:116]Timer fired!
[11:11:05.853][timer_example.cpp:116]Timer fired!
[11:11:05.954][timer_example.cpp:116]Timer fired!
[11:11:06.046][timer_example.cpp:121]Stopping timer
[11:11:06.046][timer_example.cpp:86]Timer canceled: Manual cancel after 1 second
[11:11:06.147][timer_example.cpp:116]Timer fired!
[11:11:06.248][timer_example.cpp:116]Timer fired!
[11:11:06.348][timer_example.cpp:116]Timer fired!
[11:11:06.449][timer_example.cpp:116]Timer fired!
[11:11:06.550][timer_example.cpp:116]Timer fired!
[11:11:06.651][timer_example.cpp:116]Timer fired!
[11:11:06.752][timer_example.cpp:116]Timer fired!
[11:11:06.853][timer_example.cpp:116]Timer fired!
[11:11:06.954][timer_example.cpp:116]Timer fired!
[11:11:07.047][timer_example.cpp:121]Stopping timer
[11:11:07.047][timer_example.cpp:86]Timer canceled: Manual cancel after 1 second

Process finished with exit code 0

コードの解説

リピート部分

リピート部分
  void scheduleNext() {
    // promise を作るときcanceler をラップしておくことで後でキャンセルできるようになる.
    auto promise = canceler.wrap(timer.afterDelay(interval))
                       .then([this]() {
                         if (callback) callback();
                       })
                       .then([this]() {
                         scheduleNext();  // 再帰的に次をスケジュール
                       })
                       .catch_([](kj::Exception&& e) {
                         LOG_COUT
                             << "Timer canceled: " << e.getDescription().cStr()
                             << std::endl;
                       });

    taskSet.add(kj::mv(promise));
  }
};

リピート時に callback() が呼び出され繰り返し処理を実行できるようになっている.
更に, catch_ に登録しておいた例外ハンドラはキャンセル時に呼び出しされる.
最後に作成した promise をtaskSet に追加することでタイマーが起動される

スタート部分

start()
  /**
   * @brief タイマーの起動(指定間隔で繰り返しコールバックを呼び出す)
   *
   * @param interval 実行間隔(kj::Duration)
   * @param callback 実行する処理(std::function<void()>)
   */
  void start(kj::Duration interval, std::function<void()> callback) {
    this->interval = interval;
    this->callback = callback;
    scheduleNext();  ///< 最初の1回目のスケジュールを行う
  }

スタートはコールバックの登録と, interval の設定を行う.
その後即座に一回目の起動をする.

キャンセル処理

cancel()
  /**
   * @brief 実行中のタイマーをキャンセルする
   *
   * @param reason キャンセル理由(ログ出力用)
   */
  void cancel(const char* reason) {
    canceler.cancel(reason);  ///< すべての未完了タスクにキャンセル通知
  }

キャンセルはcanceler の cancel() 関数を呼び出すだけでできる.
reason に停止理由を渡せばリピート部分作成時に登録した例外ハンドラに理由が渡される.

main 関数

    // 100ミリ秒間隔でタイマー発火させる
    repeatingTimer.start(100 * kj::MILLISECONDS,
                         []() { LOG_COUT << "Timer fired!" << std::endl; });

    // 1秒後にキャンセルを実行
    timer.afterDelay(1 * kj::SECONDS)
        .then([&]() {
          LOG_COUT << "Stopping timer" << std::endl;
          repeatingTimer.cancel("Manual cancel after 1 second");
        })
        .wait(ws);  ///< Promise の完了を待機

    // すべてのタスクが終了するのを待つ
    taskSet.onEmpty().wait(ws);

main 関数でリピートタイマーの起動と停止タイマーの起動を行っている.
100msec 事にリピートして処理を行う. 同時に1秒後にキャンセルが実行されリピートタイマーが停止される.

Canceler の実装

canceler を使うときはキャンセルしたいプロミスをラップする

canceler.wrap(timer.afterDelay(interval))

wrap関数でnewAdaptedPromiseでラップされたPromiseが返される

  template <typename T>
  Promise<T> wrap(Promise<T> promise) {
    return newAdaptedPromise<T, AdapterImpl<T>>(*this, kj::mv(promise));
  }

AdapterImpl が呼ばれているが, AdapterBase を継承したクラスで内部に双方向リストを持っている.
リストがあるためにキャンセルしたい promise を複数登録することができる.

Canceler::cancel() を実行するとリスト内部の promise に対して unlink と AdapterImpl::cancel() を実行する.
この AdapterImpl::cancel() では タスク (promise) を失敗させて強制終了させている.

    void AdapterImpl:::cancel(Exception&& e) override {
      fulfiller.reject(kj::mv(e));
      inner = nullptr;
    }

参考

https://github.com/capnproto/capnproto

Discussion