Celeryを使う
first step
このチュートリアルで得られるもの
- メッセージトランスポート(ブローカー)の選択とインストール
- Celeryのインストールと最初のタスクの作成
- ワーカーの起動とタスクの呼び出し
- タスクがさまざまな状態に移行する際の追跡と、戻り値の検査
ブローカーの選択
利用可能なブローカー
- RabbitMQ
- Redis
- AmazonSQS
celeryのインストール
pip install celery
アプリケーション
Celery インスタンス作成
まず、Celeryインスタンスを作成(Celery アプリケーション)
このインスタンスがエントリーポイントとして使用される
このインスタンスを通じてタスクを作成したり、ワーカーを管理します。
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@task
def app(x, y):
return x + y
Celeryのコンストラクタの最初の引数は、モジュール名を指定します。
この例ではtasks.pyがモジュールなので、"tasks"を引数に渡しています。
2つの目の引数では、ブローカーを指定するためのURLになります。
RabbitMQ: amqp://localhost
Radis : raddis://localhost
Celeryワーカーサーバーの実行
worker引数を指定してワーカーを実行します。
celery -A tasks worker --loglevel=INFO
利用可能なコマンドラインオプションを確認したければ
celery worker --help
その他のコマンドについて知りたければ
celery --help
タスクをコールする
delayメソッド
apply_async()メソッドをより簡単に使えるようにしたもの
タスクをバックグランドで実行できる。
AsyncResultインスタンスが返る、このインスタンスを通じてタスクの状態を確認できる。
タスクの状態確認:
- ready()
タスクの結果の取得: - get()
例外処理:
タスクが例外を発生させた場合、ge()メソッドはそのまま例外を再発生させるが、propagete引数をfalseにすることで、例外を抑制できる。
Resultを保持する
タスクの状態をトラックしたい場合、Celeryは状態を保持するか状態をどこかね送信する必要があります。いくつかのバックエンドがあり、SQLAlchemy/Django ORM, MongoDB, Mecached, Redis, RPC
RPCをバックエンドとして使用したい場合
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
Radisを使う場合
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')
バックエンドの設定が出来たら、現在のPythoセッションを閉じます。
tasksモジュールをインポートして、変更を反映させます。
```python
from tasks import add
result = add.delay(4,4)
ready()メソッドでタスクが完了しているか確認
result.ready()
結果が完了するまで待つこともできる。
result.get(timeout=1)
タスクが例外を発生する場合、get()は例外を再発生させるがpropagate引数で抑制できる
result.get(propagete=false)
タスクの例外のスタックトレースを確認する場合
result.traceback
設定
大半のユーザーにとってデフォルトの設定で十分
Configuration and defaultsで確認できる
設定方法
アプリケーションオブジェクトでの直接設定: app.conf オブジェクトを通して設定できます。例えば、タスクペイロードのシリアライザーを JSON に設定するには、app.conf.task_serializer = 'json' と記述します。複数の設定をまとめて行うには、app.conf.update() メソッドを使用できます。
設定モジュール: 大規模なプロジェクトでは、専用の設定モジュールを使用することが推奨されます。設定モジュールを使用すると、設定を一箇所に集中管理でき、コードの見通しがよくなります。
例えば、app.config_from_object('celeryconfig') を呼び出すことで、celeryconfig.py というモジュールから設定を読み込むことができます。
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
ブローカーの URL、結果バックエンド、シリアライザー、タイムゾーンなどの設定を行っています
設定モジュールが正しく動作するかを確認する方法
設定ファイルに構文エラーがないかを確認するには、
python -m celeryconfig
コマンドを実行してモジュールをインポートしてみます
設定モジュールの読込
app.config._from_objectで読み込むことができる
app.config._from_object('celeryconfig')