Promiseベースの処理をsocketry/asyncでどう書く?
結果は別にいらない、非同期で動いてくれさえすればいいパターン
require 'async'
require 'async/http/internet'
require 'async/io'
require 'net/http'
puts "START"
Async do
# これやっといてくれーという非同期処理とする
Async do
puts 'async > start'
internet = Async::HTTP::Internet.new
resp = internet.get('http://example.com/')
puts "async > HTTP #{resp.status}"
puts 'async > done'
end
# メインストリーム処理とする。(stdinを読み続ける)
Async do
stdin = Async::IO::Stream.new(Async::IO::Generic.new($stdin))
while line = stdin.gets
puts "MAIN > line: #{line}"
end
end
end
puts "DONE"
$ ruby play1.rb
START
async > start
async > HTTP 200
async > done
a (キーボード入力+Enter)
MAIN > line: a
a (キーボード入力+Enter)
MAIN > line: a
hoge (キーボード入力+Enter)
MAIN > line: hoge
(Ctrl+D)
DONE
Promise.all
的な処理
puts "START"
Async do
puts "SUB-start"
Async do
Async do
puts 'async > start'
internet = Async::HTTP::Internet.new
resp = internet.get('http://example.com/')
puts "async > HTTP #{resp.status}"
puts 'async > done'
end
Async do
stdin = Async::IO::Stream.new(Async::IO::Generic.new($stdin))
while line = stdin.gets
puts "MAIN > line: #{line}"
end
end
puts "SUB-end1"
end
puts "SUB-end2"
Async do
sleep 3
puts "END"
end
end
puts "DONE"
一番外側のAsyncは全部のタスクが終わるのをまつが、内側のAsyncはたんなるお仕事の塊を表すに過ぎず、まつことはしない
$ ruby play1.rb
START
SUB-start
async > start
SUB-end1
SUB-end2
async > HTTP 200
async > done
(3秒まつ)
END
(Ctrl+D)
DONE
https://github.com/socketry/async-http のサンプルコードにあるように、semaphoreとかbarrierを使うようだ。
- https://github.com/socketry/async/blob/main/lib/async/barrier.md
- https://github.com/socketry/async/blob/main/lib/async/semaphore.md
barrierはちょうどPromise.all的なもので、semaphoreは同時実行数制限をかけるもの。
require 'async'
require 'async/barrier'
require 'async/http/internet'
require 'async/io'
require 'net/http'
puts "START"
Async do
puts "SUB-start"
barrier = Async::Barrier.new
barrier.async do
puts 'async > start'
internet = Async::HTTP::Internet.new
resp = internet.get('http://example.com/')
puts "async > HTTP #{resp.status}"
puts 'async > done'
end
barrier.async do
stdin = Async::IO::Stream.new(Async::IO::Generic.new($stdin))
while line = stdin.gets
puts "MAIN > line: #{line}"
end
end
barrier.wait
puts "SUB-end2"
Async do
sleep 3
puts "END"
end
end
puts "DONE"
START
SUB-start
async > start
async > HTTP 200
async > done
hoge (キーボード入力+Enter)
MAIN > line: hoge
(Ctrl+D)
SUB-end2
(3秒まつ)
END
DONE
asyncしたものをPromise.allまとめるイメージとは結構違い、親Barrierに対して子Asyncをぶら下げていき、親をwaitするみたいなイメージ
Promiseで値を返すパターン
Async::Condition, Async::Notification, Async::Queue, Async::Variable という似たようで違う4つがある。
require 'async'
require 'async/condition'
Async do
cond = Async::Condition.new
Async do
puts "waiter > START"
result = cond.wait
puts "waiter > result = #{result}"
end
Async do
puts "worker > START"
sleep 1
cond.signal('This is the result')
puts "worker > END"
end
end
$ ruby play2.rb
waiter > START
worker > START
(1秒まつ)
waiter > result = This is the result
worker > END
Conditionは、かならずwaitが先じゃないといけないらしく、signalが先の場合には捨てられ、waitはsignalこないかなーーと待ち続ける。
require 'async'
require 'async/condition'
Async do
cond = Async::Condition.new
Async do
puts "waiter > START"
sleep 2
result = cond.wait
puts "waiter > result = #{result}"
end
Async do
puts "worker > START"
sleep 1
cond.signal('This is the first result')
sleep 1
cond.signal('This is the second result')
puts "worker > END"
end
end
$ ruby play2.rb
waiter > START
worker > START
waiter > result = This is the second result
worker > END
Async::Notificationは、signalする側がFiber Schedulerを使うように(ノンブロッキングになるように?)工夫されたもの。らしい。よくわからんけど、Async::Queueが内部でこれを使っている。
じゃあ、waitする側がすでにあるsignalを受け取るには...?
Async::Queueを使う。
require 'async'
require 'async/queue'
Async do
cond = Async::Queue.new
Async do
puts "waiter > START"
sleep 2
puts "waiter > consume"
result = cond.dequeue
puts "waiter > result = #{result}"
end
Async do
puts "worker > START"
sleep 1
puts "emit first result"
cond << 'This is the first result'
sleep 2
puts "emit second result"
cond << 'This is the second result'
puts "worker > END"
end
end
$ ruby play2.rb
waiter > START
worker > START
(1秒まつ)
emit first result
(1秒まつ)
waiter > consume
waiter > result = This is the first result
(1秒まつ)
emit second result
worker > END
これぞPromiseっぽい動き。
ちなみに、 LimitedQueue(1)
を使うと、dequeueする者が現れるまでenqueueする側を待たせることもできるようだ。
ここまで話に出てこなかったAsync::Variableだが、これもまたPromiseっぽい。Async::Queueとは違い、Async::Notification継承のクラスではなく、Async::Conditionを保持するユーティリティクラス。
ただ、v1.30では使えず、v2.0 (Ruby 3.1必須!! )から使える。
require 'async'
require 'async/variable'
Async do
cond = Async::Variable.new
Async do
puts "waiter > START"
sleep 2
puts "waiter > consume"
result = cond.wait
puts "waiter > result = #{result}"
end
Async do
puts "worker > START"
sleep 1
puts "emit first result"
cond.resolve('This is the first result')
puts "worker > END"
end
end
$ ruby play2.rb
waiter > STARTworker > START
emit first result
worker > END
waiter > consume
waiter > result = This is the first result