Zenn
🦀

Rust非同期処理の仕組み:join! vs select!で実現する並行性

2025/03/24に公開

表紙

Future を 1 つだけ実行する場合、非同期関数 async fn または非同期コードブロック async {} の中では .await を直接使用することができます。しかし、複数の Future を並行して実行したい場合、単に .await を使うと特定の Future が完了するまで他の並行タスクがブロックされ(直列実行)、思うように並行実行されません。
futures クレートには、Future を並行に実行できる便利なツールが多数用意されており、その中には join! マクロや select! マクロがあります。

注:futures::future モジュールには、マクロよりもさらに多機能な Future 操作用関数群が提供されています。詳しくは以下を参照:

join! マクロ

join! マクロは、複数の異なる Future を同時に待ち受けることができ、これらの Future を並行に実行することが可能です。

まず、.await を使った誤った使い方を 2 つ見てみましょう:

struct Book;
struct Music;

async fn enjoy_book() -> Book { /* ... */ Book }
async fn enjoy_music() -> Music { /* ... */ Music }

// 誤りパターン1: 非同期関数内で逐次(直列)実行されてしまい、同時に走らない
async fn enjoy1_book_and_music() -> (Book, Music) {
    // 実際には直列実行されている
    let book = enjoy_book().await; // await によりブロッキング実行
    let music = enjoy_music().await; // await によりブロッキング実行
    (book, music)
}

// 誤りパターン2: こちらも同様に直列実行される
async fn enjoy2_book_and_music() -> (Book, Music) {
    // async 関数は遅延実行で、呼び出した時点ではまだ走っていない
    let book_future = enjoy_book();
    let music_future = enjoy_music();
    (book_future.await, music_future.await)
}

上記の 2 つの例はいずれも一見すると非同期に動いているように見えますが、実際には「まず読書を終えてから音楽を聴く」という順番で逐次的に処理されており、並行実行にはなっていません。

これは Rust の Future が「遅延評価」型であることに起因します。.await を呼び出すまでは実行が開始されず、await の順序によって処理が直列化されてしまいます。

2 つの Future を正しく並行実行するには、futures::join! マクロを使います:

use futures::join;

// `join!` は、すべての Future が完了した後に、その結果をタプルで返します。
async fn enjoy_book_and_music() -> (Book, Music) {
    let book_fut = enjoy_book();
    let music_fut = enjoy_music();
    // join! は管理しているすべての Future の完了を待ってから完了する
    join!(book_fut, music_fut)
}

fn main() {
    futures::executor::block_on(enjoy_book_and_music());
}

もし配列内の複数の非同期タスクを同時に実行したい場合は、futures::future::join_all メソッドを使用すると良いでしょう。

try_join! マクロ

join! マクロは、管理下のすべての Future が完了するまで待機しますが、もしどれか 1 つの Future がエラーを返した時点で、すべての Future の実行を即座に中止したい場合は、try_join! を使うと便利です。特に、Future が Result を返す場合に有効です。

注:try_join! に渡すすべての Future は、同じエラー型を持っている必要があります。エラー型が異なる場合は、futures::future::TryFutureExt モジュールの map_errerr_info メソッドでエラーの型を変換することが可能です。

use futures::{
    future::TryFutureExt,
    try_join,
};

struct Book;
struct Music;

async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }
/**
 * try_join! に渡すすべての Future は同じエラー型を持っていなければならない。
 * 異なる場合は、TryFutureExt の map_err や err_info で型変換する。
 */
async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
    let music_fut = get_music();
    // どちらか一方がエラーになると、即座にすべての Future の実行が中断される
    try_join!(book_fut, music_fut)
}

async fn get_into_book_and_music() -> (Book, Music) {
    get_book_and_music().await.unwrap()
}

fn main() {
    futures::executor::block_on(get_into_book_and_music());
}

select! マクロ

join! マクロでは、すべての Future が終了するまで結果の処理ができませんが、select! マクロを使えば、複数の Future のうちどれか 1 つが完了すればすぐに処理を行うことができます。

use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

/**
 * レース実行モード:t1 と t2 を同時並行で実行し、どちらかが先に完了したら処理して即終了する
 */
async fn race_tasks() {
    // `.fuse()` によって Future に FusedFuture トレイトを実装させる
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    // `pin_mut!` によって Future に Unpin トレイトを付与する
    pin_mut!(t1, t2);

    // 複数の Future を同時に待ち、いずれかが完了すればすぐに処理される
    select! {
        () = t1 => println!("タスク1が先に完了"),
        () = t2 => println!("タスク2が先に完了"),
    }
}

このコードでは t1t2 が同時に実行され、どちらかが先に完了すればその分岐が実行され、関数は終了します。残りのタスクの完了を待つことはありません。

注意:select マクロを使うには、Future に FusedFuture + Unpin の 2 つの制約が必要です。それぞれ fuse メソッドと pin_mut! マクロで満たすことができます。

.fuse() により Future は FusedFuture トレイトを実装し、pin_mut! によって Unpin 化されます。

