💎

Rubyで書くスレッドのデザインパターン

2022/12/17に公開
名前 挙動
Single Threaded Execution 排他制御 → する
Immutable 排他制御 → しないでよくなる
Guarded Suspension 処理できるまで → 待つ (別に増えない)
Worker Thread 処理できるまで → 待つ (増えると速い)
Balking 処理できるまで → 待たない
Thread Per Message 処理を投げる → 戻値いらん
Future 処理を投げる → 戻値いる
Producer-Consumer 作る → キュー → 使う
Read-Write Lock 読み込み中は書き込まない
Two-Phase Termination 外から殺さない
Thread Specific Storage スレッド個別のグローバル変数
Active Object 非同期メッセージを受け取る

Single Threaded Execution

排他制御。

mutex = Mutex.new
a = 0
b = 0
2.times.collect {
  Thread.start do
    2.times do
      mutex.synchronize do
        a += 1
        Thread.pass
        b += 1
        p [a, b, (a == b)]
      end
    end
  end
}.each(&:join)
# > [1, 1, true]
# > [2, 2, true]
# > [3, 3, true]
# > [4, 4, true]

明示的にパスしても synchronize ブロック内はスレッドが切り替わらないことがわかる。

Immutable

Object.new.freeze
  • 不変にしておけば排他制御に気をつかわなくてもよくなる
  • 値オブジェクトに適用することが多い

Guarded Suspension

実行できるまで待つ。

queue = Queue.new

N = 10

sender = Thread.start do
  Thread.current[:data] = []
  N.times do |i|
    sleep(rand(0..0.01))
    queue << i
    Thread.current[:data] << i
  end
end

receiver = Thread.start do
  Thread.current[:data] = []
  N.times do
    sleep(rand(0..0.001))
    # shift出来ないとスレッドが自動停止してくれる
    Thread.current[:data] << queue.shift
  end
end

sender.join
receiver.join

# 正常にデータが受け取れている
sender[:data]    # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
receiver[:data]  # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Worker Thread

仕事がくるまで待って仕事がきたら働く。増えると速い。

class Channel < SizedQueue
  attr_reader :threads

  def initialize(size)
    super(size)
    @threads = size.times.collect do |i|
      Thread.start(i) do |i|
        loop do
          request = shift
          p "スレッド#{i}#{request}を担当"
          sleep(1)
        end
      end
    end
  end
end

# 1つのワーカーだけだと 3.3 秒。
# 4秒になってないのは、たぶん最後の sleep(1) が開始した時点で status == "sleep" になってるからみたい
channel = Channel.new(1)
t = Time.now
4.times { |i| channel << i }
nil until channel.size.zero? && channel.threads.all? { |t| t.status == "sleep" }
puts "%.1f s" % (Time.now - t)
# > "スレッド0が0を担当"
# > "スレッド0が1を担当"
# > "スレッド0が2を担当"
# > "スレッド0が3を担当"
# > 3.1 s

# 4つのワーカーだと処理が分散してすぐ終わる
channel = Channel.new(4)
t = Time.now
4.times { |i| channel << i }
nil until channel.size.zero? && channel.threads.all? { |t| t.status == "sleep" }
puts "%.1f s" % (Time.now - t)
# > "スレッド0が0を担当"
# > "スレッド1が1を担当"
# > "スレッド2が2を担当"
# > "スレッド3が3を担当"
# > 0.1 s

Balking

実行できるまで待たない

  • 待つのではなく、すぐにリターンする
  • 待つ場合は Guarded Suspention になる
  • 一つのインスタンスの複数のスレッドで実行しているとき一部だけ排他制御を行うには synchronize ブロックで囲む

以下の例は a b c を順番に発動していく。ただ a の処理が 0.1 秒かかっているため、直後に発動した b は a が処理中のためリターンしている。aの処理が終わったころに発動した c は実行できていることがわかる。

class Foo
  include Mutex_m

  def initialize
    super
    @change = false
  end

  def execute(str, t)
    synchronize do
      if @change
        p "処理中のため#{str}はスキップ"
        return
      end
      @change = true

      p str
      sleep(t) # sleep は synchronize の中で行わないとエラーになる

      @change = false
    end
  end
end

x = Foo.new
threads = []
threads << Thread.start { x.execute("a", 0.1) }
threads << Thread.start { x.execute("b", 0) }
sleep(0.1)
threads << Thread.start { x.execute("c", 0) }
threads.collect(&:join)
# > "a"
# > "処理中のためbはスキップ"
# > "c"

Thread Per Message

戻値は要らない。

def request(x)
  Thread.start(x) { |x| p x }
end

request("a")
request("b")

(Thread.list - [Thread.main]).each(&:join)
# > "a"
# > "b"

Future

戻値は要る。

def request(x)
  Thread.start(x) { |x| x }
end

t = []
t << request("A")
t << request("B")
t.collect(&:value)  # => ["A", "B"]

Producer-Consumer

作る → キュー → 使う

queue = SizedQueue.new(1)

