Chapter 09

Async をさらに掘り下げる

magurotuna
magurotuna
2021.02.24に更新

原文: https://tokio.rs/tokio/tutorial/async

現時点で、非同期 Rust と Tokio についてのまずまず包括的なツアーは完了しました。ここからは、Rust の非同期ランタイムモデルについてさらに深堀りしていきます。このチュートリアルの最初の方で、Rust は非同期に関して独特なアプローチをとっていることをほのめかしました。それがどのようなことを意味するのか、ここで説明していきます。

Futures

ちょっとした復習として、とても基本的な非同期関数を取り上げてみましょう。これはここまでのチュートリアルで学んできたものとそう変わりはないです。

use tokio::net::TcpStream;

async fn my_async_fn() {
    println!("hello from async");
    let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap();
    println!("async TCP operation complete");
}

この関数を呼び出すとなんらかの値が返ってきます。その値に .await をつけるのでした。

#[tokio::main]
async fn main() {
    let what_is_this = my_async_fn();
    // ここではまだ何も出力されない

    what_is_this.await;
    // `.await` のタイミングで文字列の出力、ソケットの接続確立とクローズが行われる
}

my_async_fn が返す値は "future" です。"future" は標準ライブラリの std::future::Future トレイトを実装する値のことで、進行途中の非同期的な計算を含んでいます。

std::future::Future トレイトの定義は以下のようになっています:

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context)
        -> Poll<Self::Output>;
}

関連型 として登場している Output は、 "future" が完了したときに生成される値の型を表します。Pin 型は、Rust がいかにして async 関数の中での借用をサポートすることが可能なのか、ということに関わっています。詳細は 標準ライブラリのドキュメント を参照してください。

他の言語における "future" の実装のされ方とは異なり、Rust の "future" はバックグラウンドで実行されている計算のことを表してはいません。そうではなく、Rust の "future" は、計算そのもの なのです。"future" の所有者は、"future" に対してポーリングを行うことによって、計算を進める責任を負っています。これは Future::poll を呼び出すことによって行われます。

Future を実装する

シンプルな "future" を実装してみましょう。この "future" は、以下のように動作します:

  1. ある時間、待機する
  2. 標準出力にあるテキストを出力する
  3. 文字列を返す
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // 今はこの行は無視してください
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
}

Future としての非同期関数

main 関数の中で、"future" インスタンスを作り、それに対して .await を呼んでいます。async 関数の中では Future トレイトを実装している任意の値に対して .await を呼ぶことができます。結果的に、async 関数を呼び出すと、Future を実装した匿名型が返ってくるということになります。async fn main() の場合は、生成される "future" はおよそ以下の通りです:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

enum MainFuture {
    // 初期化状態。まだポーリングされていない
    State0,
    // `Dalay`, つまり `future.await` の行で待っている状態
    State1(Delay),
    // "future" が完了した状態
    Terminated,
}

impl Future for MainFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<()>
    {
        use MainFuture::*;

        loop {
            match *self {
                State0 => {
                    let when = Instant::now() +
                        Duration::from_millis(10);
                    let future = Delay { when };
                    *self = State1(future);
                }
                State1(ref mut my_future) => {
                    match Pin::new(my_future).poll(cx) {
                        Poll::Ready(out) => {
                            assert_eq!(out, "done");
                            *self = Terminated;
                            return Poll::Ready(());
                        }
                        Poll::Pending => {
                            return Poll::Pending;
                        }
                    }
                }
                Terminated => {
                    panic!("future polled after completion")
                }
            }
        }
    }
}

Rust の "future" は ステートマシン です。ここで、MainFuture は "future" がとりうる状態の enum で表されています。この "future" は State0 という状態から開始します。poll が呼び出されると、"future" は内部状態を可能な限り進めようとします。"future" が完了可能になったら、非同期計算の結果を含んだ Poll::Ready が返されます。

もし "future" がまだ完了することが できない のであればどうなるかを考えてみます。このようなケースが発生するのは、待ち受けているリソースがまだ ready になっていない、というのが一般的です。このときは Poll::Pending が返されます。Poll::Pending が返ってきたら、呼び出し側にとってそれは 「"future" があとで完了すること」「poll をあとでまた呼び出さなければならないということ」を意味します。

また、"future" は別の "future" と合成される、ということも確認できます。外側の "future" に対して poll を呼び出すと、結果として内側の "future" の poll を呼び出すことになります。

Executors

