FastAPI と Next.js による非同期ジョブ化とポーリング
はじめに
Web API が大量データ処理や外部サービス呼び出しを伴うと、リクエスト/レスポンス待ち時間が長くなり、Cloudflare のタイムアウト(Freeプランでは約100秒)やブラウザ側の UX に悪影響を及ぼします。
本記事では、FastAPI を使った Python バックエンドで 非同期ジョブ化 を行い、Next.js(React)フロントエンドで ポーリング による結果取得を行うパターンをご紹介します。
今回は特に、Azure OCR → OpenAI 画像解析のように 100秒を超える可能性がある重い処理を扱うケースを想定しています。
※あくまで現在の環境下で検討した実装になります。至らない点も多いと思いますので、ご指導ご鞭撻いただけると幸いです。
背景と課題
-
同期処理のタイムアウト問題
- 画像の OCR 処理や機械学習モデル呼び出しでは、1 枚あたり数秒〜数十秒、枚数や内容次第では 100秒を超えることも。
- Cloudflare Freeプランではタイムアウト延長が不可のため、100秒を超えると HTTP 524 が返り、処理結果を得られない。
-
UX の低下
- ユーザーはブラウザ上で長時間ローディング表示に固まり、挙動が止まったと誤認しやすい。
-
コスト制約
- Enterprise プランへのアップグレードでタイムアウト延長は可能だが、今回はサーバー設計の変更のみで解決を目指す。
1. バックエンド:FastAPI での非同期ジョブ化
FastAPI公式ドキュメントでも「時間のかかるファイル処理はHTTP 202 Acceptedで即座に応答し、ファイル処理自体はバックグラウンドで行う」ことが推奨されています
1.1 ジョブ管理クラス設計
from enum import Enum
import threading, time, uuid
from typing import Any, Optional, Dict
import asyncio
# ジョブステータス
class JobStatus(str, Enum):
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
# ジョブ情報保持
class JobInfo:
def __init__(self, job_id: str, theme: str):
self.job_id = job_id
self.theme = theme
self.status = JobStatus.QUEUED
self.result = None
self.error = None
self.created_at = time.time()
self.started_at = None
self.completed_at = None
def to_dict(self) -> Dict[str, Any]:
elapsed = ((self.completed_at or time.time()) - self.created_at) \
if self.started_at else None
return {
"job_id": self.job_id,
"theme": self.theme,
"status": self.status,
"created_at": self.created_at,
"started_at": self.started_at,
"completed_at": self.completed_at,
"elapsed_time": elapsed,
}
1.2 ジョブキューマネージャ
class JobQueueManager:
_instance = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.jobs = {}
cls._instance.cleanup_after = 4 * 60 * 60 # 4時間
threading.Thread(
target=cls._instance._cleanup_old_jobs, daemon=True
).start()
return cls._instance
def create_job(self, theme: str) -> str:
job_id = str(uuid.uuid4())
self.jobs[job_id] = JobInfo(job_id, theme)
return job_id
def get_job(self, job_id: str) -> Optional[JobInfo]:
return self.jobs.get(job_id)
def update_job_status(
self,
job_id: str,
status: JobStatus,
result: Any = None,
error: Optional[str] = None
) -> None:
job = self.get_job(job_id)
if not job:
return
job.status = status
if status == JobStatus.RUNNING and job.started_at is None:
job.started_at = time.time()
if status in (JobStatus.COMPLETED, JobStatus.FAILED):
job.completed_at = time.time()
if result is not None:
job.result = result
if error is not None:
job.error = error
def get_job_status(self, job_id: str) -> Dict[str, Any]:
job = self.get_job(job_id)
if not job:
return {"error": "Job not found"}
info = job.to_dict()
if job.status == JobStatus.COMPLETED:
info["result"] = job.result
if job.status == JobStatus.FAILED:
info["error"] = job.error
return info
def _cleanup_old_jobs(self) -> None:
while True:
time.sleep(3600)
now = time.time()
for jid, job in list(self.jobs.items()):
if now - job.created_at > self.cleanup_after:
self.jobs.pop(jid, None)
1.3 バックグラウンド実行ユーティリティ
async def run_job_async(
job_id: str,
func: Callable[..., Any],
*args, **kwargs
) -> None:
mgr = JobQueueManager()
mgr.update_job_status(job_id, JobStatus.RUNNING)
try:
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, func, *args, **kwargs)
mgr.update_job_status(job_id, JobStatus.COMPLETED, result=result)
except Exception as e:
mgr.update_job_status(job_id, JobStatus.FAILED, error=str(e))
1.4 FastAPI エンドポイント例
from fastapi import FastAPI, BackgroundTasks, File, UploadFile
app = FastAPI()
mgr = JobQueueManager()
@app.post("/api/them1", status_code=202)
async def enqueue_theme1(
background: BackgroundTasks,
file: UploadFile = File(...),
theme: str = ""
):
# 1) ジョブ登録
job_id = mgr.create_job(theme)
# 2) バックグラウンドで OCR+OpenAI 解析を実行
background.add_task(
run_job_async,
job_id,
process_image, # 画像解析用コルーチン
job_id,
await file.read(),
theme
)
return {"job_id": job_id}
@app.get("/api/jobs/{job_id}")
def get_status(job_id: str):
return mgr.get_job_status(job_id)
2. フロントエンド:Next.js でのポーリング
useJobPolling
2.1 カスタムフック // hooks/useJobPolling.ts
import { useState, useEffect, useRef } from 'react';
export enum JobStatus {
QUEUED = 'queued',
RUNNING = 'running',
COMPLETED = 'completed',
FAILED = 'failed',
}
export interface JobResponse {
job_id: string;
status: JobStatus;
result?: unknown;
error?: string;
}
const DEFAULT_INTERVAL = 5000; // 5秒
const DEFAULT_TIMEOUT = 600000; // 10分
export function useJobPolling(
jobId: string | null,
{
interval = DEFAULT_INTERVAL,
timeout = DEFAULT_TIMEOUT,
onProgress,
}: {
interval?: number;
timeout?: number;
onProgress?: (status: JobResponse) => void;
} = {}
) {
const [status, setStatus] = useState<JobResponse | null>(null);
const [error, setError] = useState<string | null>(null);
const startRef = useRef<number>(Date.now());
useEffect(() => {
if (!jobId) return;
let cancelled = false;
const poll = async () => {
if (Date.now() - startRef.current > timeout) {
setError(`ポーリングがタイムアウトしました(${timeout}ms)`);
return;
}
try {
const res = await fetch(
`${process.env.NEXT_PUBLIC_MACRO_API_URL}/ai/jobs/${jobId}`
);
if (!res.ok) {
const errJson = await res.json().catch(() => ({}));
throw new Error(
`Error ${res.status}: ${res.statusText}` +
(errJson.error ? ` - ${errJson.error}` : '')
);
}
const data: JobResponse = await res.json();
if (cancelled) return;
onProgress?.(data);
setStatus(data);
if (data.status === JobStatus.COMPLETED) return;
if (data.status === JobStatus.FAILED) {
throw new Error(data.error || 'ジョブが失敗しました');
}
setTimeout(poll, interval);
} catch (e: any) {
if (!cancelled) setError(e.message);
}
};
poll();
return () => { cancelled = true };
}, [jobId, interval, timeout, onProgress]);
return { status, error };
}
2.2 ページコンポーネント例
// pages/index.tsx
import { useState } from 'react';
import { useJobPolling, JobStatus } from '@/hooks/useJobPolling';
export default function Home() {
const [jobId, setJobId] = useState<string | null>(null);
const [theme, setTheme] = useState('default-theme');
const { status, error } = useJobPolling(jobId, {
onProgress: s => console.log('進捗:', s.status),
});
const startJob = async () => {
setJobId(null);
const res = await fetch(
`${process.env.NEXT_PUBLIC_MACRO_API_URL}/aipj/theme1`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ theme }),
}
);
if (res.status !== 202) {
console.error('ジョブ開始失敗:', await res.text());
return;
}
const { job_id } = await res.json();
setJobId(job_id);
};
return (
<div className="max-w-md mx-auto p-4">...
);
}
3. ポイント
-
即時応答+非同期実行
バックエンドは「ジョブ投入 → 202 Accepted」をできるだけ速く返し、処理は完全にバックグラウンド化。 -
ジョブステータス管理
シングルトンでメモリ上に保持し、定期クリーンアップでリソースを制御。 -
汎用ポーリングフック
間隔・タイムアウト・進捗コールバックをパラメータ化し、再利用性を高める。
4. ジョブ処理フロー図
FastAPIバックエンドでのジョブ投入(キューイング)から、バックグラウンドタスク実行、Next.jsフロントエンドでのポーリングによるステータス取得とUI更新までの一連の流れを示したジョブ処理フロー図になります。
図1 FastAPI × Next.js によるジョブ処理フロー図
-
クライアントリクエスト
Next.jsからFastAPIの /theme1 エンドポイントにジョブ投入リクエストを送信。 -
ジョブキューイング
FastAPIでは JobQueueManager を使ってジョブIDを生成し、メモリ上のジョブストア(マップ)に登録。HTTPステータス 202 Accepted で即時応答。 -
バックグラウンド実行
BackgroundTasks 経由で run_job_async が呼び出され、OCRやOpenAI解析などの重い処理を非同期に実行。 -
ジョブステータス更新
処理開始時にステータスを running に、完了時に completed(またはエラー発生時に failed)に更新。結果やエラー情報もジョブストアに格納。 -
ポーリングフック
Next.js側ではカスタムフック useJobPolling が5秒間隔などで /api/jobs/{job_id} を定期的にフェッチし、返却されたステータスをもとにUIを更新。
Discussion