👣
TPL Dataflow: TransformBlockの結果を配列で取得する
背景
TPL (Task Parallel Library) Dataflowについての説明は以下ご参照ください。
- https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library
- https://azyobuzin.hatenablog.com/entry/2019/05/26/164155
巷にあるサンプルコードでは最後にActionBlock
を据えているものばかりなのですが、ゆるくTransformBlock
でMap処理を並列にしたいくらいの用途の時に結果を配列で得る方法が意外と見つからなかったので、メモしておきます。間違っているかもしれません。
原型のコード (.NET 5, C# 9):
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public class Program
{
private static async Task<string> HelloAsync(string s)
{
await Task.Delay(1000);
return $"Hello {s}";
}
public static async Task Main()
{
var transformBlock = new TransformBlock<string, string>(
async s => await HelloAsync(s),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = true
});
transformBlock.Post("Alice");
transformBlock.Post("Bob");
transformBlock.Post("Charlie");
transformBlock.Complete();
await transformBlock.Completion;
// どうする?
//string[] items = ...
Console.WriteLine(string.Join("\n", items));
// Hello Alice
// Hello Bob
// Hello Charlie
// が1秒後に一気に出るはず
}
}
方法1. BufferBlock
BufferBlock
にリンクして、そこから TryReceiveAll
で取り出します。
var transformBlock = new TransformBlock<string, string>(
async s => await HelloAsync(s),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = true
});
var bufferBlock = new BufferBlock<string>();
transformBlock.LinkTo(bufferBlock,
new DataflowLinkOptions { PropagateCompletion = true });
transformBlock.Post("Alice");
transformBlock.Post("Bob");
transformBlock.Post("Charlie");
transformBlock.Complete();
await transformBlock.Completion;
if (!bufferBlock.TryReceiveAll(out var items))
throw new Exception("Failed TryReceiveAll");
Console.WriteLine(string.Join("\n", items));
方法2. OutputAvailableAsync & TryReceive
以下Stack Overflowにあった方法です:
予めこんな拡張メソッドを用意します。
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;;
using System.Threading;
using System.Threading.Tasks.Dataflow;
public static class ReceivableSourceBlockExtensions
{
public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
this IReceivableSourceBlock<T> block,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
{
while (block.TryReceive(out var item))
{
yield return item;
}
}
await block.Completion.ConfigureAwait(false);
}
}
これを使うと以下のようになります。
var transformBlock = new TransformBlock<string, string>(
async s => await HelloAsync(s),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = true
});
transformBlock.Post("Alice");
transformBlock.Post("Bob");
transformBlock.Post("Charlie");
transformBlock.Complete();
var items = await transformBlock.ToAsyncEnumerable().ToArrayAsync();
Console.WriteLine(string.Join("\n", items));
ToArrayAsync
を使うには、System.Linq.Async
パッケージを参照してください。
よくわからないですが使い方によっては方法2だとawaitのところで固まってしまうことがあり、方法1のほうが良さそうな気がしています。
Discussion