rustで非同期ランタイム実装してみた
はじめに
この記事はQiita 3-shake Advent Calendar 2025 シリーズ10日目の記事です。
以前Rustのイベントに参加した時に非同期周りの話がでて、少し興味が湧いたので実装してみたというお話になります。
リポジトリはこちらです
はじめにRustにおける非同期処理の特徴を概説します。
Rustの非同期処理の特徴
Rustはいわゆる協調的マルチタスクと呼ばれる機構によって非同期処理を実現しています。
つまり、シグナルなどによってプリエンプトされるのではなく、async関数が自らリソースを明け渡します。また、Rustの非同期処理はJSなどのasync関数とは異なり、awaitすることでFutureの実行が進みます。
加えてRustのasync/awaitは糖衣構文になっていて、コンパイル時に全く別の関数にデシュガーされます。
具体的には以下のようなasync関数があったとします。
/// Sum two D10 rolls plus a modifier.
async fn two_d10(modifier: u32) -> u32 {
let first_roll = roll_d10().await;
let second_roll = roll_d10().await;
first_roll + second_roll + modifier
}
これは以下のようにコンパイル時に変換されます。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Sum two D10 rolls plus a modifier.
fn two_d10(modifier: u32) -> TwoD10 {
TwoD10::Init { modifier }
}
enum TwoD10 {
// Function has not begun yet.
Init { modifier: u32 },
// Waiting for first `.await` to complete.
FirstRoll { modifier: u32, fut: RollD10Future },
// Waiting for second `.await` to complete.
SecondRoll { modifier: u32, first_roll: u32, fut: RollD10Future },
}
impl Future for TwoD10 {
type Output = u32;
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
loop {
match *self {
TwoD10::Init { modifier } => {
// Create future for first dice roll.
let fut = roll_d10();
*self = TwoD10::FirstRoll { modifier, fut };
}
TwoD10::FirstRoll { modifier, ref mut fut } => {
// Poll sub-future for first dice roll.
if let Poll::Ready(first_roll) = fut.poll(ctx) {
// Create future for second roll.
let fut = roll_d10();
*self = TwoD10::SecondRoll { modifier, first_roll, fut };
} else {
return Poll::Pending;
}
}
TwoD10::SecondRoll { modifier, first_roll, ref mut fut } => {
// Poll sub-future for second dice roll.
if let Poll::Ready(second_roll) = fut.poll(ctx) {
return Poll::Ready(first_roll + second_roll + modifier);
} else {
return Poll::Pending;
}
}
}
}
}
}
Rustはasync/awaitがあると後述するFutureトレイトを実装したステートマシンに変換されます。ここで、ステートの個数はasync関数内で使用したawaitの数に依存します。この状態をpollの実行ごとに保存しておいて、実行できる状態になったらそこから再開するというのを繰り返し行います。
Each await point—that is, every place where the code uses the await keyword—represents a place where control is handed back to the runtime. To make that work, Rust needs to keep track of the state involved in the async block so that the runtime can kick off some other work and then come back when it’s ready to try advancing the first one again. This is an invisible state machine, as if you’d written an enum like this to save the current state at each await point
次にRustの非同期処理を行う上で出てくる特徴的な構造体やトレイトについて説明します。
非同期周りで出てくる構造体・トレイト
Rustで非同期処理を書くうえで出てくる特徴的なものは以下になります。
-
Futureトレイト -
Pin構造体 -
Context構造体 -
Waker構造体
ここではRustの非同期処理の概要を説明した後に個々の詳細についてお話しします。
今回は、マルチスレッド環境での非同期処理の実行を前提とします。
- ワーカが
Futureを再実行するために必要なWakerを初期化する - ワーカが
Wakerの参照をもつContextを初期化する - ワーカがデシュガーされた
Future.pollを実行 - (
Future.pollの中でデシュガーされたFuture.pollを呼び出す) - 末端の
Future.pollがPoll::PendingorPoll::Ready<T>を返す
6.Poll::Pendingの場合、Wakerを登録or呼び出して、再スケジューリングを行うようにする
といった具合です。
これらの詳細についてお話ししようと思います。
Futureとは
先述しましたが、async/awaitはこのトレイトを実装したステートマシンに変換されます。
Futureトレイトは以下の様な実装になっています。
pub trait Future {
type Output;
// Required method
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
これがRustの非同期処理の実態です。
このメソッドを実行ごとに、Poll::PendingまたはPoll::Ready<T>を返します。
Poll::Pendingになっているときは、何かしらの事情(例えば、I/O関連のシステムコールを呼び出し、I/O処理待ちの状態など)で処理を中断するときに返ります。
一方、Poll::Ready<T>は非同期処理が完了し、値を返すときに使用します。
ちなみに、Poll::Ready<T>になった後にpollを呼び出すのは未定義動作となっています。
Pinとは
Rustの非同期処理はステートマシンに変更される際にその時の状態を保存しておくために、自己参照を持つ可能性があります。しかし、ステートマシンの実行は一度で終わるとも、同じスレッドに閉じているとも限りません。なので、moveが起きてしまうと、参照していた値は無効なものになってしまいます。また、Rustではこのような構造体内の参照というのは、借用チェッカーでは管理できないそうです。
The compiler only knows about references from outside an object into the object (such as the above example, or a reference to a field of an object). A reference entirely within an object would be invisible to the compiler.
そのため、非同期処理実行中はその状態をもつ構造体はmoveされない保証を作る必要がありました。
このような背景からこの構造体が設けられました。これは実行時には存在せず、コンパイラにmoveしないように伝えるためのものです。
Contextとは
ドキュメントには以下の様に書かれています。
The context of an asynchronous task.
Currently, Context only serves to provide access to a &Waker which can be used to wake the current task.
非同期タスクのコンテキストである。現在、Contextは現在のタスクを呼び起こすために使用される&Wakerへのアクセスを提供するだけである。
Contextの定義は以下のようになっています。
struct Context<'a> {
waker: &'a Waker,
local_waker: &'a LocalWaker,
ext: AssertUnwindSafe<ExtData<'a>>,
// Ensure we future-proof against variance changes by forcing
// the lifetime to be invariant (argument-position lifetimes
// are contravariant while return-position lifetimes are
// covariant).
_marker: PhantomData<fn(&'a ()) -> &'a ()>,
// Ensure `Context` is `!Send` and `!Sync` in order to allow
// for future `!Send` and / or `!Sync` fields.
_marker2: PhantomData<*mut ()>,
}
Contextは後述するWakerの参照を持っており、Future::pollはそこからWakerを受け取ってタスクを再度呼び起こすために使用します。
ただ、pub const fn ext(&mut self) -> &mut (dyn Any + 'static)こんなAPIがあることからも分かる通り、今後はContextを介してタスクに様々なデータが渡されるようになるとかないとか?
Wakerとは
こちらの責務は、Poll::PendingになったFutureを再実行するための準備をすることです。
構造体の中身としては以下のようになっています。
pub struct Waker {
waker: RawWaker,
}
pub struct RawWaker {
/// A data pointer, which can be used to store arbitrary data as required
/// by the executor. This could be e.g. a type-erased pointer to an `Arc`
/// that is associated with the task.
/// The value of this field gets passed to all functions that are part of
/// the vtable as the first parameter.
data: *const (),
/// Virtual function pointer table that customizes the behavior of this waker.
vtable: &'static RawWakerVTable,
}
pub struct RawWakerVTable {
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
wake_by_ref: unsafe fn(*const ()),
drop: unsafe fn(*const ()),
}
Wakerの実態としては、RawWakerというものであり、さらにそのRawWakerというのは内部で仮想テーブルを持っています。
非同期ランタイムを作成する場合はこのインターフェースに合わせて関数を実装し、RawWakerVTableに実装した関数を渡すか、後述するWakeトレイトを実装することによって、Wakerの挙動を制御することができます。
Wakeトレイト
先ほどWakerの挙動の制御の別の方法としてwakeトレイトを実装するということを書きました。
Wakeトレイトの中身以下のようになっています。
pub trait Wake {
/// Wake this task.
#[stable(feature = "wake_trait", since = "1.51.0")]
fn wake(self: Arc<Self>);
/// Wake this task without consuming the waker.
///
/// If an executor supports a cheaper way to wake without consuming the
/// waker, it should override this method. By default, it clones the
/// [`Arc`] and calls [`wake`] on the clone.
///
/// [`wake`]: Wake::wake
#[stable(feature = "wake_trait", since = "1.51.0")]
fn wake_by_ref(self: &Arc<Self>) {
self.clone().wake();
}
}
これは
impl<W> From<Arc<W>> for Waker
where
W: Wake + Send + Sync + 'static,
を呼び出すことで、先述したWakerを作成することができunsafeを避けて実装することができます。しかし、これはメモリのアロケーションが必要になり、組み込み等のメモリアロケータが存在しないような環境では動きません。
RawWakers are unsafe to use. Implementing the Wake trait is a safe alternative that requires memory allocation.
また、こちらのトレイトはレシーバがArc<Self>になっています。これは同じWakerを持ったタスクがマルチスレッドで実行される場合があるためです。
実装方針
ここからは自分がどのように実装したかになりますので、興味のある方はお付き合いいただけると幸いです!!
構成としては大きく分けて4つあります。
TaskSchedulerWakerWorker
今回は非同期I/Oは実装していません。
実装するとなると専用のスレッドを用意して、epollやらkselectやらでカーネルからのI/Oイベントを受け取り、登録されたWakerを呼び出して再スケジューリングするのかなとか思っています。
大きな処理の流れとしては、
- 非同期処理を実行するためのリソースの作成
- 実行用の
Workerの起動 -
Schedulerの起動
- 実行用の
-
SchedulerがFutureまたはIntoFutureを受け取り、必要な情報を付け加え保存 -
SchedulerがTaskを取り出してWorkerに渡す -
WorkerはTaskを実行し、pollの結果を待つ -
Taskはpollの結果がPoll::Pendingであれば、準備が出来次第Wakerを介してTaskをSchedulerに登録する
Task
はじめにTaskという構造体についてお話しします。
この構造体の目的は、受け取ったFutureにランタイム側で必要な付加的な情報をくっつけることにあります。
続いて、このTask構造体の内部で使用されているSenderとReceiverという構造体についてお話しします。
Receiver
実態としては、std::sync::mpsc::Senderによって送信された値をstd::sync::mpsc::Receiverから受け取るためのFutureです。
tokioではJoinHandleという名前でこのような方針が採用されているそうです。
Sender
こちらの責務としては後述するReceiverに値を送信し、Receiverをwakeするということです。
Scheduler
こちらは以下のようなトレイトになっています。 実際に実装したスケジューリング方式としては
- FIFO
- デッドライン方式
の2つです。
Rustは協調的マルチタスクなので、特にリソースの割り当てや管理などは必要なく、ただどのタスクをどのワーカに渡すかだけを考えれば良かったので、スケジューラの実装はかなり容易でした。
実際にTaskを取り出す操作としてはnotifyの部分で行っており、これは後述するWokerと強調して動作することを前提としています。後述しますが、ここではWorkerが返信用のSenderを送信するという工夫をしています。
tokioでは、Work-Stealingという方式で、各々のワーカにローカルなキューを持ち、原則そこからタスクをとって実行するが、ワーカがアイドル状態になったら他のキューからタスクを奪って実行するというようなスケジューリング方式をとっているようでした。
Waker
次にWakerです。前述した通り、Wakerには実装方法が2つありますが、今回はWakeトレイトを実装する方針を取りました。
構造体の中身はこのようになっております。
Arc<Mutex<T>>になっている理由は複数のArc<Waker>からのアクセスがあるためです。
Wakeトレイトの実装部分に関しては以下のようになっており、Task の状態を見てScheduler に渡すかどうかを決めています。これは先述した未定義動作を考慮してこのようになりました。
Worker
最後にWorkerです。
この構造体の責務はFuture.pollを実行することだけとしています。
また、これはスケジューラと協調して動作します。
具体的には以下のように実装しました。
ここでは、Schedulerに自信の情報と返信用の封筒を渡し、Taskをもらう準備をします。Scheduler側では準備が出来次第Task渡します。
WorkerはTaskが渡されるまでは、リソースを占有したくないので、一度thread::parkを実行し、アイドル状態にしています。
Schedulerは準備ができると、Taskを渡してt.unparkを実行します。
ここでthread::Thead:::unparkが先に呼び出される可能性があるのですが、Rustではthread::park, thread::Thread::unparkと言うAPIが存在しており、これはトークンベースで実装されています。つまり先にunparkが呼び出されても、(期待していないthread::parkが呼び出されていない限り)問題なく動作します。
最後に
Rustは協調的マルチタスクという方針を取っていたり、非同期に関するAPIやら構造体やらトレイトやらを用意しているので、少ない記述量で実装ができました。
やっぱり車輪の再発明は楽しいですね!色々な変化が目まぐるしく起きる時代ですが、何にもとらわれず好きなことをやるというのは充実した時間を過ごせるなあとふと思ったり。
最後まで見ていただいてありがとうございました!
参考
Discussion