😸

async stream を理解する [基本編]

2021/03/11に公開

C#8 で async stream (IAsyncEnumerable) がサポートされました.
会社で async stream を紹介した資料をもとに,複数回に分けて記事を投稿していこうと思います.
今回は, async stream の基本的なところを紹介していきます.

async stream の基本

async steram とは,簡単に言えば IEnumerable<T>foreach の非同期版であると言えます.
同期 IEnumerable<T> との比較を次の表に示します.

同期 非同期(async stream)
IEnumerable<T> IAsyncEnumerable<T>
IEnumerator<T> IAsyncEnumerator<T>
foreach await foreach
System.Linq System.Linq.Async

このように,IEnumerable<T> にあったような基本的な要素に対して,非同期版の機能が提供されています.

では,実際にコードベースで async stream を見ていきましょう.

IAsyncEnumerable<T>, IAsyncEnumerator<T> の定義

まずインターフェース定義を見てみます.
下記のコードは dotnet/runtime (GitHub) から,コメントなどを省き転載したものです.

IEnumerable<T>, IEnumerator<T>

// https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Collections/Generic/IEnumerable.cs
public interface IEnumerable<out T> : IEnumerable
{
    new IEnumerator<T> GetEnumerator();
}

// https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Collections/Generic/IEnumerator.cs
public interface IEnumerator<out T> : IDisposable, IEnumerator
{
    new T Current
    {
        get;
    }
}

IAsyncEnumerable<T>, IAsyncEnumerator<T>

// https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Generic/IAsyncEnumerable.cs
public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
}

// https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Generic/IAsyncEnumerator.cs
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
    ValueTask<bool> MoveNextAsync();

    T Current { get; }
}

ここで,IEnumerator<T>IAsyncEnumerator<T> にはひとつ決定的な違いがあります.IEnumerator<T>IAsyncEnumerator<T> は,どちらも共通して T Current{ get; } を持っていることがわかります.
一方,
bool IEnumerator<T>.MoveNext()

ValueTask<bool> IAsyncEnumerator<T>.MoveNextAsync()
では決定的な違いがあります.MoveNext() は同期的に次の要素へ移動するのに対し, MoveNextAsync() はその操作を非同期的に行うことができます.これによって,ひとつひとつの要素の非同期的な列挙が可能になっています.

async stream の生成と消費

ここからは, async stream の具体的な使い方を見ていきます.
async stream を生成するための最も簡単な方法は,IAsyncEnumerable<T> を戻り値とした非同期メソッド内で yield return することです.

static async IAsyncEnumerable<int> YieldReturnSample()
{
    foreach (var x in Enumerable.Range(1, 10))
    {
        // 列挙の途中で非同期な操作をしたいものに最適!
        await Task.Delay(x * 100); 
        yield return x;
    }
}

これで,非同期でディレイをかけながら 1, 2, ..., 10 を列挙できます.

次に,この IAsyncEnumerable<T> を消費する方法を見てみます.簡単な方法としては await foreach があります.

IAsyncEnumerable<int> asyncStream = YieldReturnSample();

// foreach でなく await foreach で消費
await foreach (int x in asyncStream)
{
    Console.WriteLine(x);
}

このようにすると,IEnumerable<T> に対する foreach と同様の記法で,async stream の要素を一つずつ順番に処理できます.
ここで,async stream の生成時に要素を yield return するたびにディレイをかけているため,Console.WriteLine(x) も1要素ずつディレイがかかりながら実行されることになります.

このように,見た目上は簡単に async stream の生成と消費を書けます.しかし,非同期の処理ですから,実際に実行されるコードは複雑になります.SharpLab で C# to C# の変換をかけたものを,gistにアップロードしていますので,興味のある方はご確認ください.

Discussion