🐡

Sidekiq Batchesの紹介|Offers Tech Blog

2023/12/05に公開

はじめに

こんにちは!
Offers, Offers MGR を運営している株式会社 overflow のバックエンドエンジニアの takkun7171 です。龍が如く 7 外伝を最近クリアしまして、エンディングで感動してしまいました。。やはり桐生ちゃん最高ですね。

今回は最近業務で使うことが多い Sidekiq Batches の紹介をしたいと思います。
内容はシンプルですが、日本語の記事が少なかったのでちょうど良いのかなと思っております。

Sidekiq EnterpriseとProについて

Sidekiq は言わずと知れた Ruby 製の非同期処理フレームワークですが、Sidekiq Enterprise という
エンタープライズ向けの機能拡張が行われた有料版の Sidekiq があります。

Sidekiq Enterprise を導入すると Sidekiq Pro の機能も使用できるようになるのですが、その
Pro の中の機能として Sidekiq Batches が提供されています。

Sidekiq Enterprise の導入方法はこちらの記事が詳しいです。ご興味ある方は参照してください。
https://tech.medpeer.co.jp/entry/2020/04/01/090000

Sidekiq Batchesとは

それで Sidekiq Batches とは何なのかですが、
Sidekiq Batches は Batch として複数の Job の実行をまとめられる機能で、
これにより並行処理だけでなく直列で処理できます。

具体的には A の処理が完全に処理されてから B の処理を実行する、
のような複雑なワークフロー処理が可能です。

こいつを使えば、
集計処理などのような順番がキーとなる
複雑な処理も簡単にさばけます。

Sidekiq Batches の概要
https://github.com/sidekiq/sidekiq/wiki/Batches

サンプルを使った説明

下記の記事は具体的な使用方法に踏み込んでおり、良いサンプルです。
まずはこちらの記事を読んでもらうと、以降の説明が理解しやすと思います。
https://github.com/sidekiq/sidekiq/wiki/Really-Complex-Workflows-with-Batches

下図の青い部分が job です。
オレンジ色のボックスが並列処理部分で、こちらが全て完了すると次の処理に移行します。
Sidekiq Batches の workflow のサンプル

記事の内容を踏まえて
なるべく分かりやすく実装したのが、以下のコードです。

