🙆

Sidekiqでキューの重み付けの挙動を理解する

2024/03/17に公開

Sidekiqでは、キューに重み付けが設定できます。
挙動は公式ドキュメントに記載されているのですが、細かい挙動を理解したかったので、検証してコードも読んでみました。

https://github.com/sidekiq/sidekiq/wiki/Advanced-Options#queues

動作検証

環境

  • ruby: 3.1.4
  • sidekiq: 7.2.2

検証用コード

スレッド数は1、キューは2つ用意

:concurrency: 1
:queues:
  - critical
  - normal

各キューに5つずつジョブをエンキューする。

class EnqueJob
  def self.run
    5.times do |i|
      NormalJob.perform_async(i + 1)
      CriticalJob.perform_async(i + 1)
    end
  end
end

それぞれのキューから取り出して、job_numberを出力するジョブ。

class CriticalJob
    include Sidekiq::Job
    sidekiq_options queue: 'critical'

    def perform(job_number)
      puts "Critical Job #{job_number} processed"
    end
end

class NormalJob
    include Sidekiq::Job
    sidekiq_options queue: 'normal'
    
    def perform(job_number)
      puts "Normal Job #{job_number} processed"
    end
end

検証

sidekiq実行前に、ジョブをエンキューしておく。

# bundle exec rails c
irb(main):003:0> EnqueJob.run
重み付けをしない
# ...
:queues:
  - critical
  - normal

queuesで指定した順にキューが処理される。criticalキューが空になってから、normalキューが処理される。

Critical Job 1 processed
Critical Job 2 processed
Critical Job 3 processed
Critical Job 4 processed
Critical Job 5 processed
Normal Job 1 processed
Normal Job 2 processed
Normal Job 3 processed
Normal Job 4 processed
Normal Job 5 processed
同じ重み付けをする
# ...
:queues:
  - [critical, 1]
  - [normal, 1]

ランダムな優先順位でキューが処理される。

Critical Job 1 processed
Critical Job 2 processed
Normal Job 1 processed
Normal Job 2 processed
Critical Job 3 processed
Critical Job 4 processed
Normal Job 3 processed
Critical Job 5 processed
Normal Job 4 processed
Normal Job 5 processed
異なる重み付けをする
# ...
:queues:
  - [critical, 3]
  - [normal, 1]

nomarlキューの3倍の頻度でcriticalキューがチェックされます。

Critical Job 1 processed
Critical Job 2 processed
Normal Job 1 processed
Critical Job 3 processed
Critical Job 4 processed
Normal Job 2 processed
Critical Job 5 processed
Normal Job 3 processed
Normal Job 4 processed
Normal Job 5 processed

ただし、確実に3:1になっているわけではなく、どちらかが偏って処理されるケースもあります。

Critical Job 1 processed
Critical Job 2 processed
Critical Job 3 processed
Critical Job 4 processed
Critical Job 5 processed
Normal Job 1 processed
Normal Job 2 processed
Normal Job 3 processed
Normal Job 4 processed
Normal Job 5 processed

内部の実装はどのようになっているのか、コードを見てみます。

重み付けの実装

まず、config.yml等で指定された、queuesは以下の箇所で処理されます。

capsule.rb
 def queues=(val)
  @weights = {}
  @queues = Array(val).each_with_object([]) do |qstr, memo|
    arr = qstr
    arr = qstr.split(",") if qstr.is_a?(String)
    name, weight = arr
    @weights[name] = weight.to_i
    [weight.to_i, 1].max.times do
      memo << name
    end
  end
  @mode = if @weights.values.all?(&:zero?)
    :strict
  elsif @weights.values.all? { |x| x == 1 }
    :random
  else
    :weighted
  end
end

キューの定義から、キュー名の配列@queuesを生成しています。重み付けの数だけ、キュー名が配列に格納されます。

例えば、queuesの指定が以下のようになっている場合は、

# ...
:queues:
  - [critical, 3]
  - [normal, 1]

以下のように、criticalは配列に3つ格納されます。

@queues: ["critical", "critical", "critical", "normal"]

また、重みに応じて、modeが判定されます。

  • 重みがすべて未指定の場合、:strict
  • 重みがすべて1の場合、:random
  • その他、:weighted

次に、キューからのジョブを取得する箇所を見ていきます。

fetch.rb
def initialize(cap)
  #...
  @strictly_ordered_queues = cap.mode == :strict
end

def retrieve_work
  qs = queues_cmd
  #...
  queue, job = redis { |conn| conn.blocking_call(conn.read_timeout + TIMEOUT, "brpop", *qs, TIMEOUT) }
  #...
end

def queues_cmd
  if @strictly_ordered_queues
    @queues
  else
    permute = @queues.shuffle
    permute.uniq!
    permute
  end
end

initializeで、前述のmodeを元にstrictly_ordered_queuesが設定されます。重みが未指定:strictの場合に、trueを設定します。

queues_cmdで、キューの取得順序が決定されます。

  • strictly_ordered_queuesがtrueの場合は、@queuesをそのまま返す
    • 毎回、キューの順序は同じになります。
  • それ以外の場合は、@queuesをランダムにシャッフルして、重複を排除して返します
    • つまり、@queues内にキュー名の数が多いと、先頭にくる確率が高くなります
    • shuffleなので、確実に先頭にくる保証はありません

retrieve_workで、redisからジョブを取得します。

  • queues_cmdで決定した取得順序に従い、キューからジョブを取得します
  • キューにジョブがない場合に、次のキューからジョブを取得します。

まとめ

  • sidekiqのキューをチェックするモードは3つある
    • 順序固定
    • ランダム
    • 重み付け
  • ランダム・重み付けの場合は、Array#shuffleを使って、順序決定が行われている
    • 重み付けが高いと、配列のキュー名が多く存在するので、高い順序になりやすい

Discussion