🦁

Sidekiqをコードリーディングしてみた(前編)

2022/10/13に公開約7,800字

挨拶

ポートの就活会議でバックエンドを担当している新卒の @s.yanadaです。
最近、私が所属しているプロジェクトにてsidekiqが導入され良い機会だったのでSidekiqについてコードリーディングしてみました。

Sidekiqについて

サービス開発において、

  • ユーザーにメール送信をする
  • 他のサービスにAPIリクエストを送る

など何かしらの処理を行いたいが、これらの処理が完了するまでユーザーを待たせたくないなどのケースがあると思います。
こういったケースにおいて、Sidekiqを使うことで処理の完了を待たずにユーザーへレスポンスを返す、つまり実質非同期的な処理を行うことが可能になります。

Sidekiqが担う処理

1. ジョブをredisに格納しておく

非同期に処理したい内容を何かしらのデータストアに格納しておきます。
基本的な場合、データストアにはredisが選択されることが多いので本記事ではredisに格納する前提で話を進めます。

実際に格納されるデータについては後述しますが、大まかにいうと

  • Jobクラス
  • 引数

上記2種類をredisに格納しています。

2. redisから処理内容を取り出し、実際の処理を行う

上記で格納していたジョブに関するデータを取り出し、実際の処理を実行させます。

格納していた情報には「クラス名」、「引数」があるのでこれらの情報により処理の実行が可能になります。

実際にコードリーディングしてみる

前述したとおり、Sidekiqには大まかに以下の2種類の役割があります。

  • Sidekiq Client
    • ジョブをredisに格納する処理を担う
  • Sidekiq Server
    • redisからジョブを取り出しジョブの実行を担う

本記事ではSidekiq Clientを焦点にあててコードリーディングしていきたいと思います。

Sidekiq Client

大まかな処理の流れ

  1. redisとのコネクション確立
  2. ジョブに関するデータの正規化と検証
  3. ミドルウェアの実行
  4. redisへジョブのキューイング

登場人物

Sidekiq::Worker

ジョブをキューイングするインタフェースを提供
redisとのコネクションを確立したり、実際にredisへジョブをキューイングする処理などは他のクラス、モジュールにまかせている。

# 標準的な使い方
.perform_async(*args)

# ジョブをスケジューリングしたい時用
.perform_in(interval, *args)

ちなみに、Sidekiq単体を使うときはこのモジュールをincludeさせる必要があるが、 ActiveJobを使用する場合は、includeしてはいけないそうです。
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/worker.rb#L158-L159

Sidekiq::Client

インスタンス変数として ConnectionPool(connection_pool gemを利用)のインスタンスを持ち、このコネクションプールを使用して、redisへリクエストする処理を担う。

Sidekiq::RedisConnection

コネクションプールの作成を担う。
.adapter = xxx で Adapterを変更可能。
gem内ではredis、redis-clientのアダプターが既に内包されている。
RedisConnection::RedisAdapterと同じインターフェースを持つカスタムクラスを作成すれば、別のアダプターを指定することも可能

.create 関数で、アダプター(redis)のコネクションプールを作成する

Sidekiq::Middleware::Chain

Sidekiqのclient、server両方で使用できるミドルウェア管理モジュール
https://github.com/mperham/sidekiq/wiki/Middleware

それぞれの処理単位は Sidekiq::Middleware::Entry であり、複数のentryの実行順序などを管理するのがSidekiq::Middleware::Chain

Client Middlewareでは、redisへキューイングする前に実行したい処理
Server Middlewareでは、ジョブの実行前後に実行したい処理
を設定することができる。

Sidekiq::JobUtil

redisへ送るデータの検証、正規化を担う

Sidekiq

Sidekiq全体の設定、コネクションプールの保持をしている
Sidekiqにはclientとserverが存在し、それぞれは別プロセスで稼働しているため設定が共有されることはない

処理を追っていく

1. 全ての始まり

Sidekiqを直接利用する際は perform_async メソッドを実行させることから全ては始まります。
今後は、以下のコードを元に説明して行きたいと思います。

class OurWorker
  include Sidekiq::Worker

  def perform(complexity)
    case complexity
    when 'super_hard'
      sleep 300
      puts 'Reallly took quie a bit of effor'
    when 'hard'
      sleep 30
      puts 'That was a bit of work'
    else
      sleep 1
      puts "That wasn't a lot of effor"
    end
  end
end
OurWorker.perform_async('hard')

この perform_async は、Sidekiq::Worker に定義されています。
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/worker.rb#L158-L162
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/worker.rb#L288-L290

2. Setterへ委譲

上記githubのコードのように、perform_asyncSetterクラスに委譲されています。
他のperform_inperform_bulkを見てもわかりますが、実際の処理は全てSetterクラスに任せるという形になっているようです。
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/worker.rb#L293-L295
ちなみにここでSetterに渡している selfSidekiq::Workerのinclude元のクラス、上記の例でいうと OurWorkerです。

Setterを一旦挟んでいる理由はコメントに書いてました。
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/worker.rb#L169-L171
また、ActiveJobとの互換性を保つための処理もinitializeでしているようです。

では、Setterではどういう処理を行っているかをたどってみると以下のコードを実行しているようです。
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/worker.rb#L198

