📬

CeleryによるPythonベース非同期タスク処理

2021/10/05に公開

はじめに

例えば機械学習モデルを運用する際に、WebAPI形式で予測を提供するのは一般的な方法と言えます。ここで推論処理に時間がかかる場合には、リクエストに対するレスポンスだけ先に返しておき、処理の本体はバックグラウンドで非同期的に実行するという選択肢が存在します。

本記事では、Pythonベースの分散タスクキューツールであるCeleryを用いて、WebAPIへのリクエストに対応したタスクを非同期実行し、結果を確認するまでを紹介します。サンプルコードは以下に配置しました。

https://github.com/daigo0927/blog/tree/master/celery-taskqueue

Celeryとは

公式ドキュメントの冒頭には以下のように書かれています。

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.
It’s a task queue with focus on real-time processing, while also supporting task scheduling.

端的に言えば、発行されるメッセージをキューによって管理し、タスクとして実行するためのツールと言えます。Celeryはキューを介したメッセージのやりとりを抽象化しており、これによって簡単にアプリケーションに非同期的な振る舞いを組み込むことができます。

https://docs.celeryproject.org/en/stable/index.html#

https://aws.amazon.com/jp/message-queue/

BrokerとBackend

メッセージキューを実装するためには、メッセージを格納、引き出しできるデータストアが必要となります。これを担当する要素をメッセージブローカーと呼び、CeleryではRedisRabbitMQなどを利用できます。タスクを実行する際にはキューからメッセージを引き出し、内容に従って処理を実行します。

バックエンドは処理結果の保持を担当します。ブローカーと同様にRedisなどを用いることもできるほか、PostgreSQLやMySQLなどを採用して永続性を強化することもできます。

今回のサンプルでは、ブローカーとバックエンドの両方にRedisを用いました。運用レベルではセキュリティなど検討すべき点が多いですが、検証レベルの観点では非常に簡単にCeleryと組み合わせることができたと感じます。

https://docs.celeryproject.org/en/stable/getting-started/backends-and-brokers/index.html

Flowerによるモニタリング

FlowerはCeleryの処理状況をモニタリングするためのライブラリです。ワーカーの稼働状況やタスクの処理状況をダッシュボードとしてブラウザから確認できるほか、ワーカーの再起動やオートスケーリングの設定などを行うこともできます。今回はタスクの処理状況を確認するためのダッシュボードとして利用します。

https://flower.readthedocs.io/en/latest/

FastAPIのBackgroundTasks

今回はリクエストを受け付けるAPIを、Pythonベースの軽量WebフレームワークであるFastAPIで実装しています。FastAPIにはBackgroundTasksという機能があり、非常に簡単に非同期(バックグラウンド)処理を実現することができます。

FastAPIのドキュメントではBackgroundTasksの留意点に触れており、その中で関連するツールとしてCeleryを挙げた上で、以下のように比較しています。

  • BackgroundTasks:FastAPIアプリとメモリや変数を共有する場合や、比較的軽量の処理を行う場合に有用
  • Celery(など):メッセージキューを細かく管理したい場合や、比較的重い処理を行う場合に有用

実際に利用する際には、これらの特徴を検討した上で適した方法を選ぶ必要があると言えます。

https://fastapi.tiangolo.com/tutorial/background-tasks/#caveat

サンプルコード

今回はサンプルとして、身長と体重をもとにBMIを計算するアプリ(というほどでもありませんが)を構築します。技術的な構成としては以下のようになります。

  • Celery: メッセージの管理とタスクの実行
  • Flower: タスクの実行状況を確認するためのダッシュボードツール
  • FastAPI: WebAPIの実装
  • Redis: メッセージと処理結果を保持するデータストア

ディレクトリ構成は以下のようになっています。メインとなるPythonスクリプトはserver.py, tasks.pyの二つであり、他はDockerなど環境関連のファイルです。

.
├── .env
├── README.md
├── celery-app
│   ├── .dockerignore
│   ├── poetry.lock
│   ├── pyproject.toml
│   ├── server.dockerfile
│   ├── server.py
│   ├── tasks.py
│   └── worker.dockerfile
└── docker-compose.yaml

タスクの定義

tasks.pyの中身は以下のようになります。

Celery(__name__)によって作成したインスタンスには、メッセージをやり取りするための情報としてブローカーとバックエンドのURLをbroker_url, result_backendに設定しています。他にタイムゾーンやタスクのレートリミットなども設定できます。

calc_bmiはBMIを計算する関数です。非同期処理の振る舞いを確認するために、10秒のスリープを挟んでいます。これを@celery.taskによってデコレートすることで、celeryインスタンスが管理するタスクとして登録しています。

celery-app/tasks.py
import os
import time
from celery import Celery

celery = Celery(__name__)
celery.conf.broker_url = os.environ.get('CELERY_BROKER_URL',
                                        'redis://localhost:6379')
celery.conf.result_backend = os.environ.get('CELERY_BACKEND_URL',
                                            'redis://localhost:6379')


@celery.task(name='tasks.calc_bmi')
def calc_bmi(weight: float, height: float) -> float:
    time.sleep(10)
    bmi = weight / height**2
    return bmi

https://docs.celeryproject.org/en/stable/getting-started/first-steps-with-celery.html#configuration

APIの実装

server.pyでは、FastAPIによってBMIの計算リクエストとタスクの処理状況を確認するAPIを実装しています。Body, TasksStatusはFastAPIにおいてリクエストやレスポンス形式を定義するためのクラスであり、説明は割愛します。

calculate_bmi関数では、calc_bmi.delayという形でBMIの計算を呼び出しています。@celery.taskでデコレートしたことで.delayという呼び出しが可能になっており、ブローカーにメッセージが発行された上で非同期処理が行われます。