test_service.rb
class TestService
  def initialize(oid:)
    @oid = oid
  end

  def execute
    overall = Sidekiq::Batch.new
    overall.on(:success, "TestService::FulfillmentCallbacks#shipped", 'oid' => @oid)
    overall.description = "Fulfillment for #{@oid}"
    overall.jobs do
      StartWorkflow.perform_async(@oid)
    end
  end

  class StartWorkflow
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: :default

    def perform(oid)
      batch&.jobs do
        step1 = Sidekiq::Batch.new
        step1.on(:success, 'TestService::FulfillmentCallbacks#step1_done', 'oid' => oid)
        step1.jobs do
          AWorker.perform_async
        end
      end
    end
  end

  class FulfillmentCallbacks
    def step1_done(status, arguments)
      oid = arguments['oid']
      overall = Sidekiq::Batch.new(status.parent_bid)
      overall.jobs do
        step2 = Sidekiq::Batch.new
        step2.on(:success, 'TestService::FulfillmentCallbacks#step2_done', 'oid' => oid)
        step2.jobs do
          # ここは並列で処理されるのでB->C->D..などの順番で処理されるわけではない
          BWorker.perform_async
          CWorker.perform_async
          DWorker.perform_async
          EWorker.perform_async
          FWorker.perform_async
        end
      end
    end

    def step2_done(status, arguments)
      oid = arguments['oid']
      overall = Sidekiq::Batch.new(status.parent_bid)
      overall.jobs do
        step3 = Sidekiq::Batch.new
        step3.on(:success, 'TestService::FulfillmentCallbacks#step3_done', 'oid' => oid)
        step3.jobs do
          GWorker.perform_async if rand(2) == 1

          # GWorkerのjobがキューに追加されるとは限らず、もし追加されなければstep3_doneが呼び出されない
          # 対策として何もしないNullWorkerを呼び出し、step3_doneが呼び出されることを確定させる
          NullWorker.perform_async
        end
      end
    end

    def step3_done(status, arguments)
      oid = arguments['oid']
      overall = Sidekiq::Batch.new(status.parent_bid)
      overall.jobs do
        step4 = Sidekiq::Batch.new
        step4.on(:success, 'TestService::FulfillmentCallbacks#step4_done', 'oid' => oid)
        step4.jobs do
          HWorker.perform_async
          IWorker.perform_async
        end
      end
    end

    def step4_done(status, arguments)
      oid = arguments['oid']
      overall = Sidekiq::Batch.new(status.parent_bid)
      overall.jobs do
        JWorker.perform_async
        KWorker.perform_async
        LWorker.perform_async
      end
    end

    def shipped(status, arguments)
      pp arguments
      pp "shipped"
    end
  end

  class AWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: :default
  
    def perform
      pp "A"
    end
  end

  class BWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: :default
  
    def perform
      pp "B"
    end
  end

  class CWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: :default
  
    def perform
      pp "C"
    end
  end

  class DWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: :default
  
    def perform
      pp "D"
    end
  end

  class EWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: :default
  
    def perform
      pp "E"
    end
  end

  class FWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: :default
  
    def perform
      pp "F"
    end
  end

  class GWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: :default
  
    def perform
      pp "G"
    end
  end

  class HWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: :default
  
    def perform
      pp "H"
    end
  end

  class IWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: :default

    def perform
      pp "I"
    end
  end

  class JWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: :default
  
    def perform
      pp "J"
    end
  end

  class KWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: :default
  
    def perform
      pp "K"
    end
  end

  class LWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: :default
  
    def perform
      pp "L"
      # do stuff
      if bid
        # if we belong to a batch, assume we're within the fulfillment workflow
        # and need to kick off job M
        batch.jobs do
          MWorker.perform_async
        end
      end
    end
  end

  class MWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: :default
  
    def perform
      pp "M"
    end
  end

  class NullWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: :default
  
    def perform
      pp "NULLなので何もしないよん"
    end
  end
end

実行結果のログは以下の通りです

