Open1

Celeryを使う

m10k1m10k1

first step

https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html

このチュートリアルで得られるもの

  • メッセージトランスポート(ブローカー)の選択とインストール
  • Celeryのインストールと最初のタスクの作成
  • ワーカーの起動とタスクの呼び出し
  • タスクがさまざまな状態に移行する際の追跡と、戻り値の検査

ブローカーの選択

利用可能なブローカー

  • RabbitMQ
  • Redis
  • AmazonSQS

celeryのインストール

pip install celery

アプリケーション

Celery インスタンス作成

まず、Celeryインスタンスを作成(Celery アプリケーション)
このインスタンスがエントリーポイントとして使用される
このインスタンスを通じてタスクを作成したり、ワーカーを管理します。

tasks.py
from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@task
def app(x, y):
    return x + y

Celeryのコンストラクタの最初の引数は、モジュール名を指定します。
この例ではtasks.pyがモジュールなので、"tasks"を引数に渡しています。
2つの目の引数では、ブローカーを指定するためのURLになります。
RabbitMQ: amqp://localhost
Radis : raddis://localhost

Celeryワーカーサーバーの実行

worker引数を指定してワーカーを実行します。

celery -A tasks worker --loglevel=INFO

利用可能なコマンドラインオプションを確認したければ

celery worker --help

その他のコマンドについて知りたければ

celery --help

タスクをコールする

delayメソッド
apply_async()メソッドをより簡単に使えるようにしたもの
タスクをバックグランドで実行できる。
AsyncResultインスタンスが返る、このインスタンスを通じてタスクの状態を確認できる。
タスクの状態確認:

  • ready()
    タスクの結果の取得:
  • get()
    例外処理:
    タスクが例外を発生させた場合、ge()メソッドはそのまま例外を再発生させるが、propagete引数をfalseにすることで、例外を抑制できる。

Resultを保持する

タスクの状態をトラックしたい場合、Celeryは状態を保持するか状態をどこかね送信する必要があります。いくつかのバックエンドがあり、SQLAlchemy/Django ORM, MongoDB, Mecached, Redis, RPC

RPCをバックエンドとして使用したい場合

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

Radisを使う場合

app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

バックエンドの設定が出来たら、現在のPythoセッションを閉じます。
tasksモジュールをインポートして、変更を反映させます。

```python
from tasks import add
result = add.delay(4,4)

ready()メソッドでタスクが完了しているか確認

result.ready()

結果が完了するまで待つこともできる。

result.get(timeout=1)

タスクが例外を発生する場合、get()は例外を再発生させるがpropagate引数で抑制できる

result.get(propagete=false)

タスクの例外のスタックトレースを確認する場合

result.traceback

設定

大半のユーザーにとってデフォルトの設定で十分

Configuration and defaultsで確認できる

設定方法
アプリケーションオブジェクトでの直接設定: app.conf オブジェクトを通して設定できます。例えば、タスクペイロードのシリアライザーを JSON に設定するには、app.conf.task_serializer = 'json' と記述します。複数の設定をまとめて行うには、app.conf.update() メソッドを使用できます。

設定モジュール: 大規模なプロジェクトでは、専用の設定モジュールを使用することが推奨されます。設定モジュールを使用すると、設定を一箇所に集中管理でき、コードの見通しがよくなります。

例えば、app.config_from_object('celeryconfig') を呼び出すことで、celeryconfig.py というモジュールから設定を読み込むことができます。

celeryconfig.py
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

ブローカーの URL、結果バックエンド、シリアライザー、タイムゾーンなどの設定を行っています

設定モジュールが正しく動作するかを確認する方法

設定ファイルに構文エラーがないかを確認するには、

python -m celeryconfig

コマンドを実行してモジュールをインポートしてみます

設定モジュールの読込

app.config._from_objectで読み込むことができる

app.config._from_object('celeryconfig')