🔃

DjangoのCeleryで非同期タスクの実行結果をDBに保存する

2023/09/23に公開

Django上でCelery環境の構築方法は以下の記事で紹介しました。

https://zenn.dev/shimakaze_soft/articles/bbd859803c63a6

今回はそのCeleryの実行結果をDBに自動で保存してくれたり、Django-ORMを介して実行結果を取得したりできるdjango-celery-resultsというライブラリがあり、そちらの使い方について紹介します。

https://github.com/celery/django-celery-results

インストール

まずは上記の記事を最初に読んで、Docker上でDjangoとCeleryの環境を構築してください。この記事はその続きとなるため、既にCeleryの環境は構築されていることを前提とします。

$ pip install django-celery-results

requirements.txtにも書き込んでおけば、後々のdockerコンテナの立ち上げ時に自動的にインストールもされます。

django-celery-resultsの追加

Djangoの設定ファイルであるsettings.pyに以下の設定を加えます。前回からCELERY_BROKER_URLの設定はそのままにして、今回は以下の2つの設定を加えます。

  • CELERY_RESULT_BACKEND
  • CELERY_TASK_TRACK_STARTED
config/settings.py
INSTALLED_APPS = [
    ...
    'django_celery_results'
]

# 前回同様にブローカーにredisを指定
CELERY_BROKER_URL = "redis://redis:6379"

# 結果はdjango指定のDBに保存
CELERY_RESULT_BACKEND = "django-db"

# taskが開始状態になったことを確認できるための設定
CELERY_TASK_TRACK_STARTED = True

マイグレーション

以下のコマンドでdjango_celery_results用のテーブルをマイグレーションで作成します。

$ python manage.py migrate
Operations to perform:
  Apply all migrations: admin, auth, contenttypes, django_celery_results, sessions
Running migrations:
  Applying django_celery_results.0001_initial... OK
  Applying django_celery_results.0002_add_task_name_args_kwargs... OK
  Applying django_celery_results.0003_auto_20181106_1101... OK
  Applying django_celery_results.0004_auto_20190516_0412... OK
  Applying django_celery_results.0005_taskresult_worker... OK
  Applying django_celery_results.0006_taskresult_date_created... OK
  Applying django_celery_results.0007_remove_taskresult_hidden... OK
  Applying django_celery_results.0008_chordcounter... OK
  Applying django_celery_results.0009_groupresult... OK
  Applying django_celery_results.0010_remove_duplicate_indices... OK
  Applying django_celery_results.0011_taskresult_periodic_task_name... OK

新たなタスクの追加

今回は新たにtime_sleep_funcという以下のタスクを追加します。
(新しいアプリケーション上においてもpolls/tasks.pyに追記してもかまいません。)

以下のtime_sleep_funcは10秒待ってから、引数のproject_idを受け取ってから"Hello - {project_id}"の実行結果を返しています。

import time
from celery import shared_task  # type: ignore


@shared_task
def time_sleep_func(project_id: str) -> str:
    print(f"project_id - {project_id}")

    time.sleep(10)
    message: str = f"Hello - {project_id}"
    print(f"message - {message}")

    return message

タスクの状態を確認できるようにする

カスタムコマンドの追加

time_sleep_funcの非同期タスクの関数を呼び出せるように、time_sleepというカスタムコマンドを新たに追加します。

後々にタスクの内容を確認するためにも、タスクに割り振られるIDを取得する必要があります。

time_sleep_func.delay()の返り値として、celery.result.AsyncResultのタスクオブジェクトが取得でき、そこからタスクに割り振られるIDが取得できます。

タスクIDがあれば、別なプロセスなどからもタスクの状態を確認することが可能です。

polls/management/commands/time_sleep_cmd.py
from django.core.management.base import BaseCommand

from celery.result import AsyncResult

from polls.tasks import time_sleep_func


class Command(BaseCommand):
    def handle(self, *args, **options):  # type: ignore
        project_id: str = "0001"
        task: AsyncResult = time_sleep_func.delay(project_id)  # type: ignore
        print("task", task.id)

python manage.py time_sleepのカスタムコマンドを実行することで、以下のようにタスクIDが確認できます。

