⚠️

Azure Functions の自動スケールが引き起こす Rate Limit Error とその対策

に公開

1. はじめに

RAGシステムの検索用データを生成するために、大量のPDFをGPTモデルでマークダウンに変換するパイプラインを構築した。しかし、Azure Functions の自動スケールによりGPTへのリクエストが集中し、Rate Limit Error が頻発した。

本記事では、Durable Functions を使って処理の並列度を制御し、Rate Limit Error を回避した方法を紹介する。

2. 構築したデータパイプライン

以下のパイプラインでPDFからマークダウンを生成していた。

処理の流れ

  1. Blob Storage にPDFファイルをアップロード
  2. Event Grid がアップロードを検知し、Azure Functions をトリガー
  3. Azure Functions 上で GPT モデルを呼び出し、PDFをマークダウンに変換

3. 発生した問題

複数のPDFを同時にアップロードすると、Rate Limit Error(HTTP 429) が頻発した。

原因

Azure Functions(Flex Consumption)は負荷に応じて自動スケールする。そのため、複数ファイルをアップロードすると以下のような状態になる。

さらに、PDFをそのまま1ファイルとしてGPTに渡すと、ページ数が多い場合にマークダウンの変換精度が落ちた。そのためPDFをチャンク(数ページ単位)に分割してからGPTに送る必要があり、PDFの同時処理に加えて、分割したチャンクの同時リクエスト数も制御する必要が出てきた。

解決に必要なこと

Rate Limit を回避するには、1分あたりのリクエスト数(RPM)とトークン数(TPM) を制御する必要がある。つまり、処理の並列度を適切に制限しなければならない。

4. 解決策

今回のシステムは検索用データの生成が目的であり、リアルタイム性は求められなかった。そのため、処理速度よりも確実にデータを生成することを優先した。

4.1 Durable Functions でPDFの同時処理を防ぐ

通常の Azure Functions では、複数のトリガーが同時に処理されるため、並列度の制御が難しい。

Durable Functions では、オーケストレーター関数が全体の処理フローを一元管理でき、処理の粒度ごとに並列度を変えられる。

Durable Functions を導入した後

オーケストレーターをシングルトンにすることで、複数のPDFが同時にアップロードされても、PDFを1つずつ順次処理できる。これにより、複数PDFの同時処理による Rate Limit Error を防いだ。

シングルトンの実装方法

シングルトンパターンの実装には、インスタンスIDの固定に加えて、Durable Entity をキューとして使う。PDF情報をEntityに追加してからオーケストレーターを起動し、オーケストレーターはEntityからPDFを1つずつ取り出して処理する。

スターター関数(Event Grid トリガー)

instance_id = "pdf-processor-singleton"

async def event_grid_trigger(client, doc_info: dict):
    # Entity にドキュメント情報を追加(キューイング)
    entity_id = df.EntityId("pdf_queue_entity", "singleton")
    await client.signal_entity(entity_id, "add", doc_info)

    # オーケストレーターが既に動いていればキューに追加済みなので何もしない
    existing = await client.get_status(instance_id)
    if existing and existing.runtime_status in [
        df.OrchestrationRuntimeStatus.Running,
        df.OrchestrationRuntimeStatus.Pending,
    ]:
        return

    # 動いていなければオーケストレーターを起動
    await client.start_new("orchestrator_main", instance_id, {})

オーケストレーター(Entity からキューを取得)

def orchestrator_main(context):
    entity_id = df.EntityId("pdf_queue_entity", "singleton")

    while True:
        # Entity からキューの次のPDFを取得
        pdf_info = yield context.call_entity(entity_id, "get_next")

        if pdf_info is None:
            break  # キューが空なら終了

        # PDF を処理
        yield context.call_activity("process_pdf", pdf_info)

4.2 ファンアウト/ファンインでチャンクの同時リクエストを制限する

1つのPDFを複数チャンクに分割した後、GPTへのリクエストを一度に全て送るのではなく、バッチサイズ(同時数件)を指定して少しずつ処理するようにした。

# オーケストレーター内でファンアウト/ファンイン
for i in range(0, len(chunks), BATCH_SIZE):
    batch = chunks[i : i + BATCH_SIZE]
    tasks = [context.call_activity("convert_chunk", chunk) for chunk in batch]
    results = yield context.task_all(tasks)  # バッチ分だけ並列実行して待機

5. 検証結果

Durable Functions 導入後、Rate Limit Error は解消された。

項目 Before After
PDF処理 複数同時 シングルトンで1つずつ
チャンク処理 全件同時 ファンアウト/ファンインでバッチサイズ(数件)ずつ並列処理
Rate Limit Error 頻発 発生なし

6. まとめ

Azure Functions で GPT API を呼び出すと、自動スケールにより同時リクエストが増えすぎて Rate Limit Error が発生した。以下の2点で解決した。

  1. Durable Functions のシングルトン: PDFを1つずつ順次処理
  2. Durable Functions のファンアウト/ファンイン: チャンクをバッチサイズ(数件)ずつ並列処理
ヘッドウォータース

Discussion