🦀

RustのTokioで非同期とグリーンスレッドを理解する

2022/12/12に公開約20,500字

この記事はRust Advent Calendar 2022 - Qiitaの13日目の記事です。

はじめに

Rustの非同期I/Oライブラリ tokio の解説記事になります。初めに概要説明をします。その後でちょっとしたソースコードを見せながら解説をしていきます。理論より実践、実際にどう書けば良いのか知りたい人向けの記事になります。

tokioはライブラリの一つに過ぎませんが、hyperactix-webなどのWebフレームワーク、Denoのイベントループ、Solanaのツールなどで利用されており、デファクトスタンダード的な重要なライブラリになっています。

またDiscordtokioのスポンサーをしており、GoからRust(Tokio)で書き換えた話は小説を一冊読み終えたかのようなブラボーな興奮がありました。

https://discord.com/blog/why-discord-is-switching-from-go-to-rust

ちなみに、tokioのネーミングは開発者のCarl Lercheが東京観光を楽しんだことや、ローレベルI/OライブラリMioとの語呂合わせから名付けられたそうです。

I enjoyed visiting Tokio (Tokyo) the city and I liked the "io" suffix and how it plays w/ Mio as well. I don't know... naming is hard so I didn't spend too much time thinking about it.
Reddit

1 非同期I/Oとグリーンスレッド

1-1 tokioとは?

乱暴な言い方をすれば、Node.jsとGolangを合体させたのがtokioです。

Node.jsとはC10Kのような膨大なリクエストをシングルスレッド&イベントループ&非同期I/Oで効率良く処理するランタイムです。実際、Denoにはtokioが使用されています。tokioでもasync/awaitを使用して非同期プログラミングをします。
Pythonでもaysncioのような非同期プログラミングが導入されています。Kotlinにもコルーチンというグリーンスレッドとasync/awaitを使用した非同期プログラミングがあります。

Golangではgoroutineという超軽量なスレッド(OSスレッドではない)とチャネルを使用してパラレルに処理するランタイムです。最近のCPUはマルチコア、ハイパースレッディングなどの技術により、パラレルに複数の命令を実行できるようになっています。インテルCorei9だと論理コア数が20もありますが、シングルスレッドでは宝の持ち腐れになります。tokioでもtokio::spawnを使用してマルチスレッド・プログラミングをします。

A task is a light weight, non-blocking unit of execution. A task is similar to an OS thread, but rather than being managed by the OS scheduler, they are managed by the Tokio runtime. Another name for this general pattern is green threads. If you are familiar with Go's goroutines, Kotlin's coroutines, or Erlang's processes, you can think of Tokio's tasks as something similar...
Rust公式

つまり、tokioでは非同期プログラミングとグリーンスレッドの両方の理解が必要になるためハードルは高いです。そして、Rust言語固有の所有権が加わりカオスさを増します。

CPUバウンドの場合はOSネイティブスレッドを使用する必要があります。こちらの私が以前書いた記事も参考にしてください。

https://zenn.dev/tfutada/articles/16766e3b4560db

1-2 非同期I/Oとは?

AjaxとはAsynchronous JavaScript and XMLの略ですが、サーバとの通信を非同期(asynchronous)に行うことでブラウザの画面が固まらないようにする仕組みです。Node.jsはクライアントからのリクエストを非同期に処理することでC10Kという膨大なリクエストでもサーバがダウンしないようにする仕組みです。

外部APIの呼び出しやデータベース接続など、ネットワークが絡む処理は速くても1ms、ルーターなどを経由すれば最悪1秒近くかかることもあります。その一方でCPUの処理速度はナノ秒単位と超高速です。人間の時間感覚(1秒基準)に例えると、CPUにとっての1秒は80年になります。つまり人生が終わってしまいます。

