🪷

DjangoのCeleryで非同期でタスクを実行して、モニタリングする環境をDockerで構築する

2023/09/23に公開

Pythonで非同期処理をやる場合はCeleryを使うのが定番です。

今回はceleryを試しに動作させるための環境をdocker-composeを使って簡単に作る方法をご紹介し、Django上で動作するように環境構築します。

以前、昔にnoteに書いた以下の記事の応用や焼き直しのような内容です。(随分昔に書いた懐かしい記事)

https://note.com/shimakaze_soft/n/n4a2b63d320ed

基本的に使用するツール類

今回は以下の4つを使用します。
それぞれ別なコンテナで動作させるため、本番の運用などでは、別々なサーバーで動作させることを想定しています。

  • redis (キューを動作させるためのBroker)
  • celery (Pythonのタスクキューサービス、別なプロセスで動作させる)
  • django (PythonのWEBフレーワーク、ここからceleryに対しタスクを投げることになる)
  • flower (celery内にあるタスクを監視するためのツール、webで動作)

非同期処理とは

非同期処理とは、一つのタスクが完了するのを待たずに次のタスクを開始することを意味します。
非同期処理の主な利点は以下の通りです。

  1. 効率的なリソースの利用: 同時に複数のタスクを進行させることができるため、リソース(CPUやメモリ)を最大限に活用することができる。

  2. ユーザーエクスペリエンスの向上: 長時間かかるタスクがバックグラウンドで実行されている間も、ユーザーは他の操作を継続できる。

  3. タイムアウトの回避: サーバーへのリクエストがタイムアウトになる前に、重いタスクをバックグラウンドで開始することができる。

もし、CPUに負荷がかかってしまうような計算処理や、RDBへの永続処理などの一つ一つが重いような処理を行う必要性がある場合は、Queueに処理であるタスクを貯めていきます。

後はキューであるRedisにタスクが貯まるのをワーカーであるCeleryが検知し、自動で負荷状況などを調整して、タスクを徐々に実行してくれます。

またこの、Queueである部分のことは、Broker(ブローカー) とも呼びます。

# 簡単な図

(Producer) => (Queue Redis : Broker) => (Celery: Consumer(Worker))

Celeryとは

Celeryは、Pythonで非同期タスクを実行するための強力なツールです。
特に、長時間実行する必要があるタスクや定期的に実行するタスクをバックグラウンドで処理するのに便利です。

Celeryの主な特徴:

  • ブローカを介したメッセージキュー: Celeryは、タスクのメッセージをRabbitMQRedisなどのブローカを介して処理する。
  • 複数のワーカー: いくつものワーカープロセスやワーカーノードを使用してタスクを並行して処理することができる。
  • 拡張性: 独自のワーカーやタスク、スケジュールなどを容易に追加することができる。

一番初めのDjango環境の整備

それではまず最初にDjangoの環境をインストールするところから始めます。以下のようにappというディレクトリを作成して、そこにDjangoの環境を構築します。

$ mkdir app
$ cd app
$ touch requirements.txt

パッケージ管理をするrequirements.txtを作成し、以下の3つのパッケージを記述します。

  • django
  • celery
  • redis
app/requirements.txt
Django==4.2.4
celery==5.3.1
redis==5.0.0
flower==2.0.1

3つのパッケージをインストールして、django-adminコマンドでDjangoのプロジェクトを作成します。

$ cd app
$ pip install -r requirements.txt
$ django-admin startproject config .

$ tree
.
├── config
│   ├── __init__.py
│   ├── asgi.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── manage.py

Redisの設定

Djangoの設定ファイルであるconfig/settings.pyの中にあるCACHESにRedisの設定をします。

DjangoのキャッシュはReidsを使用することを設定しており、後々記述するCELERY_CACHE_BACKENDを設定するためにも必要です。

config/settings.py
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.redis.RedisCache',
        'LOCATION': 'redis://redis:6379'
    }
}

タスクの作成

まずはDjango上にアプリケーションを作成し、今回は投票関連のアプリを作ろうかと思うので、pollsというものを作成します。

$ python manage.py startapp polls

ディレクトリ構成が以下のようになりました。

$ tree
.
├── config
│   ├── __init__.py
│   ├── asgi.py
│   ├── celery.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── db.sqlite3
├── manage.py
├── polls
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tests.py
│   └── views.py
└── requirements.txt

pollsというアプリケーションを作成したので、Djangoの設定ファイルであるconfig/settings.pyの中にあるINSTALLED_APPSpollsを追加します。

config/settings.py
# Application definition

INSTALLED_APPS = [
    "django.contrib.admin",
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.messages",
    "django.contrib.staticfiles",

    "polls"
]