Rust の非同期関数は "future" を返します。"future" の内部状態を進めるためには poll が呼び出される必要があります。"future" は別の "future" を合成されます。では、もっとも外側の "future" の poll は、誰が呼び出すのでしょうか?

非同期関数を実行するためには、tokio::spawn に渡すか、main 関数に #[tokio::main] と付与する必要があったことを覚えているでしょうか。これらがなぜ必要だったのかというと、もっとも外側の "future" を Tokio の実行器 (Executor) に渡すためだったのです。実行器はもっとも外側の "future" に対して Future::poll を呼び出す役割を担っていて、非同期計算を完了に向けて推進させるのです。

Mini Tokio

これらがどのように噛み合うのかを深く理解するため、自分たち自身で最小の Tokio を実装してみましょう!コードの全体は こちら で見ることができます。

use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task;

fn main() {
    let mut mini_tokio = MiniTokio::new();

    mini_tokio.spawn(async {
        let when = Instant::now() + Duration::from_millis(10);
        let future = Delay { when };

        let out = future.await;
        assert_eq!(out, "done");
    });

    mini_tokio.run();
}

struct MiniTokio {
    tasks: VecDeque<Task>,
}

type Task = Pin<Box<dyn Future<Output = ()> + Send>>;

impl MiniTokio {
    fn new() -> MiniTokio {
        MiniTokio {
            tasks: VecDeque::new(),
        }
    }
    
    /// mini-tokio のインスタンスに "future" を渡す
    fn spawn<F>(&mut self, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        self.tasks.push_back(Box::pin(future));
    }
    
    fn run(&mut self) {
        let waker = task::noop_waker();
        let mut cx = Context::from_waker(&waker);
        
        while let Some(mut task) = self.tasks.pop_front() {
            if task.as_mut().poll(&mut cx).is_pending() {
                self.tasks.push_back(task);
            }
        }
    }
}

これは非同期ブロックを実行します。指定した待機時間で Delay インスタンスが生成され、待ち受けられます。しかし、上記の実装は大きな 欠陥 を抱えています。我々の実行器は、休むことがないのです。実行器は、絶えず すべての spawn された "future" をループで回し、ポーリングしています。ほとんどの場合、"future" はまだ次の状態へと進むための準備ができていないために Poll::Pending が返ってくるでしょう。プロセスは CPU を食い尽くしますし、一般的にあまり効率的なやり方とは言えません。

理想的には、"future" が次の状態に進めるときにのみ、ポーリングをしてもらいたいです。次の状態に進めるというのは、タスクをブロックしているリソースが、リクエストされた操作を実行するための準備が整った状況のことです。例えば、タスクが TCP ソケットからデータを読み取りたいとすると、TCP ソケットがデータを受信するまではポーリングをしないでもらいたいです。我々のケースでは、Instant 時間が経過するまでタスクはブロックされています。mini-tokio は、指定した時間が経過するまではポーリングを行わない、という挙動を示すのが理想的と言えるでしょう。

これを実現するため、リソースがポーリングされて、まだそのリソースの準備が整っていなかったら、リソース側から「準備完了になりました」という通知を後ほど送る、という仕組みを導入します。

Wakers

欠けていたピースは、"waker" です。これは、リソースからそれを待ち受けているタスクに対して、リソースの準備が整って、続きの処理を行うことができるようになった、ということを通知することが可能になるシステムです。

Future::poll の定義をもう一度確認してみましょう:

fn poll(self: Pin<&mut Self>, cx: &mut Context)
    -> Poll<Self::Output>;

Contextwaker() というメソッドをもっています。このメソッドは、現在のタスクに紐づく Waker を返却します。この Wakerwake() というメソッドをもっています。このメソッドを呼ぶと、続きの処理を行うため紐付いているタスクを実行予約するよう、実行器へと合図が送られます。ポーリングによって処理を次に進めることができる準備が整った、ということを実行器へと通知できるようになったときに、リソースは wake() を呼び出します。

Delay を更新する

Delay を更新して、"waker" を利用するように書き換えましょう:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // 現在のタスクに紐づく "waker" ハンドルを取得
            let waker = cx.waker().clone();
            let when = self.when;

            // タイマースレッドを spawn
            thread::spawn(move || {
                let now = Instant::now();

                if now < when {
                    thread::sleep(when - now);
                }

                waker.wake();
            });

            Poll::Pending
        }
    }
}

