Rust非同期処理の仕組み:Streamトレイトと非同期イテレータ
Stream トレイトは Future トレイトと似ています。Future は 1 つのアイテムの状態の変化を表しますが、Stream は標準ライブラリの Iterator トレイトに似ており、終了するまでに複数の値を生成することができます。あるいは、Stream は一連の Future で構成されていると簡単に理解してもよく、Stream から各 Future の結果を読み取っていき、Stream が終了するまで続けることができます。
Stream の定義
Future は非同期開発における最も基本的な概念です。Future が一度限りの非同期値を表すとすれば、Stream は一連の非同期値を表します。Future が「1」なら、Stream は「0、1、または N」です。Stream のシグネチャは以下の通りです:
pub trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
Stream は同期プリミティブの Iterator の概念に対応しています。思い出してみてください、シグネチャもほとんどそっくりではありませんか?
pub trait Iterator {
type Item;
fn next(&mut self) -> Option<Self::Item>;
}
Stream は、絶えず流れてくるデータソースを抽象化するために使われます(もちろん、None を poll したときには終了もします)。
たとえば、Stream のよくある例の一つに、メッセージチャネル(futures クレートにある)のコンシューマー Receiver があります。Send 側からメッセージが送られるたびに、Receiver 側では Some(val)という値を受け取ることができます。Send 側がクローズ(drop)され、チャネルにメッセージが残っていなければ、Receiver は None を受け取ります。
use futures::channel::mpsc;
use futures::{executor::block_on, SinkExt, StreamExt};
async fn send_recv() {
const BUFFER_SIZE: usize = 10;
let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);
println!("tx: Send 1, 2");
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
drop(tx);
// `StreamExt::next` は `Iterator::next` に似ていますが、
// 前者は値ではなく `Future<Output = Option<T>>` を返すため、
// `.await` を使って値を取得する必要があります
assert_eq!(Some(1), rx.next().await);
assert_eq!(Some(2), rx.next().await);
assert_eq!(None, rx.next().await);
}
fn main() {
block_on(send_recv());
}
Iterator と Stream の違い:
-
Iterator は
next()
メソッドを繰り返し呼び出すことで、新しい値を取得し続けることができます。値がなくなるとNone
を返します。Iterator はブロッキング式でデータを返すため、next()
を呼び出すたびに CPU を占有し、結果が得られるまで待機します。一方、非同期の Stream はノンブロッキングであり、待機中は CPU を他の作業に開放します。 -
Stream の
poll_next()
メソッドは、Future のpoll()
メソッドに似ており、Iterator のnext()
と同様の役割を持ちます。ただし、poll_next()
の呼び出しは扱いにくく、Poll
の状態を自分で処理する必要があるため、あまり使いやすくありません。そこで Rust はStreamExt
を提供しています。これは Stream の拡張で、next()
メソッドを提供し、Future
トレイトを実装したNext
構造体を返します。これにより、stream.next().await
のように直接値を反復取得できます。
注:
StreamExt
はStreamExtension
の略称です。Rust では、最小限の定義(例えばStream
)を 1 つのファイルに置き、拡張関連の API(例えばStreamExt
)は別ファイルに配置するのが一般的な方法です。
注:
Stream
トレイトはまだFuture
のように Rust のコアライブラリ(std::core
)には含まれておらず、futures_util
クレートにあります。また、StreamExtension
も標準ライブラリには含まれていません。つまり、ライブラリによって異なるインポートが提供されるため、競合が発生する可能性があります。例えば、tokio
はfutures_util
とは異なるStreamExt
を提供しています。可能であれば、非同期/await で最もよく使われるfutures_util
を使用するのが望ましいです。
StreamExt
のnext()
メソッドおよびNext
構造体の実装:
pub trait StreamExt: Stream {
fn next(&mut self) -> Next<'_, Self> where Self: Unpin {
assert_future::<Option<Self::Item>, _>(Next::new(self))
}
}
// next は Next 構造体を返す
pub struct Next<'a, St: ?Sized> {
stream: &'a mut St,
}
// Stream が Unpin の場合、Next も Unpin
impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {}
impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
pub(super) fn new(stream: &'a mut St) -> Self {
Self { stream }
}
}
// Next は Future を実装しており、poll() は実際には stream の poll_next() を呼び出す
impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
type Output = Option<St::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.stream.poll_next_unpin(cx)
}
}
Stream の作成
futures
ライブラリは、いくつかの基本的な Stream を作成するための便利なメソッドを提供しています。例えば:
-
empty()
:空の Stream を生成する -
once()
:1 つの値だけを含む Stream を生成する -
pending()
:値を一切含まず、常にPoll::Pending
を返す Stream を生成する -
repeat()
:常に同じ値を返し続ける Stream を生成する -
repeat_with()
:クロージャ関数によって無限にデータを返す Stream -
poll_fn()
:Poll
を返すクロージャによって Stream を生成する -
unfold()
:初期値と Future を返すクロージャから Stream を生成する
use futures::prelude::*;
#[tokio::main]
async fn main() {
let mut st = stream::iter(1..10)
.filter(|x| future::ready(x % 2 == 0))
.map(|x| x * x);
// イテレート
while let Some(x) = st.next().await {
println!("Got item: {}", x);
}
}
上記のコードでは、stream::iter
で Stream を生成し、それにfilter
およびmap
操作を適用しています。最終的には、その Stream 全体を反復して取得したデータを出力しています。
非同期/await の仕組みには興味がなく、ただ Stream としての挙動をテストしたいだけであれば、Stream::iter
は非常に便利です。もう一つ興味深いのはrepeat_with
で、ここではクロージャを使って必要に応じて遅延的に値を生成する Stream を作成できます。例えば:
use futures::stream::{self, StreamExt};
// 2の0乗から3乗まで:
async fn stream_repeat_with(){
let mut curr = 1;
let mut pow2 = futures::stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp });
assert_eq!(Some(1), pow2.next().await);
assert_eq!(Some(2), pow2.next().await);
assert_eq!(Some(4), pow2.next().await);
assert_eq!(Some(8), pow2.next().await);
}
Stream の実装
独自の Stream を作成するには、以下の 2 つのステップが必要です:
- Stream の状態を保持する構造体(
struct
)を定義する - その構造体に対して
Stream
トレイトを実装する
ここでは、1 から 5 までカウントするCounter
という名前の Stream を作成してみましょう:
#![feature(async_stream)]
// まずは構造体の定義:
/// 1から5までのカウントを行うStream
struct Counter {
count: usize,
}
// カウントを1から開始したいので、補助的なnew()メソッドを追加します。
// これは必須ではありませんが、便利です。
// 注意:ここでは`count`を0から開始します。なぜなら、次の`poll_next()`実装で1加算してから判定するからです。
impl Counter {
fn new() -> Counter {
Counter { count: 0 }
}
}
// 次に、Counterに対して`Stream`トレイトを実装します:
impl Stream for Counter {
// カウントにはusizeを使います
type Item = usize;
// poll_next()が唯一必要なメソッドです
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// カウントを1つ増やす。これがcountを0から始めた理由です。
self.count += 1;
// カウントが5以下なら値を返す
if self.count < 6 {
Poll::Ready(Some(self.count))
} else {
// カウントが終了したらNoneを返す
Poll::Ready(None)
}
}
}
Stream トレイトの種類
Rust における Stream に関連するトレイトはいくつか存在します。たとえば、Stream
、TryStream
、FusedStream
などがあります。
-
Stream:これは同期の
Iterator
に非常に似ています。ただし、None
を返した時点でストリームが終了したことを意味し、それ以降にストリームをポーリングすべきではありません。もしそれを行うと、未定義の動作に陥り、不具合が発生する可能性があります。 -
TryStream:これは
Result<value, error>
型を返すストリームに特化したトレイトです。TryStream
は、内部のResult
を簡単にマッチング・変換できる関数を提供しています。これは、Result
型のアイテムを生成するストリーム向けの API と考えることができ、より扱いやすくなっています。 -
FusedStream:
Stream
と非常によく似ていますが、ストリームがNone
を返した後に本当に終了したかどうかを確認できるようにしたものです。例えば、リングバッファによってサポートされたストリームを作成したい場合、最初の反復ではNone
が返されるかもしれませんが、その後再びポーリングして新たな反復を再開するのは安全です。FusedStream
はそのような挙動を可能にします。
イテレーションと並行処理
Iterator
と同様に、Stream
もイテレーションが可能です。たとえば、map
、filter
、fold
、for_each
、skip
などのメソッドが利用できます。また、エラーが発生した時点で処理を中断するバージョン、たとえば:try_map
、try_filter
、try_fold
、try_for_each
などもあります。
しかし、Iterator
と異なる点として、for
ループではStream
を直接反復できません。その代わりに、命令的スタイルのループ、たとえばwhile let
やloop
を使って、明示的にnext
やtry_next
メソッドを呼び出す必要があります。以下のような方法でストリームから値を読み取ることができます:
// イテレーション方法 1
while let Some(value) = s.next().await {}
// イテレーション方法 2
loop {
match s.next().await {
Some(value) => {}
None => break;
}
}
次に、ストリームの値を合計する例です:
use futures_util::{pin_mut, Stream, stream, StreamExt};
async fn sum(stream: impl Stream<Item=usize>) -> usize {
// イテレーション前に stream を pin することを忘れずに
pin_mut!(stream);
let mut sum: usize = 0;
// stream をイテレート
while let Some(item) = stream.next().await {
sum = sum + item;
}
sum
}
一度に 1 つの値だけを処理すると、並行性が失われてしまい、非同期プログラミングの利点がなくなります。複数の値を並行して処理するには、for_each_concurrent
やtry_for_each_concurrent
メソッドを使うことができます:
use std::{pin::Pin, io};
use futures_util::{Stream, TryStreamExt};
async fn jump_around(stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>) -> Result<(), io::Error> {
// `try_for_each_concurrent` を使用
stream.try_for_each_concurrent(100, |num| async move {
jump_n_times(num).await?;
report_n_jumps(num).await?;
Ok(())
}).await?;
Ok(())
}
async fn jump_n_times(num: i32)-> Result<(), io::Error> {
println!("jump_n_times :{}", num+1);
Ok(())
}
async fn report_n_jumps(num: i32)-> Result<(), io::Error>{
println!("report_n_jumps : {}", num);
Ok(())
}
まとめ
Stream
はFuture
とよく似ていますが、Future
が 1 つのアイテムの状態変化を扱うのに対し、Stream
はIterator
のように、終了するまでに複数の値を返すことができます。簡単に言えば、Stream
は一連のFuture
で構成されており、Stream
から各Future
の結果を読み取り、Stream
が終了するまでそれを続ける、非同期のイテレーターであると考えられます。
Stream
のpoll_next
関数は、以下の 3 つのいずれかの値を返す可能性があります:
-
Poll::Pending
:次の値がまだ準備できていない、待機が必要であることを意味します -
Poll::Ready(Some(val))
:値が準備できており、成功したことを示します。次の値を取得するには再びpoll_next
を呼び出します -
Poll::Ready(None)
:Stream
が終了したことを示します。これ以上poll_next
を呼び出すべきではありません
私たちはLeapcell、Rustプロジェクトのホスティングの最適解です。
Leapcellは、Webホスティング、非同期タスク、Redis向けの次世代サーバーレスプラットフォームです:
複数言語サポート
- Node.js、Python、Go、Rustで開発できます。
無制限のプロジェクトデプロイ
- 使用量に応じて料金を支払い、リクエストがなければ料金は発生しません。
比類のないコスト効率
- 使用量に応じた支払い、アイドル時間は課金されません。
- 例: $25で6.94Mリクエスト、平均応答時間60ms。
洗練された開発者体験
- 直感的なUIで簡単に設定できます。
- 完全自動化されたCI/CDパイプラインとGitOps統合。
- 実行可能なインサイトのためのリアルタイムのメトリクスとログ。
簡単なスケーラビリティと高パフォーマンス
- 高い同時実行性を容易に処理するためのオートスケーリング。
- ゼロ運用オーバーヘッド — 構築に集中できます。
Xでフォローする:@LeapcellHQ
Discussion