👷

Producer-Consumerパターンで解決する asyncio.create_task の課題

に公開

はじめに

こんにちは、KENCOPA で SRE をしている moko-poi です。

KENCOPA では、建設業向けの AI プロダクト「工程AIエージェント」を開発しています。設計図書の解析や工程表の自動生成など、長時間かかる AI 処理を提供しています。

これまで FastAPI の asyncio.create_task を使って非同期処理を実装していましたが、以下のような課題に直面していました。

  • API サーバーが再起動すると処理中のタスクが失われる
  • サーバーリソースが長時間占有され、スケーリングが困難
  • エラー時のリトライ機構を自前で実装する必要がある

これらの課題を解決するため、非同期処理基盤を AWS SQS ベースのアーキテクチャに移行することを決定しました。

本記事では、移行を決定した背景、アーキテクチャ設計、そして実装における判断基準について共有します。

工程表生成の具体的な流れ

非同期処理の課題に入る前に、工程AIエージェントのシステム全体像と、実際にどのように工程表が生成されるのかを説明します。

システムアーキテクチャ全体像

工程AIエージェントは、以下のコンポーネントで構成されています。

工程AIエージェントのシステムアーキテクチャ図

工程AIエージェントは、フロントエンドでユーザーインターフェースと WebSocket 接続を提供し、FastAPI による REST API サーバーで非同期処理を実行しています。データストアとしては、Aurora PostgreSQL でマスターデータや結果データを永続化し、DynamoDB でリアルタイム進捗管理を行い、S3 でファイルを保存しています。

進捗管理に DynamoDB を使用しているのは、数秒ごとの高頻度な更新に最適化されており、低レイテンシでの読み取りが可能なためです。また、TTL 機能により処理完了後 24 時間で自動的にデータがクリーンアップされるため、運用負荷を削減できます。

Next.js サーバー

Next.js サーバーは、初回アクセス時の HTML 配信(Server Side Rendering)、クライアントサイド JavaScript のバンドル配信、ルーティングとページ遷移の管理を担当します。

ブラウザ(クライアントサイド)

ブラウザ上で動作するフロントエンドアプリケーションは、ユーザーインターフェースの提供と、presigned URL を使った S3 への直接ファイルアップロードを行います。また、API サーバーへの HTTP リクエスト送信と WebSocket 接続による進捗受信を通じて、処理状況をリアルタイムに表示します。

API サーバー (FastAPI)

バックエンド API サーバーとして、REST API エンドポイントの提供と非同期処理の起動(現状は asyncio.create_task)を行います。進捗通知用の WebSocket サーバーを提供し、RDB へのデータ保存、DynamoDB への進捗状態管理、S3 からのファイル取得・保存を担当します。

Aurora PostgreSQL

メインデータベースとして、マスターデータ(サイト情報、プロジェクト情報)、AI 処理の結果データ(生成された工程表など)、認証・認可情報(ユーザー情報)を永続化します。

DynamoDB (Progress Table)

リアルタイム進捗管理専用のデータストアとして使用しています。テーブル構造は、id(Hash Key)と createdAt(Range Key)を主キーとし、request_id と progress_key を Global Secondary Index(RequestProgressIndex)として持ちます。progress_data には進捗情報(message や progress を含む JSON)を格納し、TTL により処理完了後 24 時間で自動削除されます。

S3

ファイルストレージとして、アップロードされた PDF ファイル(設計図書)、AI 処理の中間ファイル(抽出されたテキストなど)、生成された Excel ファイル(工程表の出力)を保存します。

工程表生成の実際のフロー

ユーザーが設計図書をアップロードしてから工程表が生成されるまでの流れを説明します。

まず、ユーザーがブラウザで PDF を選択すると、presigned URL を使って S3 に直接アップロードされます。その後、API サーバーに POST リクエストが送信され、202 Accepted が即座に返却されます。

API サーバーは asyncio.create_task で非同期処理を起動し、DynamoDB に進捗レコードを作成します(progress: 0%)。同時に、ブラウザは WebSocket 接続を確立し、API サーバーが 1 秒間隔で DynamoDB をポーリングして、新しい進捗データを WebSocket 経由でブラウザに送信します。WebSocket を使用することで、HTTP ポーリングよりも効率的でリアルタイムな進捗更新が可能になります。

