🎋

Rust の Future に入門した

2021/08/04に公開

最近、Rust の Future と async/await の使い方を調べたのでメモしておきます。

最初の例

まず、HTTP サーバーにリクエストを送ってレスポンスを出力するプログラムを書いてみます。

// [dependencies]
// tokio = { version = "1.9", features = ["full"] }
// reqwest = "0.11"

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

#[tokio::main]
async fn main() -> Result<()> {
    let resp = reqwest::get("https://httpbin.org/get").await?;
    let body = resp.text().await?;
    println!("body = {}", body);
    Ok(())
}

reqwest::get() が、指定された URL に HTTP リクエストを送る関数です。戻り値は Future に包まれて返ってきます。

Future に対して .await を作用させると、Future の完了を待って、その結果を取得することができます。この .await は文法的にメソッドっぽく見えますが、メソッドではなく専用の言語機能です。

main 関数が通常の fn ではなく async fn になっていたり、#[tokio::main] というアノテーションがついていたりするので注意してください。

async 関数

次に、最初の例から関数を切り出してみましょう。すると次のようになります。

// [dependencies]
// tokio = { version = "1.9", features = ["full"] }
// reqwest = "0.11"

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

async fn http_get(url: &str) -> Result<String> {
    let resp = reqwest::get(url).await?;
    let body = resp.text().await?;
    Ok(body)
}

#[tokio::main]
async fn main() -> Result<()> {
    let body = http_get("https://httpbin.org/get").await?;
    println!("body = {}", body);
    Ok(())
}

注目すべき点は http_get を宣言するときに async fn という構文が使われていることです。関数を async fn として宣言すると、その中で .await 構文を使うことができます。

http_get は async 関数なので、通常の値ではなく Future に包まれた値を返します。なので main 関数では http_get の戻り値を取得するために .await しています。

並列実行(その1) 間違った例

せっかく非同期実行を行うので、並列に HTTP リクエストを送ってみましょう。

まず間違った例を紹介します。

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

// 前の例と同じなので省略
async fn http_get(url: &str) -> Result<String> {
    ...
}

#[tokio::main]
async fn main() -> Result<()> {
    // まず Future をふたつ作る
    let fut1 = http_get("https://httpbin.org/delay/5");
    let fut2 = http_get("https://httpbin.org/delay/5");

    // そのあと await で待つ
    let body1 = fut1.await?;
    let body2 = fut2.await?;

    println!("body1 = {}", body1);
    println!("body2 = {}", body2);
    Ok(())
}

この例で使われている https://httpbin.org/delay/5 というエンドポイントは5秒だけ待ってからレスポンスを返すエンドポイントです。仮に正しく並列実行できていれば、このプログラムは5秒程度で終わるはずです。しかし、このプログラムは10秒かかってしまいます。

Rust の Future は作成しただけでは実行されません。つまり、http_get() を呼び出しても Future が作成されるだけで実行されない、ということです。これは、他の言語の Future や Promise に慣れた人がハマりやすい罠なので注意してください。Rust の Future は await したタイミングではじめて実行されます。

Rust の Future がこのような設計になっているのは効率のためです。興味のある人は Designing futures for Rust を読むとよいでしょう。

並列実行(その2) 正しい例

並列実行したい場合は futures cratejoin 関数を使います。

// [dependencies]
// tokio = { version = "1.9", features = ["full"] }
// reqwest = "0.11"
// futures = "0.3"  # 追加!

use futures::future; // 追加!

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

// 前の例と同じなので省略
async fn http_get(url: &str) -> Result<String> {
    ...
}

#[tokio::main]
async fn main() -> Result<()> {
    let fut1 = http_get("https://httpbin.org/delay/5");
    let fut2 = http_get("https://httpbin.org/delay/5");

    // join を使ってふたつの Future を同時に待つ
    let (result_body1, result_body2) = future::join(fut1, fut2).await;

    // http_get の戻り値は Result に包まれているので、Result を外す
    let (body1, body2) = (result_body1?, result_body2?);

    println!("body1 = {}", body1);
    println!("body2 = {}", body2);
    Ok(())
}

この例は並列実行されているので、ちゃんと5秒程度で終わります。この例に出てくる join 関数は、ふたつの Future を受け取って、それらを並列に実行するような新しい Future を返す関数です。

join と Result を外すのを連続して行うことはよくあることなので、try_join という関数が用意されています。これを使うと上の例の場合は以下のように書けます。

let (body1, body2) = future::try_join(fut1, fut2).await?;

また、join! というマクロも用意されています。join! は可変長引数になっていて、Future が何個あっても一撃で join できます。

let (ret1, ret2, ret3, ret4) = futures::join!(fut1, fut2, fut3, fut4);

join!join と違って await まで行うので注意してください。また、try_join に対応するマクロとして try_join! が用意されています。

大量の Future を並列実行する

2個の Future を合成する方法はわかりました。ではもっとたくさんの Future を合成したいときはどうしたらいいでしょうか?

こういう場合、join_all 関数が使えます。以下の例は 1000 個の Future を作って並列実行する例です。

// [dependencies]
// tokio = { version = "1.9", features = ["full"] }
// futures = "0.3"

use futures::future;
use std::time::Duration;

// 5秒待って引数をそのまま返す非同期関数
async fn some_heavy_work(id: i64) -> i64 {
    tokio::time::sleep(Duration::from_secs(5)).await;
    id
}

#[tokio::main]
async fn main() {
    // 1000個の Future を作る (このタイミングでは実行されていない)
    let works: Vec<_> = (0..1000).map(|i| some_heavy_work(i)).collect();
    // 1000個の Future を並列実行する
    let ret = future::join_all(works).await;

    println!("ret = {:?}", ret);
}

try_join に対応する try_join_all という関数もあります。

さらに複雑な制御が必要な場合は FuturesOrderedFuturesUnordered を使えばよいそうです。

Discussion