🐙

FastAPIとCeleryを使って非同期処理を実装する

2025/03/13に公開

はじめに

Webアプリケーションでは、メール送信や画像処理などの時間のかかるタスクを非同期で実行したい場面がよくあります。FastAPIとCeleryを組み合わせることで、これを簡単に実装できます。本記事では、FastAPIとCeleryを使った非同期タスクの実装方法について、細かく解説します。


1. FastAPIとCeleryとは?

FastAPIとは

FastAPIは、Python製の高速なWebフレームワークで、以下の特徴を持っています。

  • 非同期処理に対応(async/awaitを活用)
  • 型ヒントによる自動バリデーション
  • Swagger UIを自動生成
  • 高速(StarletteとPydanticを使用)

Celeryとは

Celeryは、タスクキューを管理するためのPythonライブラリです。主な特徴は以下のとおりです。

  • 非同期タスクの管理
  • 分散処理が可能
  • タスクの再試行機能を備えている

FastAPIとCeleryを組み合わせることで、リクエストの処理とは別に時間のかかるタスクをバックグラウンドで実行できます。


2. 環境構築

必要なライブラリをインストール

まず、FastAPIとCeleryをインストールします。

pip install fastapi uvicorn celery redis

また、CeleryのメッセージブローカーとしてRedisを使用するため、RedisをインストールまたはDockerで起動します。

RedisをDockerで起動

Dockerを使用する場合、以下のコマンドでRedisを起動できます。

docker run -d -p 6379:6379 redis

3. Celeryの設定とタスク定義

Celeryを使用するには、ワーカーを設定し、実行するタスクを定義する必要があります。

Celeryの設定(tasks.py

以下のコードでは、Redisをメッセージブローカーとして設定し、時間のかかるタスクを作成します。

from celery import Celery
import time

# Celeryの設定
celery_app = Celery("tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

@celery_app.task
def long_running_task(x, y):
    """ 時間のかかる処理(3秒スリープ後に加算) """
    time.sleep(3)
    return x + y

このコードでは、long_running_task という関数を定義し、Celeryの @task デコレーターを付けています。この関数は3秒間スリープした後、xとyを足して結果を返します。


4. FastAPIアプリの作成

次に、FastAPIを使ってタスクを実行するエンドポイントを作成します。

FastAPIの実装(main.py

from fastapi import FastAPI
from tasks import long_running_task

app = FastAPI()

@app.get("/")
def read_root():
    return {"message": "Hello, FastAPI + Celery!"}

@app.post("/task/")
def create_task(x: int, y: int):
    """ タスクをCeleryに送信 """
    task = long_running_task.apply_async(args=[x, y])
    return {"task_id": task.id, "status": "Task submitted!"}

@app.get("/task/{task_id}")
def get_task_status(task_id: str):
    """ タスクの状態を取得 """
    task_result = long_running_task.AsyncResult(task_id)
    return {"task_id": task_id, "status": task_result.status, "result": task_result.result}

このFastAPIアプリでは、以下のエンドポイントを用意しています。

  • / : APIの動作確認
  • /task/ : 非同期タスクをキューに送信
  • /task/{task_id} : タスクの状態を取得

5. アプリケーションの起動とテスト

1. Celeryワーカーを起動

まず、Celeryワーカーを起動します。

celery -A tasks worker --loglevel=INFO

2. FastAPIアプリを起動

次に、FastAPIアプリを起動します。

uvicorn main:app --reload

6. APIの動作確認

1. タスクを送信

タスクを作成するには、以下のAPIを実行します。

curl -X POST "http://127.0.0.1:8000/task/?x=3&y=5"

レスポンス:

{"task_id": "some-task-id", "status": "Task submitted!"}

2. タスクの状態を確認

タスクの状態を取得するには、以下のAPIを実行します。

curl "http://127.0.0.1:8000/task/some-task-id"

結果(処理中の場合):

{"task_id": "some-task-id", "status": "PENDING", "result": null}

結果(処理完了後):

{"task_id": "some-task-id", "status": "SUCCESS", "result": 8}

7. まとめ

本記事では、FastAPIとCeleryを組み合わせて非同期タスクを処理する方法を紹介しました。

学んだこと

✅ FastAPIの基本的な構造
✅ Celeryの設定方法
✅ 非同期タスクの作成と実行
✅ Redisを使用したメッセージキューの構築

この構成を使えば、バックグラウンドで処理が必要なWebアプリケーションを簡単に作成できます!🚀

Discussion