RustのTokioで非同期とグリーンスレッドを理解する
この記事はRust Advent Calendar 2022 - Qiitaの13日目の記事です。
はじめに
Rustの非同期I/Oライブラリ tokio
の解説記事になります。初めに概要説明をします。その後でちょっとしたソースコードを見せながら解説をしていきます。理論より実践、実際にどう書けば良いのか知りたい人向けの記事になります。
tokioはライブラリの一つに過ぎませんが、hyper
、actix-web
などのWebフレームワーク、Deno
のイベントループ、Solana
のツールなどで利用されており、デファクトスタンダード的な重要なライブラリになっています。
またDiscord
はtokio
のスポンサーをしており、GoからRust(Tokio)で書き換えた話は小説を一冊読み終えたかのようなブラボーな興奮がありました。
ちなみに、tokio
のネーミングは開発者のCarl Lerche
が東京観光を楽しんだことや、ローレベルI/OライブラリMioとの語呂合わせから名付けられたそうです。
I enjoyed visiting Tokio (Tokyo) the city and I liked the "io" suffix and how it plays w/ Mio as well. I don't know... naming is hard so I didn't spend too much time thinking about it.
1 非同期I/Oとグリーンスレッド
1-1 tokioとは?
乱暴な言い方をすれば、Node.jsとGolangを合体させたのがtokio
です。
Node.jsとはC10Kのような膨大なリクエストをシングルスレッド&イベントループ&非同期I/Oで効率良く処理するランタイムです。実際、Denoにはtokioが使用されています。tokioでもasync/await
を使用して非同期プログラミングをします。
Pythonでもaysncio
のような非同期プログラミングが導入されています。Kotlinにもコルーチンというグリーンスレッドとasync/await
を使用した非同期プログラミングがあります。
Golangではgoroutineという超軽量なスレッド(OSスレッドではない)とチャネルを使用してパラレルに処理するランタイムです。最近のCPUはマルチコア、ハイパースレッディングなどの技術により、パラレルに複数の命令を実行できるようになっています。インテルCorei9だと論理コア数が20もありますが、シングルスレッドでは宝の持ち腐れになります。tokioでもtokio::spawn
を使用してマルチスレッド・プログラミングをします。
A task is a light weight, non-blocking unit of execution. A task is similar to an OS thread, but rather than being managed by the OS scheduler, they are managed by the Tokio runtime. Another name for this general pattern is green threads. If you are familiar with Go's goroutines, Kotlin's coroutines, or Erlang's processes, you can think of Tokio's tasks as something similar...
Rust公式
つまり、tokioでは非同期プログラミングとグリーンスレッドの両方の理解が必要になるためハードルは高いです。そして、Rust言語固有の所有権が加わりカオスさを増します。
CPUバウンドの場合はOSネイティブスレッドを使用する必要があります。こちらの私が以前書いた記事も参考にしてください。
1-2 非同期I/Oとは?
AjaxとはAsynchronous JavaScript and XML
の略ですが、サーバとの通信を非同期(asynchronous)に行うことでブラウザの画面が固まらないようにする仕組みです。Node.jsはクライアントからのリクエストを非同期に処理することでC10Kという膨大なリクエストでもサーバがダウンしないようにする仕組みです。
外部APIの呼び出しやデータベース接続など、ネットワークが絡む処理は速くても1ms、ルーターなどを経由すれば最悪1秒近くかかることもあります。その一方でCPUの処理速度はナノ秒単位と超高速です。人間の時間感覚(1秒基準)に例えると、CPUにとっての1秒は80年になります。つまり人生が終わってしまいます。
つまり、ネットワークからレスポンスを受け取るまでの間、CPU使用率はほぼゼロに近い状態になるわけです。そうであるならば、レスポンス待ちの場合は一旦処理を中断し、別のタスクの処理をした方がお得なわけです。レスポンスが返りしだい再開することで、システムのスループットを改善できるわけです。
その一方で短所もあります。協調的スケジューリングであるため、プログラマーの責任でタスクススワップする必要があります。同期処理やCPUヘビーなロジックがあるとイベントループの流れが悪くなり渋滞が発生し、テールレイテンシー、後続のリクエストの遅延が大きくなります。
When writing async Rust, the phrase “blocking the thread” means “preventing the runtime from swapping the current task”. This can be a major issue because it means that other tasks on the same runtime will stop running until the thread is no longer being blocked. To prevent this, we should write code that can be swapped quickly, which you do by never spending a long time away from an .await.
Alice Ryhl
また、こちらのYahoo! Japan
の記事はNode.jsですが非同期の難しさをよく語っています。
1-3 グリーンスレッドとは?
tokioではOSではなくランタイムがタスクのスケジューリングをします。つまり、グリーンスレッド(M:Nスレッド)になります。グリーンスレッドは64バイトと軽量であり、goroutineのようにたくさんスレッドを生成しても問題ありません。
Tasks in Tokio are very lightweight. Under the hood, they require only a single allocation and 64 bytes of memory.
Tokio公式チュートリアル
タスクの生成は、tokio::spawn()
を呼び出します。タスクが一つの処理単位になります。イベントキューが論理CPUの数だけ作成されます。つまり、1論理コアに1OSスレッドが張り付きます。生成されたタスクが、イベントキューに放り込まれ処理されていきます。
Spawning a task enables the task to execute concurrently to other tasks. The spawned task may execute on the current thread, or it may be sent to a different thread to be executed.
RustDoc
tokioのランタイム・スケジューラはタスクをロードバランスしながら各イベントキューに割り振ります。
2 実際に動かして学ぶ
実際のコードを例に説明します。非同期処理、グリーンスレッド(タスク)の理解を深めるため、あえて変なことをしていますので真似しないように。
2-1 手始めにHello World的なやつ
まずはHello World的なコードから始めましょう。
おまじない的に main()
に#[tokio::main]
マクロを指定します。tokioのランタイムが生成されます。
次に、tokio::task::spawn(async {タスク})
でタスク(グリーンスレッド)を生成します。asyncブロックが1つのタスクになります。(実際にはここでREST APIを叩くなど、I/Oバウンドな処理をします。)
次のコードは3秒間スリープしたあとで、"woke up!"
と出力する例です。
#[tokio::main] // 忘れないで!
async fn main() {
tokio::task::spawn(async { // タスクを記述する
tokio::time::sleep(Duration::from_secs(3)).await; // ネットワーク呼び出しのフリ
println!("woke up!"); // 3秒後に実行されるよ
});
// この時点でタスクはすでに走り始めている
std::thread::sleep(Duration::from_secs(5)); // プログラムが終了しないように
println!("Done");
}
(3秒後)
woke up!
(2秒後 5s - 3s = 2s)
Done
tokio::task::spawn
した時点でタスク(グリーンスレッド)は実行されます。goroutine
と似たような感じです。tokio::spawn
と省略して書けます。
2-2 シングルスレッドにしてみよう
さて次に面白い実験をしてみましょう。先ほどのプログラムをシングルスレッド環境で実行してみましょう。
#[tokio::main(flavor = "current_thread")]
と指定すると、Node.jsのようにOSスレッドが1つだけのランタイムになります。
#[tokio::main(flavor = "current_thread")]
async fn main() {
tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(3)).await;
println!("woke up!"); // 表示されない!
});
std::thread::sleep(Duration::from_secs(5)); // 譲らない!
println!("Done");
}
(5秒後)
Done
(終了。タスクが実行されることはない)
なんということでしょう!タスクは実行されることなく、プログラムは終了してしまうのです。
答え: 同期スリープだとOSスレッドをがっつり掴んだまま寝てしまいます。タスクにスレッドを譲ることなく、5秒後に目覚めて、print出力しメインプログラムは終了してしまいます。
修正方法
タスクにOSスレッドを譲るには同期スリープを非同期スリープに変更します。tokioの非同期プログラミングではプログラマーが明示的にawait
を入れ、協調的(cooperative)にタスクをスケジューリングする必要があります。
tokio::time::sleep(Duration::from_secs(5)).await; // 譲るよ!
2-3 タスクを2つ生成してみるよ
さて、次のプログラムの実行順序はどうなるでしょうか?
use std::time::Duration;
#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn main() {
let cpus = num_cpus::get();
println!("logical cores: {}", cpus);
tokio::spawn(async move {
println!("task 1 started...");
std::thread::sleep(Duration::from_secs(3)); // 3秒寝る(同期スリープ)
println!("task 1 woke up!");
});
// この時点でtask1はすでに走っている。
tokio::spawn(async move {
println!("task 2 started...");
std::thread::sleep(Duration::from_secs(1));
println!("task 2 woke up!");
});
// この時点でtask2は走らない。空きスレッドがないから。
std::thread::sleep(Duration::from_secs(5)); // 3秒後にtask2は走る
println!("Done");
}
タスク1とタスク2は逐次的に実行します。つまり、このプログラムの全体の実行時間は4秒になります。
logical cores: 4 (論理コア数。マシンによって変わります。)
task 1 started...
(3秒後)
task 1 woke up!
task 2 started...
(1秒後)
task 2 woke up!
(即時)
Done
worker_threads = 1
と指定すると、メインスレッド+ワーカースレッド1
になります。
タスク1は同期スリープであるため、タスク2にスレッドを譲りません。
ではここで、worker_threads = 2
にして実行してみましょう。
task 1 started...
task 2 started...
task 2 woke up!
task 1 woke up!
Done
このようにタスク1と2は別々のスレッドでパラレルに実行します。
とは言うものの、Tokioではタスクにバジェットを持たせることで、yieldさせる仕掛けはあり、そこまでnaive(単純)でもありません。あの、Ryan Dahl
との会話で誕生したアイデアのようです。
Even though Tokio is not able to preempt, there is still an opportunity to nudge a task to yield back to the scheduler. As of 0.2.14, each Tokio task has an operation budget. This budget is reset when the scheduler switches to the task. Each Tokio resource (socket, timer, channel, ...) is aware of this budget.
Tokio公式ブログ
2-4 タスクをいっぱい生成してみるよ
tokioのタスクを数百、数千と生成してもグリーンスレッドですからパフォーマンス的に問題はありません。だがしかし、同期呼び出しをしたら大変なことになりますよ、というコードをご紹介します。
タスクAを1つ、タスクBを1000個生成します。タスクAは10秒同期スリープします。
use std::time::Duration;
use futures::{stream::FuturesUnordered, StreamExt};
// #[tokio::main]
#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn main() {
console_subscriber::init();
let mut tasks = FuturesUnordered::new();
let h1 = tokio::spawn(async move {
std::thread::sleep(Duration::from_secs(10)); // ここでつまるよ!
println!("0 woke up!");
});
tasks.push(h1);
println!("0 started...");
for i in 1..1000 {
let h2 = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("{i} woke up!");
});
tasks.push(h2);
println!("{i} ...");
}
// join_allは遅いよ。https://github.com/tokio-rs/tokio/issues/2401
while let Some(item) = tasks.next().await {
let () = item.unwrap();
}
println!("Done");
}
2-3と同様に、タスクAが大渋滞を引き起こします。仮にこれがWebサーバであれば、タスクBのレスポンスタイムは11s+になります。シングルスレッド、イベントループではたった一つのリクエストが後続のリクエストを全滅させるという恐怖があります。
(省略)
998 started...
999 started...
(10秒後)
0 woke up!
2 woke up! (この子のレスポンスタイムは11秒になる)
1 woke up!
(省略)
tokio-console
でデバッグするとよく分かります。一番上が同期スリープしているタスクです。Busy状態になっており、OSスレッドをつかんでいるのがよく分かります。その他のタスクはアイドル状態になっています。(この例では分かりやすいようにスリープを30秒にしています。)
2-5 チャネル
一般的にコンカレントなプログラミングでは、複数のスレッド間のデータの受け渡しにチャネルを使用するのがエレガントなやり方です。tokioでもOSスレッド(thread::spawn)と同じようにチャネル通信ができます。OSスレッドに関しては私のこちらの記事を参考にしていただければと思います。
さて、tokioのチャネルはOSスレッドのチャネルとほぼ使い方は同じです。大きな違いは、send().await
, recv().await
とawaitをつけること、つまり非同期呼び出しであるということです。
チャネルにはバッファーサイズを指定できます。バッファーがいっぱいになると、send().await
はサスペンドし次の処理に進めません。バックプレッシャーと言いますが、送りすぎを防止する機能になります。
アナロジーで言うと、胃がchannel、口がsend()、小腸がrecv()になります。消化が間に合わず、胃が一杯(buffer full)になると、もう食べること(send)はできません。小腸での消化を待ち、胃にスペースができるとまた食べることができます。
use std::time::Duration;
use tokio::sync::mpsc;
#[tokio::main()]
async fn main() {
// チャネル (胃)
let (tx1, mut rx1) = mpsc::channel(2);
// 送信側タスク (口)
let handle1 = tokio::spawn(async move {
let mut count: i32 = 0;
while count < 5 {
if let Err(e) = tx1.send(count).await { // 食べる
eprintln!("{}", e);
break;
}
println!("sent {}", count);
count += 1;
}
});
// この時点で食事はすでに進んでいます。
// 受信側タスク (小腸)
let handle2 = tokio::spawn(async move {
while let Some(msg) = rx1.recv().await { // 消化
tokio::time::sleep(Duration::from_secs(1)).await;
println!("get {}", msg);
}
});
let (_r1, _r2) = (handle1.await.unwrap(), handle2.await.unwrap());
}
sent 0
sent 1
sent 2
get 0 (1秒後)
sent 3
get 1
sent 4
get 2
get 3
get 4
実際にどのようなユースケースがあるかと言えば、例えばデータベースのマイグレーション処理に使えます。ターゲットのDBからデータを取得しながら、データの加工処理を施し、別のデータベースに登録するなどです。
2-6 排他ロックとawait
こちらのプログラムはコンパイルエラーになります。tokioのプログラミングをすると必ずと言っていいほどお目にかかるエラーです。
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
#[tokio::main]
async fn main() {
let data = Arc::new(Mutex::new(0));
let d = Arc::clone(&data);
let handle = tokio::spawn(async move {
let mut guard: MutexGuard<i32> = d.lock().unwrap();
*guard += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
});
handle.await.unwrap();
println!("{:?}", data);
}
error: future cannot be sent between threads safely
--> src/zenn/ex5.rs:10:18
|
10 | let handle = tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `impl Future<Output = [async output]>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/zenn/ex5.rs:13:51
|
11 | let mut d = d.lock().unwrap();
| ----- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
12 | *d += 1;
13 | tokio::time::sleep(Duration::from_secs(1)).await;
| ^^^^^^ await occurs here, with `mut d` maybe used later
先に答えです。ブロックスコープを使用します。guard
のライフタイムがawait
を超えないように明示するとコンパイラーが許してくれます。
let handle = tokio::spawn(async move {
{
let mut guard: MutexGuard<i32> = d.lock().unwrap();
*guard += 1;
}
tokio::time::sleep(Duration::from_secs(1)).await;
});
実は、tokioのタスクはawait処理の前後で別のOSスレッドに引っ越しする可能性があります。そして、Mutexとはmutual exclusion
の略ですが、複数のスレッドが同時に一つのリソースにアクセスrace condition
しないように排他制御するものです。
つまり、awaitの前後でロックを保持するOSスレッドが変わる可能性があるわけです。
しかしながら、Mutex
は通常の同期プログラミングでの使用を前提に設計されており、非同期プログラミングでの使用はできません。つまり、mutexガードをスレッド間で共有できないことにあります。
note: future is not
Send
as this value is used across an await
とエラーメッセージにあるように、this value(MutexGuard)
がawaitをまたがないようにすれば同一スレッドでライフタイムは終了するためコンパイルが通ります。
There's two problems here, both stemming from the fact that the standard library Mutex is designed to be used with regular blocking code, rather than async code:
Locking the mutex will block the thread, which is generally something you want to avoid in async code as it can prevent other tasks from running.
As seen in the compiler error, the mutex guard that you get by locking it can't be shared between threads safely, so it won't compile anyways.
引用StackOverflow
3 補足
3-1 Tokioランタイム
tokioランタイムの設定をカスタマイズできます。
マクロを使わずに、直接ビルダーでランタイムを生成します。block_on
にプログラム本体を渡します。
// #[tokio::main] 以下のように展開される
fn main() {
let cpus = num_cpus::get();
println!("logical cores: {}", cpus);
tokio::runtime::Builder::new_multi_thread() // or Builder::new_current_thread
.worker_threads(5) // ワーカースレッド数
.max_blocking_threads(1) // ブロッキングスレッド数。次で説明。
.enable_all() // ネットワークI/Oなどを有効化
.on_thread_start(|| {
println!("thread started");
})
.build()
.unwrap()
.block_on(async { // ルートのタスク
println!("Hello world");
3-2 spawn_bloking
実はtokioには、ワーカースレッドとブロッキングスレッドの2種類のスレッドがあります。
ワーカースレッドとはこれまで紹介したasyncタスクを実行するもので論理コア数分作成されます。tokioの開発でお馴染みのAlice Ryhlによると、awaitからawaitの実行時間は10μsから100μsが望ましく、それ以上の時間がかかる場合にはブロッキングスレッドspawn_blocking()
の使用が推奨されています。
このサンプルコードではシングルスレッドでメインを同期スリープさせています。
tokio::spawn
はこれまで何度も説明したようにワーカースレッドで処理されるためブロックされます。一方で、spawn_blocking
はブロッキングスレッドで処理されるためブロックされません。(このスレッド内では同期的な記述をします。)
use std::time::Duration;
use tokio::task;
#[tokio::main(flavor = "current_thread")]
async fn main() {
tokio::spawn(async {
// 順番が回ってこない。。。
tokio::time::sleep(Duration::from_secs(1)).await;
println!("woke up! 1");
});
let handle = task::spawn_blocking(move || { // ブロッキングスレッドで動くよ
std::thread::sleep(Duration::from_secs(1)); // 同期的に記述する
println!("woke up! 2");
});
std::thread::sleep(Duration::from_secs(3));
println!("woke up! 3");
handle.await.expect("TODO: panic message");
}
woke up! 2
woke up! 3
(woke up! 1は実行されない)
3-3 join!
JavaScriptのPromise.all()
のように、複数のasyncブロックをコンカレントに実行するマクロです。渡した全てのasyncブロックは同一のOSスレッドで実行します。言い方を変えるとマルチコア上でも並列処理(parallel)には実行されません。パラレルに実行したい場合は、tokio::spawnして別々のタスクとして実行する必要があります。
By running all async expressions on the current task, the expressions are able to run concurrently but not in parallel. This means all expressions are run on the same thread and if one branch blocks the thread, all other expressions will be unable to continue. If parallelism is required, spawn each async expression using tokio::spawn and pass the join handle to join!.
Rust公式
ぱっと見で、このサンプルコードのprintlnの出力順がわかった人はすごいです。
use futures::future;
use futures::future::join_all;
use std::thread;
use std::time::Duration;
#[tokio::main()]
async fn main() {
let logic1 = async {
println!("logic1...");
std::thread::sleep(Duration::from_secs(5));
};
let logic2 = async {
println!("logic2... will start once logic1 is done.");
std::thread::sleep(Duration::from_secs(5));
};
println!("waiting at join!");
tokio::join!(logic1, logic2);
println!("all done");
}
waiting at join!
logic1...
logic2... will start once logic1 is done.
all done
asyncブロックはすぐには実行されません。join!
で始めて開始します。ここがJavaScriptと違うところです。繰り返しになりますが、2つのasyncブロックは同一のOSスレッドで処理されます。例えマルチスレッドであっても!logic1が同期スリープをしているため、logic2はlogic1の終了待ちになります。
もうお分かりですね。spawn
にすることで、マルチスレッド上でパラレルに実行できます。
async fn main() {
let handle1 = tokio::spawn(async {
println!("logic1...");
std::thread::sleep(Duration::from_secs(5));
});
let handle2 = tokio::spawn(async {
println!("logic2...");
std::thread::sleep(Duration::from_secs(5));
});
println!("waiting both of handles");
let (_r1, _r2) = (handle1.await.unwrap(), handle2.await.unwrap());
}
asyncのメリットはasyncブロックでボローしてもコンパイルエラーにはなりません。spawnでは`staticである必要があるためライフサイクルでコンパイルエラーになります。
async fn main() {
let url = String::from("https://example.com");
let logic1 = async {
println!("logic1... get {}", &url); // ボローできる
std::thread::sleep(Duration::from_secs(5));
};
let logic2 = async {
println!("logic2... get {}", &url);
std::thread::sleep(Duration::from_secs(5));
};
4 Vegetaアプリを作ってみた
理解を深めるため、負荷テストツールのvegeta
をtokioで実装してみました。膨大なリクエストをサーバに送りレスポンスタイムを測定、統計処理するものです。通常、負荷テスtではレスポンスの中身はパースせずに捨ててしまいますので、CPUを全く消費しないI/Oバウンドなプログラムになります。
spawnとチャネルを実際にどう使えば良いかの参考になるかと思います。
$ ./target/release/vegeta -u http://localhost:8000/foo -r 5 -d 30
ソースコードはGitHubにあります。ご自由にご利用、改造してください。
5 参考資料
Rust界隈では有名なJonのビデオを参考にさせていただきました。彼の英語は聞き取りやすく、何より説明がうまいです。
Discordでは、Aliceが質問に丁寧に答えています。
Discussion