pollsのディレクトリの中に、tasks.pyというファイルを作成し、以下の簡単なhello_worldという関数を作成します。これがCeleryというWorkerで動作する関数です。

hello_worldメソッドの前に @shared_taskというのがあります。
このメソッドはceleryタスクであるというのを明示的に示しています。

polls/tasks.py
from celery import shared_task  # type: ignore


@shared_task()
def hello_world():
    print("start hello_world")
    print("hello")
    print("-----" * 200)
    print("end hello_world")


@shared_task()
def calc(a: int, b: int) -> int:
    result: int = a + b
    return result

Djangoの設定ファイルにCeleryの設定をする

config内でcelery.pyというファイルを作成して以下のように設定します。

config/celery.py
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")

app: Celery = Celery("app")

app.config_from_object("django.conf:settings", namespace="CELERY")  # type: ignore
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)  # type: ignore

config__init__.pyを以下のように記述することで、

同ディレクトリであるconfig/celery.pyからappがインポートされることで、Django起動時にappが自動的にロードされます。

config/__init__.py
from .celery import app as celery_app

__all__ = ("celery_app",)

config/settings.pyに以下の記述をします。これがDjangoからCeleryを扱うための設定を読み取るためのファイルです。

config/settings.py
# Celery configurations
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZZER = 'json'

# 'amqp://guest:guest@localhost//'
# celeryを動かすための設定ファイル
CELERY_BROKER_URL = "redis://redis:6379"
CELERY_CACHE_BACKEND = "django-cache"
CELERY_RESULT_EXTENDED = True

CELERY_BROKER_URLはBrokerであるRedisの参照先を意味している。

CELERY_CACHE_BACKENDでDjangoのキャッシュ先であるRedisをキャッシュとして指定しています。

今回は実装しないが、タスクのステートや戻り値をRedisに保存したい場合は、次の設定を追加する。

config/settings.py
CELERY_RESULT_BACKEND = 'redis://redis:6379/0'

ここでは設定しなくても良いですが、以下のような設定項目がありますが、コマンドのオプションでも指定できます。

config/settings.py
# CELERYD_CONCURRENCY=1なので、1こずつキューを捌いていく
# ここはCPU数に合わせていくのがよい
CELERYD_CONCURRENCY = 1

CELERYD_LOG_FILE = "./celeryd.log"

# CELERYD_LOG_LEVELをINFOにしておくと、
# タスクの標準出力もログ(celeryd.log)に書かれる
CELERYD_LOG_LEVEL = "INFO"

Celeryにタスクを投げるためのカスタムコマンドを作成する

pollsディレクトリの中に/management/commandsというディレクトリを作成します。

Djangoのカスタムコマンドを機能を使用して、Celeryにタスクを投げるための処理を作成してみます。以下のようにCeleryに非同期処理のキューを積むカスタムコマンドを作ります。

ここではhello_world_queue.pyというファイル名にします。これがコマンド名となり、python manage.py hello_world_queueで実行できるようになります。

polls/management/commands/hello_world_queue.py
from django.core.management.base import BaseCommand

from polls.tasks import hello_world

class Command(BaseCommand):
    def handle(self, *args, **options):  # type: ignore
        print("====== START =================")
        hello_world.apply_async(args=())  # type: ignore

        print("====== END   =================")

Docker環境の構築

お次にDockerの環境を構築しています。docker-compose.ymlcelery_appの1つ上の階層においてください。

ここではdocker-composeを以下のように記述してください。

docker-compose.yml
version: "3"
services:
  app:
    restart: always
    build:
      context: ./celery_app
    volumes:
      - ./celery_app:/usr/src/app
    depends_on:
      - db
      - redis
    command: |
      bash -c "python manage.py migrate && python manage.py runserver 0.0.0.0:8000"
    ports:
      - 8000:8000

  celery:
    container_name: celery
    tty: true
    build:
      context: ./celery_app
    volumes:
      - ./celery_app:/usr/src/app
    command: celery -A config worker -l info
    depends_on:
      - app
      - redis

  monitor:
    container_name: monitor
    tty: true
    build:
      context: ./celery_app
    volumes:
      - ./celery_app:/usr/src/app
    ports:
      - 5555:5555
    command: celery -A config flower --port=5555
    depends_on:
      - app
      - redis

  db:
    platform: linux/amd64
    image: mysql:5.7
    environment:
      MYSQL_USER: docker
      MYSQL_PASSWORD: docker
      MYSQL_ROOT_PASSWORD: local_root_password
      MYSQL_DATABASE: db
    ports:
      - 3306:3306
    command: --port 3306

  redis:
    image: redis:latest
    restart: always
    tty: true
    ports:
      - 6379:6379

