Open2

rustの非同期処理 メモ

yunayuna

tokio::spawnとtokio::task::spawn_blockingの使い分け

chatGPTの回答メモ
たぶんだいたい合ってる。

アップロード処理のように、cpuは消費しないが時間がかかる処理は、tokio::spawnで、cpuを消費する重い処理はtokio::task::spawn_blockingで良いですか

chatGPT
はい、あなたの理解は正しいです。

tokio::spawn は非同期タスクをスケジューリングし、それを現在のランタイムのタスクスケジューラによって管理されるキューにプッシュします。これは特に非同期I/O操作(ネットワークリクエスト、ファイルI/Oなど)や、長時間実行する必要があるが CPU を全くまたは非常に少量しか消費しないタスクに適しています。

一方、tokio::task::spawn_blocking は、CPU集約的なワークロードやブロッキングIOを扱うためのもので、これによりそのようなタスクがランタイムの他のすべての非同期タスクに影響を与えることなく実行できます。ブロッキングタスクは専用のスレッドプールで実行され、CPUを大量に消費する可能性があるため、他の非ブロッキングタスクのパフォーマンスに影響を与えません。

したがって、あなたの場合ではアップロード処理は非同期I/O操作なので tokio::spawn を、CPUを消費する重い処理は tokio::task::spawn_blocking を使うという使い分けが適切です。

tokio::spawnはスレッドを建てずに非同期タスクを停止させずにコントロールし、tokio::task::spawn_blockingはスレッドを新たに立てて、そのスレッドをブロッキングして処理に集中させるイメージです。合ってますか?もしニュアンスに誤りがあれば、正確に教えて

chatGPT
その理解は基本的には正しいですが、少し具体的に補足すると以下のようになります。

tokio::spawnは非同期タスクを作成し、それを実行するためにはtokioランタイムが管理するスレッドプールが使われます。これらのスレッドは非ブロッキングで、タスクが非同期I/O操作などで一時的に停止すると他のタスクにスイッチします。これにより、一つのスレッド上で複数の非同期タスクを効率的に並行実行することができます。

一方、tokio::task::spawn_blockingはブロッキング操作を行うための専用のスレッドプールを使い、その操作が完了するまでそのスレッドはブロック(待機)状態になります。この専用のスレッドプールは、そのブロッキング操作が他の非同期タスクの実行を妨げないようにするために存在します。つまり、そのスレッドがブロック状態になっていても、非同期タスクが実行されている他のスレッドには影響を与えません。

したがって、CPUを大量に消費するような処理や時間のかかる同期的な処理(ブロッキングI/Oなど)はtokio::task::spawn_blockingを使用して、その影響を非同期タスクに及ぼさないようにします。

ありがとう

chatGPT
どういたしまして! 他にも何か質問がありましたら、いつでもどうぞ。 ハッピーコーディング!

yunayuna

interval.tick() で発生した課題

win,linux,macなどデスクトップPCで動作アプリで、一定間隔でキックする処理を実装したときにハマった問題と、
解決策をメモします。

tokio::time::interval を使って指定時間ごとに発火するように素直に書くと以下のようになる。

sample.rs

let mut tick_minute= tokio::time::interval(Duration::from_secs(60));
let tick_update_check =  tokio::time::interval(Duration::from_secs(300));

loop {
        tokio::select! {
            _ = tick_minute.tick() => {
                show(start.elapsed());
            },
            _ = tick_update_check.tick() => {
                let result = catch_unwind(|| tokio::spawn(async {
                    run_update().await;
                }));
                capture_result(result);
            },
            _ = shutdown_rx_broadcast_main_loop.recv() => {
                info!("Goodbye.");
                show(start.elapsed());
                break;
            },
        }
    }

tokio::select!は、それ以下に並列に非同期処理(Futureを返す処理)を書いて、
始めに終了したものだけが、=> の先の処理を実行することができるもの。

ここで、問題が2つ発生した。

1. tokio::time::intervalで作ったintervalの初回のtickは、作成直後に発火する。
 つまり、 tokio::time::interval(Duration::from_secs(60)); この最初の発火は60秒後ではなく、0秒後になる。

2.アプリが、デスクトップPCで動くようなケースで、例えばスリープモードなどで一定期間停止した場合、その間もintervalは時間を停止せず動き続ける。

これは、以下のサンプルコードを実行すると、挙動が良くわかる

sample2.rs
use tokio::time::interval;
use std::time::Duration;

