Closed7

Tokio (tokio::fs) と io_uring

Kota UENISHIKota UENISHI

tokio::fs::read() を追ってみる

エントリーポイント
https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/fs/mod.rs#L51

基本的には tokio::fs::* は全部この syncify() を通る。中で対応する std::fs::* を呼ぶだけ

https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/fs/read.rs#L44-L47

asyncify の中身はこれまたシンプル

https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/fs/mod.rs#L114-L126

spawn_blocking(f) の実体は tokio::fs::blocking::spawn_blocking(f) になるのでシンプル・・・なはず

そいつの宣言はここ

https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/blocking.rs#L2

なので tokio::runtime に飛ぶ

Kota UENISHIKota UENISHI

tokio::runtime::spawn_blocking の実体はここ

https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/runtime/blocking/pool.rs#L79-L86

この rt はなんぞや?が難しい。 context を探しに行く。ここでは use crate::runtime::context; となっているので見に行くと current() はすごくSimple

https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/runtime/context.rs#L10-L12

これがセットされてるのは

https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/runtime/context.rs#L57-L62

ここなんだけど、これはランタイムの起動時にセットされていて、よくある

#[tokio::main(flavor="multi_thread", worker_threads = 2)]
async fn main() {
    println!("Hello world");
}

fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            println!("Hello world");
        })
}

と等価であると tokio-macros のドキュメントにかいてある。なので .build() あたりを見に行くとあった

https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/runtime/builder.rs#L557-L568

この Handle の実体はこれ

https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/runtime/handle.rs#L16

なので、 .spawn_blocking(f) の実装はここになる

https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/runtime/handle.rs#L172-L178

_inner(func, None) だと・・・まあすぐ下にある。 runtime::blocking::task::BlockingTask(func, None) をつくって self.blocking_spawner に渡している。これはRuntime初期化時に作られていて

https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/runtime/builder.rs#L552-L554

これの実体はここで作られている

https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/runtime/blocking/pool.rs#L90-L100

実際に↑で作ったTaskがQueueに入るのはここ

https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/runtime/blocking/pool.rs#L175-L187

Queueの実体は↑↑にあるとおりMutexでラップされた VecDeque<Task> になる
ちなみにタスクが埋まってたりするとここでワーカースレッドが起動される。

Mutexについては以下の記事が詳しい。
https://mmi.hatenablog.com/entry/2019/02/13/183104

Kota UENISHIKota UENISHI

ワーカースレッドはすぐ下で起動されている。ワーカースレッドが shared.queue からポップしたタスクを実行するところは簡単なのでここでは割愛する

https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/runtime/blocking/pool.rs#L233-L242

この thread::Builderuse crate::loom::thread; で宣言されているがこれはテスト用のモックを std を切り替えるためのやつ。

https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/loom/mod.rs#L9

なので実体は std::thread になる。

https://github.com/tokio-rs/tokio/blob/tokio-1.8.5/tokio/src/loom/std/mod.rs#L96

Builder が読んでいる spawn はこれ

https://github.com/rust-lang/rust/blob/1.82.0/library/std/src/thread/mod.rs#L369-L376

いろいろラップしているがどうもここスレッドを作っているようだ

https://github.com/rust-lang/rust/blob/1.82.0/library/std/src/thread/mod.rs#L561

いろいろラップされていて難しいがこういう感じ

   Ok(JoinHandle(Ok(JoinInner{ native: crate::sys::thread::Thread::new(..), ...}))) 

で、つまり pthread

https://github.com/rust-lang/rust/blob/1.82.0/library/std/src/sys/pal/unix/thread.rs#L37-L39

JoinInnerはこう

https://github.com/rust-lang/rust/blob/1.82.0/library/std/src/thread/mod.rs#L1621-L1626

JoinHandleはこう(こんな書き方できるんやな・・・・)

https://github.com/rust-lang/rust/blob/1.82.0/library/std/src/thread/mod.rs#L1697-L1699

さてnewされたスレッドはここで pthread_create() に渡る

https://github.com/rust-lang/rust/blob/1.82.0/library/std/src/sys/pal/unix/thread.rs#L84

ここで呼ばれている thread_start(main, ...) はすぐ下に定義されている(こんな書き方できるんや・・・)

https://github.com/rust-lang/rust/blob/1.82.0/library/std/src/sys/pal/unix/thread.rs#L99-L108

ここで main....() とようやく呼ばれていることがわかる。これで晴れてスレッドが起動したことになる
なおここでヌル的ななにかをReturnしてるので pthread_join() で値を拾えるということはない。

Kota UENISHIKota UENISHI

io_uring

以上みてきたことからわかるように、 tokio::fs の実装はファイルIOに関しては非同期化していないことになる。ファイルIOを非同期化する手段として今は io_uring(7) があるので、そちらを調べる(といってもリンクを列挙するだけだけど

なぜ新しいシステムコールがLinuxに必要なのか?については作者の解説PDFが詳しい
https://kernel.dk/io_uring.pdf

CloudFlare Blog

内部実装に少し触れている(わかりやすい絵があるCloudFlareの記事)
https://blog.cloudflare.com/ja-jp/missing-manuals-io_uring-worker-pool/
↑Cloudflareのブログにリンクされている
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=2e480058ddc21ec53a10e8b41623e245e908bdbc

すごくざっくり要約するとファイルIOはbounded time で終わると期待されるのでワーカープールには行かない(マジかよ)。ネットワークIOは unbounded なのでワーカープールに行くということのようだ

(C)CloudFlare
(C) CloudFlare blog

その他リンク

https://mattermost.com/blog/iouring-and-go/

Kota UENISHIKota UENISHI

io_uring の仕組みを↑↑のCloudFlareのブログをベースに調べる

We can observe io_uring when it calls vfs_poll, the machinery behind non-blocking I/O, to monitor the sockets. If that happens, we will be hitting the io_uring:io_uring_poll_arm tracepoint. Meanwhile, the wake-ups that follow, if the polled file becomes ready for I/O, can be recorded with the io_uring:io_uring_poll_wake tracepoint embedded in io_async_wake() wake-up call.

といってるので、 vfs_poll を調べることにする

https://docs.kernel.org/filesystems/vfs.html

公式が言ってるのはこう

called by the VFS when a process wants to check if there is activity on this file and (optionally) go to sleep until there is activity. Called by the select(2) and poll(2) system calls

まあそうだよね。ということでここでは ext4 を見てみることにする

カーネルの pull が一生終わらないので適当にググる

https://events.static.linuxfound.org/sites/events/files/slides/lemoal-nvme-polling-vault-2017-final_0.pdf

うんうんそうだね・・・・という絵

日本語記事もあった

https://moriyoshi.hatenablog.com/entry/20090502/1241271126

なお(1)からリンクしているサンプルコードのGistはない(筆者に問い合わせ中)

pull できたので vfs_poll(..)

https://github.com/torvalds/linux/blob/v6.8/include/linux/poll.h#L84-L89

なおext4 の poll() は虚無だった

https://github.com/torvalds/linux/blob/v6.8/fs/ext4/file.c#L931-L949

DEFAULT_POLLMASK#define DEFAULT_POLLMASK (EPOLLIN | EPOLLOUT | EPOLLRDNORM | EPOLLWRNORM) なのでつまり全部入りということになる

Kota UENISHIKota UENISHI

io_uring code reading?

エントリーポイントからかな
Ubuntu 24.04でつかわれている 6.8 をベースに読んでみる

このスクラップは27日前にクローズされました