👣

TPL Dataflow: TransformBlockの結果を配列で取得する

4 min read

背景

TPL (Task Parallel Library) Dataflowについての説明は以下ご参照ください。

巷にあるサンプルコードでは最後にActionBlockを据えているものばかりなのですが、ゆるくTransformBlockでMap処理を並列にしたいくらいの用途の時に結果を配列で得る方法が意外と見つからなかったので、メモしておきます。間違っているかもしれません。

原型のコード (.NET 5, C# 9):

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

public class Program
{
    private static async Task<string> HelloAsync(string s)
    {
        await Task.Delay(1000);
        return $"Hello {s}";
    }

    public static async Task Main()
    {
        var transformBlock = new TransformBlock<string, string>(
            async s => await HelloAsync(s),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                EnsureOrdered = true
            });

        transformBlock.Post("Alice");
        transformBlock.Post("Bob");
        transformBlock.Post("Charlie");
        transformBlock.Complete();
	
        await transformBlock.Completion;
	
	// どうする?
	//string[] items = ...
	
	Console.WriteLine(string.Join("\n", items));
	// Hello Alice
	// Hello Bob
	// Hello Charlie
	// が1秒後に一気に出るはず
    }
}

方法1. BufferBlock

https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.bufferblock-1?view=net-5.0

BufferBlock にリンクして、そこから TryReceiveAll で取り出します。

var transformBlock = new TransformBlock<string, string>(
    async s => await HelloAsync(s),
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 4,
        EnsureOrdered = true
    });

var bufferBlock = new BufferBlock<string>();
transformBlock.LinkTo(bufferBlock, 
    new DataflowLinkOptions { PropagateCompletion = true });

transformBlock.Post("Alice");
transformBlock.Post("Bob");
transformBlock.Post("Charlie");
transformBlock.Complete();

await transformBlock.Completion;

if (!bufferBlock.TryReceiveAll(out var items))
    throw new Exception("Failed TryReceiveAll");

Console.WriteLine(string.Join("\n", items));

方法2. OutputAvailableAsync & TryReceive

以下Stack Overflowにあった方法です:

https://stackoverflow.com/questions/49389273/for-a-tpl-dataflow-how-do-i-get-my-hands-on-all-the-output-produced-by-a-transf

予めこんな拡張メソッドを用意します。

using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;;
using System.Threading;
using System.Threading.Tasks.Dataflow;

public static class ReceivableSourceBlockExtensions
{
    public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
        this IReceivableSourceBlock<T> block,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
        {
            while (block.TryReceive(out var item))
            {
                yield return item;
            }
        }
        await block.Completion.ConfigureAwait(false);
    }
}

これを使うと以下のようになります。

var transformBlock = new TransformBlock<string, string>(
    async s => await HelloAsync(s),
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 4,
        EnsureOrdered = true
    });
            
transformBlock.Post("Alice");
transformBlock.Post("Bob");
transformBlock.Post("Charlie");
transformBlock.Complete();

var items = await transformBlock.ToAsyncEnumerable().ToArrayAsync();

Console.WriteLine(string.Join("\n", items));

ToArrayAsyncを使うには、System.Linq.Async パッケージを参照してください。

https://www.nuget.org/packages/System.Linq.Async

よくわからないですが使い方によっては方法2だとawaitのところで固まってしまうことがあり、方法1のほうが良さそうな気がしています。

Discussion

ログインするとコメントできます