⚙️

rustで非同期ランタイム実装してみた

に公開

はじめに

この記事はQiita 3-shake Advent Calendar 2025 シリーズ10日目の記事です。

以前Rustのイベントに参加した時に非同期周りの話がでて、少し興味が湧いたので実装してみたというお話になります。
リポジトリはこちらです
https://github.com/sraku2159/async_runtime

はじめにRustにおける非同期処理の特徴を概説します。

Rustの非同期処理の特徴

Rustはいわゆる協調的マルチタスクと呼ばれる機構によって非同期処理を実現しています。
つまり、シグナルなどによってプリエンプトされるのではなく、async関数が自らリソースを明け渡します。また、Rustの非同期処理はJSなどのasync関数とは異なり、awaitすることでFutureの実行が進みます。
加えてRustのasync/awaitは糖衣構文になっていて、コンパイル時に全く別の関数にデシュガーされます。
具体的には以下のようなasync関数があったとします。

/// Sum two D10 rolls plus a modifier.
async fn two_d10(modifier: u32) -> u32 {
    let first_roll = roll_d10().await;
    let second_roll = roll_d10().await;
    first_roll + second_roll + modifier
}

これは以下のようにコンパイル時に変換されます。

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

/// Sum two D10 rolls plus a modifier.
fn two_d10(modifier: u32) -> TwoD10 {
    TwoD10::Init { modifier }
}

enum TwoD10 {
    // Function has not begun yet.
    Init { modifier: u32 },
    // Waiting for first `.await` to complete.
    FirstRoll { modifier: u32, fut: RollD10Future },
    // Waiting for second `.await` to complete.
    SecondRoll { modifier: u32, first_roll: u32, fut: RollD10Future },
}

impl Future for TwoD10 {
    type Output = u32;
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
        loop {
            match *self {
                TwoD10::Init { modifier } => {
                    // Create future for first dice roll.
                    let fut = roll_d10();
                    *self = TwoD10::FirstRoll { modifier, fut };
                }
                TwoD10::FirstRoll { modifier, ref mut fut } => {
                    // Poll sub-future for first dice roll.
                    if let Poll::Ready(first_roll) = fut.poll(ctx) {
                        // Create future for second roll.
                        let fut = roll_d10();
                        *self = TwoD10::SecondRoll { modifier, first_roll, fut };
                    } else {
                        return Poll::Pending;
                    }
                }
                TwoD10::SecondRoll { modifier, first_roll, ref mut fut } => {
                    // Poll sub-future for second dice roll.
                    if let Poll::Ready(second_roll) = fut.poll(ctx) {
                        return Poll::Ready(first_roll + second_roll + modifier);
                    } else {
                        return Poll::Pending;
                    }
                }
            }
        }
    }
}

https://google.github.io/comprehensive-rust/concurrency/async/state-machine.html

Rustはasync/awaitがあると後述するFutureトレイトを実装したステートマシンに変換されます。ここで、ステートの個数はasync関数内で使用したawaitの数に依存します。この状態をpollの実行ごとに保存しておいて、実行できる状態になったらそこから再開するというのを繰り返し行います。

Each await point—that is, every place where the code uses the await keyword—represents a place where control is handed back to the runtime. To make that work, Rust needs to keep track of the state involved in the async block so that the runtime can kick off some other work and then come back when it’s ready to try advancing the first one again. This is an invisible state machine, as if you’d written an enum like this to save the current state at each await point

https://doc.rust-lang.org/book/ch17-01-futures-and-syntax.html

次にRustの非同期処理を行う上で出てくる特徴的な構造体やトレイトについて説明します。

非同期周りで出てくる構造体・トレイト

Rustで非同期処理を書くうえで出てくる特徴的なものは以下になります。

  • Futureトレイト
  • Pin構造体
  • Context構造体
  • Waker構造体

ここではRustの非同期処理の概要を説明した後に個々の詳細についてお話しします。
今回は、マルチスレッド環境での非同期処理の実行を前提とします。

  1. ワーカがFutureを再実行するために必要なWakerを初期化する
  2. ワーカがWakerの参照をもつContextを初期化する
  3. ワーカがデシュガーされたFuture.pollを実行
  4. (Future.pollの中でデシュガーされたFuture.pollを呼び出す)
  5. 末端のFuture.pollPoll::PendingorPoll::Ready<T>を返す
    6. Poll::Pendingの場合、Wakerを登録or呼び出して、再スケジューリングを行うようにする

といった具合です。

これらの詳細についてお話ししようと思います。

Futureとは

先述しましたが、async/awaitはこのトレイトを実装したステートマシンに変換されます。
Futureトレイトは以下の様な実装になっています。

pub trait Future {
    type Output;

