Closed6

Celeryでタスクキューことはじめ

kun432kun432

https://github.com/celery/celery

https://docs.celeryq.dev/en/stable/index.html

Celeryは、膨大な量のメッセージを処理するためのシンプルで柔軟かつ信頼性の高い分散システムであり、そのようなシステムを維持するために必要なツールをオペレーションに提供する。

リアルタイム処理に重点を置いたタスク・キューであり、タスク・スケジューリングもサポートしている。

kun432kun432

First Steps with Celery

まずは公式の「First Steps with Celery」を進めてみる。

https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html#first-steps

  • Celeryの初歩

    • Celeryは多機能なタスクキューシステム。
    • 使用が簡単で、複雑な部分を学ばずともすぐに始められる。
    • ベストプラクティスを基に設計されており、他言語との統合やスケールが可能。
    • 本番環境での運用に必要なツールとサポートが含まれている。
  • このチュートリアルで学ぶこと

    • メッセージトランスポート(ブローカー)の選択とインストール。
    • Celeryのインストールと最初のタスクの作成。
    • ワーカーの起動とタスクの呼び出し。
    • タスクの状態変化を追跡し、戻り値を検査する方法。
  • チュートリアルの目的

    • 初心者でも簡単に始められるよう、意図的にシンプルに保たれている。
    • このチュートリアルを終えた後、さらなる機能や詳細を学ぶためにドキュメントの残りを閲覧することを推奨する。

とりあえず作業ディレクトリ作成

$ mkdir celery-tutorial
$ cd  celery-tutorial

ブローカーの選択

Celeryはメッセージを送受信するためのソリューションを必要とする。通常、これはメッセージ・ブローカーと呼ばれる別のサービスの形で提供される。

選択肢は以下

  • RabbitMQ
  • Redis
  • 他のブローカー
  • Amazon SQSなど

今回はRedisを使う。

$ docker run -d -p 6379:6379 redis

Celeryのインストール

CeleryとRedisのpythonモジュールをインストール。なお、仮想環境を事前に作成しておくこと。

$ pip install celery redis

Celeryアプリケーションの作成

Celeryインスタンス=Celeryアプリを作成する。これがタスクの作成やワーカー管理などを行う。
tasksがモジュール名となる

ここではシンプルに足し算を行う関数をCeleryのタスクとして登録している。

tasks.py
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def add(x, y):
    return x + y

Celeryワーカーサーバの起動

ワーカーを起動する。

$ celery -A tasks worker --loglevel=INFO
 -------------- celery@example.local v5.4.0 (opalescent)
--- ***** -----
-- ******* ---- Linux-6.5.0-28-generic-x86_64-with-glibc2.35 2024-05-10 05:05:27
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7396374aa850
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 32 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2024-05-10 05:05:27,915: INFO/MainProcess] Connected to redis://localhost:6379//
[2024-05-10 05:05:27,916: INFO/MainProcess] mingle: searching for neighbors
[2024-05-10 05:05:28,922: INFO/MainProcess] mingle: all alone
[2024-05-10 05:05:28,928: INFO/MainProcess] celery@example.local ready.

立ち上がってきたみたい。

ではタスクを呼び出してみる。Python REPLで。

$ python

タスクの呼び出しは.delay()を使う。

>>> from tasks import add
>>> add.delay(4,4)

実行されたっぽい?

<AsyncResult: 32da7e3e-f9ab-4e58-89f0-bd9170dc81f3>

ワーカーの出力を見ると実行結果が出ていた。

[2024-05-10 05:08:42,701: INFO/MainProcess] Task tasks.add[32da7e3e-f9ab-4e58-89f0-bd9170dc81f3] received
[2024-05-10 05:08:42,702: INFO/ForkPoolWorker-32] Task tasks.add[32da7e3e-f9ab-4e58-89f0-bd9170dc81f3] succeeded in 0.0002071061171591282s: 8
  • タスクを呼び出すと、AsyncResultインスタンスが返される。これを使って、タスクの状態確認、タスクの終了を待つ、戻り値の取得、などを行う。
    • タスクが失敗した場合は、例外とトレースバックを取得できる。
  • 結果はデフォルトでは有効になっていない。結果を受けて、リモートプロシージャコールの実行やデータベースへ保存する場合は、Celeryに結果バックエンドを使用するよう設定する必要がある。

らしい。

結果の保存

タスクの状態を記録する場合は結果バックエンドが必要になる。以下が使える。

  • SQLAlchemy/Django ORM
  • MongoDB
  • Memcached
  • Redis
  • RPC(RabbitMQ/AMQP)

