Celeryでタスクキューことはじめ
Celeryは、膨大な量のメッセージを処理するためのシンプルで柔軟かつ信頼性の高い分散システムであり、そのようなシステムを維持するために必要なツールをオペレーションに提供する。
リアルタイム処理に重点を置いたタスク・キューであり、タスク・スケジューリングもサポートしている。
First Steps with Celery
まずは公式の「First Steps with Celery」を進めてみる。
Celeryの初歩
- Celeryは多機能なタスクキューシステム。
- 使用が簡単で、複雑な部分を学ばずともすぐに始められる。
- ベストプラクティスを基に設計されており、他言語との統合やスケールが可能。
- 本番環境での運用に必要なツールとサポートが含まれている。
このチュートリアルで学ぶこと
- メッセージトランスポート(ブローカー)の選択とインストール。
- Celeryのインストールと最初のタスクの作成。
- ワーカーの起動とタスクの呼び出し。
- タスクの状態変化を追跡し、戻り値を検査する方法。
チュートリアルの目的
- 初心者でも簡単に始められるよう、意図的にシンプルに保たれている。
- このチュートリアルを終えた後、さらなる機能や詳細を学ぶためにドキュメントの残りを閲覧することを推奨する。
とりあえず作業ディレクトリ作成
$ mkdir celery-tutorial
$ cd celery-tutorial
ブローカーの選択
Celeryはメッセージを送受信するためのソリューションを必要とする。通常、これはメッセージ・ブローカーと呼ばれる別のサービスの形で提供される。
選択肢は以下
- RabbitMQ
- Redis
- 他のブローカー
- Amazon SQSなど
今回はRedisを使う。
$ docker run -d -p 6379:6379 redis
Celeryのインストール
CeleryとRedisのpythonモジュールをインストール。なお、仮想環境を事前に作成しておくこと。
$ pip install celery redis
Celeryアプリケーションの作成
Celeryインスタンス=Celeryアプリを作成する。これがタスクの作成やワーカー管理などを行う。
tasks
がモジュール名となる
ここではシンプルに足し算を行う関数をCeleryのタスクとして登録している。
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task
def add(x, y):
return x + y
Celeryワーカーサーバの起動
ワーカーを起動する。
$ celery -A tasks worker --loglevel=INFO
-------------- celery@example.local v5.4.0 (opalescent)
--- ***** -----
-- ******* ---- Linux-6.5.0-28-generic-x86_64-with-glibc2.35 2024-05-10 05:05:27
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7396374aa850
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 32 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2024-05-10 05:05:27,915: INFO/MainProcess] Connected to redis://localhost:6379//
[2024-05-10 05:05:27,916: INFO/MainProcess] mingle: searching for neighbors
[2024-05-10 05:05:28,922: INFO/MainProcess] mingle: all alone
[2024-05-10 05:05:28,928: INFO/MainProcess] celery@example.local ready.
立ち上がってきたみたい。
ではタスクを呼び出してみる。Python REPLで。
$ python
タスクの呼び出しは.delay()
を使う。
>>> from tasks import add
>>> add.delay(4,4)
実行されたっぽい?
<AsyncResult: 32da7e3e-f9ab-4e58-89f0-bd9170dc81f3>
ワーカーの出力を見ると実行結果が出ていた。
[2024-05-10 05:08:42,701: INFO/MainProcess] Task tasks.add[32da7e3e-f9ab-4e58-89f0-bd9170dc81f3] received
[2024-05-10 05:08:42,702: INFO/ForkPoolWorker-32] Task tasks.add[32da7e3e-f9ab-4e58-89f0-bd9170dc81f3] succeeded in 0.0002071061171591282s: 8
- タスクを呼び出すと、AsyncResultインスタンスが返される。これを使って、タスクの状態確認、タスクの終了を待つ、戻り値の取得、などを行う。
- タスクが失敗した場合は、例外とトレースバックを取得できる。
- 結果はデフォルトでは有効になっていない。結果を受けて、リモートプロシージャコールの実行やデータベースへ保存する場合は、Celeryに結果バックエンドを使用するよう設定する必要がある。
らしい。
結果の保存
タスクの状態を記録する場合は結果バックエンドが必要になる。以下が使える。
- SQLAlchemy/Django ORM
- MongoDB
- Memcached
- Redis
- RPC(RabbitMQ/AMQP)
今回はRedisをそのまま結果バックエンドで使う。ワーカーサーバーを一旦止めて、tasks.py
を書き換える。
from celery import Celery
# backendを指定
app = Celery('tasks', backend='redis://localhost:6379', broker='redis://localhost:6379')
@app.task
def add(x, y):
return x + y
ワーカーサーバーを再度上げる。
$ celery -A tasks worker --loglevel=INFO
再度REPLでタスクを呼び出し。タスク呼び出しで返されたAsyncResultインスタンスをresultに入れている。
>>> from tasks import add
>>> result = add.delay(4,4)
で結果が返ってきているかを確認するにはこのインスタンスに対して.ready()
を使う。
>>> result.ready()
Trueなら終わっている。
True
結果を取得するには.get()
を使う。timeout
を指定すると待つことができる。
result.get(timeout=1)
結果が得られた。
8
なおタスクが失敗した場合。意図的に失敗させてみる。
>>> result = add.delay(4,"a")
>>> result.ready()
True
>>> result.get()
Traceback (most recent call last):
(snip)
TypeError: unsupported operand type(s) for +: 'int' and 'str'
.get()
でトレースバック付きで例外を再度発生させる。propagate=False
をつけて.get()
を行うと、
>>> result = add.delay(4,"a")
>>> result.ready()
True
>>> result.get(propagate=False)
TypeError("unsupported operand type(s) for +: 'int' and 'str'")
例外だけが返る。トレースバックも取得できる。
>>> result.traceback
'Traceback (most recent call last):\n (snip) ~~^~~\nTypeError: unsupported operand type(s) for +: \'int\' and \'str\'\n
設定
設定は設定ファイルとして読み込むことができる。といってもPythonファイルなのだけど。
broker_url = 'redis://localhost:6379'
result_backend = 'redis://localhost:6379',
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Tokyo'
enable_utc = True
設定の確認。何も出なければOK。
$ python -m config.celeryconfig
アプリの方で読み込む。
from celery import Celery
# インポート
from config import celeryconfig
app = Celery('tasks')
# 設定読み込み
app.config_from_object(celeryconfig)
@app.task
def add(x, y):
return x + y
いくつか設定例が用意されているので少し試してみる。
タスクにレート制限をつけてみる。
broker_url = 'redis://localhost:6379'
result_backend = 'redis://localhost:6379'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Tokyo'
enable_utc = True
# 以下を追加
task_annotations = {
'tasks.add': {'rate_limit': '1/m'}
}
ワーカーサーバーを立ち上げ直す
$ celery -A tasks worker --loglevel=INFO
REPLで。
>>> from tasks import add
>>> result = add.delay(4, 4)
>>> result.ready()
True
>>> result.get()
8
>>> result = add.delay(4, 4)
>>> result.ready()
False
>>> result.ready()
False
(しばらく待つ)
>>> result.ready()
True
>>> result.get()
8
レート制限が効いているのがわかる。
RabbitMQやRedisがブローカーの場合は、ワーカーサーバー起動時のオプションでも設定できるみたいだけど、設定ファイルでやりたいよね。
$ celery -A tasks control rate_limit tasks.add 1/m
タスクもディレクトリ切って分けたいよねーと思ってたら、以下の記事でクラスベースで書くやり方が書かれていた。こちらのほうがわかりやすくて良さそうに思える。
Next Steps
続き。「First Steps with Celery」をもう少し掘り下げた内容になっているらしい。
アプリケーションでCeleryを使う
実際にアプリケーションで使う場合の例。
こういうレイアウト例。
$ mkdir -p src/proj
$ touch src/proj/{__init__,celery,tasks}.py
$ tree src
src
└── proj
├── __init__.py
├── celery.py
└── tasks.py
1 directory, 3 files
src/proj/celery.py
を以下の内容で作成。
from celery import Celery
app = Celery('proj',
backend='redis://localhost:6379',
broker='redis://localhost:6379',
include=['proj.tasks'],
)
# オプションで設定を追加、詳細はアプリケーションユーザガイドを参照。
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
次にタスクをsrc/proj/tasks.py
に追加する。
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
ワーカーサーバを起動
$ cd src
$ celery -A proj worker -l INFO
起動時のメッセージに各種情報が表示されている。
-------------- celery@example.local v5.4.0 (opalescent)
--- ***** -----
-- ******* ---- Linux-6.5.0-28-generic-x86_64-with-glibc2.35 2024-05-10 15:03:43
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x70e9d51f0690
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost:6379/
- *** --- * --- .> concurrency: 32 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
一部ドキュメントとは違うけども、多分こんな感じ
-
transport
- (おそらく)ブローカーのURL。
-
concurrency
- タスクの同時処理に使用するワーカープロセスの数。
- この数だけ同時に処理ができる。これ以上の処理がある場合は前の処理が終わるまで待たされる。
- デフォルトはCPU(コアを含む)数。
-
celely worker -c
で設定できる。 - IOバウンドな処理が多ければ増やすと良い(とあるけども、I/Oバウンドな処理ばかりだとIO waitでしんどくないかな・・・?)
- CPUの2倍以上を指定指定も効果はなく、むしろパフォーマンスは悪くなるという実験結果があるとのこと。
- なお、プールの種類には以下がある様子。
-
prefork
- デフォルト。
- 各ワーカープロセスが独立してタスクを処理。
- CPU負荷の高いタスクやほとんどのユースケースに最適。
- 他のモデルを使う特別な必要性がない限りは堅牢でおすすめ。
-
eventlet
/gevent
- 非同期I/Oサポートにより、IOバウンドタスクのために設計されているモデル。
- greenletsを使用して高い並行性を可能にする。
- 特定の機能(soft_timeout等)は利用できない場合がある
-
eventlet
とgevent
の違いは実装の違いらしい。
-
solo
- メインスレッドでタスクを順次実行する。
- 開発やデバッグ向けらしい。
-
thread
-
concurrent.futures
モジュールがあれば利用できる。 - マルチスレッドを使用してタスクを並行処理する。
-
-
custom
- カスタムワーカープールの実装を環境変数で指定できるようにする。`
-
processes
- ドキュメントにはないけど、ヘルプで出てくる。
- 複数のプロセスを使用してタスクを並行処理する。
- タスクの同時処理に使用するワーカープロセスの数。
-
task events
- Celery が Worker で発生したアクションに対して監視メッセージ (イベント) を送信するためのオプション。
-
celery events
やリアルタイムCeleryモニタ Flower などで使用される。
-
queues
- Worker がタスクを消費するキューのリスト
- Workerは一度に複数のキューからタスクを消費するように指示することができる
- QoS、Separation of Concern、Prioritizationの手段として特定のWorkerにメッセージをルーティングするために使われる
プールの種類については以下も参照。
以下の記事には詳しい説明があった。
ワーカーのデーモン化
チュートリアルではワーカーサーバをそのまま上げているけど、実サービスで使う場合はデーモン化する。以下にinitスクリプトなどいくつかの方法が記載されている。
これらのスクリプトはcelery multi
コマンドを使っている。で、ドキュメント通りにcelery multi
で起動すると、
$ celery multi start w1 -A proj -l INFO
怒られる。
PermissionError: [Errno 13] Permission denied: '/var/run/celery'
PIDファイルとログファイルはデフォルトだとワーカーを実行したディレクトリに作成される、とドキュメントにはあるのだが、今は違うのかもしれない。
一般的なデーモンと同じく/varの下だと権限等面倒なので、今回はオプションを追加。
$ mkdir run log
$ celery multi start worker1 -A proj -l INFO \
--pidfile=./run/%n.pid \
--logfile=./log/%n%I.log
celery multi v5.4.0 (opalescent)
> Starting nodes...
> worker1@example.local: OK
プロセスを見てみる
$ ps -eo pid,ppid,user,cmd | grep celery
2941940 1 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip)
2941977 2941940 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip)
2941978 2941940 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip)
2941979 2941940 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip)
$ ps -eo pid,ppid,user,cmd | grep celery | grep -v grep | wc -l
33
最初のプロセスがスーパーバイザー的なプロセスだと思うので、実際に処理を行うワーカーは32個立ち上がっている思われる。
止めるときはstop
/stopwait
を使う。stopwait
の場合は実行中のタスク終了を待ってからワーカーが終了する。
$ celery multi stopwait worker1 -A proj -l INFO \
--pidfile=./run/%n.pid \
--logfile=./log/%n%I.log
celery multi v5.4.0 (opalescent)
> worker@example.local: DOWN
ワーカー数を指定してみる。
$ celery multi start 5 -A proj -l INFO \
--pidfile=./run/%n.pid \
--logfile=./log/%n%I.log
celery multi v5.4.0 (opalescent)
> Starting nodes...
> celery1@example.local: OK
> celery2@example.local: OK
> celery3@example.local: OK
> celery4@example.local: OK
> celery5@example.local: OK
プロセス確認
$ ps -eo pid,ppid,user,cmd | grep celery | grep -v grep | wc -l
165
なんと!プロセスを見てみるとこんな感じ。
2967798 1 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery1@example.local (snip)
2967802 1 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery2@example.local (snip)
2967805 1 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery3@example.local (snip)
2967808 1 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery4@example.local (snip)
2967810 1 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery5@example.local (snip)
2967844 2967808 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery4@example.local (snip)
2967847 2967810 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery5@example.local (snip)
2967849 2967802 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery2@example.local (snip)
2967850 2967798 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery1@example.local (snip)
2967851 2967808 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker (snip) -n celery4@example.local (snip)
通常はcelery multi start ワーカー名
で起動すると、ワーカーの親プロセス=スーパーバイザー的プロセスが生成されて、そこから子プロセス≒じっさいに処理を行うワーカープロセスが生える、みたいなイメージ。
で、celery multi start N
で数値を指定すると、ワーカー名が自動的にceleryN
みたいになって、さらにそこから子プロセスが生えるので、生成される子プロセスは 5 x 32 = 160、親プロセスも含めると165ってことなんだろう。
子プロセスの数は-c
で設定する。
一旦ストップしておいて、
$ celery multi stopwait 5 -A proj -l INFO \
--pidfile=./run/%n.pid \
--logfile=./log/%n%I.log
-c
をつけて実行。ワーカー名もつけてある。
$ celery multi start worker1 -c 5 -A proj -l INFO \
--pidfile=./run/%n.pid \
--logfile=./log/%n%I.log
$ ps -eo pid,ppid,user,cmd | grep celery | grep -v grep
2974348 1 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker -c 5 (snip) -n worker1@example.local (snip)
2974381 2974348 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker -c 5 (snip) -n worker1@example.local (snip)
2974382 2974348 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker -c 5 (snip) -n worker1@example.local (snip)
2974383 2974348 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker -c 5 (snip) -n worker1@example.local (snip)
2974384 2974348 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker -c 5 (snip) -n worker1@example.local (snip)
2974385 2974348 kun432 /SOMEWHERE/celery-tutorial/.venv/bin/python -m celery -A proj worker -c 5 (snip) -n worker1@example.local (snip)
なるほど。じゃあ複数のワーカーを立ち上げる意味は何なんだろうか?というところで元々紹介されていたコマンドはこんな感じで、どうやらワーカーごとに役割を変えたりするのに使えるらしい。なるほど。
$ celery multi start 10 -c 2 -A proj -l INFO \
-Q:1-3 images,video \
-Q:4,5 data \
-Q default \
--pidfile=./run/%n.pid \
--logfile=./log/%n%I.log
ちなみに-L
オプションでワーカーごとのログレベルを指定することができるとあるけれどもこうなった。
Error: No such option: -L
うーん、ちょいちょいドキュメントと違うところがあるのはちょっとつらい。
celery multi
のオプションは以下を見ると良さそう。
タスクの呼び出し
"First Steps ・・・"と重複する部分もあるけれど一応。
まずワーカーを起動しておく。
$ celery multi start worker1 -c 5 -A proj -l INFO \
--pidfile=./run/%n.pid \
--logfile=./log/%n%I.log
REPLからdelay()
でタスクを呼び出し。
>>> from proj.tasks import add
>>> result = add.delay(2, 2)
<AsyncResult: 4ae5feda-f8d9-4c0e-87ed-855636fd8901>
でこれはapply_async()
を使っても同じことになる。
>>> from proj.tasks import add
>>> add.apply_async((2, 2))
<AsyncResult: abbd74ad-0a95-4733-8be0-f7e3eb16a5f1>
実行時間を指定するとともできる。以下の例だと10秒後に実行される(取り出すには結果バックエンドが必要)
>>> result = add.apply_async((2, 2), countdown=10)
>>> result.ready()
False
>>> result.ready()
False
>>> result.ready()
True
>>> result.get()
4
同期的にタスクを実行することもできる。この場合は現在のプロセスで実行されるため、メッセージは送信されない。
>>> add(2, 2)
4
タスクのIDの取り出し
>>> result = add.apply_async((2, 2), countdown=10)
>>> result.id
'c384b7bc-5465-4e75-92fc-8dfb920b38a0'
タスクで例外が発生した場合は、.get()
でエラーが伝播される
>>> result = add.delay(2, '2')
>>> result.get()
Traceback (most recent call last):
(snip)
TypeError: unsupported operand type(s) for +: 'int' and 'str'
.get(propagate=False)
でエラーの伝播を無効化できる
>>> result = add.delay(2, '2')
>>> result.get(propagate=False)
TypeError("unsupported operand type(s) for +: 'int' and 'str'")
タスクの成功。失敗をチェックすることもできる。
>>> result = add.delay(2, 2)
>>> result.successful()
True
>>> result = add.delay(2, "2")
>>> result.failed()
True
タスクの状態を確認することもできる。
>>> result = add.apply_async((2, 2), countdown=10)
>>> result.state
'PENDING'
>>> result.state
'PENDING'
>>> result.state
'SUCCESS'
失敗したときはFAILED
になる。あと、task_track_started
を有効にするか、タスクに@task(track_started=True)
をつけると、PENDING
->STARTED
->SUCCESS
みたいな感じで、STARTED
のステータスが取れるらしい。
なお、「PENDING
はまだ記録されていない状態であり、未知のIDに対するデフォルトの状態」と書いてあるけれども、ここは正直どういう意味なのかよくわからない。
>>> from proj.celery import app
>>> res = app.AsyncResult('this-id-does-not-exist')
>>> res.id
'this-id-does-not-exist'
>>> res.state
'PENDING'
まあこれはそうなんでしょう。ただ同じPENDINGでもこの状態だとIDが取れる。
>>> result = add.apply_async((2, 2), countdown=10)
>>> result.state
'PENDING'
>>> result.id
'11364e31-40fe-4f62-a920-aedc75456c0a'
IDに対して何かしらの状態は紐づいていない→PENDINGという風に言いたいのかな?とはいえ、あとあと実行されるわけだし、そういう意味では実行待ちという状態を持ってるんじゃないかなーという気がするのだけども。
なお、リトライ機構もあるようで、その場合はこんな感じになったりするらしい。
PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
なんとなくtask_track_started
はデフォルトで有効にしてもいいんじゃないかなぁという気がした。
Canvas: ワークフローをデザインする
Signatureを使うと、タスクの引数と実行時のオプションをラップして、後で呼び出すことができる。ここ最初文章読んだだけだとさっぱりわからなかったけど、やってみるほうが早い。
Signatureでaddタスクをラップ。
>>> s1 = add.signature((2,2), countdown=10)
作成したSignatureを.delay()
メソッドで呼び出し
>>> result = s1.delay()
あとは普通に使える
>>> result.ready()
False
>>> result.ready()
False
>>> result.ready()
True
>>> result.get()
4
.signature()
は.s()
とも書ける。
>>> s2 = add.s(2,2)
>>> result = s2.delay()
>>> result.get()
4
面白いなと思うのはここから。addタスクは引数が2つ必要だけど、1つだけ指定してSignatureを定義する。これをPartialと呼ぶらしい。
>>> s2 = add.s(4)
でdelay()
するときにもう一つの引数を渡してやる。
>>> result = s2.delay(4)
結果を取得してみる。
>>> result.get()
8
つまり実行時に内容を書き換えるようなことができる。
これはオプションでも同様。以下のようなタスクを用意する。
@app.task
def add_sleep(x, y, sleep=False):
if sleep:
time.sleep(10)
return x + y
ワーカーを再起動して反映
$ celery multi restart worker1 -c 5 -A proj -l INFO \
--pidfile=./run/%n.pid \
--logfile=./log/%n%I.log
REPLで実行。
>>> from proj.tasks import add_sleep
>>> s3 = add_sleep.s(5, 5, sleep=True)
>>> result = s3.delay()
>>> result.ready()
False
>>> result.ready()
False
>>> result.ready()
False
>>> result.ready()
True
>>> result.get()
10
sleepが効いているのがわかる。これをdelay()
時に書き換える。
>>> result = s3.delay(sleep=False)
>>> result.ready()
True
>>> result.get()
10
なお、delay()
とapply_async()
ではオプションが異なる。タスクの引数以外のオプションの場合(例えばcountdownとか)は apply_async()
を使う場合しか書き換えできないってことかなと思っている。
sig.apply_async(args=(), kwargs={}, **options)
sig.delay(*args, **kwargs)
でこれらをより有効に活用するために以下のようなPrimitivesがある。
group
chain
chord
map
starmap
chunks
ドキュメントで紹介されているものだけ少し触れてみる。
group
group
は、タスクのリストを渡すと並列実行を行い、それぞれの結果をGroupResultオブジェクトとして返し、結果を取り出す際にはリストの順序どおりに返す
>>> from celery import group
>>> from proj.tasks import add
>>> result = group(add.s(i, i) for i in range(10))()
<GroupResult: 77cc8f3d-eca2-4d40-a6f6-134df507ba00 [4dc23bab-a7bd-4cd5-b701-3664b42dcb35, a2e4d3d2-b53f-4b30-8972-d3c351637255, 8055f8cf-056f-4564-8cef-84ce30098ed8, b08add7d-b7a9-4afc-8c53-95373c1adc40, 6e496546-4708-450d-be32-3af14ea7761c, ce7c83f6-1eb4-4f68-8244-8f07f806ea83, a1833bab-89d1-4ccf-a35f-022edb184638, d6bf0848-4449-450b-90fc-f2a336ae2acf, 6dbb2a0b-03da-4bff-bec8-910f00b25b32, 9751f4db-a9b2-404a-a30f-2b8799764e7e]>
>>> result.get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Partialの場合
>>> g = group(add.s(i) for i in range(10))
>>> g
group(<_regen: [proj.tasks.add(0), add(1), add(2), add(3), add(4), add(5), add(6), add(7), add(8), add(9)]>)
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
chain
chain
はその名の通り順番に実行していき、前のタスクの結果が後続のタスクに渡される。
>>> from celery import chain
>>> from proj.tasks import add, mul
>>> chain(add.s(4, 4) | mul.s(8))().get()
64
mul.s(8)
がPartialになっていて、add.s(4, 4)
の結果が渡されるような感じになっている。
全部Partialだとこう。
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64
chain
は省略できる。
>>> g = (add.s(4) | mul.s(8))
>>> g(4).get()
64
chord
chord
はグループにコールバックを付与したもの。
>>> from celery import chord
>>> from proj.tasks import add, xsum
>>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
90
(add.s(i, i) for i in range(10))
の結果をxsum.s()
に渡している感じ。
以下は同じ意味になる。
(group(add.s(i, i) for i in range(10)) | xsum.s())().get()
90
これらのPrimitivesを使って作成したワークフローをCanvasというらしい。
ルーティング
デーモン化のところで少し触れたけども、キューに名前をつけて、メッセージを別々のキューに振り分けることができる。
例えばこんな風にワーカーを上げておく
$ celery multi start 4 -c 1 -A proj -l INFO \
-Q:1-2 foo \
-Q:3-4 bar \
--pidfile=./run/%n.pid \
--logfile=./log/%n%I.log
- 親ワーカープロセスは合計4
- 子ワーカープロセスは各1
- キュー名
foo
はワーカー1と2に割り当て - キュー名
bar
はワーカー3と4に割り当て
で、実行時にキューを振り分ける場合はapply_async()
を使ってオプションで指定する
>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='foo')
もしくはtask_routes
を設定しておくと、タスク名で自動的に振り分けられる
app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'foo'},
},
)
その他ルーティングについては以下参照。
リモートからの制御
celeryコマンドにはいくつか運用に役立つようなサブコマンドが用意されている。
RabbitMQ、Redis、Qpidをブローカーに使っている場合は、ワーカーの状態を確認することができる。
celery inspect
を使う。
$ celery -A proj inspect active
-> celery3@example.local: OK
- empty -
-> celery4@example.local: OK
- empty -
-> celery2@example.local: OK
- empty -
-> celery1@example.local: OK
- empty -
4 nodes online.
特定のワーカーだけを確認したい場合は--destination
オプションで指定する。
$ celery -A proj inspect active --destination celery2@example.local
-> celery2@example.local: OK
- empty -
1 node online.
上記はactive
可どうかをチェックしているが、他にも以下が確認できる。
$ celery -A proj inspect --list
Inspect Commands:
active List of tasks currently being executed.
active_queues List the task queues a worker is currently
consuming from.
clock Get current logical clock value.
conf [include_defaults=False] List configuration.
memdump [n_samples=10] Dump statistics of previous memsample
requests.
memsample Sample current RSS memory usage.
objgraph [object_type=Request] [num=200 [max_depth=10]]
Create graph of uncollected objects (memory-
leak debugging).
ping Ping worker(s).
query_task [id1 [id2 [... [idN]]]]
Query for task information by id.
registered [attr1 [attr2 [... [attrN]]]]
List of registered tasks.
report Information about Celery installation for
bug reports.
reserved List of currently reserved tasks, not
including scheduled/active.
revoked List of revoked task-ids.
scheduled List of currently scheduled ETA/countdown
tasks.
stats Request worker statistics/information.
次にinspect
は状態確認を行うのに対して、control
は実際に制御を行う。
$ celery -A proj control --list
Control Commands:
add_consumer <queue> [exchange [type [routing_key]]]
Tell worker(s) to consume from task queue by
name.
autoscale [max [min]] Modify autoscale settings.
cancel_consumer <queue> Tell worker(s) to stop consuming from task
queue by name.
disable_events Tell worker(s) to stop sending task-related
events.
election Hold election.
enable_events Tell worker(s) to send task-related events.
heartbeat Tell worker(s) to send event heartbeat
immediately.
pool_grow [N=1] Grow pool by n processes/threads.
pool_restart Restart execution pool.
pool_shrink [N=1] Shrink pool by n processes/threads.
rate_limit <task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>
Tell worker(s) to modify the rate limit for
a task by type.
revoke [id1 [id2 [... [idN]]]] Revoke task by task id (or list of ids).
revoke_by_stamped_headers [key1=value1 [key2=value2 [... [keyN=valueN]]]]
Revoke task by header (or list of headers).
shutdown Shutdown worker(s).
terminate <signal> [id1 [id2 [... [idN]]]]
Terminate task by task id (or list of ids).
time_limit <task_name> <soft_secs> [hard_secs]
Tell worker(s) to modify the time limit for
task by type.
例えば、ワーカー2を停止してみる。ちょっとエラーにはなるのだけども、
$ celery -A proj control shutdown --destination celery2@example.local
Error: No nodes replied within time constraint
inspect active
で確認してみるとワーカー2がいなくなっていた。
$ celery -A proj inspect active
-> celery3@rtx40example90.local: OK
- empty -
-> celery4@example.local: OK
- empty -
-> celery1@example.local: OK
- empty -
3 nodes online.
もう一つevent
というサブコマンドがある。ワーカーで発生したイベントを確認できる。
$ celery -A proj events --help
Usage: celery events [OPTIONS]
Event-stream utilities.
Dumper:
-d, --dump
Snapshot:
-c, --camera TEXT
-d, --detach
-F, --frequency, --freq FLOAT
-r, --maxrate TEXT
-l, --loglevel [DEBUG|INFO|WARNING|ERROR|CRITICAL|FATAL]
Logging level.
Daemonization Options:
-f, --logfile TEXT Log destination; defaults to stderr
--pidfile TEXT PID file path; defaults to no PID file
--uid TEXT Drops privileges to this user ID
--gid TEXT Drops privileges to this group ID
--umask TEXT Create files and directories with this umask
--executable TEXT Override path to the Python executable
Options:
--help Show this message and exit.
これを使うにはcontrol
でまずイベントを有効化する必要がある。
$ celery -A proj control enable_events
-> celery4@rtx4090.local: OK
task events enabled
-> celery3@rtx4090.local: OK
task events enabled
-> celery1@rtx4090.local: OK
task events enabled
events --dump
を実行するとtail -f
のような感じでログが流れる。
$ celery -A proj events --dump
celery3@example.local [2024-05-10 14:33:05.876307] heartbeat: active=0, clock=18204, freq=2.0, loadavg=[0.0, 0.03, 0.04], local_received=1715351585.8782148, pid=3036874, processed=0, sw_ident=py-celery, sw_sys=Linux, sw_ver=5.4.0, utcoffset=-9
celery4@example.local [2024-05-10 14:33:05.877166] heartbeat: active=0, clock=18204, freq=2.0, loadavg=[0.0, 0.03, 0.04], local_received=1715351585.8786154, pid=3036876, processed=0, sw_ident=py-celery, sw_sys=Linux, sw_ver=5.4.0, utcoffset=-9
celery1@example.local [2024-05-10 14:33:05.877706] heartbeat: active=0, clock=18205, freq=2.0, loadavg=[0.0, 0.03, 0.04], local_received=1715351585.8790622, pid=3036861, processed=19, sw_ident=py-celery, sw_sys=Linux, sw_ver=5.4.0, utcoffset=-9
celery1@example.local [2024-05-10 14:33:06.341848] task received: proj.tasks.add(18ad0903-422f-4cda-9d9c-629727a8a035) args=(2, 2) kwargs={} clock=18209, eta=None, expires=None, local_received=1715351586.342242, parent_id=None, pid=3036861, retries=0, root_id=18ad0903-422f-4cda-9d9c-629727a8a035, utcoffset=-9
celery1@example.local [2024-05-10 14:33:06.342527] task started: proj.tasks.add(18ad0903-422f-4cda-9d9c-629727a8a035) args=(2, 2) kwargs={} clock=18210, local_received=1715351586.3427393, pid=3036861, utcoffset=-9
celery1@example.local [2024-05-10 14:33:06.343118] task succeeded: proj.tasks.add(18ad0903-422f-4cda-9d9c-629727a8a035) args=(2, 2) kwargs={} clock=18211, local_received=1715351586.3433118, pid=3036861, result=4, runtime=0.0002800379879772663, utcoffset=-9
celery3@example.local [2024-05-10 14:33:07.879026] heartbeat: active=0, clock=18208, freq=2.0, loadavg=[0.0, 0.03, 0.04], local_received=1715351587.880925, pid=3036874, processed=0, sw_ident=py-celery, sw_sys=Linux, sw_ver=5.4.0, utcoffset=-9
celery4@example.local [2024-05-10 14:33:07.879799] heartbeat: active=0, clock=18208, freq=2.0, loadavg=[0.0, 0.03, 0.04], local_received=1715351587.8815742, pid=3036876, processed=0, sw_ident=py-celery, sw_sys=Linux, sw_ver=5.4.0, utcoffset=-9
ハートビート的なログや、あとはタスクの呼び出しとかのログが取得できる。
--dump
なしだとTUIで表示される
$ celery -A proj events
終わったらcontrol
でイベントを無効化しておく
$ celery -A proj control disable_events
最後にstatus
サブコマンド。これはワーカーの状態だけを確認するコマンドになっているっぽい。
$ celery -A proj status
-> celery4@example.local: OK
-> celery1@example.local: OK
-> celery3@example.local: OK
3 nodes online.
まとめ
タスクキュー、今回初めて触った。やる前はタスクキューってよくわからなくて難しそうと思っていたのだけど、そういえばUnix/Linuxのatコマンドで似たようなことはやっていたなと。
ただ、atコマンド、プログラマブルに使ったことはほとんどなくて、ワンショットで時限でやらせたいこととかを登録するぐらいのもので、使いこなせてたわけではないし、やはりプログラムから制御できるってのが今は重要なんだろうなと感じた。
ただ、実際にきちんと運用するにはそれなりに大変そうな気はしたので、このあたりもう少しいろいろ確認しておきたい。
とりあえず何かしら実装して使ってみたいのだけど、ユースケースが思いつかないなw
いまいちな点も上げておく、っていうかやっぱりドキュメントかなぁ
- どうやら最新化されていないところがちらほらある模様。以下個人的な意見。
- リファレンスとかは追いついてないのはしょうがないし、リファレンス読むような人はある程度調べて解決できることが多いので、最悪いいや
- 初めて触る人がGetting Startedで躓くと、もう触ってくれなくなる可能性が高くなると思う
- なのでGetting Startedはきちんと最新化しておいたほうが良いと思う。
なんか調べてみるとドキュメントがーみたいな声はちらほらあるみたい。でもまあOSSの宿命的なところはあるのかもしれない。
あと用語の定義がほしいな、「ワーカー」が何を指すのかを明確にしておきたい。