Sidekiqをコードリーディングしてみた(前編)
挨拶
ポートの就活会議でバックエンドを担当している新卒の @s.yanadaです。
最近、私が所属しているプロジェクトにてsidekiqが導入され良い機会だったのでSidekiqについてコードリーディングしてみました。
Sidekiqについて
サービス開発において、
- ユーザーにメール送信をする
- 他のサービスにAPIリクエストを送る
など何かしらの処理を行いたいが、これらの処理が完了するまでユーザーを待たせたくないなどのケースがあると思います。
こういったケースにおいて、Sidekiqを使うことで処理の完了を待たずにユーザーへレスポンスを返す、つまり実質非同期的な処理を行うことが可能になります。
Sidekiqが担う処理
1. ジョブをredisに格納しておく
非同期に処理したい内容を何かしらのデータストアに格納しておきます。
基本的な場合、データストアにはredisが選択されることが多いので本記事ではredisに格納する前提で話を進めます。
実際に格納されるデータについては後述しますが、大まかにいうと
- Jobクラス
- 引数
上記2種類をredisに格納しています。
2. redisから処理内容を取り出し、実際の処理を行う
上記で格納していたジョブに関するデータを取り出し、実際の処理を実行させます。
格納していた情報には「クラス名」、「引数」があるのでこれらの情報により処理の実行が可能になります。
実際にコードリーディングしてみる
前述したとおり、Sidekiqには大まかに以下の2種類の役割があります。
- Sidekiq Client
- ジョブをredisに格納する処理を担う
- Sidekiq Server
- redisからジョブを取り出しジョブの実行を担う
本記事ではSidekiq Client
を焦点にあててコードリーディングしていきたいと思います。
Sidekiq Client
大まかな処理の流れ
- redisとのコネクション確立
- ジョブに関するデータの正規化と検証
- ミドルウェアの実行
- redisへジョブのキューイング
登場人物
Sidekiq::Worker
ジョブをキューイングするインタフェースを提供
redisとのコネクションを確立したり、実際にredisへジョブをキューイングする処理などは他のクラス、モジュールにまかせている。
# 標準的な使い方
.perform_async(*args)
# ジョブをスケジューリングしたい時用
.perform_in(interval, *args)
ちなみに、Sidekiq単体を使うときはこのモジュールをincludeさせる必要があるが、 ActiveJobを使用する場合は、includeしてはいけないそうです。
Sidekiq::Client
インスタンス変数として ConnectionPool(connection_pool gemを利用)のインスタンスを持ち、このコネクションプールを使用して、redisへリクエストする処理を担う。
Sidekiq::RedisConnection
コネクションプールの作成を担う。
.adapter = xxx
で Adapterを変更可能。
gem内ではredis、redis-clientのアダプターが既に内包されている。
RedisConnection::RedisAdapter
と同じインターフェースを持つカスタムクラスを作成すれば、別のアダプターを指定することも可能
.create
関数で、アダプター(redis)のコネクションプールを作成する
Sidekiq::Middleware::Chain
Sidekiqのclient、server両方で使用できるミドルウェア管理モジュール
それぞれの処理単位は Sidekiq::Middleware::Entry
であり、複数のentryの実行順序などを管理するのがSidekiq::Middleware::Chain
Client Middlewareでは、redisへキューイングする前に実行したい処理
Server Middlewareでは、ジョブの実行前後に実行したい処理
を設定することができる。
Sidekiq::JobUtil
redisへ送るデータの検証、正規化を担う
Sidekiq
Sidekiq全体の設定、コネクションプールの保持をしている
Sidekiqにはclientとserverが存在し、それぞれは別プロセスで稼働しているため設定が共有されることはない
処理を追っていく
1. 全ての始まり
Sidekiqを直接利用する際は perform_async
メソッドを実行させることから全ては始まります。
今後は、以下のコードを元に説明して行きたいと思います。
class OurWorker
include Sidekiq::Worker
def perform(complexity)
case complexity
when 'super_hard'
sleep 300
puts 'Reallly took quie a bit of effor'
when 'hard'
sleep 30
puts 'That was a bit of work'
else
sleep 1
puts "That wasn't a lot of effor"
end
end
end
OurWorker.perform_async('hard')
この perform_async
は、Sidekiq::Worker
に定義されています。
2. Setterへ委譲
上記githubのコードのように、perform_async
は Setter
クラスに委譲されています。
他のperform_in
やperform_bulk
を見てもわかりますが、実際の処理は全てSetter
クラスに任せるという形になっているようです。
ちなみにここでSetter
に渡している self
は Sidekiq::Worker
のinclude元のクラス、上記の例でいうと OurWorker
です。
Setterを一旦挟んでいる理由はコメントに書いてました。
また、ActiveJobとの互換性を保つための処理もinitializeでしているようです。では、Setter
ではどういう処理を行っているかをたどってみると以下のコードを実行しているようです。
@klass
はOurWorker
にあたります。
client_push
は Sidekiq::Worker
クラスに定義されているものなので結局
Sidekiq::Worker
=> Setter
=> Sidekiq::Worker
というふうに戻ってきました。
前述の通りSetter
は、set
のためにカプセル化したクラスであるようなので、戻ってくるのは直感的な気もします。
3. redisとのコネクションプール作成
client_push
では、まずbuild_client
関数によってredisとのコネクションを確立 or 取得します。
オプション周りを追うのは辛かったのでここらへんは実際の手元で確認しています。。
irb(main):020:0> OurWorker.sidekiq_options_hash
=> nil
irb(main):021:0> Sidekiq.default_job_options
=> {"retry"=>true, "queue"=>"default"}
irb(main):022:0>
上記のように、Thread.current[:sidekiq_via_pool]
とget_sidekiq_options["pool"]
はfalsyな値になっているようなので、少なくとも初回は、Sidekiq.redis_pool
からredisのコネクションプールを取得していると思います。
この処理にあたるのが以下
RedisConnection.create
の部分は少し長いので割愛しますが、
- コネクション数
- 最大コネクション数を超えた問の待機時間(タイムアウト)
- アダプター(基本redis)
上記の設定を行ったのち、connection_pool gemのConnectionPool
をインスタンス化したものを返り値としています。
以後、このコネクションプールを用いてredisとやりとりをすることになります。
4. Sidekiq::Clientの登場
コネクションプールの作成もできたので、このコネクションプールを利用して実際にredisとのやりとりをする存在が必要です。
それが、Sidekiq::Client
です。
先ほど作成したコネクションプールを渡してインスタンス化させます。
ちなみに、initialize
を見た感じ、引数に渡さなくても勝手にコネクションプール作成してくれそう。。笑
5. redisへ送るデータの正規化
これまでで、build_client
の処理は全て終えました。
この結果を利用していたのがclient_push
関数でありここからは以下の部分を追っていきます。
引数のitem
は、以下のデータを指しており、例でいうと
{ "args" => ["hard"], "class" => OurWorker }
のような値です。
build_client
の結果がSidekiq::Client
のインスタンスでしたので、push
関数は以下の定義部分にあたります。
1行目でnormalize_item
関数を呼び出しています。
名前からも想像できるようにここでは、redisへ渡すデータの正規化(redisが扱えるデータへの加工)を行っています。
この関数は、Sidekiq::JobUtil
で定義されています。
大体、
- データが想定通りのクラスであるか
- クラス定数など、redisが扱えるデータ型への変換
を行っています。詳細は割愛しますが最終結果は大体以下のようになっていると思います。
{ "retry" => true, "args" => ["hard"], "class" => "OurWorker", "jid" => "93e83e0f0426eb7c81d864e2", "queue" => "defualt", "created_at" => 1664349158.5238101 }
6. middlewareの実行
データの正規化も終了したので、このデータを用いたミドルウェアの実行部分に移ります。
このミドルウェアは、Rackのミドルウェアを模しているらしいので、使い方は似ています。
ミドルウェア実行後は、上記で正規化されたデータが返されます。(ミドルウェアとしての処理中データをいじらない限り)
コードに関しては簡単に、事前に設定されたミドルウェアを順に実行していき、最終的に以下で渡しているブロックが返されるという流れになります。
middleware.invoke(item["class"], normed, normed["queue"], @redis_pool) do
normed
end
7. redisへキューイング
ミドルウェアの実行も終わりようやくredisへジョブをキューイングするステージにきました。
redisへのリクエストはpipelineで行われています。
このpipeline機能はredis本体の機能ではなく、redis clientによってサポートされている機能です。
最後にredisへデータを送信する以下の部分が実行され、キューイングが完了となります。
おわり
いかがだったでしょうか。
初のコードリーディングだったこともあり拙い説明も多々あったと思いますが、sidekiqに処理について少しでも理解していただけたら幸いです。
ご清覧ありがとうございました。
※小ネタ
本記事執筆中にSidekiq7.0のbeta版がリリースされました。
Sidekiq7.0では若干コードが変わっているため、本記事の説明とマッチしない部分がある可能性がございます。ご了承下さい。
Discussion