CeleryによるPythonベース非同期タスク処理
はじめに
例えば機械学習モデルを運用する際に、WebAPI形式で予測を提供するのは一般的な方法と言えます。ここで推論処理に時間がかかる場合には、リクエストに対するレスポンスだけ先に返しておき、処理の本体はバックグラウンドで非同期的に実行するという選択肢が存在します。
本記事では、Pythonベースの分散タスクキューツールであるCeleryを用いて、WebAPIへのリクエストに対応したタスクを非同期実行し、結果を確認するまでを紹介します。サンプルコードは以下に配置しました。
Celeryとは
公式ドキュメントの冒頭には以下のように書かれています。
Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.
It’s a task queue with focus on real-time processing, while also supporting task scheduling.
端的に言えば、発行されるメッセージをキューによって管理し、タスクとして実行するためのツールと言えます。Celeryはキューを介したメッセージのやりとりを抽象化しており、これによって簡単にアプリケーションに非同期的な振る舞いを組み込むことができます。
BrokerとBackend
メッセージキューを実装するためには、メッセージを格納、引き出しできるデータストアが必要となります。これを担当する要素をメッセージブローカーと呼び、CeleryではRedisやRabbitMQなどを利用できます。タスクを実行する際にはキューからメッセージを引き出し、内容に従って処理を実行します。
バックエンドは処理結果の保持を担当します。ブローカーと同様にRedisなどを用いることもできるほか、PostgreSQLやMySQLなどを採用して永続性を強化することもできます。
今回のサンプルでは、ブローカーとバックエンドの両方にRedisを用いました。運用レベルではセキュリティなど検討すべき点が多いですが、検証レベルの観点では非常に簡単にCeleryと組み合わせることができたと感じます。
Flowerによるモニタリング
FlowerはCeleryの処理状況をモニタリングするためのライブラリです。ワーカーの稼働状況やタスクの処理状況をダッシュボードとしてブラウザから確認できるほか、ワーカーの再起動やオートスケーリングの設定などを行うこともできます。今回はタスクの処理状況を確認するためのダッシュボードとして利用します。
FastAPIのBackgroundTasks
今回はリクエストを受け付けるAPIを、Pythonベースの軽量WebフレームワークであるFastAPIで実装しています。FastAPIにはBackgroundTasks
という機能があり、非常に簡単に非同期(バックグラウンド)処理を実現することができます。
FastAPIのドキュメントではBackgroundTasks
の留意点に触れており、その中で関連するツールとしてCeleryを挙げた上で、以下のように比較しています。
-
BackgroundTasks
:FastAPIアプリとメモリや変数を共有する場合や、比較的軽量の処理を行う場合に有用 - Celery(など):メッセージキューを細かく管理したい場合や、比較的重い処理を行う場合に有用
実際に利用する際には、これらの特徴を検討した上で適した方法を選ぶ必要があると言えます。
サンプルコード
今回はサンプルとして、身長と体重をもとにBMIを計算するアプリ(というほどでもありませんが)を構築します。技術的な構成としては以下のようになります。
- Celery: メッセージの管理とタスクの実行
- Flower: タスクの実行状況を確認するためのダッシュボードツール
- FastAPI: WebAPIの実装
- Redis: メッセージと処理結果を保持するデータストア
ディレクトリ構成は以下のようになっています。メインとなるPythonスクリプトはserver.py, tasks.py
の二つであり、他はDockerなど環境関連のファイルです。
.
├── .env
├── README.md
├── celery-app
│ ├── .dockerignore
│ ├── poetry.lock
│ ├── pyproject.toml
│ ├── server.dockerfile
│ ├── server.py
│ ├── tasks.py
│ └── worker.dockerfile
└── docker-compose.yaml
タスクの定義
tasks.py
の中身は以下のようになります。
Celery(__name__)
によって作成したインスタンスには、メッセージをやり取りするための情報としてブローカーとバックエンドのURLをbroker_url, result_backend
に設定しています。他にタイムゾーンやタスクのレートリミットなども設定できます。
calc_bmi
はBMIを計算する関数です。非同期処理の振る舞いを確認するために、10秒のスリープを挟んでいます。これを@celery.task
によってデコレートすることで、celery
インスタンスが管理するタスクとして登録しています。
import os
import time
from celery import Celery
celery = Celery(__name__)
celery.conf.broker_url = os.environ.get('CELERY_BROKER_URL',
'redis://localhost:6379')
celery.conf.result_backend = os.environ.get('CELERY_BACKEND_URL',
'redis://localhost:6379')
@celery.task(name='tasks.calc_bmi')
def calc_bmi(weight: float, height: float) -> float:
time.sleep(10)
bmi = weight / height**2
return bmi
APIの実装
server.py
では、FastAPIによってBMIの計算リクエストとタスクの処理状況を確認するAPIを実装しています。Body, TasksStatus
はFastAPIにおいてリクエストやレスポンス形式を定義するためのクラスであり、説明は割愛します。
calculate_bmi
関数では、calc_bmi.delay
という形でBMIの計算を呼び出しています。@celery.task
でデコレートしたことで.delay
という呼び出しが可能になっており、ブローカーにメッセージが発行された上で非同期処理が行われます。
check_status
関数では、celery.AsyncResult
を通じてタスクの処理状況を確認しています。
from typing import Optional, Any
from fastapi import FastAPI
from pydantic import BaseModel
from tasks import celery, calc_bmi
app = FastAPI()
class Body(BaseModel):
weight: float
height: float
class TaskStatus(BaseModel):
id: str
status: Optional[str]
result: Optional[Any]
@app.post('/bmi', response_model=TaskStatus, response_model_exclude_unset=True)
def calculate_bmi(body: Body):
task = calc_bmi.delay(weight=body.weight, height=body.height)
return TaskStatus(id=task.id)
@app.get('/bmi/{task_id}', response_model=TaskStatus)
def check_status(task_id: str):
result = celery.AsyncResult(task_id)
status = TaskStatus(id=task_id, status=result.status, result=result.result)
return status
コンテナの構成
以下の4つのコンテナによってアプリを構成します。
-
server
: FastAPIによるAPIサーバー。Gunicornによってウェブサーバーを起動する。 -
worker
: Celeryワーカー。celery --app=tasks.celery worker
として起動する。 -
dashboard
: Flowerによるダッシュボード。celery --app=tasks.celery flower
として起動する。 -
redis
: ブローカー兼バックエンドとして処理中・処理済みのタスク情報を保持する。
docker-compose.yaml
version: '3.8'
services:
server:
build:
context: ./celery-app
dockerfile: server.dockerfile
depends_on:
- worker
- dashboard
ports:
- 8080:80
env_file:
- .env
environment:
# See details at https://github.com/tiangolo/uvicorn-gunicorn-docker#advanced-usage
APP_MODULE: server:app
PORT: 80
worker:
build:
context: ./celery-app
dockerfile: worker.dockerfile
depends_on:
- redis
command:
- celery
- --app=tasks.celery
- worker
- --loglevel=info
env_file:
- .env
dashboard:
build:
context: ./celery-app
dockerfile: worker.dockerfile
depends_on:
- redis
command:
- celery
- --app=tasks.celery
- flower
- --port=5555
ports:
- 5556:5555
env_file:
- .env
redis:
image: redis:latest
volumes:
- celery_app:/data
volumes:
celery_app:
driver: local
Redisに関する設定は各コンテナで共通のため、環境変数ファイルから各コンテナで読み込んでいます。ブローカーとして処理待ちのメッセージを保持するデータベースと、バックエンドとして処理済みのタスク情報を保持するデータベースは別にしました(同じにもできます)。
CELERY_BROKER_URL=redis://redis:6379/0
CELERY_BACKEND_URL=redis://redis:6379/1
補足として、Redisはインメモリデータベースのため、サーバープロセスの終了と同時にストアした内容も消えてしまいます。これに対して、Redisはデフォルトでいくつかの永続化設定が定義されています。同時にコンテナボリュームの作成による永続化も行っています(そもそもデータベースをコンテナで管理するのかという議論はありますが、今回は割愛します)。
実行結果
docker compose build/up/down
によって、各イメージのビルド、起動、停止ができます。FastAPIのサーバーが稼働しているため、BMIの計算をリクエストすることができます。適当な体重と身長をJSON形式でポストすると、BMIではなくタスクidが返されます。
curl -X 'POST' \
'http://127.0.0.1:8080/bmi' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{"weight": 65, "height": 1.8}'
# >> {"id":<task_id>}
このときworkerコンテナでは以下のようなログが出力され、tasks.calc_bmi
タスクが受理されたことが確認できます。
worker_1 | [2021-10-02 15:19:27,065: INFO/MainProcess] Task tasks.calc_bmi[<task_id>] received
タスクの処理中、完了後にbmi/{task_id}
にアクセスすると、server.py
のcelery.AsyncResult(task_id)
からCeleryを介してタスクの処理状況を確認でき、以下のような結果が返ります。
curl 'http://127.0.0.1:8080/bmi/<task_id>'
# >> {"id":<task_id>,"status":"PENDING","result": null} # タスク処理中
# >> {"id":<task_id>,"status":"SUCCESS","result": 20.061728395061728
} # タスク完了後
またworkerコンテナでは以下のようなログが出力され、タスクが完了したことがわかります。
worker_1 | [2021-10-02 15:19:37,041: INFO/ForkPoolWorker-4] Task tasks.calc_bmi[<task_id>] succeeded in 10.007367299993348s: 20.061728395061728
ブラウザから127.0.0.1:5556
にアクセスすると、Flowerのダッシュボードを確認できます。各タスクの処理状況のほかに、稼働中のワーカーの設定なども確認できます。
まとめ
Celeryを用いることで、Pythonベースで簡単に非同期タスク処理が実現できました。私の今回の検証のきっかけは機械学習モデルの非同期推論でしたが、非同期タスク実行という仕組みは広い応用性があると思います。
最近はAWSやGCPなどのクラウドベースのメッセージキュー、ジョブ実行サービスも提供されており、こちらはインフラを柔軟に設定・利用できるなどのメリットがあります。一方でCeleryなどOSSのメリットとしては、手元での検証やデバッグが容易であるという点が挙げられます。実用時にはそれぞれの要点を認識した上で使い分けるのが良いでしょう。
参考資料
Discussion