つまり、ネットワークからレスポンスを受け取るまでの間、CPU使用率はほぼゼロに近い状態になるわけです。そうであるならば、レスポンス待ちの場合は一旦処理を中断し、別のタスクの処理をした方がお得なわけです。レスポンスが返りしだい再開することで、システムのスループットを改善できるわけです。

その一方で短所もあります。協調的スケジューリングであるため、プログラマーの責任でタスクススワップする必要があります。同期処理やCPUヘビーなロジックがあるとイベントループの流れが悪くなり渋滞が発生し、テールレイテンシー、後続のリクエストの遅延が大きくなります。

When writing async Rust, the phrase “blocking the thread” means “preventing the runtime from swapping the current task”. This can be a major issue because it means that other tasks on the same runtime will stop running until the thread is no longer being blocked. To prevent this, we should write code that can be swapped quickly, which you do by never spending a long time away from an .await.
Alice Ryhl

また、こちらのYahoo! Japanの記事はNode.jsですが非同期の難しさをよく語っています。
https://eh-career.com/engineerhub/entry/2019/08/08/103000#パフォーマンス向上同期処理のAPIをなるべく使用しない

1-3 グリーンスレッドとは?

tokioではOSではなくランタイムがタスクのスケジューリングをします。つまり、グリーンスレッド(M:Nスレッド)になります。グリーンスレッドは64バイトと軽量であり、goroutineのようにたくさんスレッドを生成しても問題ありません。

Tasks in Tokio are very lightweight. Under the hood, they require only a single allocation and 64 bytes of memory.
Tokio公式チュートリアル

タスクの生成は、tokio::spawn()を呼び出します。タスクが一つの処理単位になります。イベントキューが論理CPUの数だけ作成されます。つまり、1論理コアに1OSスレッドが張り付きます。生成されたタスクが、イベントキューに放り込まれ処理されていきます。

Spawning a task enables the task to execute concurrently to other tasks. The spawned task may execute on the current thread, or it may be sent to a different thread to be executed.
RustDoc

tokioのランタイム・スケジューラはタスクをロードバランスしながら各イベントキューに割り振ります。

2 実際に動かして学ぶ

実際のコードを例に説明します。非同期処理、グリーンスレッド(タスク)の理解を深めるため、あえて変なことをしていますので真似しないように。

2-1 手始めにHello World的なやつ

まずはHello World的なコードから始めましょう。
おまじない的に main()#[tokio::main]マクロを指定します。tokioのランタイムが生成されます。

次に、tokio::task::spawn(async {タスク})でタスク(グリーンスレッド)を生成します。asyncブロックが1つのタスクになります。(実際にはここでREST APIを叩くなど、I/Oバウンドな処理をします。)

次のコードは3秒間スリープしたあとで、"woke up!"と出力する例です。

ex1.rs
#[tokio::main] // 忘れないで!
async fn main() {
    tokio::task::spawn(async {  // タスクを記述する
        tokio::time::sleep(Duration::from_secs(3)).await; // ネットワーク呼び出しのフリ
        println!("woke up!"); // 3秒後に実行されるよ
    });
    // この時点でタスクはすでに走り始めている
    
    std::thread::sleep(Duration::from_secs(5)); // プログラムが終了しないように
    println!("Done");
}
実行結果
(3秒後)
woke up!
(2秒後 5s - 3s = 2s)
Done

tokio::task::spawnした時点でタスク(グリーンスレッド)は実行されます。goroutineと似たような感じです。tokio::spawnと省略して書けます。

2-2 シングルスレッドにしてみよう

さて次に面白い実験をしてみましょう。先ほどのプログラムをシングルスレッド環境で実行してみましょう。

#[tokio::main(flavor = "current_thread")] と指定すると、Node.jsのようにOSスレッドが1つだけのランタイムになります。

ex2.rs
#[tokio::main(flavor = "current_thread")]
async fn main() {
    tokio::spawn(async {
        tokio::time::sleep(Duration::from_secs(3)).await;
        println!("woke up!"); // 表示されない!
    });

    std::thread::sleep(Duration::from_secs(5));  // 譲らない!
    println!("Done");
}
結果
(5秒後)
Done
(終了。タスクが実行されることはない)

