Sidekiqをコードリーディングしてみた(後編)
挨拶
ポートのSREを担当している @s.yanada です。
前回、Sidekiq の キューイング部分について書いたので今回はデキュー(ジョブを Redis から取り出して実行する部分)についてコードリーディングをしていきたいと思います。
前提
本記事では、キューイングを担うSidekiqのプログラムを Sidekiq server と呼称しています。
執筆日 2023/01/10 時点で Sidekiq の最新バージョンは 7.0.2 ですが、本記事ではバージョン 6.5.8 を対象にしています。
全部を見ていくとかなり長くなってしまうので、主要な部分にのみフォーカスしています。
要約
Sideiq server起動時、以下の3つのスレッドを起動する
スレッド | 役割 |
---|---|
heartbeat | 実行ジョブ数、失敗ジョブ数の計測。死活監視。 |
worker | redisからジョブを取り出し実行。また、失敗したジョブをリトライジョブ用のキューにエンキュー。 |
poller | 定期実行ジョブやリトライジョブを管理する。worker スレッドがキューイングしたリトライジョブを実行すべき時間が訪れた時に worker スレッド用のキューにエンキューする。(ジョブの実行は行わない) |
Sidekiq serverの起動
bundle exec sidekiq
で Sidekiq server を起動できます。
以下のファイルが実行されます。
Sidekiq::CLI
インスタンスを作成し、主に以下の処理を行っています。
- Sidekiq に関する設定の反映
- シグナルハンドラの設定
- heartbeat、worker、poller スレッドの作成
以降Sidekiq server のメイン処理である、heartbeat、worker、poller についてお話していきます。
heartbeat
主に死活監視のための処理を 5 秒ごとに行っており、以下のメソッドで実装されています。
以下をフィールド名として 60 秒の有効期限でredisに保存しています。
"#{hostname}:#{::Process.pid}:#{process_nonce}"
もし、このフィールドが存在しなくなっていた場合、ハートビートが動いていないということになるので死活監視に使えるのだと思います。
worker
worker スレッドは、redis からジョブを取り出し実行するスレッドです。通常複数のスレッドが立ち上がりデフォルトでは 10 スレッドです。
この worker スレッドを管理するクラスが存在しそれが Sidekiq::Manager
クラスです。
このクラスのインスタンス初期化時に worker スレッドの役割を担う Sidekiq::Processor
クラスのインスタンスを作成・保持し、 start
メソッドにより Sidekiq::Processor#start
メソッドを呼び出し、スレッドの作成・デキューの処理が走ります。
デキュー処理の詳細は以下
まず、 Sidekiq::Processor#run
から #process_one
が呼ばれ redis からジョブを取得します。
ジョブが存在していた場合に、ジョブの実行が行われます。
ジョブが正常に終了した後は、 Sidekiq::Manager
クラスから渡された以下の処理が実行されます。
ジョブの処理が終了したので、 当該の worker スレッド がSidekiq::Manager
の管理下から削除され新しい worker スレッドを作成し管理下においています。
ジョブ失敗時について
ジョブ実行処理の過程で以下のメソッドを経由しています。
@retrier
は Sidekiq::JobRetry
クラスのインスタンス。
Sidekiq::JobRetry#local
のメソッドのブロックとしてジョブの実行が行われており、ジョブが例外を発した際に、 #local
が 例外を補足し、 #process_retry
を実行しています。
#process_retry
では、リトライ回数を超えていない場合に限り、リトライ用のキューに例外を発したジョブを入れています。
その後、後述する poller スレッドにより worker スレッドがデキューするキューに入れ直し、 worker スレッドが再びジョブの実行を行うという流れになります。
poller
定期実行ジョブやリトライジョブの管理を担っているジョブです。
メインとなる処理は以下で、この処理が Sidekiq が起動している間ループ処理として実行されます。
引数である sorted_sets
は、定期実行ジョブやリトライジョブが格納されているキー名でありデフォルトでは以下となっています。
このキューの中から、ジョブ実行時刻が現在時刻の前となっているジョブを zrangebyscore
で取得し、 worker スレッド用のキューにエンキューしています。
poller の役割としてはここまでで、その後は worker スレッドにジョブを実行してもらい成功したら終了、また失敗したら retry キューに入れ直して poller が処理するといった流れになります。
最後に
細かい部分については省略してしまったんですが、Sidekiq serverが行っている処理の大枠はつかめたでしょうか。
詳細部分を見てみると、connection_pool gem を用いてループ処理の wait をしていたり redis で LUA を使用していたりと知識の幅が増えると思うので、より詳細に深ぼってみたいと思います。
ご参考になれば幸いです。
ありがとうございました。
Discussion