バックグラウンドでは AI 処理(ファイル取得 → データ抽出・統合 → 保存)が実行され、各ステップで DynamoDB の進捗が更新されます。ブラウザは WebSocket 経由でリアルタイムに受信し、UI に進捗バーを表示します。

処理が完了すると、生成された結果データが Aurora PostgreSQL に保存され、DynamoDB の進捗が完了状態(progress: 100%)に更新されます。最後に、WebSocket 経由でブラウザに完了通知が送信されます。

現状の課題:再起動時にタスクが消失

このフローにおいて、API サーバーが再起動すると処理が失われる問題があります。

問題のシナリオ

  1. AI 処理が 40% まで進行
  2. API サーバーがデプロイで再起動
  3. asyncio.create_task で起動したタスクが消失
  4. DynamoDB の進捗は 40% のまま更新されない
  5. ユーザーから見ると「40% で止まった」状態

この課題を解決するために、SQS ベースの非同期処理への移行を決定しました。

なぜ最初は asyncio.create_task で良かったのか

初期フェーズでは、asyncio.create_task を使った非同期処理は最適な選択でした。

素早い価値提供を優先

シンプルな実装: FastAPI の標準機能だけで非同期処理を実現でき、追加のインフラやライブラリが不要でした。外部キューサーバーの構築・運用が不要で、開発スピードを最大化できました。

MVP検証に十分: 初期ユーザー数が限定的(数十件/日)だったため、API サーバーのリソースで処理が完結しました。デプロイ頻度も低く(月1-2回)、再起動によるタスク喪失のリスクも限定的でした。

学習コストの低さ: チームメンバー全員が Python/FastAPI に慣れており、新しい技術スタックを学ぶ必要がありませんでした。

このアプローチにより、プロダクトの価値検証を最速で行うことができ、ユーザーフィードバックを得ながら機能を磨き込めました。

本番運用で顕在化した課題

しかし、実運用フェーズに入り以下の変化が起きました

  • ユーザー数の増加: 数十件/日 → 数百件/日
  • デプロイ頻度の増加: 月1-2回 → 週1-2回
  • 処理の長時間化: 初期は1-3分 → 5-15分
  • 信頼性への期待値上昇: 「試験的」から「業務で使う」へ

これらの変化により、初期の実装では限界が見え始めました。

なぜ asyncio.create_task では足りなくなったのか

既存の実装方式

工程AIエージェントでは、以下のような長時間処理を提供しています。

  • 設計図書処理: PDF から構造化データを抽出(処理時間: 5〜10分)
  • AI 工程表調整: 工程表を AI で最適化(処理時間: 10〜15分)
  • 工程表設定: 工程表の初期設定を生成(処理時間: 5〜10分)

これらの処理は、以下のような実装で非同期実行していました。

@router.post("", status_code=status.HTTP_202_ACCEPTED)
async def process_document(
    body: ProcessRequestSchema,
    progress_manager: ProgressManagerDep,
) -> ProcessResponseSchema:
    # バックグラウンドタスクとして実行
    asyncio.create_task(
        _execute_process(
            body=body,
            progress_manager=progress_manager,
        )
    )
    
    # 即座に 202 Accepted を返却
    return ProcessResponseSchema(
        request_id=body.request_id
    )

このアプローチの利点は実装がシンプルであることですが、以下のような課題がありました。

課題1: デプロイのたびにユーザーの処理が中断される

asyncio.create_task で起動したタスクは、API サーバーのプロセス内で動作します。そのため、以下のような状況でタスクが失われてしまいます。

  • API サーバーの再起動(デプロイ時など)
  • コンテナのクラッシュやスケールイン
  • 意図しないプロセス終了

ユーザーから見ると「処理中だったのに突然止まった」という状態になり、エラーハンドリングも難しい状況でした。

