🐶

FastAPI と Next.js による非同期ジョブ化とポーリング

に公開

はじめに

Web API が大量データ処理や外部サービス呼び出しを伴うと、リクエスト/レスポンス待ち時間が長くなり、Cloudflare のタイムアウト(Freeプランでは約100秒)やブラウザ側の UX に悪影響を及ぼします。
本記事では、FastAPI を使った Python バックエンドで 非同期ジョブ化 を行い、Next.js(React)フロントエンドで ポーリング による結果取得を行うパターンをご紹介します。
今回は特に、Azure OCR → OpenAI 画像解析のように 100秒を超える可能性がある重い処理を扱うケースを想定しています。
※あくまで現在の環境下で検討した実装になります。至らない点も多いと思いますので、ご指導ご鞭撻いただけると幸いです。

背景と課題

  • 同期処理のタイムアウト問題
    • 画像の OCR 処理や機械学習モデル呼び出しでは、1 枚あたり数秒〜数十秒、枚数や内容次第では 100秒を超えることも。
    • Cloudflare Freeプランではタイムアウト延長が不可のため、100秒を超えると HTTP 524 が返り、処理結果を得られない。
  • UX の低下
    • ユーザーはブラウザ上で長時間ローディング表示に固まり、挙動が止まったと誤認しやすい。
  • コスト制約
    • Enterprise プランへのアップグレードでタイムアウト延長は可能だが、今回はサーバー設計の変更のみで解決を目指す。

1. バックエンド:FastAPI での非同期ジョブ化

FastAPI公式ドキュメントでも「時間のかかるファイル処理はHTTP 202 Acceptedで即座に応答し、ファイル処理自体はバックグラウンドで行う」ことが推奨されています
https://fastapi.tiangolo.com/tutorial/background-tasks/

1.1 ジョブ管理クラス設計

from enum import Enum
import threading, time, uuid
from typing import Any, Optional, Dict
import asyncio

# ジョブステータス
class JobStatus(str, Enum):
    QUEUED    = "queued"
    RUNNING   = "running"
    COMPLETED = "completed"
    FAILED    = "failed"

# ジョブ情報保持
class JobInfo:
    def __init__(self, job_id: str, theme: str):
        self.job_id       = job_id
        self.theme        = theme
        self.status       = JobStatus.QUEUED
        self.result       = None
        self.error        = None
        self.created_at   = time.time()
        self.started_at   = None
        self.completed_at = None

    def to_dict(self) -> Dict[str, Any]:
        elapsed = ((self.completed_at or time.time()) - self.created_at) \
                  if self.started_at else None
        return {
            "job_id":       self.job_id,
            "theme":        self.theme,
            "status":       self.status,
            "created_at":   self.created_at,
            "started_at":   self.started_at,
            "completed_at": self.completed_at,
            "elapsed_time": elapsed,
        }

1.2 ジョブキューマネージャ

class JobQueueManager:
    _instance = None
    _lock     = threading.Lock()

    def __new__(cls):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance.jobs = {}
                cls._instance.cleanup_after = 4 * 60 * 60  # 4時間
                threading.Thread(
                    target=cls._instance._cleanup_old_jobs, daemon=True
                ).start()
            return cls._instance

    def create_job(self, theme: str) -> str:
        job_id = str(uuid.uuid4())
        self.jobs[job_id] = JobInfo(job_id, theme)
        return job_id

    def get_job(self, job_id: str) -> Optional[JobInfo]:
        return self.jobs.get(job_id)

    def update_job_status(
        self,
        job_id: str,
        status: JobStatus,
        result: Any = None,
        error: Optional[str] = None
    ) -> None:
        job = self.get_job(job_id)
        if not job:
            return
        job.status = status
        if status == JobStatus.RUNNING and job.started_at is None:
            job.started_at = time.time()
        if status in (JobStatus.COMPLETED, JobStatus.FAILED):
            job.completed_at = time.time()
        if result is not None:
            job.result = result
        if error is not None:
            job.error = error

    def get_job_status(self, job_id: str) -> Dict[str, Any]:
        job = self.get_job(job_id)
        if not job:
            return {"error": "Job not found"}
        info = job.to_dict()
        if job.status == JobStatus.COMPLETED:
            info["result"] = job.result
        if job.status == JobStatus.FAILED:
            info["error"]  = job.error
        return info

    def _cleanup_old_jobs(self) -> None:
        while True:
            time.sleep(3600)
            now = time.time()
            for jid, job in list(self.jobs.items()):
                if now - job.created_at > self.cleanup_after:
                    self.jobs.pop(jid, None)

1.3 バックグラウンド実行ユーティリティ

async def run_job_async(
    job_id: str,
    func: Callable[..., Any],
    *args, **kwargs
) -> None:
    mgr = JobQueueManager()
    mgr.update_job_status(job_id, JobStatus.RUNNING)
    try:
        if asyncio.iscoroutinefunction(func):
            result = await func(*args, **kwargs)
        else:
            loop = asyncio.get_event_loop()
            result = await loop.run_in_executor(None, func, *args, **kwargs)
        mgr.update_job_status(job_id, JobStatus.COMPLETED, result=result)
    except Exception as e:
        mgr.update_job_status(job_id, JobStatus.FAILED, error=str(e))

1.4 FastAPI エンドポイント例

from fastapi import FastAPI, BackgroundTasks, File, UploadFile

app = FastAPI()
mgr = JobQueueManager()

