🦀

tokio-console: topみたいなRustのコマンドラインツール

2021/12/20に公開

この記事はRust Advent Calendar 2021 21日目の記事です。

tokio-console

この記事では3日ほど前(2021-12-17)にリリースされたtokio-consoleというコマンドツールの使い方とtokioタスクの説明をします。tokio自体はなんとなく理解しているという想定で説明します。


実行イメージ

topコマンドのようにtokioのタスクを一覧表示するコマンドラインツールです。ターゲットのプログラムに少しだけ小細工が必要になります。

Tokio Console is a diagnostics and debugging tool for asynchronous Rust programs. It gives you a live, easy-to-navigate view into the program's tasks and resources, summarizing both their current status and their historical behavior.

https://tokio.rs/blog/2021-12-announcing-tokio-console

インストール手順

  1. ターゲットプログラムへの仕込み作業
  2. tokio-consoleツールのインストール

1.ターゲットプログラムへの仕込み作業

ターゲットのプログラムのCargo.tomlにconsole-subscriberを追加します。

Cargo.toml
[dependencies]
console-subscriber = "0.1.0"

.cargo/configファイルを作成します。

.cargo/config
[build]
rustflags = ["--cfg", "tokio_unstable"]

main() の初めにconsole_subscriber::init();を追加します。

your_app.rs
#[tokio::main]
async fn main() {
    console_subscriber::init(); // 追加する。

プログラムをビルド&実行します。

cargo clean
cargo run

2.tokio-consoleツールのインストール

tokio-consoleをインストールして実行します。

cargo install tokio-console
tokio-console

記事の最初のスクリーンショットのtopのような画面が表示されます。真っ黒で何もでない場合は、仕込み作業のところを再度確認してください。また、ターゲットプログラムでtokio::spawn()が使われてなければtokioタスクは無いため何も表示されません。

tokioのタスクとは?

tokioのタスクはGoのgoroutine(グリーンスレッド)に似ています。OSスケジューラではなくランタイムによってスケジューリングされます。

A task is a light weight, non-blocking unit of execution. A task is similar to an OS thread, but rather than being managed by the OS scheduler, they are managed by the Tokio runtime. Another name for this general pattern is green threads. If you are familiar with Go's goroutines, Kotlin's coroutines, or Erlang's processes, you can think of Tokio's tasks as something similar...

Goはpreemptive(Go1.14から)ですが、tokioのタスクは協調型であるためスレッドがCPUを掴んだまま走り続ける可能性があります。

Tasks are scheduled cooperatively. Most operating systems implement preemptive multitasking. This is a scheduling technique where the operating system allows each thread to run for a period of time, and then preempts it, temporarily pausing that thread and switching to another. Tasks, on the other hand, implement cooperative multitasking. In cooperative multitasking, a task is allowed to run until it yields, ...
原文

サンプルプログラム

tokioのタスクがCPUを掴んで走り続けてしまうダメなコードを書きます。rust-consoleで確認してみましょう。

sample.rs
use futures::future;
use std::thread;
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
// #[tokio::main(flavor = "current_thread")]
// #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() {
    console_subscriber::init(); // for tokio-console

    // 1. 論理CPU数の取得
    let cpus = num_cpus::get();
    println!("logical cores: {}", cpus);

    let mut handles = Vec::new();

    // 2. Task A: ノンブロッキング・スリープ
    let task_a = tokio::task::Builder::new().name("Task A").spawn(async {
        loop {
            println!("   =Task A sleeping... {:?}", thread::current().id());
            sleep(Duration::from_secs(1)).await; // ノンブロッキング・スリープ
            println!("   =Task A woke up.");
        }
    });
    handles.push(task_a);

    // 3. Task B-n: ブロッキング・スリープ
    for i in 0..cpus + 1 {
        let task_b = tokio::task::Builder::new()
            .name(&format!("Task B-{}", i))
            .spawn(async move {
                loop {
                    println!("#Task B-{} sleeping... {:?}", i, thread::current().id());
                    thread::sleep(Duration::from_secs(5)); // ブロッキング・スリープ
                    println!("#Task B-{} woke up.", i);
                    // sleep(Duration::from_nanos(1)).await;
                }
            });
        handles.push(task_b)
    }

    future::join_all(handles).await;
}

コードの説明

  1. 論理CPU数を取得します。私の環境ではcpus=4
  2. Tokioタスク(Task A)を生成します。ノンブロッキングで1秒スリープします。
  3. Tokioタスク(Task B)を論理CPU数+1生成します。ブロッキングで5秒スリープします。CPUサイクルを消費するヘビィな処理をしてるとイメージして下さい。

tokio-consoleで見たときにわかりやすいようタスクに名前をつけます。

tokio::task::Builder::new().name("Task A").spawn(

実行してみましょう

サンプルプログラムを実行し、tokio-consoleを起動します。
あれれ、Task Aが寝たままで起きることがありませんね。


cargo run

Task B-nのBusyとIdleに注目してください。CPUリソースをがっつり掴んでいるのが分かりますね。一方でTask-AはずっとIdleなのが分かります。またPollsが1のママです。

tokio-console

修正しましょう

Task Bのコードを修正します。ノンブロッキング・スリープを入れます。これによりTask BがyieldしtokioランタイムがTask Aをpollします。

println!("#Task B-{} woke up.", i);
sleep(Duration::from_nanos(1)).await; // コメントを外す

実行してみましょう。


無事にTask Aが目覚めました!
でも少し変ですね。Task Aは1秒だけスリープする設定ですがTask Bが目覚める5秒後に目覚めています。

tokio::main

tokioはデフォルトで論理CPU数分のワーカースレッドを生成します。

macOSの物理、論理CPU数
sysctl hw.physicalcpu hw.logicalcpu
hw.physicalcpu: 2
hw.logicalcpu: 4

ワーカースレッド数をtokio::mainで変更することが可能です。

#[tokio::main(flavor = "multi_thread", worker_threads = 10)]

再度実行してみましょう。

Task Aが1秒後に目覚めました!パチパチ

Idle時間 = polls数 x 1秒スリープになってます。

ちなみにNode.jsのようにシングルスレッドにすることもできます。

#[tokio::main(flavor = "current_thread")]

シングルスレッドであるためタスクが逐次処理のようになります。

参考資料

tokio-console サンプルコード
チュートリアルビデオ
async, tokio
Deno Node.jsの開発者のRyan Dahlによって開発されたJSのWebサーバ(V8)。RustのTokioを使用しています。

Discussion