    // Required method
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

これがRustの非同期処理の実態です。
このメソッドを実行ごとに、Poll::PendingまたはPoll::Ready<T>を返します。
Poll::Pendingになっているときは、何かしらの事情(例えば、I/O関連のシステムコールを呼び出し、I/O処理待ちの状態など)で処理を中断するときに返ります。
一方、Poll::Ready<T>は非同期処理が完了し、値を返すときに使用します。

ちなみに、Poll::Ready<T>になった後にpollを呼び出すのは未定義動作となっています。

Pinとは

Rustの非同期処理はステートマシンに変更される際にその時の状態を保存しておくために、自己参照を持つ可能性があります。しかし、ステートマシンの実行は一度で終わるとも、同じスレッドに閉じているとも限りません。なので、moveが起きてしまうと、参照していた値は無効なものになってしまいます。また、Rustではこのような構造体内の参照というのは、借用チェッカーでは管理できないそうです。

The compiler only knows about references from outside an object into the object (such as the above example, or a reference to a field of an object). A reference entirely within an object would be invisible to the compiler.

https://rust-lang.github.io/async-book/part-reference/pinning.html

そのため、非同期処理実行中はその状態をもつ構造体はmoveされない保証を作る必要がありました。
このような背景からこの構造体が設けられました。これは実行時には存在せず、コンパイラにmoveしないように伝えるためのものです。

Contextとは

ドキュメントには以下の様に書かれています。

The context of an asynchronous task.
Currently, Context only serves to provide access to a &Waker which can be used to wake the current task.

非同期タスクのコンテキストである。現在、Contextは現在のタスクを呼び起こすために使用される&Wakerへのアクセスを提供するだけである。

https://doc.rust-lang.org/std/task/struct.Context.html

Contextの定義は以下のようになっています。

struct Context<'a> {
    waker: &'a Waker,
    local_waker: &'a LocalWaker,
    ext: AssertUnwindSafe<ExtData<'a>>,
    // Ensure we future-proof against variance changes by forcing
    // the lifetime to be invariant (argument-position lifetimes
    // are contravariant while return-position lifetimes are
    // covariant).
    _marker: PhantomData<fn(&'a ()) -> &'a ()>,
    // Ensure `Context` is `!Send` and `!Sync` in order to allow
    // for future `!Send` and / or `!Sync` fields.
    _marker2: PhantomData<*mut ()>,
}

Contextは後述するWakerの参照を持っており、Future::pollはそこからWakerを受け取ってタスクを再度呼び起こすために使用します。
ただ、pub const fn ext(&mut self) -> &mut (dyn Any + 'static)こんなAPIがあることからも分かる通り、今後はContextを介してタスクに様々なデータが渡されるようになるとかないとか?

Wakerとは

こちらの責務は、Poll::PendingになったFutureを再実行するための準備をすることです。

構造体の中身としては以下のようになっています。

pub struct Waker {
    waker: RawWaker,
}

pub struct RawWaker {
    /// A data pointer, which can be used to store arbitrary data as required
    /// by the executor. This could be e.g. a type-erased pointer to an `Arc`
    /// that is associated with the task.
    /// The value of this field gets passed to all functions that are part of
    /// the vtable as the first parameter.
    data: *const (),
    /// Virtual function pointer table that customizes the behavior of this waker.
    vtable: &'static RawWakerVTable,
}

pub struct RawWakerVTable {
    clone: unsafe fn(*const ()) -> RawWaker,
    wake: unsafe fn(*const ()),
    wake_by_ref: unsafe fn(*const ()),
    drop: unsafe fn(*const ()),
}

Wakerの実態としては、RawWakerというものであり、さらにそのRawWakerというのは内部で仮想テーブルを持っています。
非同期ランタイムを作成する場合はこのインターフェースに合わせて関数を実装し、RawWakerVTableに実装した関数を渡すか、後述するWakeトレイトを実装することによって、Wakerの挙動を制御することができます。

Wakeトレイト

先ほどWakerの挙動の制御の別の方法としてwakeトレイトを実装するということを書きました。
Wakeトレイトの中身以下のようになっています。

pub trait Wake {
    /// Wake this task.
    #[stable(feature = "wake_trait", since = "1.51.0")]
    fn wake(self: Arc<Self>);

    /// Wake this task without consuming the waker.
    ///
    /// If an executor supports a cheaper way to wake without consuming the
    /// waker, it should override this method. By default, it clones the
    /// [`Arc`] and calls [`wake`] on the clone.
    ///
    /// [`wake`]: Wake::wake
    #[stable(feature = "wake_trait", since = "1.51.0")]
    fn wake_by_ref(self: &Arc<Self>) {
        self.clone().wake();
    }
}

これは

impl<W> From<Arc<W>> for Waker
where
    W: Wake + Send + Sync + 'static,

を呼び出すことで、先述したWakerを作成することができunsafeを避けて実装することができます。しかし、これはメモリのアロケーションが必要になり、組み込み等のメモリアロケータが存在しないような環境では動きません。

RawWakers are unsafe to use. Implementing the Wake trait is a safe alternative that requires memory allocation.

https://doc.rust-lang.org/stable/core/task/struct.RawWaker.html

また、こちらのトレイトはレシーバがArc<Self>になっています。これは同じWakerを持ったタスクがマルチスレッドで実行される場合があるためです。

実装方針

ここからは自分がどのように実装したかになりますので、興味のある方はお付き合いいただけると幸いです!!
構成としては大きく分けて4つあります。

  • Task
  • Scheduler
  • Waker
  • Worker

今回は非同期I/Oは実装していません。
実装するとなると専用のスレッドを用意して、epollやらkselectやらでカーネルからのI/Oイベントを受け取り、登録されたWakerを呼び出して再スケジューリングするのかなとか思っています。

大きな処理の流れとしては、

  1. 非同期処理を実行するためのリソースの作成
    • 実行用のWorkerの起動
    • Schedulerの起動
  2. SchedulerFutureまたはIntoFutureを受け取り、必要な情報を付け加え保存
  3. SchedulerTaskを取り出してWorkerに渡す
  4. WorkerTaskを実行し、pollの結果を待つ
  5. Taskpollの結果がPoll::Pendingであれば、準備が出来次第Wakerを介してTaskSchedulerに登録する

Task

はじめにTaskという構造体についてお話しします。
この構造体の目的は、受け取ったFutureにランタイム側で必要な付加的な情報をくっつけることにあります。

続いて、このTask構造体の内部で使用されているSenderReceiverという構造体についてお話しします。

Receiver

実態としては、std::sync::mpsc::Senderによって送信された値をstd::sync::mpsc::Receiverから受け取るためのFutureです。
tokioではJoinHandleという名前でこのような方針が採用されているそうです。

Sender

こちらの責務としては後述するReceiverに値を送信し、Receiverwakeするということです。

Scheduler

こちらは以下のようなトレイトになっています。
https://github.com/sraku2159/async_runtime/blob/main/src/engine/schedule.rs#L8-L48
実際に実装したスケジューリング方式としては

  • FIFO
  • デッドライン方式
    の2つです。
    Rustは協調的マルチタスクなので、特にリソースの割り当てや管理などは必要なく、ただどのタスクをどのワーカに渡すかだけを考えれば良かったので、スケジューラの実装はかなり容易でした。

実際にTaskを取り出す操作としてはnotifyの部分で行っており、これは後述するWokerと強調して動作することを前提としています。後述しますが、ここではWorkerが返信用のSenderを送信するという工夫をしています。

tokioでは、Work-Stealingという方式で、各々のワーカにローカルなキューを持ち、原則そこからタスクをとって実行するが、ワーカがアイドル状態になったら他のキューからタスクを奪って実行するというようなスケジューリング方式をとっているようでした。
https://tokio.rs/blog/2019-10-scheduler

Waker

次にWakerです。前述した通り、Wakerには実装方法が2つありますが、今回はWakeトレイトを実装する方針を取りました。
構造体の中身はこのようになっております。
https://github.com/sraku2159/async_runtime/blob/main/src/engine/waker.rs#L7-L13
Arc<Mutex<T>>になっている理由は複数のArc<Waker>からのアクセスがあるためです。

Wakeトレイトの実装部分に関しては以下のようになっており、Task の状態を見てScheduler に渡すかどうかを決めています。これは先述した未定義動作を考慮してこのようになりました。
https://github.com/sraku2159/async_runtime/blob/main/src/engine/waker.rs#L27-L43

Worker

最後にWorkerです。
この構造体の責務はFuture.pollを実行することだけとしています。
また、これはスケジューラと協調して動作します。
具体的には以下のように実装しました。
https://github.com/sraku2159/async_runtime/blob/main/src/engine/worker.rs#L41-L46
ここでは、Schedulerに自信の情報と返信用の封筒を渡し、Taskをもらう準備をします。Scheduler側では準備が出来次第Task渡します。
https://qiita.com/namn1125/items/46f03de967d0de46737e
WorkerTaskが渡されるまでは、リソースを占有したくないので、一度thread::parkを実行し、アイドル状態にしています。
Schedulerは準備ができると、Taskを渡してt.unparkを実行します。
ここでthread::Thead:::unparkが先に呼び出される可能性があるのですが、Rustではthread::park, thread::Thread::unparkと言うAPIが存在しており、これはトークンベースで実装されています。つまり先にunparkが呼び出されても、(期待していないthread::parkが呼び出されていない限り)問題なく動作します。

最後に

Rustは協調的マルチタスクという方針を取っていたり、非同期に関するAPIやら構造体やらトレイトやらを用意しているので、少ない記述量で実装ができました。

やっぱり車輪の再発明は楽しいですね!色々な変化が目まぐるしく起きる時代ですが、何にもとらわれず好きなことをやるというのは充実した時間を過ごせるなあとふと思ったり。

最後まで見ていただいてありがとうございました!

参考

https://rust-lang.github.io/async-book/
https://doc.rust-lang.org/std/task/index.html
https://doc.rust-lang.org/std/future/trait.Future.html
https://doc.rust-lang.org/std/pin/struct.Pin.html

Discussion