デプロイのたびにユーザーの処理が中断され、再実行を強いられる状況が発生していました。特に、処理時間が長い(5〜15分)ため、デプロイと処理のタイミングが重なる確率が高く、週1-2回のデプロイのたびに複数のユーザーに影響が出ていました。

ECS デプロイ時の挙動

ECS でコンテナをデプロイする際、以下の流れで既存タスクが終了します。

  1. 新しいコンテナを起動
  2. ヘルスチェックが成功
  3. 古いコンテナに SIGTERM シグナルを送信(グレースフルシャットダウンの開始)
  4. 30秒間の停止猶予期間(stopTimeout
  5. 30秒経過しても終了しない場合、SIGKILL で強制終了

AI 処理が 5〜15分かかるのに対し、ECS の停止猶予期間はデフォルトで 30秒しかありません。つまり、デプロイ時には処理中のタスクが必ず強制終了されるという状況でした。

この問題を根本的に解決するには、処理を API サーバーのプロセスから切り離し、デプロイの影響を受けないようにする必要がありました。

課題2: リソース効率とスケーリングの限界

長時間処理が API サーバー上で動作するため、以下の問題がありました。

リソースの長時間占有: API サーバーのリソースが非同期処理(5〜15分)で占有され、同期的な API リクエストの処理に影響が出ていました。例えば、3つの長時間処理が同時実行されると CPU・メモリが 100% 使用中となり、新しい HTTP リクエストが遅延する状況が発生していました。

スケーリングの非効率性: 同期処理と非同期処理で必要なリソース量が異なるため、適切にスケーリングすることが困難でした。スケールアウトしても既存の長時間タスクは新インスタンスに移行できず、ピーク時には処理待ちが発生していました。

課題3: エラーハンドリングとリトライの複雑さ

一時的なエラー(外部 API の Rate Limit など)が発生した場合、リトライ機構を自前で実装する必要がありました。

リトライ機構の実装負荷: リトライロジック(指数バックオフ、最大試行回数など)を各処理に自前で実装する必要があり、コードの複雑性が増していました。

エラー状態の管理: エラー発生時、処理を最初からやり直す必要があり、途中から再開できる仕組みがありませんでした。エラー時の状態管理やチェックポイント機構を自前で実装する必要があり、開発・保守コストが高くなっていました。

障害の可視性不足: API サーバーの標準出力に複数処理のログが混在し、特定のリクエストのエラーを追跡するのが困難でした。リトライ回数や失敗パターンの統計も取れませんでした。

課題のまとめ

課題 影響
デプロイのたびに処理が中断 ユーザーの再実行負荷、サービス体験の悪化
リソース効率とスケーリング 同期 API のレスポンス遅延、ピーク時の処理待ち
エラーハンドリングの複雑さ 開発・保守コストの増加、障害対応の困難

これらの課題を総合的に解決するため、非同期処理基盤を SQS ベースのアーキテクチャに移行することを決定しました。

なぜ SQS を選んだのか

SQS を選んだ理由

AWS SQS を選択した理由は、まずフルマネージドサービスであるため運用負荷が低いことが挙げられます。キューサーバーのスケーリング、データの永続化・バックアップ、可用性の担保が不要であり、インフラ担当が少ない中で運用負荷を最小化できることは大きな利点でした。

また、SQS にはリトライと Dead Letter Queue (DLQ) がビルトインされています。VisibilityTimeout によりメッセージ処理中は他の Consumer に配信されず、MaxReceiveCount で指定回数処理に失敗すると DLQ に移動し、失敗したメッセージを別キューで保管できます。これらを自前で実装する必要がなく、信頼性の高い非同期処理基盤を構築できます。

既に AWS 上で動作しており、DynamoDB(進捗管理)、S3(ファイルストレージ)、CloudWatch(ログ・監視)、ECS(コンテナ実行環境)と連携しています。SQS を追加することで、統一された AWS エコシステムの中で開発・運用できます。

さらに、後述する設計により、既存の処理ロジックやビジネスロジックをほぼそのまま再利用できるため、移行リスクを最小化できました。

Standard SQS vs FIFO SQS の選択

SQS には Standard Queue と FIFO Queue の2種類がありますが、Standard SQS を採用しました。

FIFO SQS は厳密な順序保証と重複排除機能を提供しますが、以下の理由で不要と判断しました。まず、順序制御は既にドメイン層で実装済みであり、先行タスクの完了チェックで順序を制御しています。先行タスクが未完了の場合は VisibilityTimeout で待機します。また、冪等性は DynamoDB で担保されており、アトミックな状態管理で重複実行を防止しています。

FIFO SQS には TPS 制限(最大 3,000 TPS、Standard SQS は無制限)、コスト(Standard SQS の約1.5倍)、並列処理の制限などの制約があります。AI ワークフローは処理時間が長く(5〜15分)、現時点で TPS 制限に達するリスクは低いものの、将来的な拡張性とコスト効率を考慮し Standard SQS を採用しました。

SNS Fan-out Pattern を採用しなかった理由

SNS を SQS の前段に配置する Fan-out Pattern も検討しましたが、現時点では見送りました。現時点では用途が一つ(ワーカーによる処理)のみであり、SNS は必要になったタイミングで SQS の前段に追加することが容易です。初期構成は SQS のみの最小構成とし、システムの複雑性を抑えることにしました。

将来的に複数のコンシューマーや異なる処理パターンが必要になった際に、SNS Fan-out Pattern への移行を検討します。

アーキテクチャ設計

移行前後の比較

Before: asyncio.create_task による実装

移行前の工程表生成の流れ

  1. フロントエンドから API サーバーへ POST リクエスト送信
  2. API サーバーがリクエストを受け取り処理を開始
  3. API サーバーから 202 Accepted レスポンスを返却
  4. API サーバーが DynamoDB へ進捗データを更新(同一プロセス内)
  5. フロントエンドが DynamoDB から進捗データをポーリング取得
  6. WebSocket 経由でクライアントへ進捗データを送信

問題点として、API サーバーの再起動でタスクが失われることが挙げられます。

After: SQS による実装

移行後の工程表生成の流れ

  1. フロントエンドから API サーバーへ POST リクエスト送信
  2. API サーバーがジョブキューへメッセージを Enqueue
  3. API サーバーから 202 Accepted レスポンスを返却
  4. ワーカーがジョブキューからメッセージを Dequeue
  5. ワーカーが DynamoDB へ進捗データを更新(別プロセス)
  6. フロントエンドが DynamoDB から進捗データをポーリング取得
  7. WebSocket 経由でクライアントへ進捗データを送信

ワーカーが別プロセスのため、API サーバーの再起動の影響を受けなくなりました。

Producer側の責務

Producer(API サーバー)は、HTTP リクエストを受け取り、ジョブメッセージを SQS に送信(Enqueue)し、202 Accepted レスポンスを即座に返却します。

@router.post("", status_code=status.HTTP_202_ACCEPTED)
async def process_document(
    body: ProcessRequestSchema,
    sqs_producer: SQSProducerDep,  # ← ProgressManager から変更
) -> ProcessResponseSchema:
    # ジョブメッセージを作成
    message = JobMessage(
        job_type=JobType.PROCESS_DOCUMENT,
        request_id=body.request_id,
        body_data=body.model_dump(),
    )
    
    # SQS に Enqueue
    await sqs_producer.enqueue_job(message)
    
    # 即座に 202 を返却
    return ProcessResponseSchema(
        request_id=body.request_id
    )

Producer 側では実際の処理を行わず、メッセージ送信のみを行います。

Consumer側の責務

Consumer(ワーカープロセス)は、SQS からジョブメッセージを取得(Dequeue)し、ジョブタイプに応じて適切な処理関数を実行し、処理完了後にメッセージを削除します。

class JobExecutor:
    async def execute(self, job_message: JobMessage) -> None:
        """ジョブタイプに応じて処理を実行"""
        if job_message.job_type == JobType.PROCESS_DOCUMENT:
            await self._execute_process_document(job_message)
        elif job_message.job_type == JobType.OPTIMIZE_SCHEDULE:
            await self._execute_optimize_schedule(job_message)
        # ...
    
    async def _execute_process_document(
        self, job_message: JobMessage
    ) -> None:
        """既存の処理ロジックを呼び出し"""
        # body_data から Pydantic オブジェクトを復元
        body = ProcessRequestSchema(**job_message.body_data)
        
        # 既存の処理関数を呼び出し(変更なし)
        await execute_process(
            body=body,
            progress_manager=self._progress_manager,
        )

既存の処理ロジックをほぼそのまま再利用しています。

メッセージ設計と実装の原則

既存コードの再利用を最優先: メッセージの body_data には元の HTTP リクエストボディをそのまま格納し、Consumer 側で復元することで、既存の処理ロジックを一切変更せずに移行できました。

SQS の主要設定

メインキュー: VisibilityTimeout を1時間(最長処理時間15分の余裕を考慮)、Long Polling で効率化、3回失敗で DLQ に移動。

Dead Letter Queue: 重大なエラー(Consumer クラッシュ、DB障害など)で書き込みすら失敗したメッセージを保管し、手動調査・復旧を可能にします。

エラーハンドリング設計

エラーの種類に応じた処理方針:

通常の処理失敗: ビジネスロジックのエラー(AI API エラー、データ検証失敗など)は、DynamoDB にエラーステータスを記録してメッセージを削除。ユーザーには WebSocket 経由で通知。

一時的なエラー: API Rate Limit やネットワークエラーは例外を再 raise し、SQS の自動リトライ機構(VisibilityTimeout 後に再配信、最大3回)に任せます。

重大なエラー: Consumer クラッシュや DB 障害でエラー記録すらできない場合、3回の失敗後に DLQ へ移動し、運用チームが手動で調査・復旧します。

進捗通知: 既存の WebSocket + DynamoDB ポーリングの仕組みは変更なしで動作します。

まとめ

この記事では、速く作って価値検証した AI 処理を、本番運用に耐える形に進化させるプロセスを共有しました。

段階的なアーキテクチャ進化

初期フェーズ: asyncio.create_task で素早く実装 → MVP 検証に成功し、ユーザーフィードバックを獲得

課題の顕在化: 本番運用で、デプロイ時の処理中断・リソース効率の悪化・エラーハンドリングの複雑化が問題に

アーキテクチャの進化: SQS ベースの Producer-Consumer パターンへ移行し、タスクの永続性・独立スケーリング・自動リトライを実現

設計のポイント: 既存コードの再利用、API 仕様の維持、マネージドサービスの活用で、移行リスクを最小化しながら信頼性を向上

得られた効果

信頼性の向上として、デプロイ時にユーザーの処理が中断されなくなり、自動リトライにより一時的なエラーからの回復が可能になり、DLQ により重大なエラーの調査・復旧が容易になりました。

スケーラビリティの向上として、Producer(API サーバー)と Consumer(ワーカー)を独立してスケール可能にし、同期 API と非同期処理で異なるリソース配分が可能になり、ピーク時の処理待ちが解消されました。

運用負荷の削減として、SQS のマネージド機能によりキューサーバーの運用が不要になり、リトライロジックを自前で実装する必要がなくなり、CloudWatch でメトリクスを監視可能になりました。

今後の展開

この SQS ベースの非同期処理基盤を、他のワークフローや機能にも適用していく予定です。同様に長時間かかる AI 処理や、バッチ処理的な機能への展開を検討しています。

将来の拡張として、複数のコンシューマーが必要になった際の SNS Fan-out Pattern への移行、メトリクスやアラートの整備による運用の自動化、処理時間の最適化とコスト削減などを検討しています。

おわりに

初期フェーズでは「速く作る」ことを優先し、成長フェーズでは「壊れない仕組み」へと進化させる――このアーキテクチャの段階的な進化が、AI プロダクトの信頼性とスケーラビリティを大きく向上させました。

最初から完璧なアーキテクチャを目指すのではなく、ビジネスの成長に合わせて適切なタイミングで進化させることが重要だと実感しています。同様の課題を抱えている方の参考になれば幸いです。

参考文献

株式会社KENCOPA テックブログ

Discussion