🐙

C# の非同期プログラミング async/await についての考察

に公開1

はじめに

筆者はこれまでRust、Go、Javaなどの平行プログラミングを学んできましたが、最近、ASP.NET C#に触れる機会がありましたので、C#の非同期プログラミングについても調べて見ました。

C#の非同期プログラミング

非同期プログラミングとは、WebサーバのようにI/Oバウンドな性質を持つアプリケーションにおいて、CPUリソースを効率良く使用することでアプリ全体のスループットを向上させる並行処理プログラミングです。

非同期プログラミングでは、I/Oバウンドであること、譲り合いの精神cooperativeな記述をすることがとても重要です。つまり、I/O待ちのタイミングでawaitし、CPUを別のタスクに譲る処理をプログラマーが常に意識し、明示的に記述する必要があります。この点で、Go goroutineはランタイムが暗黙的にCPUをスケジューリングするため楽ではあります。

async/await

C#の非同期プログラミングは、async/awaitを使用して実装します。TaskをPromiseに読み替えれば、JavaScriptとほぼ同じなので難しくはないでしょう。

非同期プログラミング
static async Task Main() {
  Task<string> task = MyAsync("hoge");
  string content = await task
}

static async Task<string> MyAsync(string moji)
{
    return moji
}

上記のコードは、無名関数を使用して簡略化することもできます。

簡略化
static async Task Main() {
    string content = await (async (string moji) => moji)("hoge");
}

awaitをつけない場合、 Fire and Forgetになります。awaitつけると、それ以降の行は、Taskの値が確定するまで進みません。(その間に別の箇所のコードを処理する)

I/Oバウンド

先ほど示したサンプルは、実際には非同期で記述する必要性はなく、初心者を混乱させるダメなサンプルです。ごめんなさい。

基本的には、ネットワーク呼び出しのようなI/O処理で、CPUが遊んでしまう場合に、非同期で記述するのが鉄則です。

次のコードは、JavaScriptのAjaxのように、外部のAPIを叩くサンプルになります。await client.GetStringAsync()がJSのfetch()にあたります。

GetStringAsync
static async Task Main() {
  Task<string> ioTask = DownloadContentAsync("https://postman-echo.com/delay/3");
  // この行で、なんか別のタスク(contentが不要)をした方がCPUを効率的に使える。
  string content = await ioTask
  // これ以降の行で、contentの値が必要な処理を記述する。
}

static async Task<string> DownloadContentAsync(string url)
{
    using HttpClient client = new HttpClient();
    string content = await client.GetStringAsync(url); // Await releases the thread
    return content;
}

client.GetStringAsync()の呼び出しで、DownloadContentAsync関数を一旦抜けます。裏ではネットワーク処理をOSが引き継ぎます。

コンパイラーがawaitの前後で関数をぶった斬りますので、DownloadContentAsync_before()DownloadContentAsync_after()の2つの関数になるイメージです。

DownloadContentAsync_before()の戻り値がioTaskにセットされます。Promiseのような不確定な状態がセットされます。

await ioTaskにて、それ以降の行の処理をioTaskの結果が確定する(APIレスポンスが返る)まで停止します。

async/awaitの必要性

初心者にとっての非同期プログラミングの難しさは、なぜawaitで関数を一旦抜ける必要性があるのか でしょう。関数の「色付け」(coloring) と言いますが、同期関数と非同期関数では処理の仕方が違うことを理解する必要があります。

次の2つの処理パターンの違いを考える上で、例えばネットワーク通信が爆速で、CPUが激おそな並行世界があったとしたらどうでしょうか?

ネットワーク呼び出しが1ナノ秒、CPUの処理速度が1クロック1秒というわけです。

パターン1
static async Task Main()
  Task<string> ioTask1 = DownloadContentAsync("https://postman-echo.com/delay/3");
  Task<string> ioTask2 = DownloadContentAsync("https://postman-echo.com/delay/3");

  string content1 = await ioTask1
  string content2 = await ioTask2
}
パターン2
static async Task Main()
  Task<string> ioTask1 = DownloadContentAsync("https://postman-echo.com/delay/3");
  string content1 = await ioTask1

  Task<string> ioTask2 = DownloadContentAsync("https://postman-echo.com/delay/3");
  string content2 = await ioTask2
}

そうしますと、実はどちらのパターンも違いはなくなります。ioTaskはナノ秒で確定しますから、どちらのコードもトータルの実行時間は同じになります。

もう一つのポイントは、ネットワークの処理はカーネルレベルで同時並行に処理が可能ということです。