ここでは以下4つのコンテナを構築しています。

app

Djangoを動作させるWEBアプリケーションを動作させるコンテナ。ここでは8000番ポートでで動作させています。
ここからQueueであるRedisに対してタスクを投げます。

celery

BrokerであるRedisのキューにタスクが入ったのを検知するワーカープロセスを動かすWorkerコンテナ。
キューに貯まったタスクの実行も行う。

monitor

celeryのタスク監視を行うflowerを動作させるコンテナ

db

RDBを動作させるMySQLのコンテナです。ここでは使いませんが、後々使用します。

redis

KVSでもありQueueを動作させるためのRedisを動作させるBrokerコンテナ。
Redis以外にもRabbitMQなどを使用してもかまいません。


以下のようなDockerfilecelery_appの直下におきます。

Dockerfile
FROM python:3.11-slim

ENV PYTHONPATH /usr/src/app

WORKDIR /usr/src/app

RUN apt update \
    && apt-get -y install gcc libmariadb-dev \
    && apt install -y default-mysql-client \
    && apt-get install -y default-libmysqlclient-dev \
    && apt install --no-install-recommends -y tzdata \
    && apt-get install -y git \
    && apt clean

COPY . /usr/src/app
RUN pip install --upgrade pip
RUN pip install -r requirements.txt --no-cache-dir

EXPOSE 8000

最終的な構成としては以下のようになっているはずです。

$ tree
.
├── celery_app
│   ├── Dockerfile
│   ├── config
│   │   ├── __init__.py
│   │   ├── asgi.py
│   │   ├── celery.py
│   │   ├── settings.py
│   │   ├── urls.py
│   │   └── wsgi.py
│   ├── db.sqlite3
│   ├── manage.py
│   ├── polls
│   │   ├── __init__.py
│   │   ├── admin.py
│   │   ├── apps.py
│   │   ├── management
│   │   │   └── commands
│   │   │       └── hello_world_queue.py
│   │   ├── migrations
│   │   │   └── __init__.py
│   │   ├── models.py
│   │   ├── tasks.py
│   │   ├── tests.py
│   │   └── views.py
│   └── requirements.txt
└── docker-compose.yaml # <- docker-compose.ymlを一番上に置く

それでは実際に動作のために立ち上げてみましょう。

$ docker-compose up -d

ログなどをみてみて、エラーなどが起きていなければ成功です。

$ docker-compose logs -f

カスタムコマンドを実行してCeleryの動作を試す

それでは先ほど作成したhello_world関数を動かしてみましょう。

$ docker-compose exec app bash

# appコンテナのシェルの中に入る
$ python manage.py hello_world_queue
====== START =================
====== END   =================

すると、celeryコンテナのログは以下のように記述されており、hello_world関数のprint文の実行結果が書き込まれているはずです。

$ docker-compose logs -f celery

