Zenn
🌊

Rust非同期処理の仕組み:Streamトレイトと非同期イテレータ

2025/03/23に公開

表紙

Stream トレイトは Future トレイトと似ています。Future は 1 つのアイテムの状態の変化を表しますが、Stream は標準ライブラリの Iterator トレイトに似ており、終了するまでに複数の値を生成することができます。あるいは、Stream は一連の Future で構成されていると簡単に理解してもよく、Stream から各 Future の結果を読み取っていき、Stream が終了するまで続けることができます。

Stream の定義

Future は非同期開発における最も基本的な概念です。Future が一度限りの非同期値を表すとすれば、Stream は一連の非同期値を表します。Future が「1」なら、Stream は「0、1、または N」です。Stream のシグネチャは以下の通りです:

pub trait Stream {
    type Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

Stream は同期プリミティブの Iterator の概念に対応しています。思い出してみてください、シグネチャもほとんどそっくりではありませんか?

pub trait Iterator {
    type Item;

    fn next(&mut self) -> Option<Self::Item>;
}

Stream は、絶えず流れてくるデータソースを抽象化するために使われます(もちろん、None を poll したときには終了もします)。

たとえば、Stream のよくある例の一つに、メッセージチャネル(futures クレートにある)のコンシューマー Receiver があります。Send 側からメッセージが送られるたびに、Receiver 側では Some(val)という値を受け取ることができます。Send 側がクローズ(drop)され、チャネルにメッセージが残っていなければ、Receiver は None を受け取ります。

use futures::channel::mpsc;
use futures::{executor::block_on, SinkExt, StreamExt};

async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    println!("tx: Send 1, 2");
    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    // `StreamExt::next` は `Iterator::next` に似ていますが、
    // 前者は値ではなく `Future<Output = Option<T>>` を返すため、
    // `.await` を使って値を取得する必要があります
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}

fn main() {
    block_on(send_recv());
}

Iterator と Stream の違い:

  • Iterator はnext()メソッドを繰り返し呼び出すことで、新しい値を取得し続けることができます。値がなくなるとNoneを返します。Iterator はブロッキング式でデータを返すため、next()を呼び出すたびに CPU を占有し、結果が得られるまで待機します。一方、非同期の Stream はノンブロッキングであり、待機中は CPU を他の作業に開放します。

  • Stream のpoll_next()メソッドは、Future のpoll()メソッドに似ており、Iterator のnext()と同様の役割を持ちます。ただし、poll_next()の呼び出しは扱いにくく、Pollの状態を自分で処理する必要があるため、あまり使いやすくありません。そこで Rust はStreamExtを提供しています。これは Stream の拡張で、next()メソッドを提供し、Futureトレイトを実装したNext構造体を返します。これにより、stream.next().awaitのように直接値を反復取得できます。

注:StreamExtStreamExtensionの略称です。Rust では、最小限の定義(例えばStream)を 1 つのファイルに置き、拡張関連の API(例えばStreamExt)は別ファイルに配置するのが一般的な方法です。

注:StreamトレイトはまだFutureのように Rust のコアライブラリ(std::core)には含まれておらず、futures_utilクレートにあります。また、StreamExtensionも標準ライブラリには含まれていません。つまり、ライブラリによって異なるインポートが提供されるため、競合が発生する可能性があります。例えば、tokiofutures_utilとは異なるStreamExtを提供しています。可能であれば、非同期/await で最もよく使われるfutures_utilを使用するのが望ましいです。

StreamExtnext()メソッドおよびNext構造体の実装:

pub trait StreamExt: Stream {
    fn next(&mut self) -> Next<'_, Self> where Self: Unpin {
        assert_future::<Option<Self::Item>, _>(Next::new(self))
    }
}

// next は Next 構造体を返す
pub struct Next<'a, St: ?Sized> {
    stream: &'a mut St,
}

// Stream が Unpin の場合、Next も Unpin
impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {}

impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
    pub(super) fn new(stream: &'a mut St) -> Self {
        Self { stream }
    }
}