注:select! マクロで必要な 2 つのトレイト制約:

  • Unpin:select は所有権を奪うのではなく可変参照で Future を使うため、select 実行後も Future の所有権が残り再利用できる。
  • FusedFuture:一度完了した Future に対して、select は再度 poll しません。fuse(ヒューズ)は「熔断」の意味で、完了後は poll が Poll::Pending を返すようになります。

これにより select! マクロは loop と組み合わせて使えるようになります。FusedFuture を実装していないと、完了済みの Future も select により何度も poll されてしまいます。

Stream は少し異なり、使用するトレイトは FusedStream になります。.fuse()(または手動実装)によって FusedStream を実装した Stream は、.next() または .try_next() を使うことで FusedFuture を実装した Future を取得できます:

use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams() -> u8 {
    // mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    // mut s2: impl Stream<Item = u8> + FusedStream + Unpin,

    // .fuse() により、Stream に FusedStream を実装
    let s1 = futures::stream::once(async { 10 }).fuse();
    let s2 = futures::stream::once(async { 20 }).fuse();

    // pin_mut! マクロで Unpin トレイトを適用
    pin_mut!(s1, s2);

    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
            default => panic!(), // この分岐は常に走らない。Future が先に動き、その後 complete 分岐が選ばれる。
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }
    println!("add_two_streams,total = {total}");
    total
}
fn main() {
    executor::block_on(add_two_streams());
}

注:select! マクロは defaultcomplete 分岐もサポートしています:

  • complete 分岐:すべての Future および Stream が完了したときに実行されます。たいてい loop と組み合わせて使われます。
  • default 分岐:どの Future や Stream も Ready 状態でないときに即時実行されます。

select! マクロを使う際に便利な関数・型を 2 つ紹介します:

  • Fuse::terminated():select ループ内で空の Future(すでに FusedFuture を実装)を作成でき、後から新しい Future を動的に追加可能。
  • FuturesUnordered 型:複数の Future をコピーして同時に実行できる並列処理のための構造体。
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn future_in_select() {
    // 空の Future を作成(FusedFuture 実装済み)
    let fut = Fuse::terminated();
    // 複数の Future を非同期実行できるコレクション
    let mut async_tasks: FuturesUnordered<Pin<Box<dyn Future<Output = i32>>>> = FuturesUnordered::new();
    async_tasks.push(Box::pin(async { 1 }));

    pin_mut!(fut);

    let mut total = 0;
    loop {
        select! {
            // `select_next_some` は Stream の Some(_) のみを選択し、None を無視する
            num = async_tasks.select_next_some() => {
                println!("first num is {num} and total is {total}");
                total += num;
                println!("total is {total}");
                if total >= 10 { break; }
                // fut が終了していれば新しい Future をセット
                if fut.is_terminated() {
                    fut.set(async { 1 }.fuse());
                }
            },
            num = fut => {
                println!("second num is {num} and total is {total}");
                total += num;
                println!("now total is {total}");
                async_tasks.push(Box::pin(async { 1 }));
            },
            complete => break,
            default => panic!(),
        };
    }

    println!("total finally is {total}");
}

fn main() {
    executor::block_on(future_in_select());
}

まとめ

futures クレートには、Future を並行実行するための多くの便利なツールが提供されています。たとえば:

  • join! マクロ:複数の異なる Future を並行に実行し、すべての Future が完了してはじめて終了とみなされる。これは「すべてのタスクを完了する」並行実行パターン。
  • try_join! マクロ:複数の異なる Future を並行に実行し、どれか 1 つでもエラーを返した時点で全体の処理を即座に停止する。「失敗があれば即終了」するタイプの並行実行パターン。
  • select! マクロ:複数の異なる Future を並行に実行し、どれか 1 つが完了した時点で即時に処理できる。「タスク間のレース」実行パターン。
  • select! マクロの使用には FusedFuture + Unpin の条件が必要:これは .fuse() メソッドと pin_mut! マクロを使って満たす。

私たちはLeapcell、Rustプロジェクトのホスティングの最適解です。

Leapcell

Leapcellは、Webホスティング、非同期タスク、Redis向けの次世代サーバーレスプラットフォームです:

複数言語サポート

  • Node.js、Python、Go、Rustで開発できます。

無制限のプロジェクトデプロイ

  • 使用量に応じて料金を支払い、リクエストがなければ料金は発生しません。

比類のないコスト効率

  • 使用量に応じた支払い、アイドル時間は課金されません。
  • 例: $25で6.94Mリクエスト、平均応答時間60ms。

洗練された開発者体験

  • 直感的なUIで簡単に設定できます。
  • 完全自動化されたCI/CDパイプラインとGitOps統合。
  • 実行可能なインサイトのためのリアルタイムのメトリクスとログ。

簡単なスケーラビリティと高パフォーマンス

  • 高い同時実行性を容易に処理するためのオートスケーリング。
  • ゼロ運用オーバーヘッド — 構築に集中できます。

ドキュメントで詳細を確認!

Try Leapcell

Xでフォローする:@LeapcellHQ


ブログでこの記事を読む

Discussion

ログインするとコメントできます