C# の非同期プログラミング async/await についての考察
はじめに
筆者はこれまでRust、Go、Java
などの平行プログラミングを学んできましたが、最近、ASP.NET C#
に触れる機会がありましたので、C#の非同期プログラミングについても調べて見ました。
C#の非同期プログラミング
非同期プログラミングとは、WebサーバのようにI/Oバウンドな性質を持つアプリケーションにおいて、CPUリソースを効率良く使用することでアプリ全体のスループットを向上させる並行処理プログラミングです。
非同期プログラミングでは、I/Oバウンドであること、譲り合いの精神cooperativeな記述をすることがとても重要です。つまり、I/O待ちのタイミングでawait
し、CPUを別のタスクに譲る処理をプログラマーが常に意識し、明示的に記述する必要があります。この点で、Go goroutine
はランタイムが暗黙的にCPUをスケジューリングするため楽ではあります。
async/await
C#の非同期プログラミングは、async/await
を使用して実装します。TaskをPromiseに読み替えれば、JavaScriptとほぼ同じなので難しくはないでしょう。
static async Task Main() {
Task<string> task = MyAsync("hoge");
string content = await task
}
static async Task<string> MyAsync(string moji)
{
return moji
}
上記のコードは、無名関数を使用して簡略化することもできます。
static async Task Main() {
string content = await (async (string moji) => moji)("hoge");
}
await
をつけない場合、 Fire and Forget
になります。awaitつけると、それ以降の行は、Taskの値が確定するまで進みません。(その間に別の箇所のコードを処理する)
I/Oバウンド
先ほど示したサンプルは、実際には非同期で記述する必要性はなく、初心者を混乱させるダメなサンプルです。ごめんなさい。
基本的には、ネットワーク呼び出しのようなI/O処理で、CPUが遊んでしまう場合に、非同期で記述するのが鉄則です。
次のコードは、JavaScriptのAjaxのように、外部のAPIを叩くサンプルになります。await client.GetStringAsync()
がJSのfetch()
にあたります。
static async Task Main() {
Task<string> ioTask = DownloadContentAsync("https://postman-echo.com/delay/3");
// この行で、なんか別のタスク(contentが不要)をした方がCPUを効率的に使える。
string content = await ioTask
// これ以降の行で、contentの値が必要な処理を記述する。
}
static async Task<string> DownloadContentAsync(string url)
{
using HttpClient client = new HttpClient();
string content = await client.GetStringAsync(url); // Await releases the thread
return content;
}
client.GetStringAsync()
の呼び出しで、DownloadContentAsync
関数を一旦抜けます。裏ではネットワーク処理をOSが引き継ぎます。
コンパイラーがawait
の前後で関数をぶった斬りますので、DownloadContentAsync_before()
とDownloadContentAsync_after()
の2つの関数になるイメージです。
DownloadContentAsync_before()の戻り値がioTask
にセットされます。Promise
のような不確定な状態がセットされます。
await ioTask
にて、それ以降の行の処理をioTask
の結果が確定する(APIレスポンスが返る)まで停止します。
async/awaitの必要性
初心者にとっての非同期プログラミングの難しさは、なぜawaitで関数を一旦抜ける必要性があるのか でしょう。関数の「色付け」(coloring) と言いますが、同期関数と非同期関数では処理の仕方が違うことを理解する必要があります。
次の2つの処理パターンの違いを考える上で、例えばネットワーク通信が爆速で、CPUが激おそな並行世界があったとしたらどうでしょうか?
ネットワーク呼び出しが1ナノ秒、CPUの処理速度が1クロック1秒というわけです。
static async Task Main()
Task<string> ioTask1 = DownloadContentAsync("https://postman-echo.com/delay/3");
Task<string> ioTask2 = DownloadContentAsync("https://postman-echo.com/delay/3");
string content1 = await ioTask1
string content2 = await ioTask2
}
static async Task Main()
Task<string> ioTask1 = DownloadContentAsync("https://postman-echo.com/delay/3");
string content1 = await ioTask1
Task<string> ioTask2 = DownloadContentAsync("https://postman-echo.com/delay/3");
string content2 = await ioTask2
}
そうしますと、実はどちらのパターンも違いはなくなります。ioTaskはナノ秒で確定しますから、どちらのコードもトータルの実行時間は同じになります。
もう一つのポイントは、ネットワークの処理はカーネルレベルで同時並行に処理が可能ということです。
まとめると以下の2点が非同期処理を理解する上で重要なポイントになります。
- CPUの処理は爆速である一方、ネットワーク呼び出しは劇おそである
- ネットワーク呼び出しはカーネルレベルで同時並行処理される
- ネットワーク呼び出しはスケールする
CPUを使用すると、非同期プログラミングは破綻する
初学者がよくする間違いに、マルチスレッドと混同し、CPUヘビィな実装を非同期プログラミングでしてしまうことが見受けられます。I/O呼び出しと比較した相対的なCPUの速さを活用するわけですが、計算時間が長くなればメリットは少なくなります。
例えば、フィボナッチ数列の計算はCPUサイクルを多く消費します。当然、プログラムはCPU上で逐次的に処理されるため、
Task<long> cpuTask = ComputeFibonacciSync(FibonacciNumber);
Task<string> ioTask = DownloadContentAsync("https://postman-echo.com/delay/3");
await Task.WhenAll(ioTask, cpuTask);
static Task<long> ComputeFibonacciSync(int n)
{
Console.WriteLine("C: Starting CPU-bound computation on the main thread...");
long result = Fibonacci(n); // Heavy computation
Console.WriteLine("C: CPU-bound computation completed.");
return Task.FromResult(result); // Wrap the result in a Task
}
実行すると、逐次的に処理されるのが分かります。ComputeFibonacciSync()
は途中で抜けないため、完全に終了までDownloadContentAsync()
は開始しません。
➜ ConsoleApp1 git:(main) ✗ dotnet run
C: Starting CPU-bound computation on the main thread...
C: CPU-bound computation completed.
I: Starting I/O-bound download...
I: I/O-bound download completed.
I/O-Bound Result: Content length = 13
CPU-Bound Result: Fibonacci42 = 267914296
Total time: 5802 ms
I/O呼び出しと違い、Fibonacci()はCPUに張り付く処理です。そのため途中で関数を抜けるタイミングがないのです。
long result = Fibonacci(n); // Heavy computation
非同期処理は、CPUが遊んでいるときに別のタスクをさせる、というのが大前提です。
スレッドを生成して、オフロードする
非同期プログラミングで、CPUヘビィな処理をする場合は、別のOSスレッドを生成して、メインのOSスレッドを止めないようにするのが定石です。
C#では、Task.Run()
でメイン・スレッドとは別のスレッドを生成することができます。先ほどのフィボナッチ関数を別スレッドで実行します。つまりマルチスレッド・プログラミングと非同期プログラミングのハイブリッドになるわけです。
static Task<long> ComputeFibonacciAsync(int n)
{
return Task.Run(() =>
{
Console.WriteLine("C: Starting CPU-bound computation on thread pool...");
long result = Fibonacci(n); // Heavy computation
Console.WriteLine("C: CPU-bound computation completed.");
return result;
});
}
これにより、ComputeFibonacciAsync()
はフィボナッチの完了を待たずにすぐにリターンすることができます。
キャンセル処理
非同期呼び出しでは同時並行に多くのI/Oリクエストが可能であるため、接続先のデータベースやAPIサーバに負荷がかかりやすいです。したがって、必ずキャンセル処理を入れ長時間リソースを使用しないようにする必要があります。
これを忘れると、データベースの負荷が急上昇したり、アプリケーションのメモリー・リークが発生したり、GCが多発します。
しかしながら、非同期処理のキャンセルは結構難しいです。筆者の経験上、きちんとキャンセル処理を入れているプロジェクトは少ないです。ただし、多くの場合、サーバレスやクライアント側のJSのため、ライフタイムが短く、早期に強制リセットされるため問題にはなりません。
C#では、CancellationTokenSource
を使用してキャンセルします。GetStringAsync()はキャンセル・トークンを引数に取ります。
HTTPクライアントは、内部ではTCP/IP接続、つまりソケットを利用します。コネクション・プールが枯渇したり、サーバ側のリソースを消費したり、502エラーの発生などトラブルの原因になります。
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeoutMilliseconds); // Timeout after specified ms
Task<string> ioTask = DownloadContentAsync("https://postman-echo.com/delay/3", cts.Token);
static async Task<string> DownloadContentAsync(string url, CancellationToken cancellationToken = default)
{
using HttpClient client = new HttpClient();
string content = await client.GetStringAsync(url, cancellationToken); // Await releases the thread
return content;
}
このパターンは、Golangのキャンセル・コンテキストに似ています。呼び出し元で、キャンセル・トークンを作成し、非同期関数を呼び出す際に一緒に渡すことで、キャンセルを伝搬させることができるわけです。
C#では、HTTPリクエストだけでなく、データベースの呼び出しでもキャンセル・トークンを渡せるようになっています。
async Task<List<Credit>> GetCreditsAsync(CancellationToken cancellationToken)
{
return await db.Credits
.Where(c => c.Value >= 0)
.OrderByDescending(c => c.Value)
.ToListAsync(cancellationToken); // Pass the token here
}
CPUのフィボナッチの場合は自分でキャンセルを実装する必要があります。
static long Fibonacci(int n, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested(); // Check at each call
if (n <= 1) return n;
return Fibonacci(n - 1, cancellationToken) + Fibonacci(n - 2, cancellationToken);
}
RAGエージェントなどの場合も途中で処理を中断する必要がある場合に、このキャンセルの仕組みを使用することで、ゾンビ・スレッドを無くすことができます。
ネットワーク呼び出しでエラーが発生したら、フィボナッチを明示的にキャンセルしたい場合は次のように記述します。
ioTask.ContinueWith(t =>
{
if (t.IsCanceled)
{
cpuCts.Cancel();
Console.WriteLine("Canceling CPU task because I/O was canceled.");
}
}, TaskContinuationOptions.OnlyOnCanceled);
task.ContinueWith()
はタスクの終了時に追加で実行したいロジックをアドオンできます。
ここで、ioTask
がキャンセルで終了した場合に、フィボナッチにキャンセルを送信するようにしています。
チャネル
チャネルとはスレッドセーフなキューになります。GolangやRustでもお馴染みですが、
マルチスレッドではスレッド間通信に共有変数が使用できますが、複数のスレッドが同時にメモリに書き込みを行う可能性があるため、排他制御の仕組みが必要になります。
チャネルは、内部で暗黙的に排他制御をしているため、プログラマー自身で排他制御する必要はありません。
以下はよくあるproducer-consumer
のサンプルです。例えば、100万行のテキスト・ファイルから、特定のキーワードを抽出したいとします。その場合、producer側
でファイルを1000行ごとに分割して、チャネルに送り込み、consumer側
でチャンクごとにキーワード抽出をするわけです。このように、膨大なデータを並行して分割処理したい場合によく使用されます。
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
var options = new BoundedChannelOptions(3) // 容量3の制限付きチャネル
{
FullMode = BoundedChannelFullMode.Wait // writer waits when full
};
var channel = Channel.CreateBounded<int>(options);
var writer = channel.Writer;
var reader = channel.Reader;
// プロデューサータスク
Task producer = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await writer.WriteAsync(i);
Console.WriteLine($"Produced: {i}");
await Task.Delay(50);
}
writer.Complete();
});
// コンシューマータスク
Task consumer = Task.Run(async () =>
{
await foreach (var item in reader.ReadAllAsync())
{
Console.WriteLine($"Consumed: {item}");
await Task.Delay(200); // コンシューマーを遅くして満杯を発生させる
}
});
await Task.WhenAll(producer, consumer);
}
}
このサンプル・コードではチャネルの容量に制限をかけています。
例えて言えば、コイン・パーキングに似ています。駐車場(チャネル)が満車状態になると、誰かが出庫(コンシューム)するまで、入庫(プロデューサー)は待ちになります。バック・プレッシャーと言いますが、後ろ側、つまりコンシューマ側が流量の調整をすることになります。
実際に実行してみます。このように、Consumed
でデータが消費され、チャネルに空きが出来るまで、Produced
は進みません。つまり、コンシューマーの処理能力がシステム全体のスループットになります。スロットリングのように、データベースなどへの負荷をコントロールしたい場合に有効な手法です。
Producer trying to write: 0
Produced: 0
Consumed: 0
Producer trying to write: 1
Produced: 1
Producer trying to write: 2
Produced: 2
Producer trying to write: 3
Produced: 3
Producer trying to write: 4
Produced: 4
Consumed: 1
Producer trying to write: 5
Consumed: 2 <- バックプレッシャー。コンシューマがメッセージを消費するまで書き込めない。
Produced: 5
Producer trying to write: 6
Consumed: 3
Produced: 6
もし、駐車場が満車の場合は、入庫待ちを諦めたい場合、BoundedChannelFullMode.DropWrite
を指定します。当然、その場合、コンシューマーはそれらのデータを受け取ることはありません。
Produced: 0
Consumed: 0
Producer trying to write: 1
Produced: 1
Producer trying to write: 2
Produced: 2
Producer trying to write: 3
Produced: 3
Producer trying to write: 4
Produced: 4
Producer trying to write: 5
Produced: 5
Producer trying to write: 6
Produced: 6
Producer trying to write: 7
Produced: 7
Producer trying to write: 8
Produced: 8
Producer trying to write: 9
Produced: 9
Consumed: 1
Consumed: 2
Consumed: 3
ただここで、writer.WriteAsync()
は成功or失敗を返しません。もし、結果を知りたい場合、タイムアウト設定をし、例外で検出する方法を取ります。
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200));
await ch.Writer.WriteAsync(item, cts.Token);
セマフォ
セマフォとは、指定した数以上のスレッドが侵入しないようにするカウンターです。例えれば、人気ショップなどで、店内が混み合わないように入場制限をすることがあります。
先ほどのバックプレッシャー同様に、データベースなどのリソースへの負荷をコントロールするスロットリングとして使用できます。チャネルの場合はスレッド間通信が主な目的でしたが、セマフォの場合は純粋なスロットリング機能のみになります。
SemaphoreSlim(スレッド数)
でセマフォを作成します。
await semaphore.WaitAsync()
でセマフォの取得待ちをします。
タスクが完了したら、忘れずにsemaphore.Release()
で解放します。
Enumerable.Range().Select().ToArray()
で、非同期で同時並行にセマフォの取得を待ちます。
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
// allow 2 tasks in the "critical section" at once
using var semaphore = new SemaphoreSlim(initialCount: 2, maxCount: 2);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); // optional
var tasks = Enumerable.Range(0, 5)
.Select(i => DoWorkAsync(i, semaphore, cts.Token))
.ToArray();
// Propagates exceptions (AggregateException if multiple)
await Task.WhenAll(tasks);
Console.WriteLine("All done.");
}
static async Task DoWorkAsync(int taskId, SemaphoreSlim semaphore, CancellationToken ct)
{
// If you want a timeout instead of waiting forever:
// if (!await semaphore.WaitAsync(TimeSpan.FromSeconds(5), ct)) { /* handle timeout */ return; }
await semaphore.WaitAsync(ct);
try
{
Console.WriteLine($"Task {taskId} entered.");
await Task.Delay(3_000, ct); // Simulate heavy async work (I/O)
}
finally
{
semaphore.Release();
Console.WriteLine($"Task {taskId} released.");
}
}
}
実行しましょう。このように同時に2つのタスクしか飛び込めないことが分かります。
Task 0 entered.
Task 1 entered.
Task 0 released.
Task 2 entered.
Task 3 entered.
Task 1 released.
Task 2 released.
Task 3 released.
Task 4 entered.
Task 4 released.
All done.
以下の書き方では逐次的な処理になってしまいます。Node.jsでもやりがちな間違いです。Select()はC#のLINQと呼ばれるもので、多言語のmap()のようなものです。LINQ自体はコンカレンシーとは無関係です。(別にPLINQもある)
// ❌ Sequential: each task is created and awaited before the next is created
foreach (var id in Enumerable.Range(0, 5))
{
await DoWorkAsync(id, sem, cts.Token); // next iteration waits for previous to finish
}
最後に
いかがでしたでしょうか?こうした並行プログラミングのテクニックは、C#だけでなく、RustやGoなどあらゆる言語で共通のテクニックになります。ランタイムの実装によって、微妙な違いはありますが、横展開できる共通の知識です。
最近は生成AIで簡単に他の言語に移植することが可能になりました。異なる言語で実装を比較することで、より深く理解することができます。皆さんも色々と試してみてください。
Discussion
typo
平行 → 並行