原文: https://tokio.rs/tokio/tutorial/select

ここまで、システムに並行性を追加したいときには、新しいタスクを spawn してきました。この章では、Tokio を使って非同期コードを並行に実行するための他のやり方について解説していきます。

tokio::select!

tokio::select! を使うと、複数の非同期計算の完了を待ち、そのうちどれか1つ の計算が完了したときに return する、ということが可能になります。

以下の例を見てください:

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        let _ = tx1.send("one");
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

ここでは2つの oneshot チャネルが使われています。どちらか一方のチャネルが先に完了します。select! 文は、2つのチャネルを await し、タスクから return されてくる値を val へと束縛します。tx1tx2 のどちらかが完了したら、それに対応するブロックが実行されます。

完了 しなかった ほうの分岐はドロップされます。上記の例では、それぞれのチャネルに対して、oneshot::Receiver、 つまり rx1rx2 を待ち受けています。完了しなかったほうのチャネルに対応する oneshot::Receiver はドロップされます。つまり、先に rx1 が完了したとすると、未完了である rx2 はドロップされる、ということです。

キャンセル

非同期 Rust においては、 "future" をドロップすることによってキャンセルが実現されます。"Async をさらに掘り下げる" で、Rust の非同期処理は "future" により実装されていて、"future" は lazy である、と説明したことを覚えているでしょうか。処理が進行するのは、"future" がポーリングされたときのみです。もし "future" がドロップされたら、それに紐付いている全てのステートも同時にドロップされるので、それ以上処理を継続することは不可能です。

非同期処理がバックグラウンドタスクを spawn したり、バックグラウンドで実行される別の処理を介ししたりすることもあります。例えば、下の例では、メッセージを送るためにタスクが spawn されています。一般的に、タスクは値を生成するために何らかの計算を行います。

"future" や他の型は、バックグラウンドリソースを片付けるために Drop トレイトを実装していることがあります。Tokio の oneshot::ReceiverDrop を実装していて、ドロップ時に Sender 側へとクローズ通知を送信します。送信側はこの通知を受け取って、進行中の計算をドロップして中断します。

use tokio::sync::oneshot;

async fn some_operation() -> String {
    // ここで値を計算する
}

#[tokio::main]
async fn main() {
    let (mut tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        // `some_operation` と oneshot の `closed()` 通知の
        // どちらかが完了するのを待ち受ける
        tokio::select! {
            val = some_operation() => {
                let _ = tx1.send(val);
            }
            _ = tx1.closed() => {
                // `some_operation` はキャンセルされる。
                // タスクは完了して `tx1` はドロップする
            }
        }
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

Future の実装

select! がどのように動作するのかをもっと理解するために、Future 実装がどのようになっているかの仮説を見てみましょう。これは説明のために簡単化したものです。実際の select! は、どの分岐を最初にポーリングするかをランダムで決定するか、といった追加の機能を有しています。

use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct MySelect {
    rx1: oneshot::Receiver<&'static str>,
    rx2: oneshot::Receiver<&'static str>,
}

impl Future for MySelect {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {
            println!("rx1 completed first with {:?}", val);
            return Poll::Ready(());
        }

        if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {
            println!("rx2 completed first with {:?}", val);
            return Poll::Ready(());
        }

        Poll::Pending
    }
}

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    // tx1 と tx2 を使う

    MySelect {
        rx1,
        rx2,
    }.await;
}

MySelect はそれ自体が Future で、それぞれの分岐に対応する "future" を内包しています。MySelect がポーリングされると、1つ目の分岐がポーリングされます。その時点で処理が完了していたら、その結果が使われ、MySelect も完了したことになります。.await が "future" から結果を受け取ったあと、その "future" はドロップされます。つまり、rx1 が完了したとすると、MySelect はドロップされ、このとき同時に rx2 がドロップされることになります。完了していない方の分岐がドロップされ、その処理は実質キャンセルされた、ということになるのです。

前の章で以下のように書いたことを思い出してください:

"future" が Poll::Pending を返すときは、どこかで "waker" に対し 確実に 合図を送らなければなりません。これを忘れると、タスクが永遠に完了しないという結果につながってしまいます。