// Next は Future を実装しており、poll() は実際には stream の poll_next() を呼び出す
impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
    type Output = Option<St::Item>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.stream.poll_next_unpin(cx)
    }
}

Stream の作成

futuresライブラリは、いくつかの基本的な Stream を作成するための便利なメソッドを提供しています。例えば:

  • empty():空の Stream を生成する
  • once():1 つの値だけを含む Stream を生成する
  • pending():値を一切含まず、常にPoll::Pendingを返す Stream を生成する
  • repeat():常に同じ値を返し続ける Stream を生成する
  • repeat_with():クロージャ関数によって無限にデータを返す Stream
  • poll_fn()Pollを返すクロージャによって Stream を生成する
  • unfold():初期値と Future を返すクロージャから Stream を生成する
use futures::prelude::*;

#[tokio::main]
async fn main() {
    let mut st = stream::iter(1..10)
        .filter(|x| future::ready(x % 2 == 0))
        .map(|x| x * x);

    // イテレート
    while let Some(x) = st.next().await {
        println!("Got item: {}", x);
    }
}

上記のコードでは、stream::iterで Stream を生成し、それにfilterおよびmap操作を適用しています。最終的には、その Stream 全体を反復して取得したデータを出力しています。

非同期/await の仕組みには興味がなく、ただ Stream としての挙動をテストしたいだけであれば、Stream::iterは非常に便利です。もう一つ興味深いのはrepeat_withで、ここではクロージャを使って必要に応じて遅延的に値を生成する Stream を作成できます。例えば:

use futures::stream::{self, StreamExt};

// 2の0乗から3乗まで:
async fn stream_repeat_with(){
    let mut curr = 1;
    let mut pow2 = futures::stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp });

    assert_eq!(Some(1), pow2.next().await);
    assert_eq!(Some(2), pow2.next().await);
    assert_eq!(Some(4), pow2.next().await);
    assert_eq!(Some(8), pow2.next().await);
}

Stream の実装

独自の Stream を作成するには、以下の 2 つのステップが必要です:

  1. Stream の状態を保持する構造体(struct)を定義する
  2. その構造体に対してStreamトレイトを実装する

ここでは、1 から 5 までカウントするCounterという名前の Stream を作成してみましょう:

#![feature(async_stream)]

// まずは構造体の定義:
/// 1から5までのカウントを行うStream
struct Counter {
    count: usize,
}

// カウントを1から開始したいので、補助的なnew()メソッドを追加します。
// これは必須ではありませんが、便利です。
// 注意:ここでは`count`を0から開始します。なぜなら、次の`poll_next()`実装で1加算してから判定するからです。
impl Counter {
    fn new() -> Counter {
        Counter { count: 0 }
    }
}

// 次に、Counterに対して`Stream`トレイトを実装します:
impl Stream for Counter {
    // カウントにはusizeを使います
    type Item = usize;

    // poll_next()が唯一必要なメソッドです
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // カウントを1つ増やす。これがcountを0から始めた理由です。
        self.count += 1;

        // カウントが5以下なら値を返す
        if self.count < 6 {
            Poll::Ready(Some(self.count))
        } else {
            // カウントが終了したらNoneを返す
            Poll::Ready(None)
        }
    }
}

Stream トレイトの種類

Rust における Stream に関連するトレイトはいくつか存在します。たとえば、StreamTryStreamFusedStreamなどがあります。

  • Stream:これは同期のIteratorに非常に似ています。ただし、Noneを返した時点でストリームが終了したことを意味し、それ以降にストリームをポーリングすべきではありません。もしそれを行うと、未定義の動作に陥り、不具合が発生する可能性があります。

  • TryStream:これはResult<value, error>型を返すストリームに特化したトレイトです。TryStreamは、内部のResultを簡単にマッチング・変換できる関数を提供しています。これは、Result型のアイテムを生成するストリーム向けの API と考えることができ、より扱いやすくなっています。

  • FusedStreamStreamと非常によく似ていますが、ストリームがNoneを返した後に本当に終了したかどうかを確認できるようにしたものです。例えば、リングバッファによってサポートされたストリームを作成したい場合、最初の反復ではNoneが返されるかもしれませんが、その後再びポーリングして新たな反復を再開するのは安全です。FusedStreamはそのような挙動を可能にします。

