💎
Rubyで書くスレッドのデザインパターン
名前 | 挙動 |
---|---|
Single Threaded Execution | 排他制御 → する |
Immutable | 排他制御 → しないでよくなる |
Guarded Suspension | 処理できるまで → 待つ (別に増えない) |
Worker Thread | 処理できるまで → 待つ (増えると速い) |
Balking | 処理できるまで → 待たない |
Thread Per Message | 処理を投げる → 戻値いらん |
Future | 処理を投げる → 戻値いる |
Producer-Consumer | 作る → キュー → 使う |
Read-Write Lock | 読み込み中は書き込まない |
Two-Phase Termination | 外から殺さない |
Thread Specific Storage | スレッド個別のグローバル変数 |
Active Object | 非同期メッセージを受け取る |
Single Threaded Execution
排他制御。
mutex = Mutex.new
a = 0
b = 0
2.times.collect {
Thread.start do
2.times do
mutex.synchronize do
a += 1
Thread.pass
b += 1
p [a, b, (a == b)]
end
end
end
}.each(&:join)
# > [1, 1, true]
# > [2, 2, true]
# > [3, 3, true]
# > [4, 4, true]
明示的にパスしても synchronize ブロック内はスレッドが切り替わらないことがわかる。
Immutable
Object.new.freeze
- 不変にしておけば排他制御に気をつかわなくてもよくなる
- 値オブジェクトに適用することが多い
Guarded Suspension
実行できるまで待つ。
queue = Queue.new
N = 10
sender = Thread.start do
Thread.current[:data] = []
N.times do |i|
sleep(rand(0..0.01))
queue << i
Thread.current[:data] << i
end
end
receiver = Thread.start do
Thread.current[:data] = []
N.times do
sleep(rand(0..0.001))
# shift出来ないとスレッドが自動停止してくれる
Thread.current[:data] << queue.shift
end
end
sender.join
receiver.join
# 正常にデータが受け取れている
sender[:data] # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
receiver[:data] # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Worker Thread
仕事がくるまで待って仕事がきたら働く。増えると速い。
class Channel < SizedQueue
attr_reader :threads
def initialize(size)
super(size)
@threads = size.times.collect do |i|
Thread.start(i) do |i|
loop do
request = shift
p "スレッド#{i}が#{request}を担当"
sleep(1)
end
end
end
end
end
# 1つのワーカーだけだと 3.3 秒。
# 4秒になってないのは、たぶん最後の sleep(1) が開始した時点で status == "sleep" になってるからみたい
channel = Channel.new(1)
t = Time.now
4.times { |i| channel << i }
nil until channel.size.zero? && channel.threads.all? { |t| t.status == "sleep" }
puts "%.1f s" % (Time.now - t)
# > "スレッド0が0を担当"
# > "スレッド0が1を担当"
# > "スレッド0が2を担当"
# > "スレッド0が3を担当"
# > 3.1 s
# 4つのワーカーだと処理が分散してすぐ終わる
channel = Channel.new(4)
t = Time.now
4.times { |i| channel << i }
nil until channel.size.zero? && channel.threads.all? { |t| t.status == "sleep" }
puts "%.1f s" % (Time.now - t)
# > "スレッド0が0を担当"
# > "スレッド1が1を担当"
# > "スレッド2が2を担当"
# > "スレッド3が3を担当"
# > 0.1 s
Balking
実行できるまで待たない
- 待つのではなく、すぐにリターンする
- 待つ場合は Guarded Suspention になる
- 一つのインスタンスの複数のスレッドで実行しているとき一部だけ排他制御を行うには synchronize ブロックで囲む
以下の例は a b c を順番に発動していく。ただ a の処理が 0.1 秒かかっているため、直後に発動した b は a が処理中のためリターンしている。aの処理が終わったころに発動した c は実行できていることがわかる。
class Foo
include Mutex_m
def initialize
super
@change = false
end
def execute(str, t)
synchronize do
if @change
p "処理中のため#{str}はスキップ"
return
end
@change = true
p str
sleep(t) # sleep は synchronize の中で行わないとエラーになる
@change = false
end
end
end
x = Foo.new
threads = []
threads << Thread.start { x.execute("a", 0.1) }
threads << Thread.start { x.execute("b", 0) }
sleep(0.1)
threads << Thread.start { x.execute("c", 0) }
threads.collect(&:join)
# > "a"
# > "処理中のためbはスキップ"
# > "c"
Thread Per Message
戻値は要らない。
def request(x)
Thread.start(x) { |x| p x }
end
request("a")
request("b")
(Thread.list - [Thread.main]).each(&:join)
# > "a"
# > "b"
Future
戻値は要る。
def request(x)
Thread.start(x) { |x| x }
end
t = []
t << request("A")
t << request("B")
t.collect(&:value) # => ["A", "B"]
Producer-Consumer
作る → キュー → 使う
queue = SizedQueue.new(1)
producer = Thread.start do
4.times do |i|
p ["作成", i]
queue.push(i)
end
p "作成側は先に終了"
end
consumer = Thread.start do
4.times do
p ["使用", queue.shift]
sleep(0.01)
end
end
producer.join
consumer.join
# > ["作成", 0]
# > ["作成", 1]
# > ["使用", 0]
# > ["作成", 2]
# > ["使用", 1]
# > ["作成", 3]
# > ["使用", 2]
# > "作成側は先に終了"
# > ["使用", 3]
- 生産スレッドが作ってキューに入れて使用スレッドが shift する
- SizedQueue のサイズが小さいほど流れが悪くなる
- 上の例は SizedQueue のサイズが 1 しかないので非常に効率が悪い
- consumer が shift してくれないと次を push できないことがわかる
Read-Write Lock
class Buffer
def initialize
@sync = Sync.new
@str = ""
end
def write(str)
@sync.synchronize(:EX) do
str.chars.with_index do |c, i|
sleep(0.0001)
@str[i] = c
end
end
end
def read
@sync.synchronize(:SH) do
@str.size.times.collect { |i|
sleep(0.001)
@str[i]
}.join
end
end
end
書き込みスレッドと読み込みスレッドを並列で動かして干渉させてみると、
buffer = Buffer.new
w = Thread.start do
("A".."Z").cycle { |c|
buffer.write(c.to_s * 64)
sleep(0.001)
}
end
r = Thread.start do
10.times do
sleep(0.001)
p buffer.read
end
end
r.join
w.kill
結果は壊れてない。
# > "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
# > "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
# > "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC"
# > "DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD"
# > "EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE"
# > "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
# > "GGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGG"
# > "HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH"
# > "IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII"
# > "JJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJ"
@sync.synchronize
ブロックを使わなかった場合は壊れる。
# > AAAAAAA
# > BBBBBBBBBBCCCCCCCCCCCDDDDDDDDDDDEEEEEEEEEEEFFFFFFFFFFGGGGGGGGGGG
# > IIIIIIIIIIJJJJJJJJJJJKKKKKKKKKKKLLLLLLLLLLMMMMMMMMMMMNNNNNNNNNNO
# > PPPPPPPPPPQQQQQQQQQQQRRRRRRRRRRRSSSSSSSSSSTTTTTTTTTTTUUUUUUUUUUU
# > WWWWWWWWWWXXXXXXXXXXXYYYYYYYYYYYYZZZZZZZZZZAAAAAAAAAAABBBBBBBBBB
# > DDDDDDDDDDDEEEEEEEEEEEEFFFFFFFFFFFGGGGGGGGGGHHHHHHHHHHHIIIIIIIII
# > JJKKKKKKKKKKKLLLLLLLLLLLMMMMMMMMMMNNNNNNNNNNNOOOOOOOOOOOPPPPPPPP
# > QQRRRRRRRRRRRSSSSSSSSSSTTTTTTTTTTTUUUUUUUUUUVVVVVVVVVVVWWWWWWWWW
# > XXYYYYYYYYYYYZZZZZZZZZZZAAAAAAAAAAABBBBBBBBBBBCCCCCCCCCCCDDDDDDD
# > EEEEFFFFFFFFFFFGGGGGGGGGGGHHHHHHHHHHHIIIIIIIIIIIJJJJJJJJJJJKKKKK
Two Phase Termination
外から Thread.kill するんじゃなくて止まるように指示する。
t = Thread.start do
2.times do |i|
if Thread.current["interrupt"]
break
end
p "処理中: #{i}"
sleep(0.2)
end
p "終了処理"
end
sleep(0.1)
t["interrupt"] = true
t.join
# > "処理中: 0"
# > "終了処理"
Thread Specific Storage
スレッド内グローバル変数。
Thread.start { Thread.current["a"] = 1 }.join
Thread.start { Thread.current["a"] }.value # => nil
異なるスレッドなので値が共有されていないのがわかる。
Active Object
非同期メッセージを受け取る。
この 1 + 2 の処理が重すぎるとする。
class Foo
def process
p 1 + 2
end
end
obj = Foo.new
obj.process
# > 3
改善後。
class Foo
attr_accessor :queue
def initialize
@queue = Queue.new
Thread.start do
loop { @queue.shift.call } # バックグランド処理を永遠と回す
end
end
def process
@queue << proc { p 1 + 2 }
end
end
obj = Foo.new
obj.process
nil until obj.queue.empty?
# > 3
process 内の処理が変わっただけでインタフェースはそのままである。
気をつけること
Queue は pop ではなく shift を使いたい。どちらも同じ動作だけど push / pop のペアで使うと Queue なのに Stack なイメージになって混乱する。enq / deq のペアでもいい。
参照
Discussion