🌵

Django上でCeleryの環境を構築するための細かい設定項目

2023/09/24に公開

Django上でCeleryの環境を構築する

Django上でCeleryの環境の構築方法は以下の記事でまとめています。

https://zenn.dev/shimakaze_soft/articles/bbd859803c63a6

Workerのプリフェッチ

CeleryのWorkerがタスクを実行する際、効率的なタスク処理のためにプリフェッチという機能を活用します。
プリフェッチとは、Workerが一度に複数のタスクをキューから事前に先読みして、タスクを保持しておく機能のことを指します。
これにより、前のタスクが終了した瞬間に直ちに次のタスクを開始することができ、Workerの待機時間が短縮され、処理の効率が向上します。

しかし、プリフェッチには注意点もあります。
Workerが先読みしたタスクは他のWorkerには渡せないため、多くのタスクを先読みしすぎると、他のWorkerがアイドル状態になる可能性があります。逆に、先読みの数が少なすぎると、タスクの取得に伴うオーバーヘッドが増加し、効率が低下するリスクがあります。

そのため、Celeryを使用する際には、Workerの数やシステムの負荷に応じて、プリフェッチの設定を適切に調整することが推奨されます。

worker_prefetch_multiplierの設定

Celeryにはworker_prefetch_multiplierという設定があり、Workerがプリフェッチするタスクの数を設定するためのオプションです。
具体的には、この値が設定されている場合、Workerは設定されたCONCURRENCY数 x worker_prefetch_multiplierの数だけタスクを先読みします。
デフォルトの値は4であり、コンカレンシ数が1の場合、Workerはデフォルトで一度に4つのタスクをプリフェッチします。

Djangoでは以下のようにsettings.pyCELERY_WORKER_PREFETCH_MULTIPLIERという項目があり、それで設定を行います。

config/settings.py
CELERY_WORKER_PREFETCH_MULTIPLIER = 4

長い実行時間を持つタスクに対する考慮

特に実行時間が長いタスクを定期的に実行している場合、worker_prefetch_multiplierの設定には注意が必要です。
もし高い値が設定されていると、Workerは多数の長時間実行タスクを先読みしてしまい、その結果として他の短時間タスクの実行が遅れる可能性があります。

例えば、CONCURRENCY数が1でworker_prefetch_multiplierが4に設定されている場合、Workerは4つの長時間実行タスクを先読みします。
もしこれらのタスクがそれぞれ1時間かかるとすると、他の短時間のタスクは最大4時間待たされることになります。

最適な設定値の選択

実行時間が長いタスクを多く持つ場合、worker_prefetch_multiplierを1に設定することを検討する価値があります。
これにより、Workerは1つのタスクを先読みして実行し、その後すぐに次のタスクを取得します。
この設定により、長時間実行タスクと短時間実行タスクの間で適切なバランスを維持することができます。

一方で、全てのタスクの実行時間が比較的短い場合は、デフォルトの4やそれ以上の値を設定することで、タスクの取得に関連するオーバーヘッドを削減し、処理効率を向上させることが可能です。

Workerプロセス

Celeryには4種類のワーカープロセスがあります。celery起動時の-Pオプションでワーカープロセスを設定します。Djangoの設定ファイル上で設定はできないので、docker上のcommandで-Pオプションをつけて設定をする必要があります。

$ celery -A config worker -P {worker_process}

prefork (デフォルト)

これはマルチプロセスモードで、タスクを複数のプロセスで並列実行します。デフォルトだとこの設定になっています。CPUの複数のコアを効果的に利用でき、それぞれのタスクが独立したプロセスで実行されるため、メモリの隔離が確保されます。

しかし、プロセス間のコミュニケーションにはオーバーヘッドが発生するため、大量のプロセスを作成すると、メモリやリソースの消費が増加します。

$ celery -A config worker -P prefork

gevent

軽量なグリーンスレッドを使用して非同期I/Oを効率的に処理します。I/Oバウンドなタスクに非常に効率的です。
当然、CPUバウンドなタスクでは向いていません。また、特定のライブラリやネイティブモジュールとの互換性に問題が発生する場合があります。

事前にgeventをインストールする必要があります。

$ pip install gevent
$ celery -A config worker -P gevent

eventlet

geventと似たような働きをしますが、異なるライブラリを使用しています。geventと同様にI/Oバウンドなタスクに効率的であり、geventと同様のデメリットがあります。