producer = Thread.start do
  4.times do |i|
    p ["作成", i]
    queue.push(i)
  end
  p "作成側は先に終了"
end

consumer = Thread.start do
  4.times do
    p ["使用", queue.shift]
    sleep(0.01)
  end
end

producer.join
consumer.join
# > ["作成", 0]
# > ["作成", 1]
# > ["使用", 0]
# > ["作成", 2]
# > ["使用", 1]
# > ["作成", 3]
# > ["使用", 2]
# > "作成側は先に終了"
# > ["使用", 3]
  • 生産スレッドが作ってキューに入れて使用スレッドが shift する
  • SizedQueue のサイズが小さいほど流れが悪くなる
  • 上の例は SizedQueue のサイズが 1 しかないので非常に効率が悪い
  • consumer が shift してくれないと次を push できないことがわかる

Read-Write Lock

class Buffer
  def initialize
    @sync = Sync.new
    @str = ""
  end

  def write(str)
    @sync.synchronize(:EX) do
      str.chars.with_index do |c, i|
        sleep(0.0001)
        @str[i] = c
      end
    end
  end

  def read
    @sync.synchronize(:SH) do
      @str.size.times.collect { |i|
        sleep(0.001)
        @str[i]
      }.join
    end
  end
end

書き込みスレッドと読み込みスレッドを並列で動かして干渉させてみると、

buffer = Buffer.new
w = Thread.start do
  ("A".."Z").cycle { |c|
    buffer.write(c.to_s * 64)
    sleep(0.001)
  }
end
r = Thread.start do
  10.times do
    sleep(0.001)
    p buffer.read
  end
end
r.join
w.kill

結果は壊れてない。

# > "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
# > "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
# > "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC"
# > "DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD"
# > "EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE"
# > "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
# > "GGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGG"
# > "HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH"
# > "IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII"
# > "JJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJJ"

@sync.synchronize ブロックを使わなかった場合は壊れる。

# > AAAAAAA
# > BBBBBBBBBBCCCCCCCCCCCDDDDDDDDDDDEEEEEEEEEEEFFFFFFFFFFGGGGGGGGGGG
# > IIIIIIIIIIJJJJJJJJJJJKKKKKKKKKKKLLLLLLLLLLMMMMMMMMMMMNNNNNNNNNNO
# > PPPPPPPPPPQQQQQQQQQQQRRRRRRRRRRRSSSSSSSSSSTTTTTTTTTTTUUUUUUUUUUU
# > WWWWWWWWWWXXXXXXXXXXXYYYYYYYYYYYYZZZZZZZZZZAAAAAAAAAAABBBBBBBBBB
# > DDDDDDDDDDDEEEEEEEEEEEEFFFFFFFFFFFGGGGGGGGGGHHHHHHHHHHHIIIIIIIII
# > JJKKKKKKKKKKKLLLLLLLLLLLMMMMMMMMMMNNNNNNNNNNNOOOOOOOOOOOPPPPPPPP
# > QQRRRRRRRRRRRSSSSSSSSSSTTTTTTTTTTTUUUUUUUUUUVVVVVVVVVVVWWWWWWWWW
# > XXYYYYYYYYYYYZZZZZZZZZZZAAAAAAAAAAABBBBBBBBBBBCCCCCCCCCCCDDDDDDD
# > EEEEFFFFFFFFFFFGGGGGGGGGGGHHHHHHHHHHHIIIIIIIIIIIJJJJJJJJJJJKKKKK

Two Phase Termination

外から Thread.kill するんじゃなくて止まるように指示する。

t = Thread.start do
  2.times do |i|
    if Thread.current["interrupt"]
      break
    end
    p "処理中: #{i}"
    sleep(0.2)
  end
  p "終了処理"
end
sleep(0.1)
t["interrupt"] = true
t.join
# > "処理中: 0"
# > "終了処理"

Thread Specific Storage

スレッド内グローバル変数。

Thread.start { Thread.current["a"] = 1 }.join
Thread.start { Thread.current["a"] }.value  # => nil

異なるスレッドなので値が共有されていないのがわかる。

Active Object

非同期メッセージを受け取る。

この 1 + 2 の処理が重すぎるとする。

class Foo
  def process
    p 1 + 2
  end
end

obj = Foo.new
obj.process
# > 3

改善後。

class Foo
  attr_accessor :queue

  def initialize
    @queue = Queue.new
    Thread.start do
      loop { @queue.shift.call }  # バックグランド処理を永遠と回す
    end
  end

  def process
    @queue << proc { p 1 + 2 }
  end
end

obj = Foo.new
obj.process

nil until obj.queue.empty?
# > 3

process 内の処理が変わっただけでインタフェースはそのままである。

気をつけること

Queue は pop ではなく shift を使いたい。どちらも同じ動作だけど push / pop のペアで使うと Queue なのに Stack なイメージになって混乱する。enq / deq のペアでもいい。

参照

https://www.amazon.co.jp/dp/4797331623

Discussion