FastAPIとCeleryを使って非同期処理を実装する
はじめに
Webアプリケーションでは、メール送信や画像処理などの時間のかかるタスクを非同期で実行したい場面がよくあります。FastAPIとCeleryを組み合わせることで、これを簡単に実装できます。本記事では、FastAPIとCeleryを使った非同期タスクの実装方法について、細かく解説します。
1. FastAPIとCeleryとは?
FastAPIとは
FastAPIは、Python製の高速なWebフレームワークで、以下の特徴を持っています。
- 非同期処理に対応(async/awaitを活用)
- 型ヒントによる自動バリデーション
- Swagger UIを自動生成
- 高速(StarletteとPydanticを使用)
Celeryとは
Celeryは、タスクキューを管理するためのPythonライブラリです。主な特徴は以下のとおりです。
- 非同期タスクの管理
- 分散処理が可能
- タスクの再試行機能を備えている
FastAPIとCeleryを組み合わせることで、リクエストの処理とは別に時間のかかるタスクをバックグラウンドで実行できます。
2. 環境構築
必要なライブラリをインストール
まず、FastAPIとCeleryをインストールします。
pip install fastapi uvicorn celery redis
また、CeleryのメッセージブローカーとしてRedisを使用するため、RedisをインストールまたはDockerで起動します。
RedisをDockerで起動
Dockerを使用する場合、以下のコマンドでRedisを起動できます。
docker run -d -p 6379:6379 redis
3. Celeryの設定とタスク定義
Celeryを使用するには、ワーカーを設定し、実行するタスクを定義する必要があります。
tasks.py
)
Celeryの設定(以下のコードでは、Redisをメッセージブローカーとして設定し、時間のかかるタスクを作成します。
from celery import Celery
import time
# Celeryの設定
celery_app = Celery("tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")
@celery_app.task
def long_running_task(x, y):
""" 時間のかかる処理(3秒スリープ後に加算) """
time.sleep(3)
return x + y
このコードでは、long_running_task
という関数を定義し、Celeryの @task
デコレーターを付けています。この関数は3秒間スリープした後、xとyを足して結果を返します。
4. FastAPIアプリの作成
次に、FastAPIを使ってタスクを実行するエンドポイントを作成します。
main.py
)
FastAPIの実装(from fastapi import FastAPI
from tasks import long_running_task
app = FastAPI()
@app.get("/")
def read_root():
return {"message": "Hello, FastAPI + Celery!"}
@app.post("/task/")
def create_task(x: int, y: int):
""" タスクをCeleryに送信 """
task = long_running_task.apply_async(args=[x, y])
return {"task_id": task.id, "status": "Task submitted!"}
@app.get("/task/{task_id}")
def get_task_status(task_id: str):
""" タスクの状態を取得 """
task_result = long_running_task.AsyncResult(task_id)
return {"task_id": task_id, "status": task_result.status, "result": task_result.result}
このFastAPIアプリでは、以下のエンドポイントを用意しています。
-
/
: APIの動作確認 -
/task/
: 非同期タスクをキューに送信 -
/task/{task_id}
: タスクの状態を取得
5. アプリケーションの起動とテスト
1. Celeryワーカーを起動
まず、Celeryワーカーを起動します。
celery -A tasks worker --loglevel=INFO
2. FastAPIアプリを起動
次に、FastAPIアプリを起動します。
uvicorn main:app --reload
6. APIの動作確認
1. タスクを送信
タスクを作成するには、以下のAPIを実行します。
curl -X POST "http://127.0.0.1:8000/task/?x=3&y=5"
レスポンス:
{"task_id": "some-task-id", "status": "Task submitted!"}
2. タスクの状態を確認
タスクの状態を取得するには、以下のAPIを実行します。
curl "http://127.0.0.1:8000/task/some-task-id"
結果(処理中の場合):
{"task_id": "some-task-id", "status": "PENDING", "result": null}
結果(処理完了後):
{"task_id": "some-task-id", "status": "SUCCESS", "result": 8}
7. まとめ
本記事では、FastAPIとCeleryを組み合わせて非同期タスクを処理する方法を紹介しました。
学んだこと
✅ FastAPIの基本的な構造
✅ Celeryの設定方法
✅ 非同期タスクの作成と実行
✅ Redisを使用したメッセージキューの構築
この構成を使えば、バックグラウンドで処理が必要なWebアプリケーションを簡単に作成できます!🚀
Discussion