TestService.new(oid: 1).execute
2023-11-20T17:40:09.374Z pid=1 tid=3uip class=TestService::StartWorkflow jid=7113e0a849b44d7985c6c44e bid=0319-6rBoraknQ INFO: start
2023-11-20T17:40:09.453Z pid=1 tid=3ujt class=TestService::AWorker jid=31b8a46f4165696f40a1469a bid=C8apSx0mNMTS6w INFO: start
2023-11-20T17:40:09.454Z pid=1 tid=3uip class=TestService::StartWorkflow jid=7113e0a849b44d7985c6c44e bid=0319-6rBoraknQ elapsed=0.08 INFO: done
"A"
2023-11-20T17:40:09.531Z pid=1 tid=46pl class=Sidekiq::Batch::Callback jid=aca1365c87bd00d9c95c52b1 INFO: start
2023-11-20T17:40:09.532Z pid=1 tid=3ujt class=TestService::AWorker jid=31b8a46f4165696f40a1469a bid=C8apSx0mNMTS6w elapsed=0.079 INFO: done
2023-11-20T17:40:09.596Z pid=1 tid=3z5d class=Sidekiq::Batch::Callback jid=b0b66467310118213dfe851f INFO: start
2023-11-20T17:40:09.597Z pid=1 tid=3xyl class=Sidekiq::Batch::Callback jid=6ffd82b057eeadec1801ba36 INFO: start
2023-11-20T17:40:09.597Z pid=1 tid=46pl class=Sidekiq::Batch::Callback jid=aca1365c87bd00d9c95c52b1 elapsed=0.066 INFO: done
2023-11-20T17:40:09.705Z pid=1 tid=3z5d class=Sidekiq::Batch::Callback jid=b0b66467310118213dfe851f elapsed=0.11 INFO: done
2023-11-20T17:40:09.716Z pid=1 tid=4601 class=TestService::FWorker jid=f03dca722f1f543d2340d508 bid=pj_vHj2cgcTFJw INFO: start
2023-11-20T17:40:09.716Z pid=1 tid=3z49 class=TestService::DWorker jid=28922246a8a5dd819890f6bf bid=pj_vHj2cgcTFJw INFO: start
2023-11-20T17:40:09.717Z pid=1 tid=46nd class=TestService::EWorker jid=1196f25ead345d09749fdd6f bid=pj_vHj2cgcTFJw INFO: start
2023-11-20T17:40:09.717Z pid=1 tid=45yx class=TestService::BWorker jid=398a74386963999a6b439aeb bid=pj_vHj2cgcTFJw INFO: start
2023-11-20T17:40:09.719Z pid=1 tid=45zh class=TestService::CWorker jid=69d436dab3d3e81e01fa540d bid=pj_vHj2cgcTFJw INFO: start
2023-11-20T17:40:09.720Z pid=1 tid=3xyl class=Sidekiq::Batch::Callback jid=6ffd82b057eeadec1801ba36 elapsed=0.123 INFO: done
"F"
2023-11-20T17:40:09.905Z pid=1 tid=4601 class=TestService::FWorker jid=f03dca722f1f543d2340d508 bid=pj_vHj2cgcTFJw elapsed=0.188 INFO: done
"D"
2023-11-20T17:40:09.976Z pid=1 tid=3z49 class=TestService::DWorker jid=28922246a8a5dd819890f6bf bid=pj_vHj2cgcTFJw elapsed=0.26 INFO: done
"E"
2023-11-20T17:40:09.999Z pid=1 tid=46nd class=TestService::EWorker jid=1196f25ead345d09749fdd6f bid=pj_vHj2cgcTFJw elapsed=0.282 INFO: done
"C"
2023-11-20T17:40:10.007Z pid=1 tid=45zh class=TestService::CWorker jid=69d436dab3d3e81e01fa540d bid=pj_vHj2cgcTFJw elapsed=0.288 INFO: done
"B"
2023-11-20T17:40:10.008Z pid=1 tid=3uip class=Sidekiq::Batch::Callback jid=d4ea6edae4edeb7b685f00da INFO: start
2023-11-20T17:40:10.008Z pid=1 tid=45yx class=TestService::BWorker jid=398a74386963999a6b439aeb bid=pj_vHj2cgcTFJw elapsed=0.291 INFO: done
2023-11-20T17:40:10.073Z pid=1 tid=3ujt class=Sidekiq::Batch::Callback jid=cb196a22464e1d27be7d3f0e INFO: start
2023-11-20T17:40:10.073Z pid=1 tid=3uip class=Sidekiq::Batch::Callback jid=d4ea6edae4edeb7b685f00da elapsed=0.065 INFO: done
2023-11-20T17:40:10.129Z pid=1 tid=46pl class=TestService::NullWorker jid=8113832b2630b5a86fb9e2da bid=0XzmoKIMj5RrlA INFO: start
2023-11-20T17:40:10.130Z pid=1 tid=3ujt class=Sidekiq::Batch::Callback jid=cb196a22464e1d27be7d3f0e elapsed=0.057 INFO: done
"NULLなので何もしないよん"
2023-11-20T17:40:10.224Z pid=1 tid=46pl class=TestService::NullWorker jid=8113832b2630b5a86fb9e2da bid=0XzmoKIMj5RrlA elapsed=0.095 INFO: done
2023-11-20T17:40:10.224Z pid=1 tid=3z5d class=Sidekiq::Batch::Callback jid=e9c186dfc1bc3e44b9422e9c INFO: start
2023-11-20T17:40:10.282Z pid=1 tid=3xyl class=Sidekiq::Batch::Callback jid=a3aca55d65e5d53504989c97 INFO: start
2023-11-20T17:40:10.282Z pid=1 tid=3z5d class=Sidekiq::Batch::Callback jid=e9c186dfc1bc3e44b9422e9c elapsed=0.058 INFO: done
2023-11-20T17:40:10.337Z pid=1 tid=3z49 class=TestService::IWorker jid=5e73bb32de31736277e0c243 bid=4mtlklFF_8ZSgA INFO: start
2023-11-20T17:40:10.337Z pid=1 tid=4601 class=TestService::HWorker jid=5c77b2260e4eeeb27721092e bid=4mtlklFF_8ZSgA INFO: start
2023-11-20T17:40:10.337Z pid=1 tid=3xyl class=Sidekiq::Batch::Callback jid=a3aca55d65e5d53504989c97 elapsed=0.055 INFO: done
"H"
2023-11-20T17:40:10.416Z pid=1 tid=4601 class=TestService::HWorker jid=5c77b2260e4eeeb27721092e bid=4mtlklFF_8ZSgA elapsed=0.079 INFO: done
"I"
2023-11-20T17:40:10.434Z pid=1 tid=46nd class=Sidekiq::Batch::Callback jid=98f27b599ab60adfcc7958b1 INFO: start
2023-11-20T17:40:10.435Z pid=1 tid=3z49 class=TestService::IWorker jid=5e73bb32de31736277e0c243 bid=4mtlklFF_8ZSgA elapsed=0.098 INFO: done
2023-11-20T17:40:10.498Z pid=1 tid=45zh class=Sidekiq::Batch::Callback jid=7d4a98dabde86439c5148efb INFO: start
2023-11-20T17:40:10.498Z pid=1 tid=46nd class=Sidekiq::Batch::Callback jid=98f27b599ab60adfcc7958b1 elapsed=0.063 INFO: done
2023-11-20T17:40:10.574Z pid=1 tid=3ujt class=TestService::LWorker jid=deded5257663d75dc2ea8fb6 bid=0319-6rBoraknQ INFO: start
2023-11-20T17:40:10.574Z pid=1 tid=3uip class=TestService::KWorker jid=9e6738586497424425e78dee bid=0319-6rBoraknQ INFO: start
2023-11-20T17:40:10.575Z pid=1 tid=45yx class=TestService::JWorker jid=1f10dab3211a6bfd87ff3968 bid=0319-6rBoraknQ INFO: start
2023-11-20T17:40:10.575Z pid=1 tid=45zh class=Sidekiq::Batch::Callback jid=7d4a98dabde86439c5148efb elapsed=0.078 INFO: done
"J"
2023-11-20T17:40:10.736Z pid=1 tid=45yx class=TestService::JWorker jid=1f10dab3211a6bfd87ff3968 bid=0319-6rBoraknQ elapsed=0.161 INFO: done
"K"
2023-11-20T17:40:10.736Z pid=1 tid=3uip class=TestService::KWorker jid=9e6738586497424425e78dee bid=0319-6rBoraknQ elapsed=0.162 INFO: done
"L"
2023-11-20T17:40:10.738Z pid=1 tid=46pl class=TestService::MWorker jid=1838d64f780c3dcc38601be8 bid=0319-6rBoraknQ INFO: start
2023-11-20T17:40:10.739Z pid=1 tid=3ujt class=TestService::LWorker jid=deded5257663d75dc2ea8fb6 bid=0319-6rBoraknQ elapsed=0.164 INFO: done
"M"
2023-11-20T17:40:10.794Z pid=1 tid=46pl class=TestService::MWorker jid=1838d64f780c3dcc38601be8 bid=0319-6rBoraknQ elapsed=0.055 INFO: done
2023-11-20T17:40:10.794Z pid=1 tid=3z5d class=Sidekiq::Batch::Callback jid=ce4b2621d24a30306b63b223 INFO: start
{"oid"=>1}
"shipped"
2023-11-20T17:40:10.848Z pid=1 tid=3z5d class=Sidekiq::Batch::Callback jid=ce4b2621d24a30306b63b223 elapsed=0.054 INFO: done