こうすると、指定した時間が経過したら、タスクへと通知が行き、実行器はそのタスクを確実に実行予約することができます。次のステップは mini-tokio を改良して wake 通知を待ち受けるようにすることです。

Delay の実装にはまだ少し問題が残っています。後ほど直します。

"future" が Poll::Pending を返すときは、どこかで "waker" に対し 確実に 合図を送らなければなりません。これを忘れると、タスクが永遠に完了しないという結果につながってしまいます。

Poll::Pending を返したあとにタスクを wake し忘れるというのは、バグのよくある原因です。

Delay の最初の実装を思い出してください。Future を実装している箇所はこのようになっていました:

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // 今はこの行は無視してください
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

Poll::Pending を返す前に cx.waker().wake_by_ref() を呼んでいます。これは、"future" の契約を満たすためです。Poll::Pending を返すことによって、"waker" に合図を送ることに責任を負うことになります。この時点ではまだタイマースレッドを実装していなかったので、インラインで "waker" へと合図を送りました。こうすると、"future" が即座に実行予約され、実行されますが、おそらくまだ完了状態には至っていないでしょう。

"waker" に対して必要以上に合図を送ることが可能である、ということに注意してください。上記のケースでいうと、まだ処理を続ける準備が整っていないにも関わらず、"waker" へと合図を送っています。これは、CPU の命令サイクルを無駄に消費することを除けば、問題はありません。しかし、上記の実装例では、ビジーループにつながります。

Mini Tokio を更新する

次に、mini-tokio が "waker" の通知を受け取ることができるようにアップデートします。タスクが "wake" されたときに限って実行器がタスクを実行するようにしたいです。そのため、mini-tokio は独自の "waker" を提供します。"waker" が呼び起こされると、関連するタスクが実行キューに詰められます。mini-redis は、"future" をポーリングするときに、この "waker" を渡します。

アップデート後の mini-tokio は予約されたタスクを蓄えるためにチャネルを使います。チャネルを使うことで、任意のスレッドからタスクを実行キューに詰めることができます。"waker" は Send かつ Sync でなければなりません。標準ライブラリのチャネルは Sync を実装していないので、代わりに crossbeam クレートのチャネルを利用します。

SendSync トレイトは Rust が提供する並行性に関係する「マーカートレイト」です。別のスレッドへと 送る ことができる型は Send です。ほとんどの型は Send ですが、Rc のような例外もあります。不変参照を通して 並行に アクセスされても問題ない型は Sync です。Send だが Sync ではない、という型も存在し得ます。例えば Cell は良い例で、この型は不変参照を通して値が変更されうるので、並行にアクセスされることは安全ではありません。

詳細は、The Book の関連するチャプター を参照してください。

チャネルを利用するため、Cargo.toml に以下の依存を追加します。

crossbeam = "0.8"

それから、MiniTokio 構造体を以下のように書き換えます。

use crossbeam::channel;
use std::sync::Arc;

struct MiniTokio {
    scheduled: channel::Receiver<Arc<Task>>,
    sender: channel::Sender<Arc<Task>>,
}

struct Task {
    // あとで埋める
}

"waker" は Sync であり、かつクローン可能です。wake が呼び出されたら、そのタスクを確実に実行予約しなければなりません。これを実装するためにチャネルがあります。"waker" の wake() が呼ばれたら、そのタスクをチャネルの送信側にプッシュします。Task 構造体が wake に関するロジックを実装します。そのために、spawn された "future" と、チャネルの送信側をもつ必要があります。

use std::sync::{Arc, Mutex};

struct Task {
    // `Task` が `Sync` であるようにするため、`Mutex` を利用します。
    // 任意のタイミングで、`future` にアクセスするスレッドがただ1つであることが保証されます。
    // `Mutex` は正しい実装のために必須であるわけではありません。
    // 実際、本物の Tokio はここで mutex を利用せず、より多くの行数を費やして処理しています
    // (チュートリアルには収まらない量です)
    future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
    executor: channel::Sender<Arc<Task>>,
}

impl Task {
    fn schedule(self: &Arc<Self>) {
        self.executor.send(self.clone());
    }
}

タスクをスケジューリングするため、Arc がクローンされてチャネルを通して送られます。ここで、std::task::Waker を使って我々の schedule 関数をフックする必要があります。これを行うための手動の vtable 構築を使った低レベル API が標準ライブラリによって提供されています。この戦略は実装者に最大限の柔軟性をもたらしますが、大量の unsafe なボイラープレートコードが必要になります。RawWakerVTable を直接使うのではなく、futures クレートが提供している ArcWake ユーティリティを利用しましょう。これを使うと、Task 構造体を "waker" として公開するためにシンプルなトレイトを実装することができるようになります。

