😊

async stream を理解する [使いどころ編]

2021/03/11に公開

前回の記事 では, async stream の基本的なところを説明しました.
今回は,async stream の使いどころや注意点について考察します.

async stream の生成 3 パターン

async stream を生成するには,非同期メソッドで yield return すれば良いことを前回の記事で扱いました.このとき, yield returnawait の位置によって大きく 3 パターン考えることができます.

1. 列挙中に非同期な操作を挟みたい

これが async stream の特性とも最もマッチする使い方です.たとえば,下記のような実装の場合は,要素の生成のために必要な非同期処理を列挙ごとに挟み込めるため, MoveNextAsync() が非同期であるという特性を十分に活かすことができます.

ここで,Task.Delay() は実際には非同期な I/O (データベースアクセス,HTTP API リクエスト等) と読み替えてください.

static async IAsyncEnumerable<int> Sample() {
    // 要素を返す前に毎回 await 
    foreach (var x in Enumerable.Range(1, 100))
    {
        await Task.Delay(100);
        yield return x;
    }
}
static async IAsyncEnumerable<(int x, int y)> Sample()
{
    // 毎回じゃないけど何回かに 1 回 await
    foreach (var x in Enumerable.Range(1, 100))
    {
        await Task.Delay(100);

        foreach (var y in Enumerable.Range(1, 10))
        {
            yield return (x, y);
        }
    }
}

2. 列挙前に非同期な操作が必要な場合

次に,列挙中に非同期な操作が必要なく,最初に非同期な操作が必要な場合が考えられます.

static async IAsyncEnumerable<int> Sample()
{
    // 最初だけ非同期
    await Task.Delay(100);
    
    // 残りの列挙はすべて同期
    foreach (var x in Enumerable.Range(1, 10))
    {
        yield return x * 2;
    }
}

この場合,async stream を導入することで生成側の実装は楽になります.ただし,列挙には非同期操作が一切必要ないので,MoveNextAsync() が非同期であるという特性は活かしきれていません.これなら,従来のように Task<IEnumerable<T>> を返すメソッドでも十分です.

たとえば,上記のメソッドは LINQ で下記のように書き直すことができます.

static async Task<IEnumerable<int>> Sample()
{
    // 最初だけ非同期
    await Task.Delay(100);
    
    // 残りの列挙はすべて同期
    return Enumerable.Range(1, 10).Select(x => x * 2);
}

このようなパターンで async stream を利用しなければならない状況というのは,どうしても yield return を使いたい場面のみに限定されるでしょう.そのような場面でも,ローカル関数に列挙部分を抜き出すなど,async stream を避ける方法はあるということは頭に置いておくべきでしょう.
このような状況においては,利用側として Task<IEnumerable<T>>IAsyncEnumerable<T> のどちらが扱いやすいかが判断基準のひとつになると思います.
特別な理由がなければ Task<IEnumerable<T>> を1度だけ await して列挙は同期的に扱えるほうが, IAsyncEnumerable<T> で(同期的にできるはずの)列挙を非同期的に扱うよりも扱いやすいのではないでしょうか.

3. 列挙後に非同期な処理をしたいとき

これはなにかよほど特別な事情が無い限りは NG です.具体的には下記のような実装です.

static async IAsyncEnumerable<int> AwaitAfterEnumerate()
{
    // 列挙が終わったあとに非同期な操作をしたい・・・?
    foreach (var x in Enumerable.Range(1, 5))
    {
        yield return x * 2;
    }

    await Task.Delay(10);
    Console.WriteLine("完了");
}

同期的に列挙が済んでいるのに,その後で非同期的な処理をしようというものです.
この実装はどのように動作するでしょうか.そして,それは期待したものでしょうか.実際には,このメソッドの動作は 呼び出し側の消費の仕方によって 大きく変わることになります.

呼び出し側が全部列挙してくれる場合

これが期待された挙動でしょう.呼び出し側が下記のような実装をしている場合,列挙が終わった後にきちんと後続の処理が行われます.

// 最後まで foreach で列挙すれば想定通り
// 出力: 2, 4, 6, 8, 10, 完了
await foreach (var x in AwaitAfterEnumerate()) Console.Write($"{x}, ");

// 自前で列挙しても,最後まで列挙すれば大丈夫
// 出力: 2, 4, 6, 8, 10, 完了
{
    var asyncEnumerable = AwaitAfterEnumerate();
    await using var asyncEnumerator = asyncEnumerable.GetAsyncEnumerator();

    while (await asyncEnumerator.MoveNextAsync())
    {
        var x = asyncEnumerator.Current;
        Console.Write($"{x}, ");
    }
}

しかしながら,常に最後まで全要素が列挙されるとは限らないのが,IEnumerable<T>IAsyncEnumerable<T> の特性でもあります.

途中で列挙が終わった場合

途中で列挙が終わった場合はどうでしょうか.たとえば,最初の何個かの要素だけが欲しい場合などは,すべての要素を列挙する必要はありません.
下記の実装での Take メソッドは, IAsyncEnumerable<T> に対して最初の要素から指定された要素数だけ列挙する新しい IAsyncEnumerable<T> を返します.

// Take(後述) すると 
// 出力: 2, 4,
await foreach (var x in AwaitAfterEnumerate().Take(2)) Console.Write($"{x}, ");

// 出力: 2, 4, 6,
{
    var asyncEnumerable = AwaitAfterEnumerate();
    await using var asyncEnumerator = asyncEnumerable.GetAsyncEnumerator();

    int count = 0;

    while (await asyncEnumerator.MoveNextAsync())
    {
        if (++count > 3) break;
        var x = asyncEnumerator.Current;
        Console.Write($"{x}, ");
    }
}

このような実装では,最後の処理は当然行われません.したがって,このような実装パターンは避けたほうが無難です.そもそも,列挙後になにか非同期的な処理を行いたい,というのは列挙とは関係ない副作用を引き起こしたい場合ではないでしょうか.

おわりに

一般的には, async stream の生成時に副作用を引き起こすようなパターンは避けるべきと考えます.呼び出し側は,IAsyncEnumerable<T> というインターフェースからは,「非同期的に列挙できるもの」という情報しか得られません.したがって,そもそも呼び出し側はすべて列挙したときでないと実行されない処理があるとか,複数回列挙するとその分だけ副作用が引き起こされるとか,そういったことは一切知りえないことです.async stream が副作用を引き起こすというのは,インターフェースを見ただけでは想定できないような特別な配慮を利用者側に暗黙的に要求することになってしまいます.
したがって,特別なケースを除いては,IAsyncEnumerable<T> は純粋にデータが流れる stream であるべきで,stream に流すデータを生成する以外の副作用は引き起こすべきではありません.

副作用を引き起こさないほうが良い,というのは IEnumerable<T> にもいえることですが, IAsyncEnumerable<T> では非同期な操作を書けるため,より副作用を引き起こしたい誘惑にかられやすくなるため,注意が必要です.

Discussion