💥

IAsyncEnumerable<T> を DTO のプロパティに置くと EF Core が並列実行エラーになる仕組みと解決方法

に公開

ASP.NET Core では IAsyncEnumerable<T> を返すと JSON ストリーミングができます。
では、DTO のプロパティに IAsyncEnumerable<T> を置いた場合はどうなるでしょうか?

一見動くように見えますが、EF Core の AsAsyncEnumerable() と組み合わせると
複数プロパティで並列実行エラーが発生する
ことがあります。

この現象は「DisposeAsync が呼ばれない」わけではありません。
DisposeAsync が返す ValueTask を “その場で await しない”
という JsonSerializer の挙動が原因です。

この記事では、実際のログを使ってこの挙動を可視化し、
なぜ EF Core が並列実行エラーになるのか、その仕組みを整理します。


1. 最小再現コード

AB の 2 プロパティを持つ DTO を
JsonSerializer.SerializeAsync でシリアライズします。

WithLoggingWithTimeSpan を使って
タイミングの設定とログを出しています。

using System.Text.Json;
using System.IO;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

await JsonSerializer.SerializeAsync(Stream.Null, 
    new { 
        A = GetAsyncEnumerable([1])
            .WithLogging("A"),
        B = GetAsyncEnumerable([1, 2])
            .WithTimeSpan(TimeSpan.FromMilliseconds(10))
            .WithLogging("B"),
    });

static async IAsyncEnumerable<T?> GetAsyncEnumerable<T>(params T?[] args)
{
    foreach(var item in args)
        yield return item;
}

public static class AsyncEnumerableHelpers
{
    public static IAsyncEnumerable<T?> WithTimeSpan<T>(this IAsyncEnumerable<T> source, TimeSpan? timeSpan = null) {
        if (timeSpan is null or {Ticks: <= 0}) return source;
        return Iterable();
        async IAsyncEnumerable<T?> Iterable() {
            await foreach (var item in source)
            {
                await Task.Delay(timeSpan.Value!);
                yield return item;
            }
        }
    }

    public static IAsyncEnumerable<T> WithLogging<T>(this IAsyncEnumerable<T> source, string id, TimeSpan? disposeTime = null) =>
        new LoggingAsyncEnumerable<T>(id, source, disposeTime);

    private sealed class LoggingAsyncEnumerable<T>(string id, IAsyncEnumerable<T> inner, TimeSpan? disposeTime) : IAsyncEnumerable<T>
    {
        public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
            => new Enumerator(id, inner.GetAsyncEnumerator(cancellationToken), disposeTime);

        private sealed class Enumerator(string id, IAsyncEnumerator<T> inner, TimeSpan? disposeTime = null) : IAsyncEnumerator<T>
        {
            public T Current => inner.Current;

            public async ValueTask DisposeAsync()
            {
                await inner.DisposeAsync();
                if (disposeTime is not null and {Ticks: > 0})
                    await Task.Delay(disposeTime.Value);
                Console.WriteLine($"{id}: disposed");
            }

            public async ValueTask<bool> MoveNextAsync()
            {
                bool haveNext = await inner.MoveNextAsync();
                Console.WriteLine(haveNext
                    ? $"{id}: yield return {inner.Current}"
                    : $"{id}: iteration complete");
                return haveNext;
            }
        }
    }
}