図のように直列であって欲しい所は直列に、並列で問題ない所は並列に
ワークフローに従って処理されました。非常に便利です。

注意点の解説

基本的な説明は元記事を見てもらうとして、
ここでは注意点の解説をします。

StartWorkflow や AWorker のように、worker を呼びだしワークフロー処理を行いたい場合は
batch に大元の Sidekiq::Batch.new
bid に batch の id がセットされているので、
以下のように batch.jobs で囲みます。

もし worker を通常状態でも呼び出すことがありうるなら
bid に batch の id がセットされてるので
以下のように bid の有無で処理を変更します。

def perform(oid)
  if bid
    batch.jobs do
      step1 = Sidekiq::Batch.new
      step1.on(:success, 'FulfillmentCallbacks#step1_done', 'oid' => oid)
      step1.jobs do
        A.perform_async(oid)
      end
    end
  end
end

step1_done のようにコールバックを呼ぶ場合は
最初の引数 status がバッチの status オブジェクトで、2 番目の引数 arguments が
コールバックの宣言時に指定したオプションです。

status に parent_bid があるのでそれを
Sidekiq::Batch.new の引数にすれば、
batch と同じになり、以下のように囲みます。

更にワークフローを追加する場合はブロックの中で Sidekiq::Batch.new を使って囲みます。
なおコメントにあるように BWorker や CWorker は直列処理ではなく並列処理です。
順番に処理される保証はなく、非同期で処理されるので誤解してはいけません。

