🦁

初心者向け Celery 入門ガイド

に公開

初心者向け Celery 入門ガイド

Webアプリケーションや業務システムを作っていると、どうしても 「時間がかかる処理」 が出てきてしまいますよね?
例えば、ユーザー登録後のメール送信、画像のリサイズ、外部APIとの通信、大量データの集計など、これらをリクエスト処理の中で直に行うと、レスポンスが遅くなりUXを損ねます。

特に、昨今、大規模言語モデルとのAPI連携が盛んに行われています。単純なストリーミングチャットなどであれば良いですが、例えばAIエージェントとの連携 であったり、フロントエンド側から実行待ち状態にしなくてはならないタスクがある場合などには、リクエスト処理に含めるのはあまり良い構成とは言えません。

そこで役立つのが Celery です。Celery は Python 製の分散タスクキューで、処理をバックグラウンドに回したり、スケジュール実行したり、複数ワーカーに分散したりすることが可能です。
公式ドキュメントも「小さなツールで大きな可能性」と表現しているとおり、規模の大小を問わず活躍します。
https://docs.celeryq.dev/en/v5.5.3/getting-started/introduction.html

本記事では、初心者向けに、Celery の基礎から実践的な使い方までを整理し、解説したいと思います。

Celery の基本構造

実際の使用方法の説明に入る前に、Celeryの基本構造を見ておきましょう。ここを先に抑えておくことで理解しやすくなると思います。
Celery では大きく分けて3つの要素が登場します。

  1. Broker(ブローカー)
    役割:タスクを一時的に預かる「受付係」
    Broker は、アプリケーションから渡された「やってほしい仕事(タスク)」を受け取って、キューに積んで保管します。
    Celery ではこの役割を Redis や RabbitMQ といった専用ソフトが担います。

  2. Worker(ワーカー)
    役割:タスクを実際に実行する「作業員」
    Worker は Broker からタスクを取り出し、プログラムとして実行します。後ほど詳しく解説しますが、Python で定義した関数に @app.task を付けておくと、それがワーカーにとって「やるべき仕事」になります。

    たとえば「メールを送る」というタスクを登録しておけば、ワーカーは裏でその関数を呼び出し、メールを実際に送信します。ワーカーは複数立ち上げることができ、処理を並列に進めることも可能です。

  3. Result Backend(結果バックエンド)
    役割:タスクの実行結果を保存する「記録係」
    タスクの実行結果が必要な場合、その結果を保存しておく仕組みが Result Backend です。こちらも Redis やデータベースを使うことができます。

    例えば「計算タスクを投げて、終わったら結果を受け取りたい」という場合に使います。一方で「結果は特に必要ない。終わってさえくれればいい」というタスク(例:通知メール送信)であれば、Result Backend を省略することも可能です。

環境構築と初期設定

  1. 事前にredisをインストールし、起動しておきます。
    MacOSの場合

    brew install redis
    brew services start redis
    

    Linuxの場合

    sudo apt-get install redis-server
    sudo service redis-server start
    
  2. 次にpythonライブラリをインストールします。

    pip install celery[redis]
    
  3. タスクを登録しましょう。tasks.pyというファイルを定義し、以下のように記述します。

    # demo.py
    from celery import Celery
    import time
    
    # Celery アプリケーションの生成
    celery_app = Celery(
        "demo",
        broker="redis://localhost:6379/0",   # Redis を Broker に指定
        backend="redis://localhost:6379/0"   # 実行結果も Redis に保存
    )
    
    # 時間のかかる処理を Celery タスク化
    @celery_app.task
    def long_task(x):
        time.sleep(10)  # 擬似的に10秒かかる処理
        return f"10秒待ちました! 入力は {x} でした"
    

    celeryに実行させたい処理は関数として定義し、@celery_app.task デコレーターをつける必要があります。これによってタスク化できます。

  4. 新規ターミナルで Celery ワーカーを起動します

    celery -A tasks worker --loglevel=info
    
  5. Pythonインタプリタからタスクを呼び出します。

    from tasks import long_task
     
    result = long_task.delay(42)
    print(result.get())  # 10秒後に "10秒待ったよ! 入力は 42 でした" が返る
    

    celeryタスクは、{先ほど定義した関数名}.delay(引数)の形で実行します。delay()はタスクを非同期キューへ積み、裏でワーカーが実行するように登録する処理です。