なんということでしょう!タスクは実行されることなく、プログラムは終了してしまうのです。

答え: 同期スリープだとOSスレッドをがっつり掴んだまま寝てしまいます。タスクにスレッドを譲ることなく、5秒後に目覚めて、print出力しメインプログラムは終了してしまいます。

修正方法

タスクにOSスレッドを譲るには同期スリープを非同期スリープに変更します。tokioの非同期プログラミングではプログラマーが明示的にawaitを入れ、協調的(cooperative)にタスクをスケジューリングする必要があります。

修正例
    tokio::time::sleep(Duration::from_secs(5)).await;  // 譲るよ!

2-3 タスクを2つ生成してみるよ

さて、次のプログラムの実行順序はどうなるでしょうか?

ex3.rs
use std::time::Duration;

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn main() {
    let cpus = num_cpus::get();
    println!("logical cores: {}", cpus);

    tokio::spawn(async move {
        println!("task 1 started...");
        std::thread::sleep(Duration::from_secs(3)); // 3秒寝る(同期スリープ)
        println!("task 1 woke up!");
    });
    // この時点でtask1はすでに走っている。
    
    tokio::spawn(async move {
        println!("task 2 started...");
        std::thread::sleep(Duration::from_secs(1));
        println!("task 2 woke up!");
    });
    // この時点でtask2は走らない。空きスレッドがないから。
    std::thread::sleep(Duration::from_secs(5)); // 3秒後にtask2は走る

    println!("Done");
}

タスク1とタスク2は逐次的に実行します。つまり、このプログラムの全体の実行時間は4秒になります。

実行結果
logical cores: 4 (論理コア数。マシンによって変わります。)
task 1 started...
(3秒後)
task 1 woke up!
task 2 started...
(1秒後)
task 2 woke up!
(即時)
Done

worker_threads = 1 と指定すると、メインスレッド+ワーカースレッド1になります。
タスク1は同期スリープであるため、タスク2にスレッドを譲りません。

ではここで、worker_threads = 2にして実行してみましょう。

実行結果
task 1 started...
task 2 started...
task 2 woke up!
task 1 woke up!
Done

このようにタスク1と2は別々のスレッドでパラレルに実行します。

とは言うものの、Tokioではタスクにバジェットを持たせることで、yieldさせる仕掛けはあり、そこまでnaive(単純)でもありません。あの、Ryan Dahlとの会話で誕生したアイデアのようです。

Even though Tokio is not able to preempt, there is still an opportunity to nudge a task to yield back to the scheduler. As of 0.2.14, each Tokio task has an operation budget. This budget is reset when the scheduler switches to the task. Each Tokio resource (socket, timer, channel, ...) is aware of this budget.
Tokio公式ブログ

2-4 タスクをいっぱい生成してみるよ

tokioのタスクを数百、数千と生成してもグリーンスレッドですからパフォーマンス的に問題はありません。だがしかし、同期呼び出しをしたら大変なことになりますよ、というコードをご紹介します。

タスクAを1つ、タスクBを1000個生成します。タスクAは10秒同期スリープします。

ex4
use std::time::Duration;
use futures::{stream::FuturesUnordered, StreamExt};

// #[tokio::main]
#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn main() {
    console_subscriber::init();

    let mut tasks = FuturesUnordered::new();

    let h1 = tokio::spawn(async move {
        std::thread::sleep(Duration::from_secs(10)); // ここでつまるよ!
        println!("0 woke up!");
    });
    tasks.push(h1);
    println!("0 started...");

    for i in 1..1000 {
        let h2 = tokio::spawn(async move {
            tokio::time::sleep(Duration::from_secs(1)).await;
            println!("{i} woke up!");
        });
        tasks.push(h2);
        println!("{i} ...");
    }

    // join_allは遅いよ。https://github.com/tokio-rs/tokio/issues/2401
    while let Some(item) = tasks.next().await {
        let () = item.unwrap();
    }
    println!("Done");
}