#[tokio::main]
async fn main() {
    //interval.tick()が、スリープ中に処理を貯めるか否かを確認
    let mut interval_value = interval(Duration::from_secs(1));

    //処理を5秒間停止
    tokio::time::sleep(Duration::from_secs(5)).await;

    //3回tick()を行う
    let first_tick = interval_value.tick().await;
    println!("first_tick: {:?}, now:{:?}", first_tick, tokio::time::Instant::now());
    let second_tick = interval_value.tick().await;
    println!("second_tick: {:?}, now:{:?}", second_tick, tokio::time::Instant::now());
    let third_tick = interval_value.tick().await;
    println!("third_tick: {:?}, now:{:?}", third_tick, tokio::time::Instant::now());
}

途中、tokio::time::sleep(Duration::from_secs(5)).await; で処理が停止している間も、intervalの時刻は進んでいき、その後の3回のtick().awaitは瞬時に終了する。
これは、tokio::time::Intervalにおいて、"MissedTickBehavior(何らかの理由で遅れた場合の挙動)"が、デフォルトでMissedTickBehavior::Burst(遅延したtickを一度に全て処理する)になっているため。
(なお、MissedTickBehavior::Skipに設定しておくと、遅延した処理はスキップされる)

この挙動で問題が生じる場合は、手を加える必要がある。

今回は、スリープした場合、最初の1回だけを実行するという仕様なので、
その対応を行う必要がある。

どのように対応したか

方針としては、以下の通り
tokio::time::interval(Duration::from_secs(60));で指定した時間が来るまでは、tick()を返さない。(60秒後に初めてtick()を返すようにする

・tick()が実行されず、複数のtick()が連続して返されるような状況の場合、最初の1回だけ処理されるようにする(かつ、複数の処理が溜まっていた場合、一度に全てが実行されてしまうので、溜まっていた時だけ、指定の時間だけ、遅延させて処理させるようにする)

sample3.rs
use tokio::time::{interval, Instant};
use std::time::Duration;
use async_recursion::async_recursion;


impl Interval {
    pub async fn new(duration: Duration, skip_first_tick: bool) -> Self {
        let mut i = interval(duration);
        if skip_first_tick {
            i.tick().await; //skip first immediately tick
        }
        Self {
            i: i,
            duration: duration,
        }
    }
    /**
     *  tickされた時刻が、現在時刻からdurationで指定した時間以上前の場合、
     *  これ以降に、まだtickが溜まっていると判断できるのでスキップする
     *  例:tick: 12:00:00, now: 12:00:05, duration: 1s の場合、
     *  tick~nowまで5秒。durationは 1s なので、あと4回tickが溜まっているはず
     *   
     *  ※秒単位の誤差が生じる可能性はあるが、そのような処理はすぐ次のtickが発生するので無視して構わない
     */

     #[async_recursion]
     pub async fn tick(&mut self) -> Instant {
         
         let now = Instant::now();
         let instant = self.i.tick().await;
 
         if now.duration_since(instant).cmp(&self.duration).is_gt() {
             self.tick().await
         } else {
             self.i.reset(); //tickした時点でdurationをリセットする
             instant
         }
     }

    /**
     *  tickされた時刻が、現在時刻からdurationで指定した時間以上前の場合、
     *  これ以降に、まだtickが溜まっていると判断できるのでスキップする
     *  例:tick: 12:00:00, now: 12:00:05, duration: 1s の場合、
     *  tick~nowまで5秒。durationは 1s なので、あと4回tickが溜まっているはず
     *   
     *  ※秒単位の誤差が生じる可能性はあるが、そのような処理はすぐ次のtickが発生するので無視して構わない
     * 
     *  wait_at_burst: 一気にたくさんのtickが溜まった時、そのtickを処理する前に待つ時間
     */

    #[async_recursion]
    pub async fn tick_wait_at_burst(&mut self, wait_at_burst: Duration) -> Instant {
        
        let now = Instant::now();
        let instant = self.i.tick().await;

        if now.duration_since(instant).cmp(&self.duration).is_gt() {
            self.tick_wait_at_burst(wait_at_burst).await
        } else {
            tokio::time::sleep(wait_at_burst).await;
            self.i.reset(); //tickした時点でdurationをリセットする
            instant
        }
    }
}



#[tokio::main]
async fn main() {
    //interval.tick()が、スリープ中に処理を貯めるか否かを確認する
    let mut interval_value = Interval::new(Duration::from_secs(10), true).await;
    
    tokio::time::sleep(Duration::from_secs(30)).await;

    let first_tick = interval_value.tick_wait_at_burst(Duration::from_secs(1)).await;
    println!("first_tick: {:?}, now:{:?}", first_tick, tokio::time::Instant::now());
    let second_tick = interval_value.tick().await;
    println!("second_tick: {:?}, now:{:?}", second_tick, tokio::time::Instant::now());
    let third_tick = interval_value.tick().await;
    println!("third_tick: {:?}, now:{:?}", third_tick, tokio::time::Instant::now());
}