MySelect の実装において、引数の Context を明示的には利用していません。代わりに、cx を内部の "future" へと渡すことによって、"waker" の要件が満たされています。内部の "future" もまた、さらに内部の "future" から Poll::Pending が返ってきた場合にのみ Poll::Pending を返すことによって "waker" 要件を満たさなければならないため、結果的に MySelect も "waker" 要件を満たしていることになるのです。

構文

select! マクロは2つより多くの分岐を扱うことができます。現在のところ、64個までの分岐に対応しています。それぞれの分岐の構造は以下のようです:

<pattern> = <async expression> => <handler>,

select マクロが評価されるとき、すべての <async expression> が収集され、並行に実行されます。どれか1つの式の実行が完了したら、その結果が <pattern> とマッチするかが確認されます。パターンにマッチした場合、<handler> が実行され、他のすべての async 式はドロップされます。<handler> 式の中では <pattern> 内で確立された任意の束縛にアクセスすることができます。

基本的なケースは、<pattern> が変数名である場合で、async 式の結果はその変数名に束縛され、<handler> 内でその変数を使うことができます。最初の例で、<pattern> として val が使われ、そして <handler>val を利用することができたのは、これが理由です。

もし <pattern> が async 計算の結果に マッチしなかったら 残りの async 式の処理は引き続き並行に実行され、次の完了を待ちます。また別の <async expression> が完了したら、上記と同様のロジックがその結果に適用されます。

select! はいかなる async 式を受け付けるので、どれを "select" すべきなのかがもっと複雑になっている計算を定義することも可能です。

次の例は、oneshot チャネルの結果と、TCP コネクションの確立を "select" しています。

use tokio::net::TcpStream;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    // oneshot を通してメッセージを送信するためのタスクを spawn する
    tokio::spawn(async move {
        tx.send("done").unwrap();
    });

    tokio::select! {
        socket = TcpStream::connect("localhost:3465") => {
            println!("Socket connected {:?}", socket);
        }
        msg = rx => {
            println!("received message first {:?}", msg);
        }
    }
}

また、次の例では、oneshot と、TcpListener のソケットの accept を "select" しています。

use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        tx.send(()).unwrap();
    });

    let mut listener = TcpListener::bind("localhost:3465").await?;

    tokio::select! {
        _ = async {
            loop {
                let (socket, _) = listener.accept().await?;
                tokio::spawn(async move { process(socket) });
            }

            // Rust の型推論器にヒントを与える
            Ok::<_, io::Error>(())
        } => {}
        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}

エラーが発生するか、あるいは rx が値を受け取るまでの間、accept ループが回り続けます。_ パターンを使うことで、非同期計算の結果の値には興味がないということを示唆しています。

返り値

tokio::select! マクロは、<handler> 式の評価結果を返します。

async fn computation1() -> String {
    // .. 計算
}

async fn computation2() -> String {
    // .. 計算
}

#[tokio::main]
async fn main() {
    let out = tokio::select! {
        res1 = computation1() => res1,
        res2 = computation2() => res2,
    };

    println!("Got = {}", out);
}

したがって、すべての分岐の <handler> は同じ型に評価されなければなりません。select! 式の結果が必要ない場合は、() 型を返すようにするのがベストプラクティスです。

エラー

? 演算子を用いることで、式のエラーを伝播することができます。<async expression> 部で使われるか <handler> 部で使われるかによって ? の挙動が変わります。<async expression> で使われた場合、その <async expression> からエラーが伝播されます。これにより <async expression> の結果は Result になります。一方、<handler>? を使うと、select! 式からのエラーを即座に伝播します。accept ループの例をもう一度見てみましょう:

use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    // [oneshot チャネル `rx` の準備をする]

    let listener = TcpListener::bind("localhost:3465").await?;

    tokio::select! {
        res = async {
            loop {
                let (socket, _) = listener.accept().await?;
                tokio::spawn(async move { process(socket) });
            }

            // Rust の型推論器にヒントを与える
            Ok::<_, io::Error>(())
        } => {
            res?;
        }
        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}