def step1_done(status, arguments)
  oid = arguments['oid']
  overall = Sidekiq::Batch.new(status.parent_bid)
  overall.jobs do
    step2 = Sidekiq::Batch.new
    step2.on(:success, 'TestService::FulfillmentCallbacks#step2_done', 'oid' => oid)
    step2.jobs do
      # ここは並列で処理されるのでB->C->D..などの順番で処理されるわけではない
      BWorker.perform_async
      CWorker.perform_async
      DWorker.perform_async
      EWorker.perform_async
      FWorker.perform_async
    end
  end
end

以下のようなコードの場合、
User.where(company_id: 1)の結果がブランクだと、
job が 1 つも追加されず、step1_done が呼び出されないという罠があります。

それを防止するために User.where(company_id: 1)の結果がブランクの可能性があるなら、
少なくとも 1 つの job を確保するために、
NullWorker のようなダミーの worker を呼び出す必要があります。

batch.jobs do
  step1 = Sidekiq::Batch.new
  step1.on(:success, 'FulfillmentCallbacks#step1_done', 'oid' => order_id)
  step1.jobs do
    User.where(company_id: 1).each do |row|
      CWorker.perform_async(row.id)
    end

    # 少なくとも1つのjobを確保
    NullWorker.perform_async
  end
end

その他 Tips

step1.on(:success, 'FulfillmentCallbacks#step1_done', 'oid' => order_id)
のような on の引数は success、complete、death があります。

success は全ての job が成功した場合
complete はバッチ内のすべてのジョブが 1 回実行されたとき(成功か失敗かは無関係)
death はバッチ内のジョブが初めて死んだ時に
メソッドが呼び出されます。

https://github.com/sidekiq/sidekiq/wiki/Batches#callbacks

まとめ

簡単ですが、Sidekiq Batches の紹介でした。
システムが大きくなってきて、ちょっと複雑なワークフロー処理を行いたい場合は
Sidekiq の有料版に登録して、ぜひ Sidekiq Batches を使ってみてください。
最初は取っ付きづらいですが、慣れると便利です。

関連記事

https://zenn.dev/overflow_offers/articles/20230216-how-to-create-batch-in-rails
https://zenn.dev/overflow_offers/articles/20230130-how-to-use-sidekiq
https://zenn.dev/offers/articles/20220425-universal-attitude

エンジニア採用強化中

株式会社 overflow では Offers の開発メンバーを大募集中です。正社員はもちろん、副業でのジョインも歓迎です。とりあえず話を聞いてみたい!という方には カジュアル面談 がオススメです。

https://jobs.overflow.co.jp

Offers Tech Blog

Discussion