https://lab.razor.fyi/#pVXNbttGEEbRAml5aoP00tNU7YEEnIWkxkFkWTIcpUkcJGkBEQ1Qw0DX5JhaeLkr7C7lOALfo4-RVyl676HvkHux_BNJUYKB7mkxOz_ffPOzzr8PHOdXJSNFYxLo-38_SDQTEcxvtcGY-PjekFdairHTkJ_90hS0nmeScwwMk0KTFyhQsaCl4S8U0pCJaJec-FRf67Hj0BvKDFgMc1SMcvYBFamup_pWBO7cKKQxeZtwfgAOAIDAG1jnV3tOYQIv0GTaP4skRkUvObrngwuv0rGHvGNm8VpGEROR2zvteQfV89NdLg5g2OXFZzHOl1S45YU8VzJ-wzhnGgMpQu0O-t6-8E_L8Kk3dpwflopGMYUbqoQlLGTaAoDZfDAaPQFHG2pYANTig7MWzGP_ZNqB_tifukuqaKzBPzm_AKoi7W1FUqiNVGUkZ51hupIKabBwV1QBMxgDE4V5mcwtQx6CQpMokamMndRxlsklZwEUaANOtYYWqpfIl6h0Eahp0JlYnW6bklkw3aE5BS0TFeABlMonYIobTEAknHuQB7WHXYFbPTOdvYNUsPZZcK2P4HgC_dQrE8xdjyvrQnxmij7xNk97SrRRrwHJbLIpKEiHBut55GYjNY03DuxQkWfI6W2VGvmN8gS_r-HbU8D6c-o0b-mdypVXq-jxOxVLG2W7kIX1uoVML6VGK6hKN5lWiOz4FzE6Wt66Kr3XHNkpy1JQbEUNgkbKMSx6dLe3Gr6uNJgQqHZA9-Coy8ZplrCgtKFopLLOWyNtpHJnVATIObXb15fXKCDYkkwgxCuacLPpmmbHFEW3NNZc2xSzdEhH3K0oXpvcTbtsUuuiuuZ0F7dF-vu4rdriqMt0R94F1T7MEqVQGJgUQUghGHep5wOdzZEdMHiWo8j_prsNZh6kabg9kXYl1VO0W0mafDNREVaraWo305Z15xqoucs3QUfYmRRaciTvFDP4mgl0f-ytWZgelXyHvZZVrcS7eTq-lJJP4Y1c4Vt8b-5Gl7WBBc1NYNKgr-Wpm7_Sdpuf7WD7k2_sx3WjTdI2Ic2FWR7kGv83DGb_DDt1EMh4ydFgd_AtUYG8JGT_drenKKNcoVIsxHIz-3KeXVy7giGHBecXvdxh6qT3v3lEBgPSf_gTGR4ePh6R4fC7b0ekT_oPlwpXDG_I8PGgf0gGvXtPSJ88Goxefa4S8ftXX_7z6c-_PkZff_HHZ_8B


2. 実行結果

A: yield return 1
A: iteration complete
B: yield return 1
A: disposed
B: yield return 2
B: iteration complete
B: disposed
  • A の DisposeAsync が B の 2 回目の MoveNextAsync に「間に合っている」
  • これは B の MoveNextAsync に 10ms の遅延を入れたため
  • → DisposeAsync の完了タイミングが 後続の処理速度に依存

つまり:

DisposeAsync は呼ばれているが、その場で await されていない。


3. JsonSerializer の内部挙動

3.1 プロパティの IAsyncEnumerable<T> は「1つの JSON 値」

内部的には flush が走るが、
DisposeAsync の await タイミングは遅延する。

3.2 DisposeAsync は呼ばれるが、その場で await されない

  • DisposeAsync の ValueTask は 即時 await されない
  • JSON 全体の書き込み完了後にまとめて await される
  • → 完了タイミングが非決定的になる

4. EF Core が並列実行エラーを起こす理由

EF Core の AsAsyncEnumerable() は:

  • 列挙中は接続を保持
  • DisposeAsync 完了まで接続を解放しない

JsonSerializer が DisposeAsync を待たないため:

  1. A の列挙が終わる
  2. DisposeAsync が呼ばれるが完了を待たない
  3. A の接続がまだ開いている
  4. B の列挙が始まる
  5. 同一接続の並列利用として EF Core が例外を投げる

5. 対策:DisposeAsync を即時 await するラッパー

public static class QueryableToAsyncEnumerableExtensions {
    public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
        this IQueryable<T> self,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var item in self.AsAsyncEnumerable().WithCancellation(cancellationToken))
            yield return item;
    }
}

これで:

  • DisposeAsync が列挙直後に await される
  • 接続が即時解放される
  • JsonSerializer の遅延に依存しない

6. まとめ

  • JsonSerializer は DisposeAsync を呼ぶが、その場で await しない
  • そのため DisposeAsync の完了タイミングが非決定的
  • EF Core は同一接続の並列利用を許容しない
  • → 並列実行エラーが発生する
  • await foreach ラッパーで安全に扱える

7. 蛇足

EF Core の設定によっては
「次の接続前に前の接続を閉じる」動作になる場合もあり、
必ず再現するとは限りません。


8. 参考

Discussion