まとめると以下の2点が非同期処理を理解する上で重要なポイントになります。

  1. CPUの処理は爆速である一方、ネットワーク呼び出しは劇おそである
  2. ネットワーク呼び出しはカーネルレベルで同時並行処理される
  3. ネットワーク呼び出しはスケールする

CPUを使用すると、非同期プログラミングは破綻する

初学者がよくする間違いに、マルチスレッドと混同し、CPUヘビィな実装を非同期プログラミングでしてしまうことが見受けられます。I/O呼び出しと比較した相対的なCPUの速さを活用するわけですが、計算時間が長くなればメリットは少なくなります。

例えば、フィボナッチ数列の計算はCPUサイクルを多く消費します。当然、プログラムはCPU上で逐次的に処理されるため、

s
Task<long> cpuTask = ComputeFibonacciSync(FibonacciNumber);
Task<string> ioTask = DownloadContentAsync("https://postman-echo.com/delay/3");
await Task.WhenAll(ioTask, cpuTask);

static Task<long> ComputeFibonacciSync(int n)
{
    Console.WriteLine("C: Starting CPU-bound computation on the main thread...");
    long result = Fibonacci(n); // Heavy computation
    Console.WriteLine("C: CPU-bound computation completed.");
    return Task.FromResult(result); // Wrap the result in a Task
}

実行すると、逐次的に処理されるのが分かります。ComputeFibonacciSync()は途中で抜けないため、完全に終了までDownloadContentAsync()は開始しません。

➜  ConsoleApp1 git:(main) ✗ dotnet run
C: Starting CPU-bound computation on the main thread...
C: CPU-bound computation completed.
I: Starting I/O-bound download...
I: I/O-bound download completed.

I/O-Bound Result: Content length = 13
CPU-Bound Result: Fibonacci42 = 267914296
Total time: 5802 ms

I/O呼び出しと違い、Fibonacci()はCPUに張り付く処理です。そのため途中で関数を抜けるタイミングがないのです。

Fibonacci
long result = Fibonacci(n); // Heavy computation

非同期処理は、CPUが遊んでいるときに別のタスクをさせる、というのが大前提です。

スレッドを生成して、オフロードする

非同期プログラミングで、CPUヘビィな処理をする場合は、別のOSスレッドを生成して、メインのOSスレッドを止めないようにするのが定石です。

C#では、Task.Run()でメイン・スレッドとは別のスレッドを生成することができます。先ほどのフィボナッチ関数を別スレッドで実行します。つまりマルチスレッド・プログラミングと非同期プログラミングのハイブリッドになるわけです。

s
static Task<long> ComputeFibonacciAsync(int n)
{
    return Task.Run(() =>
    {
        Console.WriteLine("C: Starting CPU-bound computation on thread pool...");
        long result = Fibonacci(n); // Heavy computation
        Console.WriteLine("C: CPU-bound computation completed.");
        return result;
    });
}

これにより、ComputeFibonacciAsync()はフィボナッチの完了を待たずにすぐにリターンすることができます。

キャンセル処理

非同期呼び出しでは同時並行に多くのI/Oリクエストが可能であるため、接続先のデータベースやAPIサーバに負荷がかかりやすいです。したがって、必ずキャンセル処理を入れ長時間リソースを使用しないようにする必要があります。
これを忘れると、データベースの負荷が急上昇したり、アプリケーションのメモリー・リークが発生したり、GCが多発します。
しかしながら、非同期処理のキャンセルは結構難しいです。筆者の経験上、きちんとキャンセル処理を入れているプロジェクトは少ないです。ただし、多くの場合、サーバレスやクライアント側のJSのため、ライフタイムが短く、早期に強制リセットされるため問題にはなりません。

C#では、CancellationTokenSourceを使用してキャンセルします。GetStringAsync()はキャンセル・トークンを引数に取ります。

HTTPクライアントは、内部ではTCP/IP接続、つまりソケットを利用します。コネクション・プールが枯渇したり、サーバ側のリソースを消費したり、502エラーの発生などトラブルの原因になります。

s
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeoutMilliseconds); // Timeout after specified ms
Task<string> ioTask = DownloadContentAsync("https://postman-echo.com/delay/3", cts.Token);

static async Task<string> DownloadContentAsync(string url, CancellationToken cancellationToken = default)
{
    using HttpClient client = new HttpClient();
    string content = await client.GetStringAsync(url, cancellationToken); // Await releases the thread
    return content;
}