@klassOurWorkerにあたります。
client_pushSidekiq::Workerクラスに定義されているものなので結局
Sidekiq::Worker => Setter => Sidekiq::Worker
というふうに戻ってきました。
前述の通りSetterは、setのためにカプセル化したクラスであるようなので、戻ってくるのは直感的な気もします。

3. redisとのコネクションプール作成

client_pushでは、まずbuild_client関数によってredisとのコネクションを確立 or 取得します。
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/worker.rb#L364

オプション周りを追うのは辛かったのでここらへんは実際の手元で確認しています。。

irb(main):020:0> OurWorker.sidekiq_options_hash
=> nil
irb(main):021:0> Sidekiq.default_job_options
=> {"retry"=>true, "queue"=>"default"}
irb(main):022:0>

上記のように、Thread.current[:sidekiq_via_pool]get_sidekiq_options["pool"]はfalsyな値になっているようなので、少なくとも初回は、Sidekiq.redis_poolからredisのコネクションプールを取得していると思います。

この処理にあたるのが以下
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq.rb#L197-L199
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/redis_connection.rb#L81

RedisConnection.createの部分は少し長いので割愛しますが、

  • コネクション数
  • 最大コネクション数を超えた問の待機時間(タイムアウト)
  • アダプター(基本redis)

上記の設定を行ったのち、connection_pool gemのConnectionPoolをインスタンス化したものを返り値としています。
以後、このコネクションプールを用いてredisとやりとりをすることになります。

4. Sidekiq::Clientの登場

コネクションプールの作成もできたので、このコネクションプールを利用して実際にredisとのやりとりをする存在が必要です。
それが、Sidekiq::Clientです。
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/worker.rb#L365-L366
先ほど作成したコネクションプールを渡してインスタンス化させます。

ちなみに、initializeを見た感じ、引数に渡さなくても勝手にコネクションプール作成してくれそう。。笑
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/client.rb#L44-L46

5. redisへ送るデータの正規化

これまでで、build_clientの処理は全て終えました。
この結果を利用していたのがclient_push関数でありここからは以下の部分を追っていきます。
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/worker.rb#L360
引数のitemは、以下のデータを指しており、例でいうと
{ "args" => ["hard"], "class" => OurWorker }のような値です。
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/worker.rb#L198

build_clientの結果がSidekiq::Clientのインスタンスでしたので、push関数は以下の定義部分にあたります。
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/client.rb#L72-L82

1行目でnormalize_item関数を呼び出しています。
名前からも想像できるようにここでは、redisへ渡すデータの正規化(redisが扱えるデータへの加工)を行っています。
この関数は、Sidekiq::JobUtilで定義されています。
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/job_util.rb#L35
大体、

  • データが想定通りのクラスであるか
  • クラス定数など、redisが扱えるデータ型への変換
    を行っています。詳細は割愛しますが最終結果は大体以下のようになっていると思います。
{ "retry" => true, "args" => ["hard"], "class" => "OurWorker", "jid" => "93e83e0f0426eb7c81d864e2", "queue" => "defualt", "created_at" => 1664349158.5238101 }

6. middlewareの実行

データの正規化も終了したので、このデータを用いたミドルウェアの実行部分に移ります。
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/client.rb#L74-L76

このミドルウェアは、Rackのミドルウェアを模しているらしいので、使い方は似ています。
ミドルウェア実行後は、上記で正規化されたデータが返されます。(ミドルウェアとしての処理中データをいじらない限り)
https://github.com/mperham/sidekiq/blob/b2b23bfee413f0efac822d03b5fc1c75bd9066fb/lib/sidekiq/middleware/chain.rb#L171-L184
コードに関しては簡単に、事前に設定されたミドルウェアを順に実行していき、最終的に以下で渡しているブロックが返されるという流れになります。

middleware.invoke(item["class"], normed, normed["queue"], @redis_pool) do
  normed
end

7. redisへキューイング

ミドルウェアの実行も終わりようやくredisへジョブをキューイングするステージにきました。
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/client.rb#L77-L81

redisへのリクエストはpipelineで行われています。
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/client.rb#L201-L203
pipelineを使用することで複数のリクエストを送る際、前回のリクエストのレスポンスを待たずに次のリクエストを送るので、パフォーマンス向上に繋がります。
このpipeline機能はredis本体の機能ではなく、redis clientによってサポートされている機能です。
https://redis.io/docs/manual/pipelining/

最後にredisへデータを送信する以下の部分が実行され、キューイングが完了となります。
https://github.com/mperham/sidekiq/blob/9fe550dbe9797319de605496e6486eb49c9b1487/lib/sidekiq/client.rb#L234-L235

おわり

いかがだったでしょうか。
初のコードリーディングだったこともあり拙い説明も多々あったと思いますが、sidekiqに処理について少しでも理解していただけたら幸いです。

ご清覧ありがとうございました。

※小ネタ
本記事執筆中にSidekiq7.0のbeta版がリリースされました。
Sidekiq7.0では若干コードが変わっているため、本記事の説明とマッチしない部分がある可能性がございます。ご了承下さい。

Discussion

ログインするとコメントできます