DjangoのCeleryで非同期タスクを定期実行するBeatの環境を構築する
Django上でCeleryの実行結果をDBに保存する構築方法は以下の記事で紹介しました。
今回はCeleryを定期実行する機能であるBeatの設定を適用して、Django上でバッチなどの定期実行の機能を設定していきます。
今回はDjango上でCeleryのBeatを使いやすくするためのライブラリとしてのdjango-celery-beat
の使い方についても紹介していきます。
インストール
まずは上記の記事を最初に読んで、Docker上でDjangoとCeleryの環境構築の説明はこの記事では省きます。
この記事は上記の続きとなるため、既にCeleryの環境は構築されていることを前提とします。
$ pip install django-celery-beat
requirements.txtにも書き込んでおけば、後々のdockerコンテナの立ち上げ時に自動的にインストールもされます。
django-celery-beatの追加
Djangoの設定ファイルであるsettings.py
に以下の設定を加えます。
INSTALLED_APPS
にdjango_celery_beat
を追加し、CELERY_BEAT_SCHEDULER
をdjango_celery_beat.schedulers:DatabaseScheduler
に設定します。
INSTALLED_APPS = [
...
'django_celery_beat'
]
# Celery Beat settings
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
django_celery_beat.schedulers:DatabaseScheduler
はcelery
のbeat
のプロセスを立ち上げる際に--scheduler
でも指定できます。
$ celery -A {project} beat --scheduler django_celery_beat.schedulers:DatabaseScheduler`
マイグレーション
以下のコマンドでdjango_celery_beat
用のテーブルをマイグレーションで作成します。
$ python manage.py migrate
docker-composeにCelery Beatのコンテナを追加
docker-compose.yml
にbeat
コンテナを新たに追加します。
以下のコマンドでもわかるようにcelery -A config beat -l info
でcelery beat
のプロセスを立ち上げています。
services:
...
beat:
container_name: beat
tty: true
build:
context: ./celery_app
volumes:
- ./celery_app:/usr/src/app
command: celery -A config beat -l info
depends_on:
- app
- db
- redis
docker-compose logs -f beat
でcelery beat
のプロセスのログを見てみると以下のようになっているはずです。
$ docker-compose logs -f beat
beat | celery beat v5.3.1 (emerald-rush) is starting.
beat | __ - ... __ - _
beat | LocalTime -> 2023-09-22 10:54:14
beat | Configuration ->
beat | . broker -> redis://redis:6379//
beat | . loader -> celery.loaders.app.AppLoader
beat | . scheduler -> django_celery_beat.schedulers.DatabaseScheduler
beat |
beat | . logfile -> [stderr]@%INFO
beat | . maxinterval -> 5.00 seconds (5s)
beat | [2023-09-22 10:54:14,058: INFO/MainProcess] beat: Starting...
簡単なスケジューラの作成
以下は前回のこちらの記事で作成したtime_sleep_func
を定期実行するスケジューラの設定です。
-
task
で定期実行したい非同期タスクの関数を設定します。 -
schedule
で定期実行する間隔を指定します。crontab()
で60秒に以下の頻度で定期実行します。 -
kwargs
で非同期タスクの関数の引数を指定します。
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'add-every-60-seconds': {
"task": "polls.tasks.time_sleep_func",
"schedule": crontab(),
"kwargs": {"project_id": "0001"}
},
}
crontab(minute="*/5")
のように指定することで、5分毎に実行するように設定できます。秒間毎に設定したい場合はtimedelta
を使用して以下ののように指定します。
from datetime import timedelta
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'add-every-60-seconds': {
"task": "polls.tasks.time_sleep_func",
"schedule": crontab(minute="*/5"),
"kwargs": {"project_id": "0001"}
},
'add-every-30-seconds': {
"task": "polls.tasks.time_sleep_func",
"schedule": timedelta(seconds=30),
"kwargs": {"project_id": "0001"}
},
}
beat
コンテナを再起動します。すると、ログにScheduler: Sending due task 〜
と上記で登録したスケジューラの動作も確認できます。
$ docker-compose restart beat
$ docker-compose logs -f beat
beat | celery beat v5.3.1 (emerald-rush) is starting.
beat | __ - ... __ - _
beat | LocalTime -> 2023-09-22 10:54:14
beat | Configuration ->
beat | . broker -> redis://redis:6379//
beat | . loader -> celery.loaders.app.AppLoader
beat | . scheduler -> django_celery_beat.schedulers.DatabaseScheduler
beat |
beat | . logfile -> [stderr]@%INFO
beat | . maxinterval -> 5.00 seconds (5s)
beat | [2023-09-22 10:54:14,058: INFO/MainProcess] beat: Starting...
beat | [2023-09-22 10:54:14,258: INFO/MainProcess] Scheduler: Sending due task add-every-30-seconds (polls.tasks.time_sleep_func)
docker-compose logs -f celery
でログを見てみるとcelery側のログを見てみると30秒に一回の頻度で定期実行の非同期タスクが動いていることが確認できます。
celery | [2023-09-22 20:31:35,974: INFO/MainProcess] Task polls.tasks.hello_world[6dda3b1d-4088-4d2c-9304-01677b296618] received
celery | [2023-09-22 20:31:35,997: WARNING/ForkPoolWorker-1] start hello_world
celery | [2023-09-22 20:31:40,998: WARNING/ForkPoolWorker-1] hello
celery | [2023-09-22 20:31:40,998: WARNING/ForkPoolWorker
celery | [2023-09-22 20:31:40,998: WARNING/ForkPoolWorker-1] end hello_world
celery | [2023-09-22 20:31:41,023: INFO/ForkPoolWorker-1] Task polls.tasks.hello_world[6dda3b1d-4088-4d2c-9304-01677b296618] succeeded in 5.047379961004481s: None
celery | [2023-09-22 20:32:05,978: INFO/MainProcess] Task polls.tasks.hello_world[fa23472e-93ed-4ce3-9c9d-4c63bbff403f] received
celery | [2023-09-22 20:32:06,001: WARNING/ForkPoolWorker-1] start hello_world
celery | [2023-09-22 20:32:11,002: WARNING/ForkPoolWorker-1] hello
celery | [2023-09-22 20:32:11,002: WARNING/ForkPoolWorker
celery | [2023-09-22 20:32:11,002: WARNING/ForkPoolWorker-1] end hello_world
celery | [2023-09-22 20:32:11,018: INFO/ForkPoolWorker-1] Task polls.tasks.hello_world[fa23472e-93ed-4ce3-9c9d-4c63bbff403f] succeeded in 5.037791001996084s: None
celery | [2023-09-22 20:32:35,982: INFO/MainProcess] Task polls.tasks.hello_world[45ce6c76-950f-4b07-a8fe-84aef99ce821] received
celery | [2023-09-22 20:32:36,003: WARNING/ForkPoolWorker-1] start hello_world
celery | [2023-09-22 20:32:41,006: WARNING/ForkPoolWorker-1] hello
celery | [2023-09-22 20:32:41,006: WARNING/ForkPoolWorker
celery | [2023-09-22 20:32:41,006: WARNING/ForkPoolWorker-1] end hello_world
celery | [2023-09-22 20:32:41,021: INFO/ForkPoolWorker-1] Task polls.tasks.hello_world[45ce6c76-950f-4b07-a8fe-84aef99ce821] succeeded in 5.037372376995336s: None
定期実行のタイミングの指定方法
schedule
に指定するcrontab
の指定方法で定期実行をするタイミングを任意の時間にすることができます。
指定方法 | 内容 |
---|---|
crontab() | 1分ごとに実行する |
crontab(minute=0, hour=0) | 毎日午前0時に実行する |
crontab(minute=0, hour='*/3') | 午前0時、午前3時、午前6時...と3時間ごとに実行する |
crontab(minute='*/15') | 15分ごとに実行し、0分、15分、30分、45分に実行される |
crontab(day_of_week='sunday') | 日曜日は1分ごとに実行する |
crontab(minute='', hour='', day_of_week='sun') | 上記と同様 |
crontab(0, 0, day_of_month='2') | 毎月2日に実行 |
定期実行を動的に作成する
今までのバッチ処理のように特定のタイミングでのみ定期実行がされていましたが、それを任意のタイミング
で非同期タスクを走らせることができます。
以下のように現在時刻から5分後に非同期タスクを実行するなんてこともできます。
from datetime import datetime
from django.utils import timezone
from datetime import timedelta
from django_celery_beat.models import PeriodicTask, ClockedSchedule
target_time: datetime = timezone.now() + timedelta(minutes=5)
clocked_schedule: ClockedSchedule = ClockedSchedule.objects.create(clocked_time=target_time)
PeriodicTask.objects.create(
clocked=clocked_schedule,
name="hello_world-task",
description="hello_world-description",
one_off=True,
task="polls.tasks.hello_world",
args=[]
)
one_off
がTrue
の場合はスケジュールは同じ名前のタスクを1回だけ実行します。繰り返し非同期タスクを実行したい場合はone_off
をFalse
にします。
まとめ
参考資料
Discussion