Sidekiq Batchesの紹介|Offers Tech Blog
はじめに
こんにちは!
Offers, Offers MGR を運営している株式会社 overflow のバックエンドエンジニアの takkun7171 です。龍が如く 7 外伝を最近クリアしまして、エンディングで感動してしまいました。。やはり桐生ちゃん最高ですね。
今回は最近業務で使うことが多い Sidekiq Batches の紹介をしたいと思います。
内容はシンプルですが、日本語の記事が少なかったのでちょうど良いのかなと思っております。
Sidekiq EnterpriseとProについて
Sidekiq は言わずと知れた Ruby 製の非同期処理フレームワークですが、Sidekiq Enterprise という
エンタープライズ向けの機能拡張が行われた有料版の Sidekiq があります。
Sidekiq Enterprise を導入すると Sidekiq Pro の機能も使用できるようになるのですが、その
Pro の中の機能として Sidekiq Batches が提供されています。
Sidekiq Enterprise の導入方法はこちらの記事が詳しいです。ご興味ある方は参照してください。
Sidekiq Batchesとは
それで Sidekiq Batches とは何なのかですが、
Sidekiq Batches は Batch として複数の Job の実行をまとめられる機能で、
これにより並行処理だけでなく直列で処理できます。
具体的には A の処理が完全に処理されてから B の処理を実行する、
のような複雑なワークフロー処理が可能です。
こいつを使えば、
集計処理などのような順番がキーとなる
複雑な処理も簡単にさばけます。
Sidekiq Batches の概要
サンプルを使った説明
下記の記事は具体的な使用方法に踏み込んでおり、良いサンプルです。
まずはこちらの記事を読んでもらうと、以降の説明が理解しやすと思います。
下図の青い部分が job です。
オレンジ色のボックスが並列処理部分で、こちらが全て完了すると次の処理に移行します。
記事の内容を踏まえて
なるべく分かりやすく実装したのが、以下のコードです。
class TestService
def initialize(oid:)
@oid = oid
end
def execute
overall = Sidekiq::Batch.new
overall.on(:success, "TestService::FulfillmentCallbacks#shipped", 'oid' => @oid)
overall.description = "Fulfillment for #{@oid}"
overall.jobs do
StartWorkflow.perform_async(@oid)
end
end
class StartWorkflow
include Sidekiq::Worker
sidekiq_options retry: 2, queue: :default
def perform(oid)
batch&.jobs do
step1 = Sidekiq::Batch.new
step1.on(:success, 'TestService::FulfillmentCallbacks#step1_done', 'oid' => oid)
step1.jobs do
AWorker.perform_async
end
end
end
end
class FulfillmentCallbacks
def step1_done(status, arguments)
oid = arguments['oid']
overall = Sidekiq::Batch.new(status.parent_bid)
overall.jobs do
step2 = Sidekiq::Batch.new
step2.on(:success, 'TestService::FulfillmentCallbacks#step2_done', 'oid' => oid)
step2.jobs do
# ここは並列で処理されるのでB->C->D..などの順番で処理されるわけではない
BWorker.perform_async
CWorker.perform_async
DWorker.perform_async
EWorker.perform_async
FWorker.perform_async
end
end
end
def step2_done(status, arguments)
oid = arguments['oid']
overall = Sidekiq::Batch.new(status.parent_bid)
overall.jobs do
step3 = Sidekiq::Batch.new
step3.on(:success, 'TestService::FulfillmentCallbacks#step3_done', 'oid' => oid)
step3.jobs do
GWorker.perform_async if rand(2) == 1
# GWorkerのjobがキューに追加されるとは限らず、もし追加されなければstep3_doneが呼び出されない
# 対策として何もしないNullWorkerを呼び出し、step3_doneが呼び出されることを確定させる
NullWorker.perform_async
end
end
end
def step3_done(status, arguments)
oid = arguments['oid']
overall = Sidekiq::Batch.new(status.parent_bid)
overall.jobs do
step4 = Sidekiq::Batch.new
step4.on(:success, 'TestService::FulfillmentCallbacks#step4_done', 'oid' => oid)
step4.jobs do
HWorker.perform_async
IWorker.perform_async
end
end
end
def step4_done(status, arguments)
oid = arguments['oid']
overall = Sidekiq::Batch.new(status.parent_bid)
overall.jobs do
JWorker.perform_async
KWorker.perform_async
LWorker.perform_async
end
end
def shipped(status, arguments)
pp arguments
pp "shipped"
end
end
class AWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: :default
def perform
pp "A"
end
end
class BWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: :default
def perform
pp "B"
end
end
class CWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: :default
def perform
pp "C"
end
end
class DWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: :default
def perform
pp "D"
end
end
class EWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: :default
def perform
pp "E"
end
end
class FWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: :default
def perform
pp "F"
end
end
class GWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: :default
def perform
pp "G"
end
end
class HWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: :default
def perform
pp "H"
end
end
class IWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: :default
def perform
pp "I"
end
end
class JWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: :default
def perform
pp "J"
end
end
class KWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: :default
def perform
pp "K"
end
end
class LWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: :default
def perform
pp "L"
# do stuff
if bid
# if we belong to a batch, assume we're within the fulfillment workflow
# and need to kick off job M
batch.jobs do
MWorker.perform_async
end
end
end
end
class MWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: :default
def perform
pp "M"
end
end
class NullWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: :default
def perform
pp "NULLなので何もしないよん"
end
end
end
実行結果のログは以下の通りです
TestService.new(oid: 1).execute
2023-11-20T17:40:09.374Z pid=1 tid=3uip class=TestService::StartWorkflow jid=7113e0a849b44d7985c6c44e bid=0319-6rBoraknQ INFO: start
2023-11-20T17:40:09.453Z pid=1 tid=3ujt class=TestService::AWorker jid=31b8a46f4165696f40a1469a bid=C8apSx0mNMTS6w INFO: start
2023-11-20T17:40:09.454Z pid=1 tid=3uip class=TestService::StartWorkflow jid=7113e0a849b44d7985c6c44e bid=0319-6rBoraknQ elapsed=0.08 INFO: done
"A"
2023-11-20T17:40:09.531Z pid=1 tid=46pl class=Sidekiq::Batch::Callback jid=aca1365c87bd00d9c95c52b1 INFO: start
2023-11-20T17:40:09.532Z pid=1 tid=3ujt class=TestService::AWorker jid=31b8a46f4165696f40a1469a bid=C8apSx0mNMTS6w elapsed=0.079 INFO: done
2023-11-20T17:40:09.596Z pid=1 tid=3z5d class=Sidekiq::Batch::Callback jid=b0b66467310118213dfe851f INFO: start
2023-11-20T17:40:09.597Z pid=1 tid=3xyl class=Sidekiq::Batch::Callback jid=6ffd82b057eeadec1801ba36 INFO: start
2023-11-20T17:40:09.597Z pid=1 tid=46pl class=Sidekiq::Batch::Callback jid=aca1365c87bd00d9c95c52b1 elapsed=0.066 INFO: done
2023-11-20T17:40:09.705Z pid=1 tid=3z5d class=Sidekiq::Batch::Callback jid=b0b66467310118213dfe851f elapsed=0.11 INFO: done
2023-11-20T17:40:09.716Z pid=1 tid=4601 class=TestService::FWorker jid=f03dca722f1f543d2340d508 bid=pj_vHj2cgcTFJw INFO: start
2023-11-20T17:40:09.716Z pid=1 tid=3z49 class=TestService::DWorker jid=28922246a8a5dd819890f6bf bid=pj_vHj2cgcTFJw INFO: start
2023-11-20T17:40:09.717Z pid=1 tid=46nd class=TestService::EWorker jid=1196f25ead345d09749fdd6f bid=pj_vHj2cgcTFJw INFO: start
2023-11-20T17:40:09.717Z pid=1 tid=45yx class=TestService::BWorker jid=398a74386963999a6b439aeb bid=pj_vHj2cgcTFJw INFO: start
2023-11-20T17:40:09.719Z pid=1 tid=45zh class=TestService::CWorker jid=69d436dab3d3e81e01fa540d bid=pj_vHj2cgcTFJw INFO: start
2023-11-20T17:40:09.720Z pid=1 tid=3xyl class=Sidekiq::Batch::Callback jid=6ffd82b057eeadec1801ba36 elapsed=0.123 INFO: done
"F"
2023-11-20T17:40:09.905Z pid=1 tid=4601 class=TestService::FWorker jid=f03dca722f1f543d2340d508 bid=pj_vHj2cgcTFJw elapsed=0.188 INFO: done
"D"
2023-11-20T17:40:09.976Z pid=1 tid=3z49 class=TestService::DWorker jid=28922246a8a5dd819890f6bf bid=pj_vHj2cgcTFJw elapsed=0.26 INFO: done
"E"
2023-11-20T17:40:09.999Z pid=1 tid=46nd class=TestService::EWorker jid=1196f25ead345d09749fdd6f bid=pj_vHj2cgcTFJw elapsed=0.282 INFO: done
"C"
2023-11-20T17:40:10.007Z pid=1 tid=45zh class=TestService::CWorker jid=69d436dab3d3e81e01fa540d bid=pj_vHj2cgcTFJw elapsed=0.288 INFO: done
"B"
2023-11-20T17:40:10.008Z pid=1 tid=3uip class=Sidekiq::Batch::Callback jid=d4ea6edae4edeb7b685f00da INFO: start
2023-11-20T17:40:10.008Z pid=1 tid=45yx class=TestService::BWorker jid=398a74386963999a6b439aeb bid=pj_vHj2cgcTFJw elapsed=0.291 INFO: done
2023-11-20T17:40:10.073Z pid=1 tid=3ujt class=Sidekiq::Batch::Callback jid=cb196a22464e1d27be7d3f0e INFO: start
2023-11-20T17:40:10.073Z pid=1 tid=3uip class=Sidekiq::Batch::Callback jid=d4ea6edae4edeb7b685f00da elapsed=0.065 INFO: done
2023-11-20T17:40:10.129Z pid=1 tid=46pl class=TestService::NullWorker jid=8113832b2630b5a86fb9e2da bid=0XzmoKIMj5RrlA INFO: start
2023-11-20T17:40:10.130Z pid=1 tid=3ujt class=Sidekiq::Batch::Callback jid=cb196a22464e1d27be7d3f0e elapsed=0.057 INFO: done
"NULLなので何もしないよん"
2023-11-20T17:40:10.224Z pid=1 tid=46pl class=TestService::NullWorker jid=8113832b2630b5a86fb9e2da bid=0XzmoKIMj5RrlA elapsed=0.095 INFO: done
2023-11-20T17:40:10.224Z pid=1 tid=3z5d class=Sidekiq::Batch::Callback jid=e9c186dfc1bc3e44b9422e9c INFO: start
2023-11-20T17:40:10.282Z pid=1 tid=3xyl class=Sidekiq::Batch::Callback jid=a3aca55d65e5d53504989c97 INFO: start
2023-11-20T17:40:10.282Z pid=1 tid=3z5d class=Sidekiq::Batch::Callback jid=e9c186dfc1bc3e44b9422e9c elapsed=0.058 INFO: done
2023-11-20T17:40:10.337Z pid=1 tid=3z49 class=TestService::IWorker jid=5e73bb32de31736277e0c243 bid=4mtlklFF_8ZSgA INFO: start
2023-11-20T17:40:10.337Z pid=1 tid=4601 class=TestService::HWorker jid=5c77b2260e4eeeb27721092e bid=4mtlklFF_8ZSgA INFO: start
2023-11-20T17:40:10.337Z pid=1 tid=3xyl class=Sidekiq::Batch::Callback jid=a3aca55d65e5d53504989c97 elapsed=0.055 INFO: done
"H"
2023-11-20T17:40:10.416Z pid=1 tid=4601 class=TestService::HWorker jid=5c77b2260e4eeeb27721092e bid=4mtlklFF_8ZSgA elapsed=0.079 INFO: done
"I"
2023-11-20T17:40:10.434Z pid=1 tid=46nd class=Sidekiq::Batch::Callback jid=98f27b599ab60adfcc7958b1 INFO: start
2023-11-20T17:40:10.435Z pid=1 tid=3z49 class=TestService::IWorker jid=5e73bb32de31736277e0c243 bid=4mtlklFF_8ZSgA elapsed=0.098 INFO: done
2023-11-20T17:40:10.498Z pid=1 tid=45zh class=Sidekiq::Batch::Callback jid=7d4a98dabde86439c5148efb INFO: start
2023-11-20T17:40:10.498Z pid=1 tid=46nd class=Sidekiq::Batch::Callback jid=98f27b599ab60adfcc7958b1 elapsed=0.063 INFO: done
2023-11-20T17:40:10.574Z pid=1 tid=3ujt class=TestService::LWorker jid=deded5257663d75dc2ea8fb6 bid=0319-6rBoraknQ INFO: start
2023-11-20T17:40:10.574Z pid=1 tid=3uip class=TestService::KWorker jid=9e6738586497424425e78dee bid=0319-6rBoraknQ INFO: start
2023-11-20T17:40:10.575Z pid=1 tid=45yx class=TestService::JWorker jid=1f10dab3211a6bfd87ff3968 bid=0319-6rBoraknQ INFO: start
2023-11-20T17:40:10.575Z pid=1 tid=45zh class=Sidekiq::Batch::Callback jid=7d4a98dabde86439c5148efb elapsed=0.078 INFO: done
"J"
2023-11-20T17:40:10.736Z pid=1 tid=45yx class=TestService::JWorker jid=1f10dab3211a6bfd87ff3968 bid=0319-6rBoraknQ elapsed=0.161 INFO: done
"K"
2023-11-20T17:40:10.736Z pid=1 tid=3uip class=TestService::KWorker jid=9e6738586497424425e78dee bid=0319-6rBoraknQ elapsed=0.162 INFO: done
"L"
2023-11-20T17:40:10.738Z pid=1 tid=46pl class=TestService::MWorker jid=1838d64f780c3dcc38601be8 bid=0319-6rBoraknQ INFO: start
2023-11-20T17:40:10.739Z pid=1 tid=3ujt class=TestService::LWorker jid=deded5257663d75dc2ea8fb6 bid=0319-6rBoraknQ elapsed=0.164 INFO: done
"M"
2023-11-20T17:40:10.794Z pid=1 tid=46pl class=TestService::MWorker jid=1838d64f780c3dcc38601be8 bid=0319-6rBoraknQ elapsed=0.055 INFO: done
2023-11-20T17:40:10.794Z pid=1 tid=3z5d class=Sidekiq::Batch::Callback jid=ce4b2621d24a30306b63b223 INFO: start
{"oid"=>1}
"shipped"
2023-11-20T17:40:10.848Z pid=1 tid=3z5d class=Sidekiq::Batch::Callback jid=ce4b2621d24a30306b63b223 elapsed=0.054 INFO: done
図のように直列であって欲しい所は直列に、並列で問題ない所は並列に
ワークフローに従って処理されました。非常に便利です。
注意点の解説
基本的な説明は元記事を見てもらうとして、
ここでは注意点の解説をします。
StartWorkflow や AWorker のように、worker を呼びだしワークフロー処理を行いたい場合は
batch に大元の Sidekiq::Batch.new
bid に batch の id がセットされているので、
以下のように batch.jobs で囲みます。
もし worker を通常状態でも呼び出すことがありうるなら
bid に batch の id がセットされてるので
以下のように bid の有無で処理を変更します。
def perform(oid)
if bid
batch.jobs do
step1 = Sidekiq::Batch.new
step1.on(:success, 'FulfillmentCallbacks#step1_done', 'oid' => oid)
step1.jobs do
A.perform_async(oid)
end
end
end
end
step1_done のようにコールバックを呼ぶ場合は
最初の引数 status がバッチの status オブジェクトで、2 番目の引数 arguments が
コールバックの宣言時に指定したオプションです。
status に parent_bid があるのでそれを
Sidekiq::Batch.new の引数にすれば、
batch と同じになり、以下のように囲みます。
更にワークフローを追加する場合はブロックの中で Sidekiq::Batch.new を使って囲みます。
なおコメントにあるように BWorker や CWorker は直列処理ではなく並列処理です。
順番に処理される保証はなく、非同期で処理されるので誤解してはいけません。
def step1_done(status, arguments)
oid = arguments['oid']
overall = Sidekiq::Batch.new(status.parent_bid)
overall.jobs do
step2 = Sidekiq::Batch.new
step2.on(:success, 'TestService::FulfillmentCallbacks#step2_done', 'oid' => oid)
step2.jobs do
# ここは並列で処理されるのでB->C->D..などの順番で処理されるわけではない
BWorker.perform_async
CWorker.perform_async
DWorker.perform_async
EWorker.perform_async
FWorker.perform_async
end
end
end
以下のようなコードの場合、
User.where(company_id: 1)の結果がブランクだと、
job が 1 つも追加されず、step1_done が呼び出されないという罠があります。
それを防止するために User.where(company_id: 1)の結果がブランクの可能性があるなら、
少なくとも 1 つの job を確保するために、
NullWorker のようなダミーの worker を呼び出す必要があります。
batch.jobs do
step1 = Sidekiq::Batch.new
step1.on(:success, 'FulfillmentCallbacks#step1_done', 'oid' => order_id)
step1.jobs do
User.where(company_id: 1).each do |row|
CWorker.perform_async(row.id)
end
# 少なくとも1つのjobを確保
NullWorker.perform_async
end
end
その他 Tips
step1.on(:success, 'FulfillmentCallbacks#step1_done', 'oid' => order_id)
のような on の引数は success、complete、death があります。
success は全ての job が成功した場合
complete はバッチ内のすべてのジョブが 1 回実行されたとき(成功か失敗かは無関係)
death はバッチ内のジョブが初めて死んだ時に
メソッドが呼び出されます。
まとめ
簡単ですが、Sidekiq Batches の紹介でした。
システムが大きくなってきて、ちょっと複雑なワークフロー処理を行いたい場合は
Sidekiq の有料版に登録して、ぜひ Sidekiq Batches を使ってみてください。
最初は取っ付きづらいですが、慣れると便利です。
関連記事
エンジニア採用強化中
株式会社 overflow では Offers の開発メンバーを大募集中です。正社員はもちろん、副業でのジョインも歓迎です。とりあえず話を聞いてみたい!という方には カジュアル面談 がオススメです。
副業転職の Offers 開発チームがお送りするテックブログです。【エンジニア積極採用中】カジュアル面談、副業からのトライアル etc 承っております💪 jobs.overflow.co.jp
Discussion