listener.accept().await? に注目してください。ここで ? 演算子はこの式からエラーを伝播し、res に束縛します。エラーのときは resErr(_) になる、ということです。それから、<handler> 部でもう一度 ? 演算子が使われています。res? はエラーを main 関数の外へと伝播します。

パターンマッチ

select! マクロの分岐の構文が以下のように定義されていることを思い出してください:

<pattern> = <async expression> => <handler>,

これまで、<pattern> として変数束縛のみを使ってきました。しかし、Rust の任意のパターンが利用可能です。例えば、複数の mpsc チャネルから受信して、何かをしたいとしましょう:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (mut tx1, mut rx1) = mpsc::channel(128);
    let (mut tx2, mut rx2) = mpsc::channel(128);

    tokio::spawn(async move {
        // `tx1` と tx2` を使って何かやる
    });

    tokio::select! {
        Some(v) = rx1.recv() => {
            println!("Got {:?} from rx1", v);
        }
        Some(v) = rx2.recv() => {
            println!("Got {:?} from rx2", v);
        }
        else => {
            println!("Both channels closed");
        }
    }
}

この例で、select! 式は rx1rx2 から値を受け取るのを待ち受けます。チャネルがクローズされた場合には recv()None を返します。これはパターンにマッチ しない ので、その分岐は無効となります。select! 式は残りの分岐の待ち受けを続けます。

ここで、select! 式が else をもっていることに注目してください。select! 式は必ずなにかの値に評価される必要が有ります。パターンマッチを利用すると、どの分岐もパターンにマッチしない、というような状況が発生する可能性があります。このような場合に else が評価されます。

借用

タスクを spawn する際、spawn される async 式はそのデータをすべて所有していなければならないのでした。select! マクロにはこのような制約はありません。それぞれの分岐の <async expression> はデータを借用してもよいですし、並行に操作をすることも構いません。Rust の借用ルールに従うと、複数の <async expression> があるデータを 不変 借用する、もしくは ただ1つの <async expression> だけがあるデータを 可変 借用する、ということが許されます。

いくつか例を見てみましょう。まず以下の例では、2つの異なる TCP 送信先に対して、同時に同一のデータを送信しています。

use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::io;
use std::net::SocketAddr;

async fn race(
    data: &[u8],
    addr1: SocketAddr,
    addr2: SocketAddr
) -> io::Result<()> {
    tokio::select! {
        Ok(_) = async {
            let mut socket = TcpStream::connect(addr1).await?;
            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}
        Ok(_) = async {
            let mut socket = TcpStream::connect(addr2).await?;
            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}
        else => {}
    };

    Ok(())
}

変数 data が両方の <async expression> から不変借用されています。どちらか一方の処理が完了し、それが成功であったならば、他方はドロップされます。Ok(_) を使ってパターンマッチしているので、一方の <async expression> が失敗した場合には、他方は実行され続けます。

どこかの <handler> に到達した場合、実行される <handler> はそのただ1つだけである、ということを select! は保証しています。このおかげで、それぞれの <handler> は同じデータに対する可変借用をとることが許されます。

以下の例では、out という変数が2つの <handler> 内部で書き換えられていますが、このような書き方が許されている、ということです。

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    let mut out = String::new();

    tokio::spawn(async move {
        // `tx1` と tx2` に値を送る
    });

    tokio::select! {
        _ = rx1 => {
            out.push_str("rx1 completed");
        }
        _ = rx2 => {
            out.push_str("rx2 completed");
        }
    }

    println!("{}", out);
}

ループ

select! マクロはしばしばループ内で利用されます。このセクションでは、いくつかの例を見ながら、select! マクロをループ内で利用する一般的な方法を紹介していきます。まず複数のチャネルを select する例から始めます:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel(128);
    let (tx2, mut rx2) = mpsc::channel(128);
    let (tx3, mut rx3) = mpsc::channel(128);

    loop {
        let msg = tokio::select! {
            Some(msg) = rx1.recv() => msg,
            Some(msg) = rx2.recv() => msg,
            Some(msg) = rx3.recv() => msg,
            else => { break }
        };

        println!("Got {}", msg);
    }

    println!("All channels have been closed.");
}