check_status関数では、celery.AsyncResultを通じてタスクの処理状況を確認しています。

celery-app/server.py
from typing import Optional, Any
from fastapi import FastAPI
from pydantic import BaseModel
from tasks import celery, calc_bmi

app = FastAPI()


class Body(BaseModel):
    weight: float
    height: float


class TaskStatus(BaseModel):
    id: str
    status: Optional[str]
    result: Optional[Any]


@app.post('/bmi', response_model=TaskStatus, response_model_exclude_unset=True)
def calculate_bmi(body: Body):
    task = calc_bmi.delay(weight=body.weight, height=body.height)
    return TaskStatus(id=task.id)


@app.get('/bmi/{task_id}', response_model=TaskStatus)
def check_status(task_id: str):
    result = celery.AsyncResult(task_id)
    status = TaskStatus(id=task_id, status=result.status, result=result.result)
    return status

コンテナの構成

以下の4つのコンテナによってアプリを構成します。

  • server: FastAPIによるAPIサーバー。Gunicornによってウェブサーバーを起動する。
  • worker: Celeryワーカー。celery --app=tasks.celery workerとして起動する。
  • dashboard: Flowerによるダッシュボード。celery --app=tasks.celery flowerとして起動する。
  • redis: ブローカー兼バックエンドとして処理中・処理済みのタスク情報を保持する。
docker-compose.yaml
docker-compose.yaml
version: '3.8'
services:
  server:
    build:
      context: ./celery-app
      dockerfile: server.dockerfile
    depends_on:
      - worker
      - dashboard
    ports:
      - 8080:80
    env_file:
      - .env      
    environment:
      # See details at https://github.com/tiangolo/uvicorn-gunicorn-docker#advanced-usage
      APP_MODULE: server:app
      PORT: 80
  worker:
    build:
      context: ./celery-app
      dockerfile: worker.dockerfile
    depends_on:
      - redis
    command:
      - celery
      - --app=tasks.celery
      - worker
      - --loglevel=info
    env_file:
      - .env
  dashboard:
    build:
      context: ./celery-app
      dockerfile: worker.dockerfile
    depends_on:
      - redis
    command:
      - celery
      - --app=tasks.celery
      - flower
      - --port=5555
    ports:
      - 5556:5555
    env_file:
      - .env
  redis:
    image: redis:latest
    volumes:
      - celery_app:/data        

volumes:
  celery_app:
    driver: local

Redisに関する設定は各コンテナで共通のため、環境変数ファイルから各コンテナで読み込んでいます。ブローカーとして処理待ちのメッセージを保持するデータベースと、バックエンドとして処理済みのタスク情報を保持するデータベースは別にしました(同じにもできます)。

.env
CELERY_BROKER_URL=redis://redis:6379/0
CELERY_BACKEND_URL=redis://redis:6379/1

補足として、Redisはインメモリデータベースのため、サーバープロセスの終了と同時にストアした内容も消えてしまいます。これに対して、Redisはデフォルトでいくつかの永続化設定が定義されています。同時にコンテナボリュームの作成による永続化も行っています(そもそもデータベースをコンテナで管理するのかという議論はありますが、今回は割愛します)。

https://redis-documentasion-japanese.readthedocs.io/ja/latest/topics/persistence.html

実行結果

docker compose build/up/downによって、各イメージのビルド、起動、停止ができます。FastAPIのサーバーが稼働しているため、BMIの計算をリクエストすることができます。適当な体重と身長をJSON形式でポストすると、BMIではなくタスクidが返されます。

curl -X 'POST' \
  'http://127.0.0.1:8080/bmi' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{"weight": 65, "height": 1.8}'
# >> {"id":<task_id>}

このときworkerコンテナでは以下のようなログが出力され、tasks.calc_bmiタスクが受理されたことが確認できます。

worker_1     | [2021-10-02 15:19:27,065: INFO/MainProcess] Task tasks.calc_bmi[<task_id>] received

タスクの処理中、完了後にbmi/{task_id}にアクセスすると、server.pycelery.AsyncResult(task_id)からCeleryを介してタスクの処理状況を確認でき、以下のような結果が返ります。

curl 'http://127.0.0.1:8080/bmi/<task_id>'
# >> {"id":<task_id>,"status":"PENDING","result": null} # タスク処理中
# >> {"id":<task_id>,"status":"SUCCESS","result": 20.061728395061728
} # タスク完了後

またworkerコンテナでは以下のようなログが出力され、タスクが完了したことがわかります。

worker_1     | [2021-10-02 15:19:37,041: INFO/ForkPoolWorker-4] Task tasks.calc_bmi[<task_id>] succeeded in 10.007367299993348s: 20.061728395061728

ブラウザから127.0.0.1:5556にアクセスすると、Flowerのダッシュボードを確認できます。各タスクの処理状況のほかに、稼働中のワーカーの設定なども確認できます。

まとめ

Celeryを用いることで、Pythonベースで簡単に非同期タスク処理が実現できました。私の今回の検証のきっかけは機械学習モデルの非同期推論でしたが、非同期タスク実行という仕組みは広い応用性があると思います。

最近はAWSやGCPなどのクラウドベースのメッセージキュー、ジョブ実行サービスも提供されており、こちらはインフラを柔軟に設定・利用できるなどのメリットがあります。一方でCeleryなどOSSのメリットとしては、手元での検証やデバッグが容易であるという点が挙げられます。実用時にはそれぞれの要点を認識した上で使い分けるのが良いでしょう。

参考資料

https://docs.celeryproject.org/en/stable/index.html

https://github.com/testdrivenio/fastapi-celery

https://docs.aws.amazon.com/ja_jp/batch/latest/userguide/what-is-batch.html

https://cloud.google.com/tasks

Discussion