原文: https://tokio.rs/tokio/tutorial/streams

"stream" は非同期な値の連なりです。Rust の std::iter::Iterator の非同期バージョンに相当するもので、Stream トレイトによって表現されます。"stream" は async 関数の中でイテレートすることができます。また、アダプタを利用して変形させることも可能です。Tokio は StreamExt トレイトを通して多くの共通アダプタを提供しています。

Tokio の "stream" サポートは別のクレート tokio-stream によって提供されています。

tokio-stream = "0.1"

現在、Tokio の "stream" ユーティリティは tokio-stream クレート内にあります。Rust の標準ライブラリの Stream トレイトが安定化したら、Tokio の "stream" ユーティリティは tokio 本体のクレートへと移動される予定です。

イテレーション

現在、Rust は非同期な for ループをサポートしていません。代わりに、StreamExt::next()while let を組み合わせて使うことで "stream" に対するイテレーションを行えます。

use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let mut stream = tokio_stream::iter(&[1, 2, 3]);

    while let Some(v) = stream.next().await {
        println!("GOT = {:?}", v);
    }
}

イテレータと同様、next() メソッドは Option<T> を返します。ここで、T は "stream" の値の型です。None が返ってきたら、"stream" のイテレーションが終了したということを意味します。

Mini-Redis のブロードキャスト

Mini-Redis クライアントを使ったもう少し複雑な例を見てみましょう。

コードの全体は こちら で確認できます。

use tokio_stream::StreamExt;
use mini_redis::client;

async fn publish() -> mini_redis::Result<()> {
    let mut client = client::connect("127.0.0.1:6379").await?;

    // いくつかのデータを発行する
    client.publish("numbers", "1".into()).await?;
    client.publish("numbers", "two".into()).await?;
    client.publish("numbers", "3".into()).await?;
    client.publish("numbers", "four".into()).await?;
    client.publish("numbers", "five".into()).await?;
    client.publish("numbers", "6".into()).await?;
    Ok(())
}

async fn subscribe() -> mini_redis::Result<()> {
    let client = client::connect("127.0.0.1:6379").await?;
    let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
    let messages = subscriber.into_stream();

    tokio::pin!(messages);

    while let Some(msg) = messages.next().await {
        println!("got = {:?}", msg);
    }

    Ok(())
}

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    tokio::spawn(async {
        publish().await
    });

    subscribe().await?;

    println!("DONE");

    Ok(())
}

"numbers" チャネルを通して Mini-Redis サーバーへとメッセージを発行するためにタスクを spawn しています。それから、メインタスク上で "numbers" チャネルを購読し、受け取ったメッセージを表示します。

購読したあと、into_stream() メソッドを実行します。これをすると、Subscriber を消費し、メッセージが届くたびにそれを送出するような "stream" を生成します。メッセージをイテレートし始める前に、tokio::pin! を使って "stream" を pin していることに注意してください。"stream" に next() を呼ぶためにはその "stream" が pin されている必要が有ります。into_stream() 関数が返す "stream" は pin されていない ので、イテレートをするためには、明示的に pin する必要があるのです。

Rust の値がメモリ上でそれ以上ムーブされることがない場合、その値は pin されている、と表現します。pin されている値の重要な性質は、pin されたデータに対するポインタをとることができて、呼び出し側はそのポインタが有効であることを確信することができる、ということです。async/await .await をまたいだデータの借用をサポートするためにこの特徴が使われています。

"stream" を pin するのを忘れると、以下のようなエラーが出てきます:

error[E0277]: `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>` cannot be unpinned
  --> streams/src/main.rs:29:36
   |
29 |     while let Some(msg) = messages.next().await {
   |                                    ^^^^ within `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>`
   |
   = note: required because it appears within the type `impl Future`
   = note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 'static)>>, impl Future>`
   = note: required because it appears within the type `impl Stream`
   = note: required because it appears within the type `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
   = note: required because it appears within the type `tokio_stream::map::_::__Origin<'_, tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
   = note: required because it appears within the type `tokio_stream::take::_::__Origin<'_, tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::take::Take<tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`

このようなエラーに出会ったら、値を pin してみてください!

さて、これを実行する前に、Mini-Redis サーバーを起動してください:

$ mini-redis-server

そして、コードを実行してみてください。標準出力に以下のような出力が出てくるのが確認できます。

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })

購読と発行の間で競合が発生し、いくつかの初期メッセージがドロップされる可能性があります。このプログラムはいつまでも終了しません。サーバーが立ち上がっている限り、Mini-Redis チャネルに対する購読は生き続けるためです。

このプログラムを拡張するため、"stream" をどのように扱えばよいかを見ていきましょう。

アダプタ

Stream を受け取って別の Stream を返す関数のことをしばしば「ストリームアダプタ」と呼びます。「アダプタパターン」[1]の形をしているからです。一般的なストリームアダプタには maptakefilter などがあります。

プログラムがきちんと終了するように上記の Mini-Redis プログラムを書き換えましょう。メッセージを3つ受け取ったあと、メッセージのイテレートを中断するようにします。これは take を使うことで実現できます。このアダプタは、最大で n 個のメッセージを送出するように "stream" を制限します。

let messages = subscriber
    .into_stream()
    .take(3);

プログラムを再実行すると、以下が得られます:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })

今度はプログラムは終了しました。

次は、1桁の数字のみとなるような制限をかけてみましょう。メッセージの長さをチェックするという処理を行います。filter アダプタを使って、条件に当てはまらないメッセージをドロップさせます。

let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .take(3);

プログラムを再実行すると、以下が得られます:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })

アダプタを適用する順番が重要であるということに注意してください。filter を呼んでから take するのと、take してから filter するのとは異なります。

最後に、出力の Ok(Message { ... }) の部分を取り除いて、出力をきれいにしてみましょう。map を使います。filterあとmap を適用するため、map をする時点ではそのメッセージが Ok であることは明らかです。したがって安全に unwrap() を使うことができます。

let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .map(|msg| msg.unwrap().content)
    .take(3);

出力は以下のようになります:

got = b"1"
got = b"3"
got = b"6"

filtermap を組み合わせて、filter_map 1つにまとめてしまう、という書き方をすることもできます。

他にももっと多くのアダプタが利用可能です。こちら から一覧をチェックしてください。

Stream を実装する

Stream トレイトは Future トレイトと非常によく似ています。

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;

    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, None)
    }
}

Stream::poll_next() 関数は、"stream" から多くの値を受け取るために複数回呼び出すことができることを除けば Future::poll に非常によく似ています。Async をさらに掘り下げる で見てきたように、"stream" がまだ値を返す準備が整っていなかったら、Poll::Pending が返され、タスクの "waker" が登録されます。"stream" が再ポーリングされてよい状態になったら、"waker" が通知を受けます。

size_hint() メソッドは イテレータ のそれと同じように使われます。

一般的に、Stream を自分で実装するには、"future" と別の "stream" を合成することで実現されることが多いです。例として、Async をさらに掘り下げる で実装した Delay を材料にして、10秒間隔で3回 () を返すような "stream" を作ってみましょう。

use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

struct Interval {
    rem: usize,
    delay: Delay,
}

impl Stream for Interval {
    type Item = ();

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<()>>
    {
        if self.rem == 0 {
            // これ以上 delay しない
            return Poll::Ready(None);
        }

        match Pin::new(&mut self.delay).poll(cx) {
            Poll::Ready(_) => {
                let when = self.delay.when + Duration::from_millis(10);
                self.delay = Delay { when };
                self.rem -= 1;
                Poll::Ready(Some(()))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

async-stream

Stream を手作業で実装するのは退屈になりがちです。あいにく Rust は "stream" を定義するための async/await 構文をまだサポートしていません。この作業は進行中ではありますが、まだ準備段階です。

一時的な解決策として async-stream クレートを利用することができます。このクレートは async_stream! マクロを提供していて、入力を "stream" へと変換してくれます。このクレートを利用すれば、上で示した Interval の実装は、以下のように書き換えることができます:

use async_stream::stream;
use std::time::{Duration, Instant};

stream! {
    let mut when = Instant::now();
    for _ in 0..3 {
        let delay = Delay { when };
        delay.await;
        yield ();
        when += Duration::from_millis(10);
    }
}
脚注
  1. 訳注: Adapter パターン (日本語版 WikiPedia) ↩︎