この例では3つのチャネルの受信側に対し select をしています。いずれかのチャネルがメッセージを受け取ると、それが標準出力に出力されます。チャネルがクローズされた場合は、recv()None を返しますが、パターンマッチで Some(msg) としているので、select! マクロは残りのチャネルで待ち受けを続けます。すべてのチャネルがクローズされたら else が評価され、ループが終了します。

select! マクロは、どの分岐を最初にチェックするかをランダムに選択します。複数のチャネルがメッセージを蓄えている場合、どのチャネルから値を取るかはランダムに選ばれるということです。なぜランダムに選ぶのかというと、もしランダム性がなく、先頭の分岐から順にチェックをしていくとすると、loop が回るのよりも早くチャネルに次のメッセージが追加されていく場合に、先頭の分岐以外で処理されるチャネルがどんどん埋まっていってしまうからです。上の例で言うと、ランダム性がない場合、rx1 の分岐が常に最初にチェックされることになります。rx1 がいつも新しいメッセージをもっているとすると、ループが回るたびに常に rx1 からの値だけが読み取られることになります。そうして、他のチャネルの値はずっとチェックされることなく、際限なくキャパシティが埋まることになってしまうのです。

select! が評価されるときに複数のチャネルがメッセージを蓄えているならば、そのうちのどれか1つだけの値が pop されます。他のすべてのチャネルはまったく触れられず、メッセージは次のループが回ってくるまでチャネルの中に留まり続けます。メッセージが失われるということはありません。

非同期処理を再開する

複数回の select! 呼び出しをまたいで非同期処理を実行するやり方を紹介します。下の例では、i32 型を扱う mpsc チャネルと非同期関数を用意します。非同期関数が完了する、あるいはチャネルを通して偶数が送られてくるまでの間、ループを回し続けたいとします。

async fn action() {
    // 何らかの非同期ロジック
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);    

    let operation = action();
    tokio::pin!(operation);

    loop {
        tokio::select! {
            _ = &mut operation => break,
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    break;
                }
            }
        }
    }
}

select! マクロの中で action() を呼ぶのではなく、ループの で呼んでいることに注目してください。action() の返り値が operation に代入されています。 .await はこの時点では付けられていません。 それから、operation に対して tokio::pin! を呼んでいます。

select! ループの中で、operation を渡すのではなく、&mut operation を渡しています。変数 operation は実行中の非同期処理をトラッキングしています。毎回のループのたびに action() を新しく呼ぶのではなく、同じ operation を使いまわしています。

select! の別の分岐はチャネルからメッセージを受信します。メッセージが偶数であれば、ループを終了します。偶数でなければ、select! を再度実行します。

tokio::pin! が登場するのはこれが初めてです。pin の詳細には立ち入りませんが、ここで押さえておくべきことは、参照に対して .await をするためには、参照される値が pin されているか、Unpin を実装している必要がある、ということです。

tokio::pin! を取り除いてコンパイルしてみると、以下のエラーが出力されます:

error[E0599]: no method named `poll` found for struct
     `std::pin::Pin<&mut &mut impl std::future::Future>`
     in the current scope
  --> src/main.rs:16:9
   |
