DjangoのCeleryで非同期でタスクを実行して、モニタリングする環境をDockerで構築する
Pythonで非同期処理をやる場合はCelery
を使うのが定番です。
今回はceleryを試しに動作させるための環境をdocker-compose
を使って簡単に作る方法をご紹介し、Django上で動作するように環境構築します。
以前、昔にnoteに書いた以下の記事の応用や焼き直しのような内容です。(随分昔に書いた懐かしい記事)
基本的に使用するツール類
今回は以下の4つを使用します。
それぞれ別なコンテナで動作させるため、本番の運用などでは、別々なサーバーで動作させることを想定しています。
- redis (キューを動作させるためのBroker)
- celery (Pythonのタスクキューサービス、別なプロセスで動作させる)
- django (PythonのWEBフレーワーク、ここからceleryに対しタスクを投げることになる)
- flower (celery内にあるタスクを監視するためのツール、webで動作)
非同期処理とは
非同期処理とは、一つのタスクが完了するのを待たずに次のタスクを開始することを意味します。
非同期処理の主な利点は以下の通りです。
-
効率的なリソースの利用: 同時に複数のタスクを進行させることができるため、リソース(CPUやメモリ)を最大限に活用することができる。
-
ユーザーエクスペリエンスの向上: 長時間かかるタスクがバックグラウンドで実行されている間も、ユーザーは他の操作を継続できる。
-
タイムアウトの回避: サーバーへのリクエストがタイムアウトになる前に、重いタスクをバックグラウンドで開始することができる。
もし、CPUに負荷がかかってしまうような計算処理や、RDBへの永続処理などの一つ一つが重いような処理を行う必要性がある場合は、Queueに処理であるタスクを貯めていきます。
後はキューであるRedisにタスクが貯まるのをワーカーであるCeleryが検知し、自動で負荷状況などを調整して、タスクを徐々に実行してくれます。
またこの、Queueである部分のことは、Broker(ブローカー) とも呼びます。
# 簡単な図
(Producer) => (Queue Redis : Broker) => (Celery: Consumer(Worker))
Celeryとは
Celeryは、Pythonで非同期タスクを実行するための強力なツールです。
特に、長時間実行する必要があるタスクや定期的に実行するタスクをバックグラウンドで処理するのに便利です。
Celeryの主な特徴:
-
ブローカを介したメッセージキュー: Celeryは、タスクのメッセージを
RabbitMQ
やRedis
などのブローカを介して処理する。 - 複数のワーカー: いくつものワーカープロセスやワーカーノードを使用してタスクを並行して処理することができる。
- 拡張性: 独自のワーカーやタスク、スケジュールなどを容易に追加することができる。
一番初めのDjango環境の整備
それではまず最初にDjangoの環境をインストールするところから始めます。以下のようにapp
というディレクトリを作成して、そこにDjangoの環境を構築します。
$ mkdir app
$ cd app
$ touch requirements.txt
パッケージ管理をするrequirements.txt
を作成し、以下の3つのパッケージを記述します。
- django
- celery
- redis
Django==4.2.4
celery==5.3.1
redis==5.0.0
flower==2.0.1
3つのパッケージをインストールして、django-admin
コマンドでDjangoのプロジェクトを作成します。
$ cd app
$ pip install -r requirements.txt
$ django-admin startproject config .
$ tree
.
├── config
│ ├── __init__.py
│ ├── asgi.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
└── manage.py
Redisの設定
Djangoの設定ファイルであるconfig/settings.py
の中にあるCACHES
にRedisの設定をします。
DjangoのキャッシュはReidsを使用することを設定しており、後々記述するCELERY_CACHE_BACKEND
を設定するためにも必要です。
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://redis:6379'
}
}
タスクの作成
まずはDjango上にアプリケーションを作成し、今回は投票関連のアプリを作ろうかと思うので、polls
というものを作成します。
$ python manage.py startapp polls
ディレクトリ構成が以下のようになりました。
$ tree
.
├── config
│ ├── __init__.py
│ ├── asgi.py
│ ├── celery.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── db.sqlite3
├── manage.py
├── polls
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── migrations
│ │ └── __init__.py
│ ├── models.py
│ ├── tests.py
│ └── views.py
└── requirements.txt
polls
というアプリケーションを作成したので、Djangoの設定ファイルであるconfig/settings.py
の中にあるINSTALLED_APPS
にpolls
を追加します。
# Application definition
INSTALLED_APPS = [
"django.contrib.admin",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
"polls"
]
polls
のディレクトリの中に、tasks.py
というファイルを作成し、以下の簡単なhello_world
という関数を作成します。これがCeleryというWorkerで動作する関数です。
hello_world
メソッドの前に @shared_task
というのがあります。
このメソッドはceleryタスクである
というのを明示的に示しています。
from celery import shared_task # type: ignore
@shared_task()
def hello_world():
print("start hello_world")
print("hello")
print("-----" * 200)
print("end hello_world")
@shared_task()
def calc(a: int, b: int) -> int:
result: int = a + b
return result
Djangoの設定ファイルにCeleryの設定をする
config
内でcelery.py
というファイルを作成して以下のように設定します。
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app: Celery = Celery("app")
app.config_from_object("django.conf:settings", namespace="CELERY") # type: ignore
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) # type: ignore
config
の__init__.py
を以下のように記述することで、
同ディレクトリであるconfig/celery.py
からapp
がインポートされることで、Django起動時にapp
が自動的にロードされます。
from .celery import app as celery_app
__all__ = ("celery_app",)
config/settings.py
に以下の記述をします。これがDjangoからCeleryを扱うための設定を読み取るためのファイルです。
# Celery configurations
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZZER = 'json'
# 'amqp://guest:guest@localhost//'
# celeryを動かすための設定ファイル
CELERY_BROKER_URL = "redis://redis:6379"
CELERY_CACHE_BACKEND = "django-cache"
CELERY_RESULT_EXTENDED = True
CELERY_BROKER_URL
はBrokerであるRedisの参照先を意味している。
CELERY_CACHE_BACKEND
でDjangoのキャッシュ先であるRedisをキャッシュとして指定しています。
今回は実装しないが、タスクのステートや戻り値をRedis
に保存したい場合は、次の設定を追加する。
CELERY_RESULT_BACKEND = 'redis://redis:6379/0'
ここでは設定しなくても良いですが、以下のような設定項目がありますが、コマンドのオプションでも指定できます。
# CELERYD_CONCURRENCY=1なので、1こずつキューを捌いていく
# ここはCPU数に合わせていくのがよい
CELERYD_CONCURRENCY = 1
CELERYD_LOG_FILE = "./celeryd.log"
# CELERYD_LOG_LEVELをINFOにしておくと、
# タスクの標準出力もログ(celeryd.log)に書かれる
CELERYD_LOG_LEVEL = "INFO"
Celeryにタスクを投げるためのカスタムコマンドを作成する
polls
ディレクトリの中に/management/commands
というディレクトリを作成します。
Djangoのカスタムコマンドを機能を使用して、Celery
にタスクを投げるための処理を作成してみます。以下のようにCelery
に非同期処理のキューを積むカスタムコマンドを作ります。
ここではhello_world_queue.py
というファイル名にします。これがコマンド名となり、python manage.py hello_world_queue
で実行できるようになります。
from django.core.management.base import BaseCommand
from polls.tasks import hello_world
class Command(BaseCommand):
def handle(self, *args, **options): # type: ignore
print("====== START =================")
hello_world.apply_async(args=()) # type: ignore
print("====== END =================")
Docker環境の構築
お次にDockerの環境を構築しています。docker-compose.yml
はcelery_app
の1つ上の階層においてください。
ここではdocker-compose
を以下のように記述してください。
version: "3"
services:
app:
restart: always
build:
context: ./celery_app
volumes:
- ./celery_app:/usr/src/app
depends_on:
- db
- redis
command: |
bash -c "python manage.py migrate && python manage.py runserver 0.0.0.0:8000"
ports:
- 8000:8000
celery:
container_name: celery
tty: true
build:
context: ./celery_app
volumes:
- ./celery_app:/usr/src/app
command: celery -A config worker -l info
depends_on:
- app
- redis
monitor:
container_name: monitor
tty: true
build:
context: ./celery_app
volumes:
- ./celery_app:/usr/src/app
ports:
- 5555:5555
command: celery -A config flower --port=5555
depends_on:
- app
- redis
db:
platform: linux/amd64
image: mysql:5.7
environment:
MYSQL_USER: docker
MYSQL_PASSWORD: docker
MYSQL_ROOT_PASSWORD: local_root_password
MYSQL_DATABASE: db
ports:
- 3306:3306
command: --port 3306
redis:
image: redis:latest
restart: always
tty: true
ports:
- 6379:6379
ここでは以下4つのコンテナを構築しています。
app
Djangoを動作させるWEBアプリケーションを動作させるコンテナ。ここでは8000番ポートでで動作させています。
ここからQueueであるRedisに対してタスクを投げます。
celery
BrokerであるRedisのキューにタスクが入ったのを検知するワーカープロセスを動かすWorkerコンテナ。
キューに貯まったタスクの実行も行う。
monitor
celeryのタスク監視を行うflowerを動作させるコンテナ
db
RDBを動作させるMySQLのコンテナです。ここでは使いませんが、後々使用します。
redis
KVSでもありQueueを動作させるためのRedisを動作させるBrokerコンテナ。
Redis以外にもRabbitMQなどを使用してもかまいません。
以下のようなDockerfile
をcelery_app
の直下におきます。
FROM python:3.11-slim
ENV PYTHONPATH /usr/src/app
WORKDIR /usr/src/app
RUN apt update \
&& apt-get -y install gcc libmariadb-dev \
&& apt install -y default-mysql-client \
&& apt-get install -y default-libmysqlclient-dev \
&& apt install --no-install-recommends -y tzdata \
&& apt-get install -y git \
&& apt clean
COPY . /usr/src/app
RUN pip install --upgrade pip
RUN pip install -r requirements.txt --no-cache-dir
EXPOSE 8000
最終的な構成としては以下のようになっているはずです。
$ tree
.
├── celery_app
│ ├── Dockerfile
│ ├── config
│ │ ├── __init__.py
│ │ ├── asgi.py
│ │ ├── celery.py
│ │ ├── settings.py
│ │ ├── urls.py
│ │ └── wsgi.py
│ ├── db.sqlite3
│ ├── manage.py
│ ├── polls
│ │ ├── __init__.py
│ │ ├── admin.py
│ │ ├── apps.py
│ │ ├── management
│ │ │ └── commands
│ │ │ └── hello_world_queue.py
│ │ ├── migrations
│ │ │ └── __init__.py
│ │ ├── models.py
│ │ ├── tasks.py
│ │ ├── tests.py
│ │ └── views.py
│ └── requirements.txt
└── docker-compose.yaml # <- docker-compose.ymlを一番上に置く
それでは実際に動作のために立ち上げてみましょう。
$ docker-compose up -d
ログなどをみてみて、エラーなどが起きていなければ成功です。
$ docker-compose logs -f
カスタムコマンドを実行してCeleryの動作を試す
それでは先ほど作成したhello_world
関数を動かしてみましょう。
$ docker-compose exec app bash
# appコンテナのシェルの中に入る
$ python manage.py hello_world_queue
====== START =================
====== END =================
すると、celery
コンテナのログは以下のように記述されており、hello_world
関数のprint文の実行結果が書き込まれているはずです。
$ docker-compose logs -f celery
celery | Please specify a different user using the --uid option.
celery |
celery | User information: uid=0 euid=0 gid=0 egid=0
celery |
celery | warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(
celery |
celery | -------------- celery@96f60d65d01d v5.3.1 (emerald-rush)
celery | --- ***** -----
celery | -- ******* ---- Linux-5.15.49-linuxkit-pr-aarch64-with-glibc2.36 2023-08-19 01:20:11
celery | - *** --- * ---
celery | - ** ---------- [config]
celery | - ** ---------- .> app: app:0xffffaf9285d0
celery | - ** ---------- .> transport: redis://redis:6379//
celery | - ** ---------- .> results: disabled://
celery | - *** --- * --- .> concurrency: 4 (prefork)
celery | -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
celery | --- ***** -----
celery | -------------- [queues]
celery | .> celery exchange=celery(direct) key=celery
celery |
celery |
celery | [tasks]
celery | . polls.tasks.hello_world
celery | [2023-08-19 01:20:12,576: INFO/MainProcess] mingle: searching for neighbors
celery | [2023-08-19 01:20:13,581: INFO/MainProcess] mingle: all alone
celery | [2023-08-19 01:20:13,593: INFO/MainProcess] celery@96f60d65d01d ready.
celery | [2023-08-19 01:20:15,996: INFO/MainProcess] Task polls.tasks.hello_world[b67ecae4-e311-46b0-847d-1af346bfbee4] received
celery | [2023-08-19 01:20:15,998: WARNING/ForkPoolWorker-4] start hello_world
celery | [2023-08-19 01:20:15,998: WARNING/ForkPoolWorker-4] hello
celery | [2023-08-19 01:20:15,998: WARNING/ForkPoolWorker-4] ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
celery | [2023-08-19 01:20:15,999: WARNING/ForkPoolWorker-4] end hello_world
celery | [2023-08-19 01:20:15,999: INFO/ForkPoolWorker-4] Task polls.tasks.hello_world[b67ecae4-e311-46b0-847d-1af346bfbee4] succeeded in 0.001324790995568037s: None
Redisに積まれたQueueがworkerであるceleryによって拾い上げられ、処理されることを確認できます。
Celeryコマンドの詳細
Celery
コンテナのコマンドについて解説します。
services:
...
celery:
...
volumes:
- ./celery_app:/usr/src/app
command: celery -A config worker -l info
...
docker-compose.yml
のファイルのceleryのcommandには以下のようなコマンドが記述されています。これは/celery_app
ディレクトリ下で実行しており、プロジェクト名の部分であるconfig
配下には__all__ = ("celery_app",)
が記述されているconfig/__init__.py
を読み込んでいます。
$ celery -A config worker -l info`
$ celery -A {プロジェクト名} worker -l {ログレベル}
-A
プロジェクト名を入れる。上記で言うなら、config
を指定している。
-Q
-Q
, --queues
でも指定可能。
Workerが取得するキューを指定する。ここで指定はしていません。
キューの名前をカンマ区切りで指定します。例えば、queue1とqueue2にリッスンするようにワーカーを指定する場合は以下のようにします。
このオプションを使用すると、特定のキューのみを処理する専用のワーカーセットを持つことができる。
これは、異なる優先順位やリソース要件を持つタスクを効果的に管理するために役立つ。
$ celery -A proj worker -Q queue1,queue2
-c
-c
, --concurrency
でも指定可能。
concurrencyの略であり、最大並列数を指定する。
デフォルトの concurrency数
はマシンのCPU (Core も含む) 数
です。
ここに1を入れると並列数は1、つまり直列処理になる。
10を入れると最大10個のスレッドで並列処理が実行される。
-l
ログレベルのオプション。
- DEBUG
- INFO
- WARNING
- ERROR
- CRITICAL
ログレベルは、これらのいずれかを選択します。
-E
-E
, --events
でも指定可能。
Workerで発生したアクションの監視メッセージ(イベント)
をCelery
に送信させるオプション。
このオプションを使用すると、Workerが task events
を送信する。
Workerがタスクを開始、完了、失敗させるたびに、特定のイベントをブローカーに送信することを意味する。
これにより、Workerのタスクに関する詳細な情報をリアルタイムで監視することができる。
例えば、後々紹介するFlower
といったCeleryの監視ツールは、これらのイベントを使用してワーカーやタスクの状態をリアルタイムで表示する。
$ celery -A proj worker -E
$ celery -A config worker -l info` -c 2
--logfile
を指定することで、実行結果がログファイルに書き込まれます。
services:
...
celery:
...
volumes:
- ./celery_app:/usr/src/app
- ./celery_app/logs:/usr/src/app/logs
command: celery -A config worker -l info --logfile=logs/celery.log
depends_on:
- app
- redis
Celeryのモニタリング環境を見てみる
flowerが動作する127.0.0.1:5555
にアクセスして見ると、以下のような画面が表示されます。
celeryが現在処理しているタスクなどのキューの様子がリアルタイムで見れます。
以下の4つの項目があります。
Active - 動作中のタスク
Processed - 処理し終わったタスク
Failed - 失敗したタスク
Succeeded - 成功したタスク
上記の画面を見ながら、新たなタスクを実行させてみましょう。以下のコマンドでappコンテナの中に入ったら、先ほどのカスタムコマンドを実行してみます。
$ docker-compose exec app bash
$ python manage.py hello_world_queue
====== START =================
====== END =================
flowerの画面に戻ってみると、Active
とProcessed
がそれぞれ1が付いてタスクが動作中なのがわかり、しばらくすると、Succeeded
とProcessed
がそれぞれカウントされて、タスクが処理されて成功したことがわかります。
まとめ
サンプルコードは以下にまとめました。
続編としてCeleryの実行結果などを自動で保存してくれるライブラリなどもあるので、そちらの使い方などを紹介していきます。
[続編]
参考資料
Discussion