👋

RedisとCeleryを用いたバックグラウンド処理を徹底解説: 基本知識から実装まで

2024/11/22に公開

はじめに

私は現在,大学4年生でWeb開発の長期インターンに参加しています.APIを用いてデータベースに保存する処理は長時間かかるため,バックグラウンド処理が必要になります.
かなり実装に手間取ってしまったため,同じようなバックグラウンド処理を行いたい人の参考になれば幸いです.

対象読者

  • バックエンドエンジニア
  • バックグラウンド処理とは何かを知りたい人

バックグラウンド処理とは

バックグラウンド処理とはユーザの操作(フロントエンドやAPIリクエスト)とは切り離されており,非同期で実行することができるので時間のかかる処理や即時応答が必要なタスクを効率的に処理するために用いられる.

今回実装した概要

APIを使ってデータを取得し,集計後にデータベースに保存する仕組みを実装した.APIから取得するデータは1万以上になることもあり,フロントエンドからのリクエストを受けて処理を開始し,途中結果を逐一返す必要がある(Herokuにデプロイしたのですが,30秒以内に応答しなければいけないっぽい)

実装の詳細

バックグラウンド処理では,長時間のかかる処理の進行状況を管理するために,「Redis」 がよく用いられる.

Redisとは

RedisとはデータをハードディスクやSSDなどのディスクではなく,メモリ上に保存するインメモリ型データストア(インメモリ型データストアとはRAM(メモリ)上にデータを保存することで、データの読み書きを高速化したデータストア)(データストとはキーと値のペアでデータを保存するデータベースの一種)
データの読み書きが非常に高速で,リアルタイム処理やキャッシュ,セッションの管理に適している.

Redisを用いたバックグラウンド処理の管理

例えば,APIを1万回リクエストを送る処理があった場合,この処理はフロントエンドからのリクエストを受けてバックグラウンド処理を開始する.
処理の進行状況の管理は以下のように行われる.

  1. 進行状況の保存
    Redisをキーと値の形式で利用し,以下の情報を保存
  • 進行状況のステータス(ex. PENDING, IN_PROGRESS, COMPLETED)
  • 処理の完了割合(1回/100回 = 1%)
  1. フロントエンドへの進行状況の通知
    フロントエンドからの進行状況を確認するリクエストが送られてきた際に,Redisに保存されている(進行状況のステータス,処理の完了割合)のペアで値を返す.
    ex. {"status": "IN_PROGRESS", "progress": "1%"}

Pythonを用いた実装

Celery を用いてRedisを扱っていく.
Celery とはバックグラウンドタスク処理をサポートするPythonライブラリで、タスクの非同期実行やスケジューリングに優れている.
Celeryを使用する理由はFastAPIを用いて開発しているため.

Celeryの実装ファイルは以下のようになります.

from celery import Celery

celery_app = Celery('analytics_app', include=['tasks.api_to_db', 'tasks.get_from_llm'])
redis_url = os.environ.get('REDIS_TLS_URL', 'redis://localhost:6379')
url = urlparse(redis_url)

celery_app.conf.update(
    # ブローカーの設定
    broker_url=f"{broker_url}/0",
    broker_use_ssl=broker_ssl_options,
    
    # バックエンドの設定
    result_backend=f"{broker_url}/1",
    redis_backend_use_ssl=broker_ssl_options, 
    
    # タイムアウトと再試行の設定
    broker_connection_retry=True,
    broker_connection_retry_on_startup=True,
    broker_connection_max_retries=5,
    broker_connection_timeout=30,
    
    # タスク設定
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    
    # ワーカー設定
    worker_max_tasks_per_child=1000,
    worker_prefetch_multiplier=1,
    result_expires=3600,
)

コードの詳細を解説

環境変数にRedisのURLが指定されている場合,それを用いるが,開発環境においてはlocalhostになるようにしている

redis_url = os.environ.get('REDIS_TLS_URL', 'redis://localhost:6379')

ブローカ:タスクをどの順番で処理されるかを制御
バックエンド:タスクの進行状況を保存する場所
URLの後の"0"とはRedisのデータベース番号を指す.
Redisには通常,15-16個のデータベースがあり,それぞれ独立したデータを保存する.

# ブローカーの設定
    broker_url=f"{broker_url}/0",
    broker_use_ssl=broker_ssl_options,

コードの残りの設定箇所は基本的なものであり,必要に応じて修正する必要あり

結論

Redisとはキーと値を保存できるメモリ上のデータベースであり,Celeryはバックグラウンド処理を行うためのPythonのライブラリであり,Redisを扱うことが可能.

Discussion