DjangoのCeleryで非同期タスクの実行結果をDBに保存する
Django上でCelery環境の構築方法は以下の記事で紹介しました。
今回はそのCeleryの実行結果をDBに自動で保存してくれたり、Django-ORMを介して実行結果を取得したりできるdjango-celery-results
というライブラリがあり、そちらの使い方について紹介します。
インストール
まずは上記の記事を最初に読んで、Docker上でDjangoとCeleryの環境を構築してください。この記事はその続きとなるため、既にCeleryの環境は構築されていることを前提とします。
$ pip install django-celery-results
requirements.txt
にも書き込んでおけば、後々のdockerコンテナの立ち上げ時に自動的にインストールもされます。
django-celery-resultsの追加
Djangoの設定ファイルであるsettings.py
に以下の設定を加えます。前回からCELERY_BROKER_URL
の設定はそのままにして、今回は以下の2つの設定を加えます。
- CELERY_RESULT_BACKEND
- CELERY_TASK_TRACK_STARTED
INSTALLED_APPS = [
...
'django_celery_results'
]
# 前回同様にブローカーにredisを指定
CELERY_BROKER_URL = "redis://redis:6379"
# 結果はdjango指定のDBに保存
CELERY_RESULT_BACKEND = "django-db"
# taskが開始状態になったことを確認できるための設定
CELERY_TASK_TRACK_STARTED = True
マイグレーション
以下のコマンドでdjango_celery_results
用のテーブルをマイグレーションで作成します。
$ python manage.py migrate
Operations to perform:
Apply all migrations: admin, auth, contenttypes, django_celery_results, sessions
Running migrations:
Applying django_celery_results.0001_initial... OK
Applying django_celery_results.0002_add_task_name_args_kwargs... OK
Applying django_celery_results.0003_auto_20181106_1101... OK
Applying django_celery_results.0004_auto_20190516_0412... OK
Applying django_celery_results.0005_taskresult_worker... OK
Applying django_celery_results.0006_taskresult_date_created... OK
Applying django_celery_results.0007_remove_taskresult_hidden... OK
Applying django_celery_results.0008_chordcounter... OK
Applying django_celery_results.0009_groupresult... OK
Applying django_celery_results.0010_remove_duplicate_indices... OK
Applying django_celery_results.0011_taskresult_periodic_task_name... OK
新たなタスクの追加
今回は新たにtime_sleep_func
という以下のタスクを追加します。
(新しいアプリケーション上においてもpolls/tasks.py
に追記してもかまいません。)
以下のtime_sleep_func
は10秒待ってから、引数のproject_id
を受け取ってから"Hello - {project_id}"の実行結果を返しています。
import time
from celery import shared_task # type: ignore
@shared_task
def time_sleep_func(project_id: str) -> str:
print(f"project_id - {project_id}")
time.sleep(10)
message: str = f"Hello - {project_id}"
print(f"message - {message}")
return message
タスクの状態を確認できるようにする
カスタムコマンドの追加
time_sleep_func
の非同期タスクの関数を呼び出せるように、time_sleep
というカスタムコマンドを新たに追加します。
後々にタスクの内容を確認するためにも、タスクに割り振られるIDを取得する必要があります。
time_sleep_func.delay()
の返り値として、celery.result.AsyncResult
のタスクオブジェクトが取得でき、そこからタスクに割り振られるIDが取得できます。
タスクIDがあれば、別なプロセスなどからもタスクの状態を確認することが可能です。
from django.core.management.base import BaseCommand
from celery.result import AsyncResult
from polls.tasks import time_sleep_func
class Command(BaseCommand):
def handle(self, *args, **options): # type: ignore
project_id: str = "0001"
task: AsyncResult = time_sleep_func.delay(project_id) # type: ignore
print("task", task.id)
python manage.py time_sleep
のカスタムコマンドを実行することで、以下のようにタスクIDが確認できます。
$ python manage.py time_sleep_cmd
task aa912875-8433-45d9-b335-682dbab7785e
タスクの状態を確認する
上記のカスタムコマンドを改修して以下のようにします。
取得したタスクIDを使って、celery.result.AsyncResult
でタスクの状態と結果を確認することができます。
タスクのステータスは以下の4種類です。
- PENDING : 実行待ちの状態
- STARTED : 実行開始の状態
- SUCCESS : 正常終了した状態
- FAILED : 異常終了した状態
import time
from django.core.management.base import BaseCommand
from celery.result import AsyncResult
from polls.tasks import time_sleep_func
class Command(BaseCommand):
def handle(self, *args, **options): # type: ignore
project_id: str = "0001"
task: AsyncResult = time_sleep_func.delay(project_id) # type: ignore
print("task.status", task.status) # type: ignore
task_id: str = task.id # type: ignore
print("task", task.id) # type: ignore
task_1: AsyncResult = AsyncResult(task_id) # type: ignore
while not task.ready():
time.sleep(1)
print(task_1.status) # type: ignore
task_1 = AsyncResult(task_id)
task.ready()
で非同期処理が終了したかどうかを判定しています。(task.ready()がTrueの場合は処理が終了)
上記のコマンドでタスクのステータスがSTARTED
かPENDING
の状態は1秒待機してステータスを取得し続けます。
$ python manage.py time_sleep_cmd
task.status PENDING
task 289e49c7-aa3e-4490-b071-7d7b105802fd
STARTED
STARTED
STARTED
STARTED
STARTED
STARTED
STARTED
STARTED
STARTED
SUCCESS
タスクの状態をDBから取得
settings.py
にて、CELERY_RESULT_BACKEND
にdjango-db
を指定したことで、非同期タスクの状態と結果はDBに自動で保存されます。
非同期タスクの結果と状態はTaskResult
モデルから取得できます。TaskResult
はDjangoのORMとして扱うことができます。
settings.py
のCELERY_TASK_TRACK_STARTED
をTrue
にしておくことで、タスクの実行結果が出なくてもDBにタスクの状態が保存されます。False
だと実行結果が出るまで(statusがPENDING)はDBにタスクの内容が保存されません。
import time
from django.core.management.base import BaseCommand
from django_celery_results.models import TaskResult
from django.db.models import QuerySet
from celery.result import AsyncResult
from polls.tasks import time_sleep_func
class Command(BaseCommand):
def handle(self, *args, **options): # type: ignore
project_id: str = "0001"
task: AsyncResult = time_sleep_func.delay(project_id) # type: ignore
print("task.status", task.status) # type: ignore
task_id: str = task.id # type: ignore
print("task", task.id) # type: ignore
time.sleep(5)
task_models: QuerySet[TaskResult] = TaskResult.objects.filter(task_id=task_id)
for task_model in task_models:
print("task_model.result", task_model.result)
print("task_model.date_created", task_model.date_created)
print("task_model.date_done", task_model.date_done)
time.sleep(10)
print("----" * 20)
task_models = TaskResult.objects.filter(task_id=task_id)
for task_model in task_models:
print("task_model.result", task_model.result)
print("task_model.date_created", task_model.date_created)
print("task_model.date_done", task_model.date_done)
上記の実行結果は以下のようになります。
time.sleep(10)
で10秒間待機すると、task_model.result
にtime_sleep_func
の戻り値の実行結果が格納されていることがわかります。
$ python manage.py time_sleep_cmd
task.status PENDING
task 2bfa1f69-3ec0-4f8c-8442-7ed2e3d8c6f1
task_model.result {"pid": 11, "hostname": "celery@53df2349231a"}
task_model.date_created 2023-09-22 10:20:14.346980+00:00
task_model.date_done 2023-09-22 10:20:14.346997+00:00
--------------------------------------------------------------------------------
task_model.result "hello - 0001"
task_model.date_created 2023-09-22 10:20:14.346980+00:00
task_model.date_done 2023-09-22 10:20:24.357911+00:00
参考資料
Discussion