🦂
Redisと排他制御
以下「スレッド」と書いているのはすべて「プロセス」と置き換えてもいい。なのでスレッド特有のライブラリやデザインパターンは用いない。
共通の前処理
require "redis"
expires_in = 2
redis = Redis.new
redis.flushdb
Redis::VERSION # => "5.0.6"
RUBY_VERSION # => "3.2.1"
毎回やってしまうダメな例
if redis.exists?("key1")
# すでに他で実行中だった
else
# 実行できるのでフラグを立てる
redis.set("key1", "true")
# API実行
# 完了
redis.del("key1")
end
これはスレッドセーフになっていない。最初のスレッドが exists?
から set
の間を実行している間に他のスレッドが exists?
を通ると、同様に else 側にいくため API の実行が干渉する恐れがある。
判定とフラグ立ての間に隙間を開けてはならない
setnx 編
redis.del("key1")
redis.setnx("key1", "true") # => true
redis.setnx("key1", "true") # => false
- そういうときのためにちょうどよい命令 setnx がある
- 指定のキーがないときだけフラグを立てて true を返す
-
nx
は Not eXists の略- なんでそんな極端に略す必要があるのかはわからない
- TTL を設定できないので使いにくい
- キーの有無だけが重要で値に意味はない
set 編
nx オプションで setnx 相当になる。
redis.del("key1")
redis.set("key1", "true", nx: true) # => true
redis.set("key1", "true", nx: true) # => false
その上 ex オプションで TTL も設定できる。
redis.del("key1")
redis.set("key1", "true", nx: true, ex: 1) # => true
sleep(0.5)
redis.get("key1") # => "true"
sleep(0.5)
redis.get("key1") # => nil
つまり setnx
も、引数の順番が難しい setex
も不要だった。
最終的にこのように書ける。
if redis.set("key1", "true", nx: true, ex: expires_in)
# API実行
# 完了
redis.del("key1")
else
# すでに他で実行中だった
end
カウンターで排他制御する例
キーの有無ではなく 0 から始まる早押しカウンターボタンを一斉に叩いて 1 を獲得したものだけに権利があるとする方法。
count_next = -> {
redis.multi { |e|
e.incr("key1")
e.expire("key1", expires_in)
}.first
}
count_next.() # => 1
count_next.() # => 2
redis.del("key1")
count_next.() # => 1
count_next.() # => 2
- デメリット
-
incr
がまた TTL を設定できたないためコードがやや複雑になる
-
- メリット
- カウンタの上昇度合いでAPIに打ち寄せるアクセスの勢いや排他制御の効果が(ログに出すようにしたときなどに)わかりやすい
排他制御された側が他スレッドのAPI実行終了を待つ例
前提条件を
- API
-
values.sum
を計算する - 実行に1秒かかるとする
- 1回呼ぶのに1万円かかる
-
- 複数のスレッドがAPIを同時に叩く
- 最初に実行の権利を獲得したスレッドだけがAPIを実行する
- 権利がなかったスレッドは終わるのを待って結果を共有してもらう
- 結果を共有してもらって整合性が取れるのはAPIへの入力が同じだから
とした場合の方法になる。
values = [1, 2]
x = nil
key = Digest::MD5.hexdigest(values.to_s)
redis.del(key)
f = -> {
if redis.set(key, "true", nx: true, ex: 2)
p "API実行 - 開始"
sleep(1)
x = values.sum
p "API実行 - 終了"
redis.del(key)
else
while redis.exists?(key)
p "待っている"
sleep(0.5)
end
end
x
}
2.times.collect { |i|
Thread.start { p [i, f.call] }
}.each(&:join)
# > "API実行 - 開始"
# > "待っている"
# > "待っている"
# > "API実行 - 終了"
# > [0, 3]
# > [1, 3]
- key は API に渡すパラメータ
values
から作る -
x
は単に変数としたが実際はどのスレッドからも key に結びつけて参照できる - 排他制御された側は、その key が Redis から消えるまで待機する
- APIが異常終了したときに while がループしそうだけど
ex: 2
の指定があるため最大2秒で while を抜ける - タイムアウトで抜けたとき
x
に値は入っていないため実際は例外を出すなどした方がいい
APIと排他制御を分離すると使いやすくなる
合計値を $answer に格納する API
require "digest/md5"
class API
def initialize(values)
@values = values
end
def call
key = Digest::MD5.hexdigest(@values.to_s)
sleep(1)
$answer[key] = @values.sum
$api_execute_count += 1
end
end
排他制御なしの場合
実行手順
$answer = {}
$api_execute_count = 0
5.times.collect { |i|
Thread.start do
values = [1, 2]
key = Digest::MD5.hexdigest(values.to_s)
unless $answer[key]
API.new(values).call
end
p "[#{i}] #{values}.sum => #{$answer[key]}"
end
}.each(&:join)
$api_execute_count # => 5
# > "[1] [1, 2].sum => 3"
# > "[0] [1, 2].sum => 3"
# > "[4] [1, 2].sum => 3"
# > "[3] [1, 2].sum => 3"
# > "[2] [1, 2].sum => 3"
APIが5回呼ばれてしまっているのがわかる。
排他制御ありの場合
実行手順
require "redis"
Redis.new.flushdb
class ExclusiveAccess
def initialize(key)
@key = key
end
def call
redis = Redis.new
if redis.set(@key, "true", nx: true, ex: 2)
yield
redis.del(@key)
else
while redis.exists?(@key)
sleep(0.1)
end
end
end
end
$answer = {}
$api_execute_count = 0
5.times.collect { |i|
Thread.start do
values = [1, 2]
key = Digest::MD5.hexdigest(values.to_s)
unless $answer[key]
ExclusiveAccess.new(key).call do
API.new(values).call
end
end
p "[#{i}] #{values}.sum => #{$answer[key]}"
end
}.each(&:join)
$api_execute_count # => 1
# > "[2] [1, 2].sum => 3"
# > "[3] [1, 2].sum => 3"
# > "[0] [1, 2].sum => 3"
# > "[1] [1, 2].sum => 3"
# > "[4] [1, 2].sum => 3"
5連続でリクエストが来てもAPIは1回しか叩いていない。APIを叩いていないスレッドも正しい結果を返している。「排他制御なし」の場合とのコードの違いは ExclusiveAccess.new(key).call
の有無だけである。API呼び出しと排他制御が疎結合になっている。
Discussion