2-3と同様に、タスクAが大渋滞を引き起こします。仮にこれがWebサーバであれば、タスクBのレスポンスタイムは11s+になります。シングルスレッド、イベントループではたった一つのリクエストが後続のリクエストを全滅させるという恐怖があります。

実行結果
(省略)
998 started...
999 started...
(10秒後)
0 woke up! 
2 woke up! (この子のレスポンスタイムは11秒になる)
1 woke up!
(省略)

tokio-consoleでデバッグするとよく分かります。一番上が同期スリープしているタスクです。Busy状態になっており、OSスレッドをつかんでいるのがよく分かります。その他のタスクはアイドル状態になっています。(この例では分かりやすいようにスリープを30秒にしています。)

https://zenn.dev/tfutada/articles/4dbb9659bb8102

2-5 チャネル

一般的にコンカレントなプログラミングでは、複数のスレッド間のデータの受け渡しにチャネルを使用するのがエレガントなやり方です。tokioでもOSスレッド(thread::spawn)と同じようにチャネル通信ができます。OSスレッドに関しては私のこちらの記事を参考にしていただければと思います。

https://zenn.dev/tfutada/articles/16766e3b4560db

さて、tokioのチャネルはOSスレッドのチャネルとほぼ使い方は同じです。大きな違いは、send().await, recv().awaitとawaitをつけること、つまり非同期呼び出しであるということです。

チャネルにはバッファーサイズを指定できます。バッファーがいっぱいになると、send().awaitはサスペンドし次の処理に進めません。バックプレッシャーと言いますが、送りすぎを防止する機能になります。

アナロジーで言うと、胃がchannel、口がsend()、小腸がrecv()になります。消化が間に合わず、胃が一杯(buffer full)になると、もう食べること(send)はできません。小腸での消化を待ち、胃にスペースができるとまた食べることができます。

channel
use std::time::Duration;
use tokio::sync::mpsc;

#[tokio::main()]
async fn main() {
    // チャネル (胃)
    let (tx1, mut rx1) = mpsc::channel(2);

    // 送信側タスク (口)
    let handle1 = tokio::spawn(async move {
        let mut count: i32 = 0;

        while count < 5 {
            if let Err(e) = tx1.send(count).await { // 食べる
                eprintln!("{}", e);
                break;
            }
            println!("sent {}", count);
            count += 1;
        }
    });
    // この時点で食事はすでに進んでいます。

    // 受信側タスク (小腸)
    let handle2 = tokio::spawn(async move {
        while let Some(msg) = rx1.recv().await { // 消化
            tokio::time::sleep(Duration::from_secs(1)).await;
            println!("get {}", msg);
        }
    });

    let (_r1, _r2) = (handle1.await.unwrap(), handle2.await.unwrap());
}
実行結果
sent 0
sent 1
sent 2
get 0 (1秒後)
sent 3
get 1
sent 4
get 2
get 3
get 4

実際にどのようなユースケースがあるかと言えば、例えばデータベースのマイグレーション処理に使えます。ターゲットのDBからデータを取得しながら、データの加工処理を施し、別のデータベースに登録するなどです。

2-6 排他ロックとawait

こちらのプログラムはコンパイルエラーになります。tokioのプログラミングをすると必ずと言っていいほどお目にかかるエラーです。

ex5
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;

#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(0));
    let d = Arc::clone(&data);

    let handle = tokio::spawn(async move {
        let mut guard: MutexGuard<i32> = d.lock().unwrap();
        *guard += 1;
        tokio::time::sleep(Duration::from_secs(1)).await;
    });

    handle.await.unwrap();
    println!("{:?}", data);
}
コンパイルエラー
error: future cannot be sent between threads safely
   --> src/zenn/ex5.rs:10:18
    |
