🦁

Sidekiqをコードリーディングしてみた(後編)

2023/01/27に公開

挨拶

ポートのSREを担当している @s.yanada です。
前回、Sidekiq の キューイング部分について書いたので今回はデキュー(ジョブを Redis から取り出して実行する部分)についてコードリーディングをしていきたいと思います。

前提

本記事では、キューイングを担うSidekiqのプログラムを Sidekiq server と呼称しています。

執筆日 2023/01/10 時点で Sidekiq の最新バージョンは 7.0.2 ですが、本記事ではバージョン 6.5.8 を対象にしています。

全部を見ていくとかなり長くなってしまうので、主要な部分にのみフォーカスしています。

要約

Sideiq server起動時、以下の3つのスレッドを起動する

スレッド 役割
heartbeat 実行ジョブ数、失敗ジョブ数の計測。死活監視。
worker redisからジョブを取り出し実行。また、失敗したジョブをリトライジョブ用のキューにエンキュー。
poller 定期実行ジョブやリトライジョブを管理する。worker スレッドがキューイングしたリトライジョブを実行すべき時間が訪れた時に worker スレッド用のキューにエンキューする。(ジョブの実行は行わない)

Sidekiq serverの起動

bundle exec sidekiq で Sidekiq server を起動できます。
以下のファイルが実行されます。
https://github.com/mperham/sidekiq/blob/v6.5.8/bin/sidekiq

Sidekiq::CLI インスタンスを作成し、主に以下の処理を行っています。

以降Sidekiq server のメイン処理である、heartbeat、worker、poller についてお話していきます。

heartbeat

主に死活監視のための処理を 5 秒ごとに行っており、以下のメソッドで実装されています。

https://github.com/mperham/sidekiq/blob/v6.5.8/lib/sidekiq/launcher.rb#L128-L195

以下をフィールド名として 60 秒の有効期限でredisに保存しています。

"#{hostname}:#{::Process.pid}:#{process_nonce}"

もし、このフィールドが存在しなくなっていた場合、ハートビートが動いていないということになるので死活監視に使えるのだと思います。

worker

worker スレッドは、redis からジョブを取り出し実行するスレッドです。通常複数のスレッドが立ち上がりデフォルトでは 10 スレッドです。

この worker スレッドを管理するクラスが存在しそれが Sidekiq::Manager クラスです。
このクラスのインスタンス初期化時に worker スレッドの役割を担う Sidekiq::Processor クラスのインスタンスを作成・保持し、 start メソッドにより Sidekiq::Processor#start メソッドを呼び出し、スレッドの作成・デキューの処理が走ります。

デキュー処理の詳細は以下
まず、 Sidekiq::Processor#run から #process_one が呼ばれ redis からジョブを取得します。

https://github.com/mperham/sidekiq/blob/v6.5.8/lib/sidekiq/processor.rb#L76-L80

ジョブが存在していた場合に、ジョブの実行が行われます。
https://github.com/mperham/sidekiq/blob/v6.5.8/lib/sidekiq/processor.rb#L201-L203

ジョブが正常に終了した後は、 Sidekiq::Manager クラスから渡された以下の処理が実行されます。

https://github.com/mperham/sidekiq/blob/v6.5.8/lib/sidekiq/manager.rb#L71-L80

ジョブの処理が終了したので、 当該の worker スレッド がSidekiq::Manager の管理下から削除され新しい worker スレッドを作成し管理下においています。

ジョブ失敗時について

ジョブ実行処理の過程で以下のメソッドを経由しています。
https://github.com/mperham/sidekiq/blob/v6.5.8/lib/sidekiq/processor.rb#L114-L143

@retrierSidekiq::JobRetry クラスのインスタンス。

Sidekiq::JobRetry#local のメソッドのブロックとしてジョブの実行が行われており、ジョブが例外を発した際に、 #local が 例外を補足し、 #process_retry を実行しています。

https://github.com/mperham/sidekiq/blob/ad0f13cac5138b891d54ef2f2df3460d5151e34c/lib/sidekiq/job_retry.rb#L112-L133

#process_retry では、リトライ回数を超えていない場合に限り、リトライ用のキューに例外を発したジョブを入れています。
その後、後述する poller スレッドにより worker スレッドがデキューするキューに入れ直し、 worker スレッドが再びジョブの実行を行うという流れになります。

poller

定期実行ジョブやリトライジョブの管理を担っているジョブです。
メインとなる処理は以下で、この処理が Sidekiq が起動している間ループ処理として実行されます。
https://github.com/mperham/sidekiq/blob/v6.5.8/lib/sidekiq/scheduled.rb#L25-L40

引数である sorted_sets は、定期実行ジョブやリトライジョブが格納されているキー名でありデフォルトでは以下となっています。

https://github.com/mperham/sidekiq/blob/v6.5.8/lib/sidekiq/scheduled.rb#L7-L8

このキューの中から、ジョブ実行時刻が現在時刻の前となっているジョブを zrangebyscore で取得し、 worker スレッド用のキューにエンキューしています。

poller の役割としてはここまでで、その後は worker スレッドにジョブを実行してもらい成功したら終了、また失敗したら retry キューに入れ直して poller が処理するといった流れになります。

最後に

細かい部分については省略してしまったんですが、Sidekiq serverが行っている処理の大枠はつかめたでしょうか。
詳細部分を見てみると、connection_pool gem を用いてループ処理の wait をしていたり redis で LUA を使用していたりと知識の幅が増えると思うので、より詳細に深ぼってみたいと思います。
ご参考になれば幸いです。
ありがとうございました。

Discussion