今回はRedisをそのまま結果バックエンドで使う。ワーカーサーバーを一旦止めて、tasks.pyを書き換える。

tasks.py
from celery import Celery

# backendを指定
app = Celery('tasks', backend='redis://localhost:6379', broker='redis://localhost:6379')

@app.task
def add(x, y):
    return x + y

ワーカーサーバーを再度上げる。

$ celery -A tasks worker --loglevel=INFO

再度REPLでタスクを呼び出し。タスク呼び出しで返されたAsyncResultインスタンスをresultに入れている。

>>> from tasks import add
>>> result = add.delay(4,4)

で結果が返ってきているかを確認するにはこのインスタンスに対して.ready()を使う。

>>> result.ready()

Trueなら終わっている。

True

結果を取得するには.get()を使う。timeoutを指定すると待つことができる。

result.get(timeout=1)

結果が得られた。

8

なおタスクが失敗した場合。意図的に失敗させてみる。

>>> result = add.delay(4,"a")
>>> result.ready()
True
>>> result.get()
Traceback (most recent call last):
(snip)
TypeError: unsupported operand type(s) for +: 'int' and 'str'

.get()でトレースバック付きで例外を再度発生させる。propagate=Falseをつけて.get()を行うと、

>>> result = add.delay(4,"a")
>>> result.ready()
True
>>> result.get(propagate=False)
TypeError("unsupported operand type(s) for +: 'int' and 'str'")

例外だけが返る。トレースバックも取得できる。

>>> result.traceback
'Traceback (most recent call last):\n (snip)  ~~^~~\nTypeError: unsupported operand type(s) for +: \'int\' and \'str\'\n

設定

設定は設定ファイルとして読み込むことができる。といってもPythonファイルなのだけど。

broker_url = 'redis://localhost:6379'
result_backend = 'redis://localhost:6379',
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Tokyo'
enable_utc = True

設定の確認。何も出なければOK。

$ python -m config.celeryconfig

アプリの方で読み込む。

from celery import Celery
# インポート
from config import celeryconfig

app = Celery('tasks')

# 設定読み込み
app.config_from_object(celeryconfig)

@app.task
def add(x, y):
    return x + y

いくつか設定例が用意されているので少し試してみる。

タスクにレート制限をつけてみる。

broker_url = 'redis://localhost:6379'
result_backend = 'redis://localhost:6379'
task_serializer = 'json'
result_serializer = 'json'
accept_content  = ['json']
timezone = 'Asia/Tokyo'
enable_utc = True
# 以下を追加
task_annotations = {
    'tasks.add': {'rate_limit': '1/m'}
}

ワーカーサーバーを立ち上げ直す

$ celery -A tasks worker --loglevel=INFO

REPLで。

>>> from tasks import add
>>> result = add.delay(4, 4)
>>> result.ready()
True
>>> result.get()
8
>>> result = add.delay(4, 4)
>>> result.ready()
False
>>> result.ready()
False
(しばらく待つ)
>>> result.ready()
True
>>> result.get()
8

レート制限が効いているのがわかる。

RabbitMQやRedisがブローカーの場合は、ワーカーサーバー起動時のオプションでも設定できるみたいだけど、設定ファイルでやりたいよね。

$ celery -A tasks control rate_limit tasks.add 1/m

タスクもディレクトリ切って分けたいよねーと思ってたら、以下の記事でクラスベースで書くやり方が書かれていた。こちらのほうがわかりやすくて良さそうに思える。

https://blog.nomott.com/celery-tutorial/

kun432kun432

Next Steps

続き。「First Steps with Celery」をもう少し掘り下げた内容になっているらしい。

https://docs.celeryq.dev/en/stable/getting-started/next-steps.html

アプリケーションでCeleryを使う

実際にアプリケーションで使う場合の例。

こういうレイアウト例。

$ mkdir -p src/proj
$ touch src/proj/{__init__,celery,tasks}.py
$ tree src
src
└── proj
    ├── __init__.py
    ├── celery.py
    └── tasks.py

1 directory, 3 files

src/proj/celery.pyを以下の内容で作成。

src/proj/celery.py
from celery import Celery

app = Celery('proj',
          backend='redis://localhost:6379',
          broker='redis://localhost:6379',
          include=['proj.tasks'],
)

# オプションで設定を追加、詳細はアプリケーションユーザガイドを参照。
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

次にタスクをsrc/proj/tasks.pyに追加する。

src/proj/tasks.py
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

ワーカーサーバを起動