タスクの再試行とエラーハンドリング

実際の運用では「APIサーバーが落ちている」「ネットワークが不安定」といったことが起こることもあるでしょう。そのため、Celery には リトライ機能 が備わっています。
以下、メール送信タスクを複数リトライする例です。

@celery_app.task(bind=True, default_retry_delay=600, max_retries=3)
def send_mail_task(self, to, subject, body):
    try:
        send_mail(subject, body, 'noreply@example.com', [to])
    except Exception as exc:
        raise self.retry(exc=exc)

この例では、失敗した場合に600秒(10分)ごとに最大3回までタスクを再試行します。

定期実行(スケジューリング)

Linux の cron のように「月曜7時に処理を実行したい」「1分おきに実行したい」といったニーズも Celery で対応することが可能です。
そのためには、Beatを起動する必要がありますので、CeleryのWorker起動コマンドに以下のように-Bオプションを付けます。

celery -A tasks worker -B --loglevel=info

Pythonコードの例は以下のようになります。

# 毎週月曜の7時に実行
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    'monday-report': {
        'task': 'tasks.send_weekly_report',
        'schedule': crontab(day_of_week=1, hour=7),
    },
}
celery_app.conf.timezone = 'Asia/Tokyo'
#1分おきに実行
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    'monday-report': {
        'task': 'tasks.long_task',
        'schedule': crontab(minute='*/1'),  # 1分ごとに実行
        'args': (42,)
    },
}

注意点として、これらは、tasks.pyに記述しておく必要があります。別ファイルの場合、必ずtasks.pyimportするように設定しないと動きません。

遅延実行(一定時間後に実行)

「ユーザー登録から15分後に確認メールを送りたい」といった場合は apply_async を使います。

send_mail_task.apply_async(
    args=('user@example.com', 'Hello', 'Thanks for joining'),
    countdown=900  # 15分後
)

または実行日時を直接指定することも可能です。

send_mail_task.apply_async(
    args=('user@example.com', 'Hello', 'Thanks for joining'),
    countdown=900  # 15分後
)

キューの分離とスケーリング

タスクが増えてくると「重要な処理が他の重い処理に阻害される」ことがあります。Celery ではタスクを別キューに振り分けて専用ワーカーで処理できます。

app.conf.task_routes = {
    'tasks.send_mail_task': {'queue': 'mail'},
    'tasks.generate_report': {'queue': 'reports'},
}

その場合、ワーカーはキューごとに起動します。

celery -A tasks worker -Q mail -l info
celery -A tasks worker -Q reports -l info

こうすることで、用途ごとに処理を分散できます。

タスクのグループ化と結果の収集

from celery import group

@celery_app.task
def add(x, y):
    return x + y

@celery_app.task
def run_group():
    job = group(add.s(i, i) for i in range(10))()
    return job.get()

並列処理した結果を .get() でまとめて取得できます。

まとめ

以上、Celery の基本構造(Broker / Worker / Backend)、タスクの定義と実行方法、定期実行や遅延実行の仕組みを確認しました。

最初は環境構築や設定の場所で戸惑うかもしれませんが、

  • まずは delay() で非同期処理を体験する
  • Beat を使って定期実行を試す
  • 複数のワーカーやキューに広げていく

と段階的に進めれば、確実に理解が深まります。

Celery はシンプルなサンプルコードから始めても、実運用レベルまでスケールさせられる柔軟なツールです。
今回紹介したポイントを押さえて、ぜひ自分のプロジェクトでも試してみてください。

Discussion