$ python manage.py time_sleep_cmd
task aa912875-8433-45d9-b335-682dbab7785e

タスクの状態を確認する

上記のカスタムコマンドを改修して以下のようにします。

取得したタスクIDを使って、celery.result.AsyncResultでタスクの状態と結果を確認することができます。

タスクのステータスは以下の4種類です。

  • PENDING : 実行待ちの状態
  • STARTED : 実行開始の状態
  • SUCCESS : 正常終了した状態
  • FAILED : 異常終了した状態
polls/management/commands/time_sleep_cmd.py
import time
from django.core.management.base import BaseCommand

from celery.result import AsyncResult

from polls.tasks import time_sleep_func


class Command(BaseCommand):
    def handle(self, *args, **options):  # type: ignore
        project_id: str = "0001"
        task: AsyncResult = time_sleep_func.delay(project_id)  # type: ignore
        print("task.status", task.status)  # type: ignore

        task_id: str = task.id  # type: ignore
        print("task", task.id)  # type: ignore

        task_1: AsyncResult = AsyncResult(task_id)  # type: ignore
        while not task.ready():
            time.sleep(1)
            print(task_1.status)  # type: ignore
            task_1 = AsyncResult(task_id)

task.ready()で非同期処理が終了したかどうかを判定しています。(task.ready()がTrueの場合は処理が終了)
上記のコマンドでタスクのステータスがSTARTEDPENDINGの状態は1秒待機してステータスを取得し続けます。

$ python manage.py time_sleep_cmd
task.status PENDING
task 289e49c7-aa3e-4490-b071-7d7b105802fd
STARTED
STARTED
STARTED
STARTED
STARTED
STARTED
STARTED
STARTED
STARTED
SUCCESS

タスクの状態をDBから取得

settings.pyにて、CELERY_RESULT_BACKENDdjango-dbを指定したことで、非同期タスクの状態と結果はDBに自動で保存されます。

非同期タスクの結果と状態はTaskResultモデルから取得できます。TaskResultはDjangoのORMとして扱うことができます。

settings.pyCELERY_TASK_TRACK_STARTEDTrueにしておくことで、タスクの実行結果が出なくてもDBにタスクの状態が保存されます。Falseだと実行結果が出るまで(statusがPENDING)はDBにタスクの内容が保存されません。

import time
from django.core.management.base import BaseCommand
from django_celery_results.models import TaskResult
from django.db.models import QuerySet

from celery.result import AsyncResult

from polls.tasks import time_sleep_func


class Command(BaseCommand):
    def handle(self, *args, **options):  # type: ignore
        project_id: str = "0001"
        task: AsyncResult = time_sleep_func.delay(project_id)  # type: ignore
        print("task.status", task.status)  # type: ignore

        task_id: str = task.id  # type: ignore
        print("task", task.id)  # type: ignore

        time.sleep(5)

        task_models: QuerySet[TaskResult] = TaskResult.objects.filter(task_id=task_id)
        for task_model in task_models:
            print("task_model.result", task_model.result)
            print("task_model.date_created", task_model.date_created)
            print("task_model.date_done", task_model.date_done)

        time.sleep(10)

        print("----" * 20)

        task_models = TaskResult.objects.filter(task_id=task_id)
        for task_model in task_models:
            print("task_model.result", task_model.result)
            print("task_model.date_created", task_model.date_created)
            print("task_model.date_done", task_model.date_done)

上記の実行結果は以下のようになります。
time.sleep(10)で10秒間待機すると、task_model.resulttime_sleep_funcの戻り値の実行結果が格納されていることがわかります。

$ python manage.py time_sleep_cmd
task.status PENDING
task 2bfa1f69-3ec0-4f8c-8442-7ed2e3d8c6f1
task_model.result {"pid": 11, "hostname": "celery@53df2349231a"}
task_model.date_created 2023-09-22 10:20:14.346980+00:00
task_model.date_done 2023-09-22 10:20:14.346997+00:00
--------------------------------------------------------------------------------
task_model.result "hello - 0001"
task_model.date_created 2023-09-22 10:20:14.346980+00:00
task_model.date_done 2023-09-22 10:20:24.357911+00:00

参考資料

https://qiita.com/ridai/items/5060e806573b553a33fc

Discussion