このパターンは、Golangのキャンセル・コンテキストに似ています。呼び出し元で、キャンセル・トークンを作成し、非同期関数を呼び出す際に一緒に渡すことで、キャンセルを伝搬させることができるわけです。

C#では、HTTPリクエストだけでなく、データベースの呼び出しでもキャンセル・トークンを渡せるようになっています。

EntityFrameworkでのキャンセル処理
async Task<List<Credit>> GetCreditsAsync(CancellationToken cancellationToken)
{
    return await db.Credits
        .Where(c => c.Value >= 0)
        .OrderByDescending(c => c.Value)
        .ToListAsync(cancellationToken); // Pass the token here
}

CPUのフィボナッチの場合は自分でキャンセルを実装する必要があります。

フィボナッチでのキャンセル処理
static long Fibonacci(int n, CancellationToken cancellationToken)
{
    cancellationToken.ThrowIfCancellationRequested(); // Check at each call
    if (n <= 1) return n;
    return Fibonacci(n - 1, cancellationToken) + Fibonacci(n - 2, cancellationToken);
}

RAGエージェントなどの場合も途中で処理を中断する必要がある場合に、このキャンセルの仕組みを使用することで、ゾンビ・スレッドを無くすことができます。

ネットワーク呼び出しでエラーが発生したら、フィボナッチを明示的にキャンセルしたい場合は次のように記述します。

明示的なキャンセル
ioTask.ContinueWith(t =>
{
    if (t.IsCanceled)
    {
        cpuCts.Cancel();
        Console.WriteLine("Canceling CPU task because I/O was canceled.");
    }
}, TaskContinuationOptions.OnlyOnCanceled);

task.ContinueWith()はタスクの終了時に追加で実行したいロジックをアドオンできます。
ここで、ioTaskがキャンセルで終了した場合に、フィボナッチにキャンセルを送信するようにしています。

チャネル

チャネルとはスレッドセーフなキューになります。GolangやRustでもお馴染みですが、
マルチスレッドではスレッド間通信に共有変数が使用できますが、複数のスレッドが同時にメモリに書き込みを行う可能性があるため、排他制御の仕組みが必要になります。

チャネルは、内部で暗黙的に排他制御をしているため、プログラマー自身で排他制御する必要はありません。

以下はよくあるproducer-consumerのサンプルです。例えば、100万行のテキスト・ファイルから、特定のキーワードを抽出したいとします。その場合、producer側でファイルを1000行ごとに分割して、チャネルに送り込み、consumer側でチャンクごとにキーワード抽出をするわけです。このように、膨大なデータを並行して分割処理したい場合によく使用されます。

チャネル
using System;
using System.Threading.Channels;
using System.Threading.Tasks;

class Program
{
    static async Task Main(string[] args)
    {
        var options = new BoundedChannelOptions(3)  // 容量3の制限付きチャネル
        {
            FullMode = BoundedChannelFullMode.Wait // writer waits when full
        };
        var channel = Channel.CreateBounded<int>(options);
        var writer = channel.Writer;
        var reader = channel.Reader;

        // プロデューサータスク
        Task producer = Task.Run(async () =>
        {
            for (int i = 0; i < 10; i++)
            {
                await writer.WriteAsync(i);
                Console.WriteLine($"Produced: {i}");
                await Task.Delay(50);
            }
            writer.Complete();
        });

        // コンシューマータスク
        Task consumer = Task.Run(async () =>
        {
            await foreach (var item in reader.ReadAllAsync())
            {
                Console.WriteLine($"Consumed: {item}");
                await Task.Delay(200);  // コンシューマーを遅くして満杯を発生させる
            }
        });

        await Task.WhenAll(producer, consumer);
    }
}

このサンプル・コードではチャネルの容量に制限をかけています。

例えて言えば、コイン・パーキングに似ています。駐車場(チャネル)が満車状態になると、誰かが出庫(コンシューム)するまで、入庫(プロデューサー)は待ちになります。バック・プレッシャーと言いますが、後ろ側、つまりコンシューマ側が流量の調整をすることになります。

実際に実行してみます。このように、Consumedでデータが消費され、チャネルに空きが出来るまで、Producedは進みません。つまり、コンシューマーの処理能力がシステム全体のスループットになります。スロットリングのように、データベースなどへの負荷をコントロールしたい場合に有効な手法です。