futures クレートを利用するため、以下の依存を Cargo.toml に追加してください。

futures = "0.3"

そして、futures::task::ArcWake を実装します。

use futures::task::{self, ArcWake};
use std::sync::Arc;
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        arc_self.schedule();
    }
}

タイマースレッドが waker.wake() を呼ぶと、タスクがチャネルへとプッシュされます。次に、MiniTokio::run() 関数の中で、タスクを受け取って実行するという実装を行いましょう。

impl MiniTokio {
    fn run(&self) {
        while let Ok(task) = self.scheduled.recv() {
            task.poll();
        }
    }

    /// mini-tokio インスタンスを初期化する
    fn new() -> MiniTokio {
        let (sender, scheduled) = channel::unbounded();

        MiniTokio { scheduled, sender }
    }

    /// mini-tokio のインスタンスに "future" を渡す
    ///
    /// 与えられる "future" は `Task` によってラップされ、`スケジュール` キューにプッシュされる。
    /// `run` が呼び出されたときに "future" が実行される
    fn spawn<F>(&self, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        Task::spawn(future, &self.sender);
    }
}

impl Task {
    fn poll(self: Arc<Self>) {
        // `Task` インスタンスから "waker" を生成する
        // 上で実装した `ArcWake` を利用する
        let waker = task::waker(self.clone());
        let mut cx = Context::from_waker(&waker);

        // 別のスレッドは "future" のロックを取ろうとしていない
        let mut future = self.future.try_lock().unwrap();

        // "future" をポーリングする
        let _ = future.as_mut().poll(&mut cx);
    }

    // 与えられた "future" に関する新しいタスクを spawn する
    //
    // "future" を含むタスクを新しく作り、`sender` にプッシュする
    // チャネルの受信側はタスクを取得して実行する
    fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        let task = Arc::new(Task {
            future: Mutex::new(Box::pin(future)),
            executor: sender.clone(),
        });

        let _ = sender.send(task);
    }

}

ここでは複数のことが起こっています。まず、MiniTokio::run() が実装されています。この関数はループを回してチャネルからスケジュールされたタスクを受け取ります。タスクがチャネルにプッシュされるのは "wake" されたときなので、それらのタスクが実行されるときは何かしらの進捗を生むことができる状態になっているはずです。

加えて、MiniTokio::new()MiniTokio::spawn()VecDeque ではなくチャネルを使うように調整されています。これらの関数は、新しいタスクが spawn されたら、チャネルの送信側をクローンしたものを受け取ります。タスクは、これを利用することで、ランタイムに自身をスケジューリングすることができます。

Task::poll() 関数は futures クレートの ArcWake ユーティリティを利用して "waker" を生成します。"waker" は task::Context を生成するために使われ、task::Contextpoll に渡されます。

まとめ

Rust の非同期がどのように動いているのかについての例を、端から端まで見てきました。Rust の async/await 機能はトレイトに裏付けられています。これによって、Tokio のようなサードパーティクレートであっても、実行の詳細を提供することが可能になっているのです。

  • Rust の非同期処理は "lazy" であり、呼び出し側がポーリングを行う必要がある
  • "future" と、その "future" を呼び出すタスクを紐付けるために "waker" が "future" に渡される
  • リソースが処理を完了させる準備が まだ 整っていない場合は、Poll::Pending が返され、タスクの "waker" が記録される
  • リソースの準備が整ったら、タスクの "waker" に通知が行く
  • 実行器は通知を受け取り、タスクの実行をスケジュールする
  • タスクが再びポーリングされる。今度はリソースは準備が整っているので、タスクは次に進むことができる

いくつかの書き残し事項

Delay を実装しているときに「まだ少し問題が残っています。後ほど直します。」と言ったことを覚えていますか? Rust の非同期モデルでは、単一の "future" が実行されている間にタスクをまたいで移動することができるようになっています。以下を見てください:

use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let mut delay = Some(Delay { when });

    poll_fn(move |cx| {
        let mut delay = delay.take().unwrap();
        let res = Pin::new(&mut delay).poll(cx);
        assert!(res.is_pending());
        tokio::spawn(async move {
            delay.await;
        });

        Poll::Ready(())
    }).await;
}