celery  | Please specify a different user using the --uid option.
celery  |
celery  | User information: uid=0 euid=0 gid=0 egid=0
celery  |
celery  |   warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(
celery  |
celery  |  -------------- celery@96f60d65d01d v5.3.1 (emerald-rush)
celery  | --- ***** -----
celery  | -- ******* ---- Linux-5.15.49-linuxkit-pr-aarch64-with-glibc2.36 2023-08-19 01:20:11
celery  | - *** --- * ---
celery  | - ** ---------- [config]
celery  | - ** ---------- .> app:         app:0xffffaf9285d0
celery  | - ** ---------- .> transport:   redis://redis:6379//
celery  | - ** ---------- .> results:     disabled://
celery  | - *** --- * --- .> concurrency: 4 (prefork)
celery  | -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
celery  | --- ***** -----
celery  |  -------------- [queues]
celery  |                 .> celery           exchange=celery(direct) key=celery
celery  |
celery  |
celery  | [tasks]
celery  |   . polls.tasks.hello_world
celery  | [2023-08-19 01:20:12,576: INFO/MainProcess] mingle: searching for neighbors
celery  | [2023-08-19 01:20:13,581: INFO/MainProcess] mingle: all alone
celery  | [2023-08-19 01:20:13,593: INFO/MainProcess] celery@96f60d65d01d ready.
celery  | [2023-08-19 01:20:15,996: INFO/MainProcess] Task polls.tasks.hello_world[b67ecae4-e311-46b0-847d-1af346bfbee4] received
celery  | [2023-08-19 01:20:15,998: WARNING/ForkPoolWorker-4] start hello_world
celery  | [2023-08-19 01:20:15,998: WARNING/ForkPoolWorker-4] hello
celery  | [2023-08-19 01:20:15,998: WARNING/ForkPoolWorker-4] ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
celery  | [2023-08-19 01:20:15,999: WARNING/ForkPoolWorker-4] end hello_world
celery  | [2023-08-19 01:20:15,999: INFO/ForkPoolWorker-4] Task polls.tasks.hello_world[b67ecae4-e311-46b0-847d-1af346bfbee4] succeeded in 0.001324790995568037s: None

Redisに積まれたQueueがworkerであるceleryによって拾い上げられ、処理されることを確認できます。

Celeryコマンドの詳細

Celeryコンテナのコマンドについて解説します。

services:
  ...

  celery:
    ...
    volumes:
      - ./celery_app:/usr/src/app
    command: celery -A config worker -l info
    ...

docker-compose.ymlのファイルのceleryのcommandには以下のようなコマンドが記述されています。これは/celery_appディレクトリ下で実行しており、プロジェクト名の部分であるconfig配下には__all__ = ("celery_app",)が記述されているconfig/__init__.pyを読み込んでいます。

$ celery -A config worker -l info`

$ celery -A {プロジェクト名} worker -l {ログレベル}

-A

プロジェクト名を入れる。上記で言うなら、configを指定している。

-Q

-Q, --queuesでも指定可能。

Workerが取得するキューを指定する。ここで指定はしていません。

キューの名前をカンマ区切りで指定します。例えば、queue1とqueue2にリッスンするようにワーカーを指定する場合は以下のようにします。

このオプションを使用すると、特定のキューのみを処理する専用のワーカーセットを持つことができる。
これは、異なる優先順位やリソース要件を持つタスクを効果的に管理するために役立つ。

$ celery -A proj worker -Q queue1,queue2

-c

-c, --concurrencyでも指定可能。

concurrencyの略であり、最大並列数を指定する。
デフォルトの concurrency数はマシンのCPU (Core も含む) 数です。

ここに1を入れると並列数は1、つまり直列処理になる。
10を入れると最大10個のスレッドで並列処理が実行される。

-l

ログレベルのオプション。

  • DEBUG
  • INFO
  • WARNING
  • ERROR
  • CRITICAL

ログレベルは、これらのいずれかを選択します。

-E

-E, --eventsでも指定可能。

Workerで発生したアクションの監視メッセージ(イベント)Celeryに送信させるオプション。
このオプションを使用すると、Workerが task eventsを送信する。
Workerがタスクを開始、完了、失敗させるたびに、特定のイベントをブローカーに送信することを意味する。

これにより、Workerのタスクに関する詳細な情報をリアルタイムで監視することができる。
例えば、後々紹介するFlowerといったCeleryの監視ツールは、これらのイベントを使用してワーカーやタスクの状態をリアルタイムで表示する。

$ celery -A proj worker -E

$ celery -A config worker -l info` -c 2

--logfileを指定することで、実行結果がログファイルに書き込まれます。

services:
  ...
  celery:
    ...
    volumes:
      - ./celery_app:/usr/src/app
      - ./celery_app/logs:/usr/src/app/logs
    command: celery -A config worker -l info --logfile=logs/celery.log
    depends_on:
      - app
      - redis

Celeryのモニタリング環境を見てみる

flowerが動作する127.0.0.1:5555にアクセスして見ると、以下のような画面が表示されます。
celeryが現在処理しているタスクなどのキューの様子がリアルタイムで見れます。

以下の4つの項目があります。

Active - 動作中のタスク
Processed - 処理し終わったタスク
Failed - 失敗したタスク
Succeeded - 成功したタスク

上記の画面を見ながら、新たなタスクを実行させてみましょう。以下のコマンドでappコンテナの中に入ったら、先ほどのカスタムコマンドを実行してみます。

$ docker-compose exec app bash

$ python manage.py hello_world_queue
====== START =================
====== END   =================

flowerの画面に戻ってみると、ActiveProcessedがそれぞれ1が付いてタスクが動作中なのがわかり、しばらくすると、SucceededProcessedがそれぞれカウントされて、タスクが処理されて成功したことがわかります。

まとめ

サンプルコードは以下にまとめました。

https://github.com/shimakaze-git/django-async-sample/tree/master

続編としてCeleryの実行結果などを自動で保存してくれるライブラリなどもあるので、そちらの使い方などを紹介していきます。

[続編]

https://zenn.dev/shimakaze_soft/articles/b925594a2235b0#タスクの状態を確認できるようにする

参考資料

https://blog.symdon.info/posts/1617189961/

Discussion