イテレーションと並行処理

Iteratorと同様に、Streamもイテレーションが可能です。たとえば、mapfilterfoldfor_eachskipなどのメソッドが利用できます。また、エラーが発生した時点で処理を中断するバージョン、たとえば:try_maptry_filtertry_foldtry_for_eachなどもあります。

しかし、Iteratorと異なる点として、forループではStreamを直接反復できません。その代わりに、命令的スタイルのループ、たとえばwhile letloopを使って、明示的にnexttry_nextメソッドを呼び出す必要があります。以下のような方法でストリームから値を読み取ることができます:

// イテレーション方法 1
while let Some(value) = s.next().await {}

// イテレーション方法 2
loop {
  match s.next().await {
    Some(value) => {}
    None => break;
  }
}

次に、ストリームの値を合計する例です:

use futures_util::{pin_mut, Stream, stream, StreamExt};

async fn sum(stream: impl Stream<Item=usize>) -> usize {
    // イテレーション前に stream を pin することを忘れずに
    pin_mut!(stream);
    let mut sum: usize = 0;
    // stream をイテレート
    while let Some(item) = stream.next().await {
        sum = sum + item;
    }
    sum
}

一度に 1 つの値だけを処理すると、並行性が失われてしまい、非同期プログラミングの利点がなくなります。複数の値を並行して処理するには、for_each_concurrenttry_for_each_concurrentメソッドを使うことができます:

use std::{pin::Pin, io};
use futures_util::{Stream, TryStreamExt};

async fn jump_around(stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>) -> Result<(), io::Error> {
    // `try_for_each_concurrent` を使用
    stream.try_for_each_concurrent(100, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}

async fn jump_n_times(num: i32)-> Result<(), io::Error> {
    println!("jump_n_times :{}", num+1);
    Ok(())
}
async fn report_n_jumps(num: i32)-> Result<(), io::Error>{
    println!("report_n_jumps : {}", num);
    Ok(())
}

まとめ

StreamFutureとよく似ていますが、Futureが 1 つのアイテムの状態変化を扱うのに対し、StreamIteratorのように、終了するまでに複数の値を返すことができます。簡単に言えば、Streamは一連のFutureで構成されており、Streamから各Futureの結果を読み取り、Streamが終了するまでそれを続ける、非同期のイテレーターであると考えられます。

Streampoll_next関数は、以下の 3 つのいずれかの値を返す可能性があります:

  • Poll::Pending:次の値がまだ準備できていない、待機が必要であることを意味します
  • Poll::Ready(Some(val)):値が準備できており、成功したことを示します。次の値を取得するには再びpoll_nextを呼び出します
  • Poll::Ready(None)Streamが終了したことを示します。これ以上poll_nextを呼び出すべきではありません

私たちはLeapcell、Rustプロジェクトのホスティングの最適解です。

Leapcell

Leapcellは、Webホスティング、非同期タスク、Redis向けの次世代サーバーレスプラットフォームです:

複数言語サポート

  • Node.js、Python、Go、Rustで開発できます。

無制限のプロジェクトデプロイ

  • 使用量に応じて料金を支払い、リクエストがなければ料金は発生しません。

比類のないコスト効率

  • 使用量に応じた支払い、アイドル時間は課金されません。
  • 例: $25で6.94Mリクエスト、平均応答時間60ms。

洗練された開発者体験

  • 直感的なUIで簡単に設定できます。
  • 完全自動化されたCI/CDパイプラインとGitOps統合。
  • 実行可能なインサイトのためのリアルタイムのメトリクスとログ。

簡単なスケーラビリティと高パフォーマンス

  • 高い同時実行性を容易に処理するためのオートスケーリング。
  • ゼロ運用オーバーヘッド — 構築に集中できます。

ドキュメントで詳細を確認!

Try Leapcell

Xでフォローする:@LeapcellHQ


ブログでこの記事を読む

Discussion

ログインするとコメントできます