poll_fn 関数はクロージャを使って Future インスタンスを生成します。上のコード内では、Delay インスタンスを生成し、1回ポーリングを行い、それから Delay インスタンスを新しいタスクへと送り、そこで await されています。この例において、Delay::poll別々の Waker インスタンスから複数回呼び出されています。我々のここまでの実装はこのようなケースには対応していませんでした。誤ったタスクが通知されるため、spawn されたタスクは永遠にスリープ状態となってしまうでしょう。

"future" を実装する際は、poll を呼び出すたびに別々の Waker インスタンスが得られる可能性があるということを念頭に置くことが重要です。poll 関数は前回記録された "waker" を新しいもので更新しなければなりません。

以下のようにして、以前の実装を修正します:

use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
    waker: Option<Arc<Mutex<Waker>>>,
}

impl Future for Delay {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        // まず、これが "future" の初めての呼び出しであるならば、タイマースレッドを spawn する
        // もしすでにタイマースレッドが実行されているなら、保存されている `Waker` が
        // 現在のタスクの "waker" と一致することを確認する
        if let Some(waker) = &self.waker {
            let mut waker = waker.lock().unwrap();

            // 保存されている "waker" が現在のタスクの "waker" と一致するか確認する
            // この確認が必要となるのは、`Delay` インスタンスが複数回の `poll` 呼び出しで異なるタスクへとムーブする可能性があるためである
            // ムーブが発生している場合、与えられた `Context` に含まれる "waker" は別物になるため、
            // その変更を反映するように保存されている "waker" を更新しなければならない
            if !waker.will_wake(cx.waker()) {
                *waker = cx.waker().clone();
            }
        } else {
            let when = self.when;
            let waker = Arc::new(Mutex::new(cx.waker().clone()));
            self.waker = Some(waker.clone());

            // これは `poll` の初回呼び出しである
            // タイマースレッドを spawn する
            thread::spawn(move || {
                let now = Instant::now();

                if now < when {
                    thread::sleep(when - now);
                }

                // 指定した時間が経過した。
                // "waker" を呼び出すことで呼び出し側へと通知する
                let waker = waker.lock().unwrap();
                waker.wake_by_ref();
            });
        }

        // "waker" が保存され、タイマースレッドがスタートしたら、delay が完了したかどうかをチェックする。
        // そのためには、現在の instant を確認すればよい。もし指定時間が経過しているなら、
        // "future" は完了しているので、`Poll::Ready` を返す
        if Instant::now() >= self.when {
            Poll::Ready(())
        } else {
            // 指定時間が経過していなかった場合、"future" は未完了のため、`Poll::Pending` を返す。
            //
            // `Future` トレイトによる契約によって、`Pending` が返されるときには、
            // "future" が再度ポーリングされるべき状況になったときに "waker" へと確実に合図を送らなければならない。
            // 我々のケースでは、ここで `Pending` を返すことによって、指定された時間が経過したタイミングで `Context` 引数がもっている "waker" を呼び起こす、ということを約束していることになる。
            // 上で spawn したタイマースレッドによって、このことが保証されている。
            //
            // もし "waker" を呼び起こすのを忘れたら、タスクは永遠に完了しない。
            Poll::Pending
        }
    }
}

これは少し複雑ですが、要点は、毎回の poll 呼び出しのたびに、"future" は以前記録された "waker" と今与えられている "waker" が一致するのかをチェックしている、ということです。これらが一致しているのならば、他にすべきことはありません。一致していないのならば、記録された "waker" を更新しなければなりません。

Notify ユーティリティ

どのようにすれば "waker" を使って自作の Delay を実装することができるのか、ということを実演してきました。"waker" は非同期 Rust が動作する仕組みの基礎にあたります。一般的には、このレイヤーまで落として考える必要はありません。例えば、Delay の場合、tokio::sync::Notify ユーティリティを利用することによって、完全に async/await を使った実装を行うことが可能です。このユーティリティは、基本的なタスク通知の仕組みを提供しています。記録された "waker" が現在のタスクのものと一致するかどうかを確認する、といった "waker" に関する詳細をよしなに処理してくれます。

Notify を利用すれば、async/await バージョンの delay 関数を以下のように実装できます:

use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;

async fn delay(dur: Duration) {
    let when = Instant::now() + dur;
    let notify = Arc::new(Notify::new());
    let notify2 = notify.clone();

    thread::spawn(move || {
        let now = Instant::now();

        if now < when {
            thread::sleep(when - now);
        }

        notify2.notify_one();
    });


    notify.notified().await;
}