😎

Sidekiqを用いた一括処理の実現

2024/07/12に公開

はじめに

ecforceはエンドユーザーへの販売の仕組み、受注情報や顧客管理を行うことのできるEC構築システムです。
ecforceの管理画面では顧客や定期受注、受注について閲覧・操作ができます。
その中でも「一括で◯◯できる」という機能がいくつかあり、例えば顧客に対して一括でメールを送信したり、一括で決済処理したりできます。
このような一括系の処理は、 Sidekiq の有料版である Sidekiq Pro を使って実装しています。
Sidekiqを使った開発/運用はecforceを提供しはじめた当初から行っており、6〜7年が経過しました。
今回は、Sidekiqを用いた一括処理について、過去の障害や知見・ノウハウをご紹介します。

本題

登場人物

ecforceでのSidekiqは以下のような構成になっています。

  • Sidekiq::Batch
  • Sidekiq::Job
  • batchesレコード

Sidekiq::Batch

Sidekiq::Batchは、以下のようなオブジェクトや情報を管理する の役割を担います。

  • Sidekiq::Job(子)がどれか
  • Sidekiq::Job(子)が全て完了したあと、どのような後処理を実行するか
  • ステータス

Sidekiq::Job

Sidekiq::Jobは、与えられたパラメータを元に実際に処理を行います。
Sidekiq::Batch(親) に紐づいているものと、単独で存在する2パターンがあります。
重い処理を小分けにして実行したい場合や、実行結果を出力した場合は、Sidekiq::Batch(親)が存在しています。

batchesレコード

batchesレコードは、 bid(Sidekiq::BatchのID) やステータスを永続化し、管理画面からの閲覧や操作をしやすくするために作られたテーブルです。
また、ecforceの一括系の処理は最後に実行結果をCSV出力することが多いため、CSVを添付するためにも使っています。

処理の流れ

Sidekiq::Batchあり

一括系の処理を例として、処理の流れを説明します。

  1. 管理画面から対象顧客を指定し、「一括XXX」ボタンを押す
  2. Sidekiq::Batchが生成、Redisに登録 (このタイミングで bid が生成)
  3. bidをbatchesレコードに記録
  4. コールバックを指定 (on_complete)
  5. 500件ずつ区切り、Sidekiq::JobをRedisに登録 (perform_async)
  6. Sidekiq::Jobは、Sidekiq::Batchのステータスを確認 (valid_within_batch?) し、 perform メソッド内の処理を実行
  7. Sidekiq::BatchはSidekiq::Jobが全て処理を終えたことを確認し、コールバックを実行
AsyncJob::BulkUpdate.run(attributes)
class AsyncJob::BulkUpdate
  include Sidekiq::Worker

  def perform(params)
    return unless valid_within_batch?

    # メイン処理
    # (省略)
  end

  def self.run(attrs)
    sidekiq_batch = Sidekiq::Batch.new
    batch = Batch.create(bid: sidekiq_batch.bid, state: :start)

    sidekiq_batch.jobs do
      attrs.each do |params|
        perform_async(params)
      end
    end
  end
end

Sidekiq::Batchなし

Sidekiq::Batchがなく、単独で存在するJobの実行は、以下のように直接 perform_async を呼び出します。
Slack通知の非同期化を例に、処理の流れを説明します。

  1. 何らかのメソッドでSlack通知のJobをRedisに登録 (perform_async)
  2. perform メソッド内の処理を実行
SlackNotificationJob.perform_async(webhook_url)
class SlackNotificationJob
  include Sidekiq::Worker

  def perform(web_hook_url)
    slack_notification = ::Slack::Notifier.new(web_hook_url)
    slack_notification.ping('', attachments: attachment, username: username)
  end
end

新規のジョブを受け付けしないようにする方法

Sidekiqでは、新規のジョブを受け付けしないようにする方法が大きく2つあります。
それが QuietStop です。
ちなみにSidekiqのUIでは「処理終了」「すべて処理終了」が Quiet 、「停止」「すべて停止」が Stop に該当します。

Quiet

QuietはRedisのキューに入っているSidekiq::Jobが、 取り出されないように するための命令です。
したがってQuietしている最中に、一括XXXを実行したり非同期ジョブを登録したりすると、待機中のSidekiq::Jobが増え続けます。

Stop