16 | /         tokio::select! {
17 | |             _ = &mut operation => break,
18 | |             Some(v) = rx.recv() => {
19 | |                 if v % 2 == 0 {
...  |
22 | |             }
23 | |         }
   | |_________^ method not found in
   |             `std::pin::Pin<&mut &mut impl std::future::Future>`
   |
   = note: the method `poll` exists but the following trait bounds
            were not satisfied:
           `impl std::future::Future: std::marker::Unpin`
           which is required by
           `&mut impl std::future::Future: std::future::Future`

Future については [前の章][async_in_depth] で解説しましたが、このエラーメッセージはあまり分かりやすいものではありません。参照 に対して .await を呼ぼうとしたときに Future が実装されていないという旨のエラーが出てきたら、おそらくその "future" を pin する必要がある、と考えてください。

Pin についての詳細は 標準ライブラリのドキュメント を参照してください。

分岐を修正する

もう少し複雑なループについて見てみましょう。以下の2つを用意します:

  1. i32 を扱うチャネル
  2. i32 を扱う非同期処理

実装するロジックは以下の通りです:

  1. チャネルから 偶数 の数字がくるのを待つ
  2. その偶数を入力として渡して、非同期処理を開始する
  3. 非同期処理の完了を待つ。ただし、それと同時に、チャネルに別の偶数が送られてくるのを待つ
  4. 非同期処理が完了する前に新しい偶数を受信したら、非同期処理を中断して、新しい偶数を使って非同期処理を開始する
async fn action(input: Option<i32>) -> Option<String> {
    // input が `None` なら `None` を返す
    // `let i = input?;` とも書ける
    let i = match input {
        Some(input) => input,
        None => return None,
    };
    // 非同期ロジックをここに書く
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);

    let mut done = false;
    let operation = action(None);
    tokio::pin!(operation);

    tokio::spawn(async move {
        let _ = tx.send(1).await;
        let _ = tx.send(3).await;
        let _ = tx.send(2).await;
    });

    loop {
        tokio::select! {
            res = &mut operation, if !done => {
                done = true;

                if let Some(v) = res {
                    println!("GOT = {}", v);
                    return;
                }
            }
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    // `.set` は `Pin` のメソッド
                    operation.set(action(Some(v)));
                    done = false;
                }
            }
        }
    }
}

先ほどの例と同様の戦略を用いています。非同期関数をループの外側で呼び出して operation に代入し、operation を pin します。ループ内で、operation とチャネルの受信側を select します。

actionOption<i32> を引数としてとっていることに注目してください。最初の偶数を受け取るより前に、operation を何かしらの値で初期化する必要があります。actionOption を受け取り Option を返すようにして、None が渡されたら None を返すようにします。ループの1回目では、operation は即座に完了し、None が返ってきます。

この例ではいくつかの新しい構文が使われています。最初の分岐に , if !done というのがあります。これは分岐の前提条件です。これがどのように動くのかを説明する前に、前提条件を取り除いたら何が起こるのかを見てみましょう。, if !done を省いてコードを実行すると、以下のエラーが得られます:

thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

このエラーは、operation が完了した あとoperation を使おうとするときに発生します。一般的に、.await を使うときは、.await されている値は消費されます。この例では、参照に対して .await をしています。これは、operation が完了したあとも operation はまだ存在している、ということを意味します。

このパニックを回避するため、operation が完了した場合には第一の分岐を無効とするように注意する必要が有ります。変数 done を使って operation が完了したか否かを追跡するようにしています。select! のぶん機は 前提条件 をもつことができ、この前提条件は select! がその分岐を await するよりも 前に チェックされます。前提条件の評価結果が false なら、その分岐は無効となり、実行されません。変数 done は最初 false で初期化され、operation が完了したときに true がセットされます。その次のループでは、donetrue になっていることから、operation の分岐は無効になります。チャネルから偶数のメッセージが受信されたら、operation はリセットされ、done には false がセットされます。

タスク単位の並行性

tokio::spawnselect! はどちらも非同期処理を並行に実行することを可能にしてくれます。しかし、並行処理を行うために使われる戦略が異なっています。tokio::spawn 関数は非同期処理を受け取り、それを実行するための新しいタスクを spawn します。タスクは Tokio のランタイムが実行予約をする対象となるオブジェクトです。2つの異なるタスクは Tokio によって別々にスケジューリングされ、別々の OS スレッド上で同時に実行されるかもしれません。そのため、spawn されるタスクは spawn されるスレッドと同様の制約を抱えています――借用ができないということです。

select! マクロは 同一のタスク上で すべての分岐を並行に実行します。すべての分岐が同一タスク上で実行されるため、それらが 同時に 実行されるということは決してありません。select! マクロは非同期処理を1つのタスク上で多重化 ("multiplex") しているのです。