10  |     let handle = tokio::spawn(async move {
    |                  ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl Future<Output = [async output]>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
   --> src/zenn/ex5.rs:13:51
    |
11  |         let mut d = d.lock().unwrap();
    |             ----- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
12  |         *d += 1;
13  |         tokio::time::sleep(Duration::from_secs(1)).await;
    |                                                   ^^^^^^ await occurs here, with `mut d` maybe used later

先に答えです。ブロックスコープを使用します。guardのライフタイムがawaitを超えないように明示するとコンパイラーが許してくれます。

コンパイルできるよ
    let handle = tokio::spawn(async move {
        {
            let mut guard: MutexGuard<i32> = d.lock().unwrap();
            *guard += 1;
        }
        tokio::time::sleep(Duration::from_secs(1)).await;
    });

実は、tokioのタスクはawait処理の前後で別のOSスレッドに引っ越しする可能性があります。そして、Mutexとはmutual exclusionの略ですが、複数のスレッドが同時に一つのリソースにアクセスrace conditionしないように排他制御するものです。

つまり、awaitの前後でロックを保持するOSスレッドが変わる可能性があるわけです。

しかしながら、Mutexは通常の同期プログラミングでの使用を前提に設計されており、非同期プログラミングでの使用はできません。つまり、mutexガードをスレッド間で共有できないことにあります。

note: future is not Send as this value is used across an await

とエラーメッセージにあるように、this value(MutexGuard)がawaitをまたがないようにすれば同一スレッドでライフタイムは終了するためコンパイルが通ります。

There's two problems here, both stemming from the fact that the standard library Mutex is designed to be used with regular blocking code, rather than async code:
Locking the mutex will block the thread, which is generally something you want to avoid in async code as it can prevent other tasks from running.
As seen in the compiler error, the mutex guard that you get by locking it can't be shared between threads safely, so it won't compile anyways.
引用StackOverflow

3 補足

3-1 Tokioランタイム

tokioランタイムの設定をカスタマイズできます。
マクロを使わずに、直接ビルダーでランタイムを生成します。block_onにプログラム本体を渡します。

ランタイム
// #[tokio::main] 以下のように展開される
fn main() {
    let cpus = num_cpus::get();
    println!("logical cores: {}", cpus);

    tokio::runtime::Builder::new_multi_thread() // or Builder::new_current_thread
        .worker_threads(5) // ワーカースレッド数
        .max_blocking_threads(1) // ブロッキングスレッド数。次で説明。
        .enable_all() // ネットワークI/Oなどを有効化
        .on_thread_start(|| {
            println!("thread started");
        })
        .build()
        .unwrap()
        .block_on(async { // ルートのタスク
            println!("Hello world");

3-2 spawn_bloking

実はtokioには、ワーカースレッドとブロッキングスレッドの2種類のスレッドがあります。

ワーカースレッドとはこれまで紹介したasyncタスクを実行するもので論理コア数分作成されます。tokioの開発でお馴染みのAlice Ryhlによると、awaitからawaitの実行時間は10μsから100μsが望ましく、それ以上の時間がかかる場合にはブロッキングスレッドspawn_blocking()の使用が推奨されています。

このサンプルコードではシングルスレッドでメインを同期スリープさせています。
tokio::spawnはこれまで何度も説明したようにワーカースレッドで処理されるためブロックされます。一方で、spawn_blockingはブロッキングスレッドで処理されるためブロックされません。(このスレッド内では同期的な記述をします。)

spawn_blocking
use std::time::Duration;
use tokio::task;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    tokio::spawn(async {
        // 順番が回ってこない。。。
        tokio::time::sleep(Duration::from_secs(1)).await;
        println!("woke up! 1");
    });

    let handle = task::spawn_blocking(move || { // ブロッキングスレッドで動くよ
        std::thread::sleep(Duration::from_secs(1)); // 同期的に記述する
        println!("woke up! 2");
    });

    std::thread::sleep(Duration::from_secs(3));
    println!("woke up! 3");

    handle.await.expect("TODO: panic message");
}
実行結果
woke up! 2
woke up! 3
(woke up! 1は実行されない)

3-3 join!

JavaScriptのPromise.all()のように、複数のasyncブロックをコンカレントに実行するマクロです。渡した全てのasyncブロックは同一のOSスレッドで実行します。言い方を変えるとマルチコア上でも並列処理(parallel)には実行されません。パラレルに実行したい場合は、tokio::spawnして別々のタスクとして実行する必要があります。

By running all async expressions on the current task, the expressions are able to run concurrently but not in parallel. This means all expressions are run on the same thread and if one branch blocks the thread, all other expressions will be unable to continue. If parallelism is required, spawn each async expression using tokio::spawn and pass the join handle to join!.
Rust公式

ぱっと見で、このサンプルコードのprintlnの出力順がわかった人はすごいです。

async-join
use futures::future;
use futures::future::join_all;
use std::thread;
use std::time::Duration;

#[tokio::main()]
async fn main() {
    let logic1 = async {
        println!("logic1...");
        std::thread::sleep(Duration::from_secs(5));
    };

    let logic2 = async {
        println!("logic2... will start once logic1 is done.");
        std::thread::sleep(Duration::from_secs(5));
    };

    println!("waiting at join!");
    tokio::join!(logic1, logic2);

    println!("all done");
}
実行結果
waiting at join!
logic1...
logic2... will start once logic1 is done.
all done

asyncブロックはすぐには実行されません。join!で始めて開始します。ここがJavaScriptと違うところです。繰り返しになりますが、2つのasyncブロックは同一のOSスレッドで処理されます。例えマルチスレッドであっても!logic1が同期スリープをしているため、logic2はlogic1の終了待ちになります。

もうお分かりですね。spawnにすることで、マルチスレッド上でパラレルに実行できます。

spawn-join
async fn main() {
    let handle1 = tokio::spawn(async {
        println!("logic1...");
        std::thread::sleep(Duration::from_secs(5));
    });

    let handle2 = tokio::spawn(async {
        println!("logic2...");
        std::thread::sleep(Duration::from_secs(5));
    });
    println!("waiting both of handles");
    let (_r1, _r2) = (handle1.await.unwrap(), handle2.await.unwrap());
}

asyncのメリットはasyncブロックでボローしてもコンパイルエラーにはなりません。spawnでは`staticである必要があるためライフサイクルでコンパイルエラーになります。

async comes in handy
async fn main() {
    let url = String::from("https://example.com");

    let logic1 = async {
        println!("logic1... get {}", &url); // ボローできる
        std::thread::sleep(Duration::from_secs(5));
    };

    let logic2 = async {
        println!("logic2... get {}", &url);
        std::thread::sleep(Duration::from_secs(5));
    };

https://stackoverflow.com/questions/69638710/when-should-you-use-tokiojoin-over-tokiospawn

4 Vegetaアプリを作ってみた

理解を深めるため、負荷テストツールのvegetaをtokioで実装してみました。膨大なリクエストをサーバに送りレスポンスタイムを測定、統計処理するものです。通常、負荷テスtではレスポンスの中身はパースせずに捨ててしまいますので、CPUを全く消費しないI/Oバウンドなプログラムになります。

spawnとチャネルを実際にどう使えば良いかの参考になるかと思います。

使い方
$ ./target/release/vegeta -u http://localhost:8000/foo -r 5 -d 30

ソースコードはGitHubにあります。ご自由にご利用、改造してください。

5 参考資料

Rust界隈では有名なJonのビデオを参考にさせていただきました。彼の英語は聞き取りやすく、何より説明がうまいです。

https://www.youtube.com/watch?v=ThjvMReOXYM&

Discordでは、Aliceが質問に丁寧に答えています。

https://discord.com/invite/tokio

Discussion

ログインするとコメントできます