@app.post("/api/them1", status_code=202)
async def enqueue_theme1(
    background: BackgroundTasks,
    file: UploadFile = File(...),
    theme: str = ""
):
    # 1) ジョブ登録
    job_id = mgr.create_job(theme)
    # 2) バックグラウンドで OCR+OpenAI 解析を実行
    background.add_task(
        run_job_async,
        job_id,
        process_image,  # 画像解析用コルーチン
        job_id,
        await file.read(),
        theme
    )
    return {"job_id": job_id}

@app.get("/api/jobs/{job_id}")
def get_status(job_id: str):
    return mgr.get_job_status(job_id)

2. フロントエンド:Next.js でのポーリング

2.1 カスタムフック useJobPolling

// hooks/useJobPolling.ts
import { useState, useEffect, useRef } from 'react';

export enum JobStatus {
  QUEUED    = 'queued',
  RUNNING   = 'running',
  COMPLETED = 'completed',
  FAILED    = 'failed',
}

export interface JobResponse {
  job_id: string;
  status: JobStatus;
  result?: unknown;
  error?: string;
}

const DEFAULT_INTERVAL = 5000;    // 5秒
const DEFAULT_TIMEOUT  = 600000;  // 10分

export function useJobPolling(
  jobId: string | null,
  {
    interval = DEFAULT_INTERVAL,
    timeout  = DEFAULT_TIMEOUT,
    onProgress,
  }: {
    interval?: number;
    timeout?: number;
    onProgress?: (status: JobResponse) => void;
  } = {}
) {
  const [status, setStatus] = useState<JobResponse | null>(null);
  const [error,  setError]  = useState<string | null>(null);
  const startRef = useRef<number>(Date.now());

  useEffect(() => {
    if (!jobId) return;
    let cancelled = false;

    const poll = async () => {
      if (Date.now() - startRef.current > timeout) {
        setError(`ポーリングがタイムアウトしました(${timeout}ms)`);
        return;
      }
      try {
        const res = await fetch(
          `${process.env.NEXT_PUBLIC_MACRO_API_URL}/ai/jobs/${jobId}`
        );
        if (!res.ok) {
          const errJson = await res.json().catch(() => ({}));
          throw new Error(
            `Error ${res.status}: ${res.statusText}` +
            (errJson.error ? ` - ${errJson.error}` : '')
          );
        }
        const data: JobResponse = await res.json();
        if (cancelled) return;

        onProgress?.(data);
        setStatus(data);

        if (data.status === JobStatus.COMPLETED) return;
        if (data.status === JobStatus.FAILED) {
          throw new Error(data.error || 'ジョブが失敗しました');
        }
        setTimeout(poll, interval);
      } catch (e: any) {
        if (!cancelled) setError(e.message);
      }
    };

    poll();
    return () => { cancelled = true };
  }, [jobId, interval, timeout, onProgress]);

  return { status, error };
}

2.2 ページコンポーネント例

// pages/index.tsx
import { useState } from 'react';
import { useJobPolling, JobStatus } from '@/hooks/useJobPolling';

export default function Home() {
  const [jobId, setJobId] = useState<string | null>(null);
  const [theme, setTheme] = useState('default-theme');
  const { status, error } = useJobPolling(jobId, {
    onProgress: s => console.log('進捗:', s.status),
  });

  const startJob = async () => {
    setJobId(null);
    const res = await fetch(
      `${process.env.NEXT_PUBLIC_MACRO_API_URL}/aipj/theme1`,
      {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ theme }),
      }
    );
    if (res.status !== 202) {
      console.error('ジョブ開始失敗:', await res.text());
      return;
    }
    const { job_id } = await res.json();
    setJobId(job_id);
  };

  return (
    <div className="max-w-md mx-auto p-4">...
    );
}

3. ポイント

  1. 即時応答+非同期実行
    バックエンドは「ジョブ投入 → 202 Accepted」をできるだけ速く返し、処理は完全にバックグラウンド化。

  2. ジョブステータス管理
    シングルトンでメモリ上に保持し、定期クリーンアップでリソースを制御。

  3. 汎用ポーリングフック
    間隔・タイムアウト・進捗コールバックをパラメータ化し、再利用性を高める。

4. ジョブ処理フロー図

FastAPIバックエンドでのジョブ投入(キューイング)から、バックグラウンドタスク実行、Next.jsフロントエンドでのポーリングによるステータス取得とUI更新までの一連の流れを示したジョブ処理フロー図になります。

図1 FastAPI × Next.js によるジョブ処理フロー図

  1. クライアントリクエスト
    Next.jsからFastAPIの /theme1 エンドポイントにジョブ投入リクエストを送信。

  2. ジョブキューイング
    FastAPIでは JobQueueManager を使ってジョブIDを生成し、メモリ上のジョブストア(マップ)に登録。HTTPステータス 202 Accepted で即時応答。

  3. バックグラウンド実行
    BackgroundTasks 経由で run_job_async が呼び出され、OCRやOpenAI解析などの重い処理を非同期に実行。

  4. ジョブステータス更新
    処理開始時にステータスを running に、完了時に completed(またはエラー発生時に failed)に更新。結果やエラー情報もジョブストアに格納。

  5. ポーリングフック
    Next.js側ではカスタムフック useJobPolling が5秒間隔などで /api/jobs/{job_id} を定期的にフェッチし、返却されたステータスをもとにUIを更新。

参考文献

Discussion