実行結果
Producer trying to write: 0
Produced: 0
Consumed: 0
Producer trying to write: 1
Produced: 1
Producer trying to write: 2
Produced: 2
Producer trying to write: 3
Produced: 3
Producer trying to write: 4
Produced: 4
Consumed: 1
Producer trying to write: 5
Consumed: 2 <- バックプレッシャー。コンシューマがメッセージを消費するまで書き込めない。
Produced: 5
Producer trying to write: 6
Consumed: 3
Produced: 6

もし、駐車場が満車の場合は、入庫待ちを諦めたい場合、BoundedChannelFullMode.DropWriteを指定します。当然、その場合、コンシューマーはそれらのデータを受け取ることはありません。

BoundedChannelFullMode.DropWrite
Produced: 0
Consumed: 0
Producer trying to write: 1
Produced: 1
Producer trying to write: 2
Produced: 2
Producer trying to write: 3
Produced: 3
Producer trying to write: 4
Produced: 4
Producer trying to write: 5
Produced: 5
Producer trying to write: 6
Produced: 6
Producer trying to write: 7
Produced: 7
Producer trying to write: 8
Produced: 8
Producer trying to write: 9
Produced: 9
Consumed: 1
Consumed: 2
Consumed: 3

ただここで、writer.WriteAsync()は成功or失敗を返しません。もし、結果を知りたい場合、タイムアウト設定をし、例外で検出する方法を取ります。

タイムアウト
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200));
await ch.Writer.WriteAsync(item, cts.Token);

セマフォ

セマフォとは、指定した数以上のスレッドが侵入しないようにするカウンターです。例えれば、人気ショップなどで、店内が混み合わないように入場制限をすることがあります。

先ほどのバックプレッシャー同様に、データベースなどのリソースへの負荷をコントロールするスロットリングとして使用できます。チャネルの場合はスレッド間通信が主な目的でしたが、セマフォの場合は純粋なスロットリング機能のみになります。

SemaphoreSlim(スレッド数)でセマフォを作成します。

await semaphore.WaitAsync()でセマフォの取得待ちをします。

タスクが完了したら、忘れずにsemaphore.Release()で解放します。

Enumerable.Range().Select().ToArray()で、非同期で同時並行にセマフォの取得を待ちます。

セマフォ
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        // allow 2 tasks in the "critical section" at once
        using var semaphore = new SemaphoreSlim(initialCount: 2, maxCount: 2);
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); // optional

        var tasks = Enumerable.Range(0, 5)
            .Select(i => DoWorkAsync(i, semaphore, cts.Token))
            .ToArray();

        // Propagates exceptions (AggregateException if multiple)
        await Task.WhenAll(tasks);
        Console.WriteLine("All done.");
    }

    static async Task DoWorkAsync(int taskId, SemaphoreSlim semaphore, CancellationToken ct)
    {
        // If you want a timeout instead of waiting forever:
        // if (!await semaphore.WaitAsync(TimeSpan.FromSeconds(5), ct)) { /* handle timeout */ return; }

        await semaphore.WaitAsync(ct);
        try
        {
            Console.WriteLine($"Task {taskId} entered.");
            await Task.Delay(3_000, ct); // Simulate heavy async work (I/O)
        }
        finally
        {
            semaphore.Release();
            Console.WriteLine($"Task {taskId} released.");
        }
    }
}

実行しましょう。このように同時に2つのタスクしか飛び込めないことが分かります。

実行結果
Task 0 entered.
Task 1 entered.
Task 0 released.
Task 2 entered.
Task 3 entered.
Task 1 released.
Task 2 released.
Task 3 released.
Task 4 entered.
Task 4 released.
All done.

以下の書き方では逐次的な処理になってしまいます。Node.jsでもやりがちな間違いです。Select()はC#のLINQと呼ばれるもので、多言語のmap()のようなものです。LINQ自体はコンカレンシーとは無関係です。(別にPLINQもある)

NGなやり方
// ❌ Sequential: each task is created and awaited before the next is created
foreach (var id in Enumerable.Range(0, 5))
{
    await DoWorkAsync(id, sem, cts.Token); // next iteration waits for previous to finish
}

最後に

いかがでしたでしょうか?こうした並行プログラミングのテクニックは、C#だけでなく、RustやGoなどあらゆる言語で共通のテクニックになります。ランタイムの実装によって、微妙な違いはありますが、横展開できる共通の知識です。

最近は生成AIで簡単に他の言語に移植することが可能になりました。異なる言語で実装を比較することで、より深く理解することができます。皆さんも色々と試してみてください。

Discussion

kanaruskanarus

typo

はじめに

筆者はこれまでRust、Go、Javaなどの平行プログラミングを

平行 → 並行