$ pip install eventlet
$ celery -A config worker -P eventlet

solo

タスクを単一のプロセスで順次実行します。デバッグ用に推奨されるモードです。本番環境での使用は推奨されません。

$ pip install eventlet
$ celery -A config worker -P solo

複数のCelery Workを管理・制御

Celeryにはcelery multiというコマンドがあり、複数のCelery worker管理・制御するためのサブコマンドがあります。
大規模なデプロイや複数のワーカーを異なる設定やキューで実行する場合に特に役立ちます。

# 複数のワーカーを起動する場合
# worker1, worker2, worker3の複数のワーカーを立ち上げ
$ celery multi start worker1 worker2 worker3 -A {project}

# 複数のワーカーを停止する場合:
$ celery multi stop worker1 worker2 worker3

以下のように、Worker毎にはコンカレンシーを指定することができます。
Worker1は4つの並行タスクを、worker2は10の並行タスクを処理するように設定されています。

# コンカレンシーを指定してワーカーを起動する場合:
$ celery multi start worker1 -c 4 worker2 -c 10 -A {project}

# 3 workers, with 3 processes each
$ celery multi start 3 -c 3 -A {project}
celery worker -n celery1@myhost -c 3
celery worker -n celery2@myhost -c 3
celery worker -n celery3@myhost -c 3

Workerが異常終了した時にタスクを再スケジューリングする

Celery Workerにはタスクが異常終了した場合に設定次第ではリトライ処理などを行なってくれます。

以下のようにCELERY_ACKS_LATE = Trueに設定しても、再試行の動作はCELERY_TASK_REJECT_ON_WORKER_LOSTCELERY_TASK_REJECT_ON_WORKER_LOSTといった他の設定にも依存するため、必要な動作を得るためにこれらの設定も適切に検討することが重要です。

config/settings.py
# タスクが完了してからAckする
CELERY_ACKS_LATE = True

# ワーカーの異常終了で、Ackされていないタスクを再スケジューリングする
CELERY_TASK_REJECT_ON_WORKER_LOST = True

CELERY_ACKS_LATE

CELERY_ACKS_LATEは、タスクの確認応答(acknowledgment = Ack)のタイミングを指定します。
このオプションは、タスクが正常に完了した後に確認応答を送信するか、それともタスクがWorkerに取得された直後に確認応答を送信するかを制御します。

  • CELERY_ACKS_LATE = False (デフォルト)の場合は、タスクがWorkerによって取得されるとすぐに確認応答(Ack)がブローカーに送信されます。これにより、Workerがタスクの処理中にクラッシュした場合、そのタスクは失われる可能性があります。
  • CELERY_ACKS_LATE = Trueの場合は、タスクが正常に完了して、結果が返された後に確認応答(Ack)がブローカーに送信されます。この設定を使用すると、Workerがタスクの処理中にクラッシュしても、タスクが再試行される可能性が高くなります。

CELERY_TASK_REJECT_ON_WORKER_LOST

CELERY_TASK_REJECT_ON_WORKER_LOSTはCeleryの設定オプションで、Workerが予期せず異常終了した場合(例: クラッシュや終了)などにタスクの挙動をどのようにするかを指定します。

  • True: Workerがタスクを処理中にクラッシュや予期しない終了をした場合、該当のタスクはブローカーに「拒否」され、タスクキューに再度追加されます。この結果、そのタスクは別のワーカーによって取り上げられて再試行されることになります。この設定は、CELERY_ACKS_LATEがTrueに設定されている場合に特に役立ちます。

  • False (デフォルト): ワーカーがタスクを処理中に失われた場合でも、タスクはブローカーに拒否されず、そのタスクは失われる可能性があります。

CELERY_ACKS_ON_FAILURE_OR_TIMEOUT

タスクが失敗した場合やタイムアウトした場合のACK(acknowledgement)の送信の挙動を制御します。

  • True: タスクが失敗またはタイムアウトしたときにACKを送信する。これにより、そのタスクはキューから削除され、再試行されない。
  • False(デフォルト): タスクが失敗またはタイムアウトした場合、ACKを送信しない。これにより、そのタスクはキューに留まり、再試行される可能性があリます。

参考資料

https://qiita.com/hankehly/items/c3e0496eb04327a53ac4

Discussion