🦂

Redisと排他制御

2023/03/28に公開

以下「スレッド」と書いているのはすべて「プロセス」と置き換えてもいい
なのでスレッド特有のライブラリやデザインパターンは用いない

共通の前処理
require "redis"

expires_in = 2

redis = Redis.new
redis.flushdb

Redis::VERSION # => "5.0.6"
RUBY_VERSION   # => "3.2.1"

毎回やってしまうダメな例

if redis.exists?("key1")
  # すでに他で実行中だった
else
  # 実行できるのでフラグを立てる
  redis.set("key1", "true")

  # API実行

  # 完了
  redis.del("key1")
end

これはスレッドセーフになっていない。最初のスレッドが exists? から set の間を実行している間に他のスレッドが exists? を通ると、同様に else 側にいくため API の実行が干渉する恐れがある

判定とフラグ立ての間に隙間を開けてはならない

setnx 編

redis.del("key1")
redis.setnx("key1", "true") # => true
redis.setnx("key1", "true") # => false
  • そういうときのためにちょうどよい命令 setnx がある
    • 指定のキーがないときだけフラグを立てて true を返す
    • nxNot eXists の略
      • なんでそんな極端に略す必要があるのかはわからない
    • TTL を設定できないので使いにくい
  • キーの有無だけが重要で値に意味はない

set 編

nx オプションで setnx 相当になった

redis.del("key1")
redis.set("key1", "true", nx: true) # => true
redis.set("key1", "true", nx: true) # => false

その上 ex オプションで TTL も設定できた

redis.del("key1")
redis.set("key1", "true", nx: true, ex: 1) # => true
sleep(0.5)
redis.get("key1")               # => "true"
sleep(0.5)
redis.get("key1")               # => nil

setnx も、引数の順番が難しい setex もまったくいらなかった

最終的にこのように書ける

if redis.set("key1", "true", nx: true, ex: expires_in)
  # API実行

  # 完了
  redis.del("key1")
else
  # すでに他で実行中だった
end

カウンターで排他制御する例

キーの有無ではなく 0 から始まる早押しカウンターボタンを一斉に叩いて 1 を獲得したものだけに権利があるとする方法

count_next = -> {
  redis.multi { |e|
    e.incr("key1")
    e.expire("key1", expires_in)
  }.first
}
count_next.() # => 1
count_next.() # => 2

redis.del("key1")
count_next.() # => 1
count_next.() # => 2
  • デメリット
    • incr がまた TTL を設定できたないためコードが複雑になってしまう
  • メリット
    • カウンタの上昇度合いでAPIに打ち寄せるアクセスの勢いや排他制御の効果が(ログに出すようにしたときなどに)わかりやすい

排他制御された側が他スレッドのAPI実行終了を待つ例

前提条件

  • API
    • values.sum を計算する
    • 実行に1秒かかるとする
    • 1回呼ぶのに1万円かかる
  • 複数のスレッドがAPIを同時に叩く
  • 最初に実行の権利を獲得したスレッドだけがAPIを実行する
  • 権利がなかったスレッドは終わるのを待って結果を共有してもらう
    • 結果を共有してもらって整合性が取れるのはAPIへの入力が同じだから

とした場合の方法

values = [1, 2]
x = nil
key = Digest::MD5.hexdigest(values.to_s)
redis.del(key)
f = -> {
  if redis.set(key, "true", nx: true, ex: 2)
    p "API実行 - 開始"
    sleep(1)
    x = values.sum
    p "API実行 - 終了"
    redis.del(key)
  else
    while redis.exists?(key)
      p "待っている"
      sleep(0.5)
    end
  end
  x
}

2.times.collect { |i|
  Thread.start { p [i, f.call] }
}.each(&:join)
# >> "API実行 - 開始"
# >> "待っている"
# >> "待っている"
# >> "API実行 - 終了"
# >> [0, 3]
# >> [1, 3]
  • key は API に渡すパラメータ values から作る
  • x は単に変数としたが実際はどのスレッドからも key に結びつけて参照できる
  • 排他制御された側は、その key が Redis から消えるまで待機する
  • APIが異常終了したときに while がループしそうだけど ex: 2 の指定があるため最大2秒で while を抜ける
  • タイムアウトで抜けたとき x に値は入っていないため実際は例外を出すなどした方がいい

APIと排他制御を分離すると使いやすくなる

合計値を $answer に格納する API
require "digest/md5"

class API
  def initialize(values)
    @values = values
  end

  def call
    key = Digest::MD5.hexdigest(@values.to_s)
    sleep(1)
    $answer[key] = @values.sum
    $api_execute_count += 1
  end
end

排他制御なしの場合

実行手順
$answer = {}
$api_execute_count = 0

5.times.collect { |i|
  Thread.start do
    values = [1, 2]
    key = Digest::MD5.hexdigest(values.to_s)
    unless $answer[key]
      API.new(values).call
    end
    p "[#{i}] #{values}.sum => #{$answer[key]}"
  end
}.each(&:join)

$api_execute_count # => 5

# >> "[1] [1, 2].sum => 3"
# >> "[0] [1, 2].sum => 3"
# >> "[4] [1, 2].sum => 3"
# >> "[3] [1, 2].sum => 3"
# >> "[2] [1, 2].sum => 3"
  • APIが5回呼ばれてしまっているのがわかる

排他制御ありの場合

実行手順
require "redis"
Redis.new.flushdb

class ExclusiveAccess
  def initialize(key)
    @key = key
  end

  def call
    redis = Redis.new
    if redis.set(@key, "true", nx: true, ex: 2)
      yield
      redis.del(@key)
    else
      while redis.exists?(@key)
        sleep(0.1)
      end
    end
  end
end

$answer = {}
$api_execute_count = 0

5.times.collect { |i|
  Thread.start do
    values = [1, 2]
    key = Digest::MD5.hexdigest(values.to_s)
    unless $answer[key]
      ExclusiveAccess.new(key).call do
        API.new(values).call
      end
    end
    p "[#{i}] #{values}.sum => #{$answer[key]}"
  end
}.each(&:join)

$api_execute_count # => 1

# >> "[2] [1, 2].sum => 3"
# >> "[3] [1, 2].sum => 3"
# >> "[0] [1, 2].sum => 3"
# >> "[1] [1, 2].sum => 3"
# >> "[4] [1, 2].sum => 3"
  • 5連続でリクエストが来てもAPIは1回しか叩いていない
  • APIを叩いていないスレッドも正しい結果を返している
  • 「排他制御なし」の場合とのコードの違いは ExclusiveAccess.new(key).call の有無だけ
  • API呼び出しと排他制御が疎結合になっている

Redis 命令リファレンス

Discussion