Azure Functions の自動スケールが引き起こす Rate Limit Error とその対策
1. はじめに
RAGシステムの検索用データを生成するために、大量のPDFをGPTモデルでマークダウンに変換するパイプラインを構築した。しかし、Azure Functions の自動スケールによりGPTへのリクエストが集中し、Rate Limit Error が頻発した。
本記事では、Durable Functions を使って処理の並列度を制御し、Rate Limit Error を回避した方法を紹介する。
2. 構築したデータパイプライン
以下のパイプラインでPDFからマークダウンを生成していた。
処理の流れ
- Blob Storage にPDFファイルをアップロード
- Event Grid がアップロードを検知し、Azure Functions をトリガー
- Azure Functions 上で GPT モデルを呼び出し、PDFをマークダウンに変換
3. 発生した問題
複数のPDFを同時にアップロードすると、Rate Limit Error(HTTP 429) が頻発した。
原因
Azure Functions(Flex Consumption)は負荷に応じて自動スケールする。そのため、複数ファイルをアップロードすると以下のような状態になる。
さらに、PDFをそのまま1ファイルとしてGPTに渡すと、ページ数が多い場合にマークダウンの変換精度が落ちた。そのためPDFをチャンク(数ページ単位)に分割してからGPTに送る必要があり、PDFの同時処理に加えて、分割したチャンクの同時リクエスト数も制御する必要が出てきた。
解決に必要なこと
Rate Limit を回避するには、1分あたりのリクエスト数(RPM)とトークン数(TPM) を制御する必要がある。つまり、処理の並列度を適切に制限しなければならない。
4. 解決策
今回のシステムは検索用データの生成が目的であり、リアルタイム性は求められなかった。そのため、処理速度よりも確実にデータを生成することを優先した。
4.1 Durable Functions でPDFの同時処理を防ぐ
通常の Azure Functions では、複数のトリガーが同時に処理されるため、並列度の制御が難しい。
Durable Functions では、オーケストレーター関数が全体の処理フローを一元管理でき、処理の粒度ごとに並列度を変えられる。
Durable Functions を導入した後
オーケストレーターをシングルトンにすることで、複数のPDFが同時にアップロードされても、PDFを1つずつ順次処理できる。これにより、複数PDFの同時処理による Rate Limit Error を防いだ。
シングルトンの実装方法
シングルトンパターンの実装には、インスタンスIDの固定に加えて、Durable Entity をキューとして使う。PDF情報をEntityに追加してからオーケストレーターを起動し、オーケストレーターはEntityからPDFを1つずつ取り出して処理する。
スターター関数(Event Grid トリガー)
instance_id = "pdf-processor-singleton"
async def event_grid_trigger(client, doc_info: dict):
# Entity にドキュメント情報を追加(キューイング)
entity_id = df.EntityId("pdf_queue_entity", "singleton")
await client.signal_entity(entity_id, "add", doc_info)
# オーケストレーターが既に動いていればキューに追加済みなので何もしない
existing = await client.get_status(instance_id)
if existing and existing.runtime_status in [
df.OrchestrationRuntimeStatus.Running,
df.OrchestrationRuntimeStatus.Pending,
]:
return
# 動いていなければオーケストレーターを起動
await client.start_new("orchestrator_main", instance_id, {})
オーケストレーター(Entity からキューを取得)
def orchestrator_main(context):
entity_id = df.EntityId("pdf_queue_entity", "singleton")
while True:
# Entity からキューの次のPDFを取得
pdf_info = yield context.call_entity(entity_id, "get_next")
if pdf_info is None:
break # キューが空なら終了
# PDF を処理
yield context.call_activity("process_pdf", pdf_info)
4.2 ファンアウト/ファンインでチャンクの同時リクエストを制限する
1つのPDFを複数チャンクに分割した後、GPTへのリクエストを一度に全て送るのではなく、バッチサイズ(同時数件)を指定して少しずつ処理するようにした。
# オーケストレーター内でファンアウト/ファンイン
for i in range(0, len(chunks), BATCH_SIZE):
batch = chunks[i : i + BATCH_SIZE]
tasks = [context.call_activity("convert_chunk", chunk) for chunk in batch]
results = yield context.task_all(tasks) # バッチ分だけ並列実行して待機
5. 検証結果
Durable Functions 導入後、Rate Limit Error は解消された。
| 項目 | Before | After |
|---|---|---|
| PDF処理 | 複数同時 | シングルトンで1つずつ |
| チャンク処理 | 全件同時 | ファンアウト/ファンインでバッチサイズ(数件)ずつ並列処理 |
| Rate Limit Error | 頻発 | 発生なし |
6. まとめ
Azure Functions で GPT API を呼び出すと、自動スケールにより同時リクエストが増えすぎて Rate Limit Error が発生した。以下の2点で解決した。
- Durable Functions のシングルトン: PDFを1つずつ順次処理
- Durable Functions のファンアウト/ファンイン: チャンクをバッチサイズ(数件)ずつ並列処理
Discussion