$ cd src
$ celery -A proj worker -l INFO

起動時のメッセージに各種情報が表示されている。

 -------------- celery@example.local v5.4.0 (opalescent)
--- ***** -----
-- ******* ---- Linux-6.5.0-28-generic-x86_64-with-glibc2.35 2024-05-10 15:03:43
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x70e9d51f0690
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost:6379/
- *** --- * --- .> concurrency: 32 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

一部ドキュメントとは違うけども、多分こんな感じ

  • transport
    • (おそらく)ブローカーのURL。
  • concurrency
    • タスクの同時処理に使用するワーカープロセスの数。
      • この数だけ同時に処理ができる。これ以上の処理がある場合は前の処理が終わるまで待たされる。
      • デフォルトはCPU(コアを含む)数。
      • celely worker -cで設定できる。
      • IOバウンドな処理が多ければ増やすと良い(とあるけども、I/Oバウンドな処理ばかりだとIO waitでしんどくないかな・・・?)
      • CPUの2倍以上を指定指定も効果はなく、むしろパフォーマンスは悪くなるという実験結果があるとのこと。
    • なお、プールの種類には以下がある様子。
    • prefork
      • デフォルト。
      • 各ワーカープロセスが独立してタスクを処理。
      • CPU負荷の高いタスクやほとんどのユースケースに最適。
      • 他のモデルを使う特別な必要性がない限りは堅牢でおすすめ。
    • eventlet/gevent
      • 非同期I/Oサポートにより、IOバウンドタスクのために設計されているモデル。
      • greenletsを使用して高い並行性を可能にする。
      • 特定の機能(soft_timeout等)は利用できない場合がある
      • eventletgeventの違いは実装の違いらしい。
    • solo
      • メインスレッドでタスクを順次実行する。
      • 開発やデバッグ向けらしい。
    • thread
      • concurrent.futuresモジュールがあれば利用できる。
      • マルチスレッドを使用してタスクを並行処理する。
    • custom
      • カスタムワーカープールの実装を環境変数で指定できるようにする。`
    • processes
      • ドキュメントにはないけど、ヘルプで出てくる。
      • 複数のプロセスを使用してタスクを並行処理する。
  • task events
    • Celery が Worker で発生したアクションに対して監視メッセージ (イベント) を送信するためのオプション。
    • celery eventsやリアルタイムCeleryモニタ Flower などで使用される。
  • queues
    • Worker がタスクを消費するキューのリスト
    • Workerは一度に複数のキューからタスクを消費するように指示することができる
    • QoS、Separation of Concern、Prioritizationの手段として特定のWorkerにメッセージをルーティングするために使われる

プールの種類については以下も参照。

https://docs.celeryq.dev/en/stable/userguide/concurrency/index.html#concurrency

以下の記事には詳しい説明があった。

https://blog.symdon.info/posts/1617189961/

https://celery.school/celery-worker-pools

ワーカーのデーモン化

チュートリアルではワーカーサーバをそのまま上げているけど、実サービスで使う場合はデーモン化する。以下にinitスクリプトなどいくつかの方法が記載されている。

https://docs.celeryq.dev/en/stable/userguide/daemonizing.html#daemonizing

これらのスクリプトはcelery multiコマンドを使っている。で、ドキュメント通りにcelery multiで起動すると、

$ celery multi start w1 -A proj -l INFO

怒られる。

PermissionError: [Errno 13] Permission denied: '/var/run/celery'

PIDファイルとログファイルはデフォルトだとワーカーを実行したディレクトリに作成される、とドキュメントにはあるのだが、今は違うのかもしれない。
一般的なデーモンと同じく/varの下だと権限等面倒なので、今回はオプションを追加。

$ mkdir run log
$ celery multi start worker1 -A proj -l INFO \
    --pidfile=./run/%n.pid \
    --logfile=./log/%n%I.log
celery multi v5.4.0 (opalescent)
> Starting nodes...
	> worker1@example.local: OK

プロセスを見てみる

$ ps -eo pid,ppid,user,cmd | grep celery
2941940       1 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip)
2941977 2941940 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip)
2941978 2941940 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip)
2941979 2941940 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip)
$ ps -eo pid,ppid,user,cmd | grep celery | grep -v grep | wc -l
33

最初のプロセスがスーパーバイザー的なプロセスだと思うので、実際に処理を行うワーカーは32個立ち上がっている思われる。

止めるときはstop/stopwaitを使う。stopwaitの場合は実行中のタスク終了を待ってからワーカーが終了する。

$ celery multi stopwait worker1 -A proj -l INFO \
    --pidfile=./run/%n.pid \
    --logfile=./log/%n%I.log
celery multi v5.4.0 (opalescent)
> worker@example.local: DOWN

ワーカー数を指定してみる。

$ celery multi start 5 -A proj -l INFO \
    --pidfile=./run/%n.pid \
    --logfile=./log/%n%I.log
celery multi v5.4.0 (opalescent)
> Starting nodes...
	> celery1@example.local: OK
	> celery2@example.local: OK
	> celery3@example.local: OK
	> celery4@example.local: OK
	> celery5@example.local: OK

プロセス確認

$ ps -eo pid,ppid,user,cmd | grep celery | grep -v grep | wc -l
165

なんと!プロセスを見てみるとこんな感じ。

2967798       1 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery1@example.local (snip)
2967802       1 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery2@example.local (snip)
2967805       1 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery3@example.local (snip)
2967808       1 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery4@example.local (snip)
2967810       1 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery5@example.local (snip)
2967844 2967808 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery4@example.local (snip)
2967847 2967810 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery5@example.local (snip)
2967849 2967802 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery2@example.local (snip)
2967850 2967798 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery1@example.local (snip)
2967851 2967808 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery4@example.local (snip)

通常はcelery multi start ワーカー名で起動すると、ワーカーの親プロセス=スーパーバイザー的プロセスが生成されて、そこから子プロセス≒じっさいに処理を行うワーカープロセスが生える、みたいなイメージ。
で、celery multi start Nで数値を指定すると、ワーカー名が自動的にceleryNみたいになって、さらにそこから子プロセスが生えるので、生成される子プロセスは 5 x 32 = 160、親プロセスも含めると165ってことなんだろう。

子プロセスの数は-cで設定する。

一旦ストップしておいて、

$ celery multi stopwait 5 -A proj -l INFO \
    --pidfile=./run/%n.pid \
    --logfile=./log/%n%I.log

-cをつけて実行。ワーカー名もつけてある。

$ celery multi start worker1 -c 5 -A proj -l INFO \
    --pidfile=./run/%n.pid \
    --logfile=./log/%n%I.log
$ ps -eo pid,ppid,user,cmd | grep celery | grep -v grep
2974348       1 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker -c 5 (snip) -n worker1@example.local (snip)
2974381 2974348 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker -c 5 (snip) -n worker1@example.local (snip)
2974382 2974348 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker -c 5 (snip) -n worker1@example.local (snip)
2974383 2974348 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker -c 5 (snip) -n worker1@example.local (snip)
2974384 2974348 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker -c 5 (snip) -n worker1@example.local (snip)
2974385 2974348 kun432   /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker -c 5 (snip) -n worker1@example.local (snip)

なるほど。じゃあ複数のワーカーを立ち上げる意味は何なんだろうか?というところで元々紹介されていたコマンドはこんな感じで、どうやらワーカーごとに役割を変えたりするのに使えるらしい。なるほど。

$ celery multi start 10 -c 2 -A proj -l INFO \
    -Q:1-3 images,video \
    -Q:4,5 data \
    -Q default \
    --pidfile=./run/%n.pid \
    --logfile=./log/%n%I.log

ちなみに-Lオプションでワーカーごとのログレベルを指定することができるとあるけれどもこうなった。

Error: No such option: -L

うーん、ちょいちょいドキュメントと違うところがあるのはちょっとつらい。

celery multiのオプションは以下を見ると良さそう。

https://docs.celeryq.dev/en/stable/reference/celery.bin.multi.html#module-celery.bin.multi

タスクの呼び出し

"First Steps ・・・"と重複する部分もあるけれど一応。

まずワーカーを起動しておく。

$ celery multi start worker1 -c 5 -A proj -l INFO \
    --pidfile=./run/%n.pid \
    --logfile=./log/%n%I.log

REPLからdelay()でタスクを呼び出し。

>>> from proj.tasks import add
>>> result = add.delay(2, 2)
<AsyncResult: 4ae5feda-f8d9-4c0e-87ed-855636fd8901>

でこれはapply_async()を使っても同じことになる。

>>> from proj.tasks import add
>>> add.apply_async((2, 2))
<AsyncResult: abbd74ad-0a95-4733-8be0-f7e3eb16a5f1>

実行時間を指定するとともできる。以下の例だと10秒後に実行される(取り出すには結果バックエンドが必要)

>>> result = add.apply_async((2, 2), countdown=10)
>>> result.ready()
False
>>> result.ready()
False
>>> result.ready()
True
>>> result.get()
4

同期的にタスクを実行することもできる。この場合は現在のプロセスで実行されるため、メッセージは送信されない。

>>> add(2, 2)
4

タスクのIDの取り出し

>>> result = add.apply_async((2, 2), countdown=10)
>>> result.id
'c384b7bc-5465-4e75-92fc-8dfb920b38a0'

タスクで例外が発生した場合は、.get()でエラーが伝播される

>>> result = add.delay(2, '2')
>>> result.get()
Traceback (most recent call last):
(snip)
TypeError: unsupported operand type(s) for +: 'int' and 'str'

.get(propagate=False)でエラーの伝播を無効化できる

>>> result = add.delay(2, '2')
>>> result.get(propagate=False)
TypeError("unsupported operand type(s) for +: 'int' and 'str'")

タスクの成功。失敗をチェックすることもできる。

>>> result = add.delay(2, 2)
>>> result.successful()
True
>>> result = add.delay(2, "2")
>>> result.failed()
True

タスクの状態を確認することもできる。

>>> result = add.apply_async((2, 2), countdown=10)
>>> result.state
'PENDING'
>>> result.state
'PENDING'
>>> result.state
'SUCCESS'

失敗したときはFAILEDになる。あと、task_track_startedを有効にするか、タスクに@task(track_started=True)をつけると、PENDING->STARTED->SUCCESSみたいな感じで、STARTEDのステータスが取れるらしい。

なお、「PENDINGはまだ記録されていない状態であり、未知のIDに対するデフォルトの状態」と書いてあるけれども、ここは正直どういう意味なのかよくわからない。

>>> from proj.celery import app
>>> res = app.AsyncResult('this-id-does-not-exist')
>>> res.id
'this-id-does-not-exist'
>>> res.state
'PENDING'

まあこれはそうなんでしょう。ただ同じPENDINGでもこの状態だとIDが取れる。

>>> result = add.apply_async((2, 2), countdown=10)
>>> result.state
'PENDING'
>>> result.id
'11364e31-40fe-4f62-a920-aedc75456c0a'

IDに対して何かしらの状態は紐づいていない→PENDINGという風に言いたいのかな?とはいえ、あとあと実行されるわけだし、そういう意味では実行待ちという状態を持ってるんじゃないかなーという気がするのだけども。

なお、リトライ機構もあるようで、その場合はこんな感じになったりするらしい。

PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

なんとなくtask_track_startedはデフォルトで有効にしてもいいんじゃないかなぁという気がした。

Canvas: ワークフローをデザインする

Signatureを使うと、タスクの引数と実行時のオプションをラップして、後で呼び出すことができる。ここ最初文章読んだだけだとさっぱりわからなかったけど、やってみるほうが早い。

Signatureでaddタスクをラップ。

>>> s1 = add.signature((2,2), countdown=10)

作成したSignatureを.delay()メソッドで呼び出し

>>> result = s1.delay()

あとは普通に使える

>>> result.ready()
False
>>> result.ready()
False
>>> result.ready()
True
>>> result.get()
4

.signature().s()とも書ける。

>>> s2 = add.s(2,2)
>>> result = s2.delay()
>>> result.get()
4

面白いなと思うのはここから。addタスクは引数が2つ必要だけど、1つだけ指定してSignatureを定義する。これをPartialと呼ぶらしい。

>>> s2 = add.s(4)

delay()するときにもう一つの引数を渡してやる。

>>> result = s2.delay(4)

結果を取得してみる。

>>> result.get()
8

つまり実行時に内容を書き換えるようなことができる。
これはオプションでも同様。以下のようなタスクを用意する。

proj/tasks.py
@app.task
def add_sleep(x, y, sleep=False):
    if sleep:
        time.sleep(10)
    return x + y

ワーカーを再起動して反映

$ celery multi restart worker1 -c 5 -A proj -l INFO \
    --pidfile=./run/%n.pid \
    --logfile=./log/%n%I.log

REPLで実行。

>>> from proj.tasks import add_sleep
>>> s3 = add_sleep.s(5, 5, sleep=True)
>>> result = s3.delay()
>>> result.ready()
False
>>> result.ready()
False
>>> result.ready()
False
>>> result.ready()
True
>>> result.get()
10

sleepが効いているのがわかる。これをdelay()時に書き換える。

>>> result = s3.delay(sleep=False)
>>> result.ready()
True
>>> result.get()
10

なお、delay()apply_async()ではオプションが異なる。タスクの引数以外のオプションの場合(例えばcountdownとか)は apply_async()を使う場合しか書き換えできないってことかなと思っている。

sig.apply_async(args=(), kwargs={}, **options)
sig.delay(*args, **kwargs)

でこれらをより有効に活用するために以下のようなPrimitivesがある。

  • group
  • chain
  • chord
  • map
  • starmap
  • chunks

ドキュメントで紹介されているものだけ少し触れてみる。

group

groupは、タスクのリストを渡すと並列実行を行い、それぞれの結果をGroupResultオブジェクトとして返し、結果を取り出す際にはリストの順序どおりに返す

>>> from celery import group
>>> from proj.tasks import add

>>> result = group(add.s(i, i) for i in range(10))()
<GroupResult: 77cc8f3d-eca2-4d40-a6f6-134df507ba00 [4dc23bab-a7bd-4cd5-b701-3664b42dcb35, a2e4d3d2-b53f-4b30-8972-d3c351637255, 8055f8cf-056f-4564-8cef-84ce30098ed8, b08add7d-b7a9-4afc-8c53-95373c1adc40, 6e496546-4708-450d-be32-3af14ea7761c, ce7c83f6-1eb4-4f68-8244-8f07f806ea83, a1833bab-89d1-4ccf-a35f-022edb184638, d6bf0848-4449-450b-90fc-f2a336ae2acf, 6dbb2a0b-03da-4bff-bec8-910f00b25b32, 9751f4db-a9b2-404a-a30f-2b8799764e7e]>

>>> result.get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Partialの場合

>>> g = group(add.s(i) for i in range(10))
>>> g
group(<_regen: [proj.tasks.add(0), add(1), add(2), add(3), add(4), add(5), add(6), add(7), add(8), add(9)]>)
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

chain

chainはその名の通り順番に実行していき、前のタスクの結果が後続のタスクに渡される。

>>> from celery import chain
>>> from proj.tasks import add, mul

>>> chain(add.s(4, 4) | mul.s(8))().get()
64

mul.s(8)がPartialになっていて、add.s(4, 4)の結果が渡されるような感じになっている。

全部Partialだとこう。

>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64

chainは省略できる。

>>> g = (add.s(4) | mul.s(8))
>>> g(4).get()
64

chord

chordはグループにコールバックを付与したもの。

>>> from celery import chord
>>> from proj.tasks import add, xsum

>>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
90

(add.s(i, i) for i in range(10))の結果をxsum.s()に渡している感じ。

以下は同じ意味になる。

(group(add.s(i, i) for i in range(10)) | xsum.s())().get()
90

これらのPrimitivesを使って作成したワークフローをCanvasというらしい。

https://docs.celeryq.dev/en/stable/userguide/canvas.html#guide-canvas

ルーティング

デーモン化のところで少し触れたけども、キューに名前をつけて、メッセージを別々のキューに振り分けることができる。

例えばこんな風にワーカーを上げておく

$ celery multi start 4 -c 1 -A proj -l INFO \
    -Q:1-2 foo \
    -Q:3-4 bar \
    --pidfile=./run/%n.pid \
    --logfile=./log/%n%I.log
  • 親ワーカープロセスは合計4
  • 子ワーカープロセスは各1
  • キュー名fooはワーカー1と2に割り当て
  • キュー名barはワーカー3と4に割り当て

で、実行時にキューを振り分ける場合はapply_async()を使ってオプションで指定する

>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='foo')

もしくはtask_routesを設定しておくと、タスク名で自動的に振り分けられる

app.conf.update(
    task_routes = {
        'proj.tasks.add': {'queue': 'foo'},
    },
)

その他ルーティングについては以下参照。

https://docs.celeryq.dev/en/stable/userguide/routing.html#guide-routing

kun432kun432

リモートからの制御

celeryコマンドにはいくつか運用に役立つようなサブコマンドが用意されている。

RabbitMQ、Redis、Qpidをブローカーに使っている場合は、ワーカーの状態を確認することができる。
celery inspectを使う。

$ celery -A proj inspect active
->  celery3@example.local: OK
    - empty -
->  celery4@example.local: OK
    - empty -
->  celery2@example.local: OK
    - empty -
->  celery1@example.local: OK
    - empty -

4 nodes online.

特定のワーカーだけを確認したい場合は--destinationオプションで指定する。

$ celery -A proj inspect active --destination celery2@example.local
->  celery2@example.local: OK
    - empty -

1 node online.

上記はactive可どうかをチェックしているが、他にも以下が確認できる。

$ celery -A proj inspect --list
Inspect Commands:
  active                          List of tasks currently being executed.
  active_queues                   List the task queues a worker is currently
                                  consuming from.
  clock                           Get current logical clock value.
  conf [include_defaults=False]   List configuration.
  memdump [n_samples=10]          Dump statistics of previous memsample
                                  requests.
  memsample                       Sample current RSS memory usage.
  objgraph [object_type=Request] [num=200 [max_depth=10]]
                                  Create graph of uncollected objects (memory-
                                  leak debugging).
  ping                            Ping worker(s).
  query_task [id1 [id2 [... [idN]]]]
                                  Query for task information by id.
  registered [attr1 [attr2 [... [attrN]]]]
                                  List of registered tasks.
  report                          Information about Celery installation for
                                  bug reports.
  reserved                        List of currently reserved tasks, not
                                  including scheduled/active.
  revoked                         List of revoked task-ids.
  scheduled                       List of currently scheduled ETA/countdown
                                  tasks.
  stats                           Request worker statistics/information.

次にinspectは状態確認を行うのに対して、controlは実際に制御を行う。

$ celery -A proj control --list
Control Commands:
  add_consumer <queue> [exchange [type [routing_key]]]
                                  Tell worker(s) to consume from task queue by
                                  name.
  autoscale [max [min]]           Modify autoscale settings.
  cancel_consumer <queue>         Tell worker(s) to stop consuming from task
                                  queue by name.
  disable_events                  Tell worker(s) to stop sending task-related
                                  events.
  election                        Hold election.
  enable_events                   Tell worker(s) to send task-related events.
  heartbeat                       Tell worker(s) to send event heartbeat
                                  immediately.
  pool_grow [N=1]                 Grow pool by n processes/threads.
  pool_restart                    Restart execution pool.
  pool_shrink [N=1]               Shrink pool by n processes/threads.
  rate_limit <task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>
                                  Tell worker(s) to modify the rate limit for
                                  a task by type.
  revoke [id1 [id2 [... [idN]]]]  Revoke task by task id (or list of ids).
  revoke_by_stamped_headers [key1=value1 [key2=value2 [... [keyN=valueN]]]]
                                  Revoke task by header (or list of headers).
  shutdown                        Shutdown worker(s).
  terminate <signal> [id1 [id2 [... [idN]]]]
                                  Terminate task by task id (or list of ids).
  time_limit <task_name> <soft_secs> [hard_secs]
                                  Tell worker(s) to modify the time limit for
                                  task by type.

例えば、ワーカー2を停止してみる。ちょっとエラーにはなるのだけども、

$ celery -A proj control shutdown --destination celery2@example.local
Error: No nodes replied within time constraint

inspect activeで確認してみるとワーカー2がいなくなっていた。

$ celery -A proj inspect active
->  celery3@rtx40example90.local: OK
    - empty -
->  celery4@example.local: OK
    - empty -
->  celery1@example.local: OK
    - empty -

3 nodes online.

もう一つeventというサブコマンドがある。ワーカーで発生したイベントを確認できる。

$ celery -A proj events --help
Usage: celery events [OPTIONS]

  Event-stream utilities.

Dumper:
  -d, --dump

Snapshot:
  -c, --camera TEXT
  -d, --detach
  -F, --frequency, --freq FLOAT
  -r, --maxrate TEXT
  -l, --loglevel [DEBUG|INFO|WARNING|ERROR|CRITICAL|FATAL]
                                  Logging level.

Daemonization Options:
  -f, --logfile TEXT  Log destination; defaults to stderr
  --pidfile TEXT      PID file path; defaults to no PID file
  --uid TEXT          Drops privileges to this user ID
  --gid TEXT          Drops privileges to this group ID
  --umask TEXT        Create files and directories with this umask
  --executable TEXT   Override path to the Python executable

Options:
  --help  Show this message and exit.

これを使うにはcontrolでまずイベントを有効化する必要がある。

$ celery -A proj control enable_events
->  celery4@rtx4090.local: OK
        task events enabled
->  celery3@rtx4090.local: OK
        task events enabled
->  celery1@rtx4090.local: OK
        task events enabled

events --dumpを実行するとtail -fのような感じでログが流れる。

$ celery -A proj events --dump
celery3@example.local [2024-05-10 14:33:05.876307] heartbeat: active=0, clock=18204, freq=2.0, loadavg=[0.0, 0.03, 0.04], local_received=1715351585.8782148, pid=3036874, processed=0, sw_ident=py-celery, sw_sys=Linux, sw_ver=5.4.0, utcoffset=-9
celery4@example.local [2024-05-10 14:33:05.877166] heartbeat: active=0, clock=18204, freq=2.0, loadavg=[0.0, 0.03, 0.04], local_received=1715351585.8786154, pid=3036876, processed=0, sw_ident=py-celery, sw_sys=Linux, sw_ver=5.4.0, utcoffset=-9
celery1@example.local [2024-05-10 14:33:05.877706] heartbeat: active=0, clock=18205, freq=2.0, loadavg=[0.0, 0.03, 0.04], local_received=1715351585.8790622, pid=3036861, processed=19, sw_ident=py-celery, sw_sys=Linux, sw_ver=5.4.0, utcoffset=-9
celery1@example.local [2024-05-10 14:33:06.341848] task received: proj.tasks.add(18ad0903-422f-4cda-9d9c-629727a8a035) args=(2, 2) kwargs={} clock=18209, eta=None, expires=None, local_received=1715351586.342242, parent_id=None, pid=3036861, retries=0, root_id=18ad0903-422f-4cda-9d9c-629727a8a035, utcoffset=-9
celery1@example.local [2024-05-10 14:33:06.342527] task started: proj.tasks.add(18ad0903-422f-4cda-9d9c-629727a8a035) args=(2, 2) kwargs={} clock=18210, local_received=1715351586.3427393, pid=3036861, utcoffset=-9
celery1@example.local [2024-05-10 14:33:06.343118] task succeeded: proj.tasks.add(18ad0903-422f-4cda-9d9c-629727a8a035) args=(2, 2) kwargs={} clock=18211, local_received=1715351586.3433118, pid=3036861, result=4, runtime=0.0002800379879772663, utcoffset=-9
celery3@example.local [2024-05-10 14:33:07.879026] heartbeat: active=0, clock=18208, freq=2.0, loadavg=[0.0, 0.03, 0.04], local_received=1715351587.880925, pid=3036874, processed=0, sw_ident=py-celery, sw_sys=Linux, sw_ver=5.4.0, utcoffset=-9
celery4@example.local [2024-05-10 14:33:07.879799] heartbeat: active=0, clock=18208, freq=2.0, loadavg=[0.0, 0.03, 0.04], local_received=1715351587.8815742, pid=3036876, processed=0, sw_ident=py-celery, sw_sys=Linux, sw_ver=5.4.0, utcoffset=-9

ハートビート的なログや、あとはタスクの呼び出しとかのログが取得できる。

--dumpなしだとTUIで表示される

$ celery -A proj events

終わったらcontrolでイベントを無効化しておく

$ celery -A proj control disable_events

最後にstatusサブコマンド。これはワーカーの状態だけを確認するコマンドになっているっぽい。

$ celery -A proj status
->  celery4@example.local: OK
->  celery1@example.local: OK
->  celery3@example.local: OK

3 nodes online.
kun432kun432

まとめ

タスクキュー、今回初めて触った。やる前はタスクキューってよくわからなくて難しそうと思っていたのだけど、そういえばUnix/Linuxのatコマンドで似たようなことはやっていたなと。

ただ、atコマンド、プログラマブルに使ったことはほとんどなくて、ワンショットで時限でやらせたいこととかを登録するぐらいのもので、使いこなせてたわけではないし、やはりプログラムから制御できるってのが今は重要なんだろうなと感じた。

ただ、実際にきちんと運用するにはそれなりに大変そうな気はしたので、このあたりもう少しいろいろ確認しておきたい。

https://qiita.com/hankehly/items/c3e0496eb04327a53ac4

とりあえず何かしら実装して使ってみたいのだけど、ユースケースが思いつかないなw

kun432kun432

いまいちな点も上げておく、っていうかやっぱりドキュメントかなぁ

  • どうやら最新化されていないところがちらほらある模様。以下個人的な意見。
    • リファレンスとかは追いついてないのはしょうがないし、リファレンス読むような人はある程度調べて解決できることが多いので、最悪いいや
    • 初めて触る人がGetting Startedで躓くと、もう触ってくれなくなる可能性が高くなると思う
    • なのでGetting Startedはきちんと最新化しておいたほうが良いと思う。

なんか調べてみるとドキュメントがーみたいな声はちらほらあるみたい。でもまあOSSの宿命的なところはあるのかもしれない。

あと用語の定義がほしいな、「ワーカー」が何を指すのかを明確にしておきたい。

このスクラップは2024/05/10にクローズされました