Stopは、処理中のジョブを強制的に停止し、処理中かつ未完了のジョブを 再度Redisに戻す ための命令です。
たとえ途中まで実行されていたとしても、実行中にStopし再起動すると、Jobが最初から実行されることになります。
したがって、処理に冪等性がないと障害につながる可能性が高いです。

ジョブの緊急停止

クライアント様が誤って全件に対して一括XXXを行ってしまったり、検索条件を間違えて対象ではないデータに対して一括操作を行ってしまったりした場合、緊急で処理を停止してほしいという依頼がたまにあります。

NG: Redisに対してflushallを実行

Sidekiqのオブジェクトや情報は Redis で管理されています。
具体的には以下が全てRedisで管理されています。

  • キューの一覧
  • キューに溜まっていた待機中のジョブ
  • 実行中のジョブ
  • リトライになったジョブ
  • 成功/失敗したジョブ
  • Sidekiq::Batch
  • sidekiq-cronの情報

つまりRedisに対して flushall を実行してしまうと、Sidekiq関連の全ての情報が消失してしまいます。

ただSidekiqには、Redisを直接いじらず、Rubyクラス経由でジョブを停止したり削除したりできる機構がありますので、以下で説明します。

ジョブを緊急停止したい (Sidekiq::Batchあり)

こちらは Sidekiq::Batch(親) が存在する場合の停止方法です。
Sidekiq::Batchに紐づくジョブの一部が既に実行中で、かつ一部はキューで待機しているという想定です。
手順は以下のとおりです。

  1. SidekiqをQuiet
  2. batchesレコードのステータスをキャンセルに変更
  3. Sidekiq::Batch(親)を invalidate_all し、Sidekiq::Job(子)に処理をさせないようにする
  4. Sidekiqを再起動
  5. 実行中のジョブが再度キューに戻る
  6. Sidekiq::Job(子)が、Sidekiq::Batch(親)のステータスを確認 (valid_within_batch?) する
  7. valid_within_batch? がfalseを返し、ジョブが何も処理をせずに終了する
batch = Batch.find(<対象のID>)
batch.update(state: :canceled)
Sidekiq::Batch.new(batch.bid).invalidate_all
class AsyncJob::BulkUpdate
  include Sidekiq::Worker

  def perform(*args)
    return unless valid_within_batch? # ここでreturnされる
    # メイン処理 (省略)
  end
end

ジョブを緊急停止したい (Sidekiq::Batchなし)

こちらは Sidekiq::Batch(親) が存在しない場合の停止方法です。
Slack通知のジョブが大量にキューに待機していることを想定した手順でになります。

  1. SidekiqをQuiet
  2. 対象のジョブをキューから削除
  3. Sidekiqを再起動

特定のキュー内の指定したジョブのみを削除する場合は以下を実行します。

q = Sidekiq::Queue.new('low')
q.select{ |j| j.klass == 'xxx' }.size # 削除対象となるジョブの数を確認
q.select{ |j| j.klass == 'xxx' }.each(&:delete) # 削除

また、特定のキュー内を全て空にする場合は以下を実行します。

q = Sidekiq::Queue.new('xxx')
q.clear

後者の方が処理は速いものの、キューに待機しているジョブが他にも存在する可能性があるため、基本は前者を採用しています。

まとめ

いかがでしたでしょうか。
今回はSidekiqの各オブジェクトや運用についてまとめました。
Redisを直接いじるのは難易度が高い一方、SidekiqはRubyやUIを経由して様々な操作ができるため、運用ハードルが下がっていることを実感しています。

SUPER STUDIOの採用について

SUPER STUDIOでは、積極的にエンジニアを採用しています。
少しでも興味がありましたら、以下の記事をご覧ください。

https://hrmos.co/pages/superstudio/jobs/0010024
https://hrmos.co/pages/superstudio/jobs/0000400
https://hrmos.co/pages/superstudio/jobs/0000404
https://hrmos.co/pages/superstudio/jobs/0010025

また、下記はSUPER STUDIOで年に一度開催されるKICKOFFイベントにて、社内表彰されたエンジニアの受賞インタビューです。SUPER STUDIOのエンジニア組織についてより理解を深められる内容となっておりますので、ぜひご一読ください。

https://www.wantedly.com/companies/super-studio/post_articles/497997
https://www.wantedly.com/companies/super-studio/post_articles/487617

SUPER STUDIOテックブログ

Discussion