Open3

Promiseベースの処理をsocketry/asyncでどう書く?

Yusuke IwakiYusuke Iwaki

結果は別にいらない、非同期で動いてくれさえすればいいパターン

require 'async'
require 'async/http/internet'
require 'async/io'
require 'net/http'

puts "START"
Async do
  # これやっといてくれーという非同期処理とする
  Async do
    puts 'async > start'
    internet = Async::HTTP::Internet.new
    resp = internet.get('http://example.com/')
    puts "async > HTTP #{resp.status}"
    puts 'async > done'
  end

  # メインストリーム処理とする。(stdinを読み続ける)
  Async do
    stdin = Async::IO::Stream.new(Async::IO::Generic.new($stdin))
    while line = stdin.gets
      puts "MAIN > line: #{line}"
    end
  end
end

puts "DONE"
$ ruby play1.rb 
START
async > start
async > HTTP 200
async > done
a (キーボード入力+Enter)
MAIN > line: a
a (キーボード入力+Enter)
MAIN > line: a
hoge (キーボード入力+Enter)
MAIN > line: hoge
(Ctrl+D)
DONE
Yusuke IwakiYusuke Iwaki

Promise.all 的な処理

puts "START"
Async do
  puts "SUB-start"
  Async do
    Async do
      puts 'async > start'
      internet = Async::HTTP::Internet.new
      resp = internet.get('http://example.com/')
      puts "async > HTTP #{resp.status}"
      puts 'async > done'
    end

    Async do
      stdin = Async::IO::Stream.new(Async::IO::Generic.new($stdin))
      while line = stdin.gets
      puts "MAIN > line: #{line}"
      end
    end
    puts "SUB-end1"
  end
  puts "SUB-end2"

  Async do
    sleep 3
    puts "END"
  end
end
puts "DONE"

一番外側のAsyncは全部のタスクが終わるのをまつが、内側のAsyncはたんなるお仕事の塊を表すに過ぎず、まつことはしない

$ ruby play1.rb 
START
SUB-start
async > start
SUB-end1
SUB-end2
async > HTTP 200
async > done
(3秒まつ)
END
(Ctrl+D)
DONE

https://github.com/socketry/async-http のサンプルコードにあるように、semaphoreとかbarrierを使うようだ。

barrierはちょうどPromise.all的なもので、semaphoreは同時実行数制限をかけるもの。

require 'async'
require 'async/barrier'
require 'async/http/internet'
require 'async/io'
require 'net/http'

puts "START"
Async do
  puts "SUB-start"
  barrier = Async::Barrier.new
  barrier.async do
    puts 'async > start'
    internet = Async::HTTP::Internet.new
    resp = internet.get('http://example.com/')
    puts "async > HTTP #{resp.status}"
    puts 'async > done'
  end

  barrier.async do
    stdin = Async::IO::Stream.new(Async::IO::Generic.new($stdin))
    while line = stdin.gets
    puts "MAIN > line: #{line}"
    end
  end
  barrier.wait
  puts "SUB-end2"

  Async do
    sleep 3
    puts "END"
  end
end
puts "DONE"
START
SUB-start
async > start
async > HTTP 200
async > done
hoge (キーボード入力+Enter)
MAIN > line: hoge
 (Ctrl+D)
SUB-end2
(3秒まつ)
END
DONE

asyncしたものをPromise.allまとめるイメージとは結構違い、親Barrierに対して子Asyncをぶら下げていき、親をwaitするみたいなイメージ

Yusuke IwakiYusuke Iwaki

Promiseで値を返すパターン

Async::Condition, Async::Notification, Async::Queue, Async::Variable という似たようで違う4つがある。

require 'async'
require 'async/condition'

Async do
  cond = Async::Condition.new

  Async do
    puts "waiter > START"
    result = cond.wait
    puts "waiter > result = #{result}"
  end

  Async do
    puts "worker > START"
    sleep 1
    cond.signal('This is the result')
    puts "worker > END"
  end
end
$ ruby play2.rb 
waiter > START
worker > START
(1秒まつ)
waiter > result = This is the result
worker > END

Conditionは、かならずwaitが先じゃないといけないらしく、signalが先の場合には捨てられ、waitはsignalこないかなーーと待ち続ける。

require 'async'
require 'async/condition'

Async do
  cond = Async::Condition.new

  Async do
    puts "waiter > START"
    sleep 2
    result = cond.wait
    puts "waiter > result = #{result}"
  end

  Async do
    puts "worker > START"
    sleep 1
    cond.signal('This is the first result')
    sleep 1
    cond.signal('This is the second result')
    puts "worker > END"
  end
end
$ ruby play2.rb 
waiter > START
worker > START
waiter > result = This is the second result
worker > END

Async::Notificationは、signalする側がFiber Schedulerを使うように(ノンブロッキングになるように?)工夫されたもの。らしい。よくわからんけど、Async::Queueが内部でこれを使っている。

じゃあ、waitする側がすでにあるsignalを受け取るには...?
Async::Queueを使う。

require 'async'
require 'async/queue'

Async do
  cond = Async::Queue.new

  Async do
    puts "waiter > START"
    sleep 2
    puts "waiter > consume"
    result = cond.dequeue
    puts "waiter > result = #{result}"
  end

  Async do
    puts "worker > START"
    sleep 1
    puts "emit first result"
    cond << 'This is the first result'
    sleep 2
    puts "emit second result"
    cond << 'This is the second result'
    puts "worker > END"
  end
end
$ ruby play2.rb 
waiter > START
worker > START
(1秒まつ)
emit first result
(1秒まつ)
waiter > consume
waiter > result = This is the first result
(1秒まつ)
emit second result
worker > END

これぞPromiseっぽい動き。

ちなみに、 LimitedQueue(1) を使うと、dequeueする者が現れるまでenqueueする側を待たせることもできるようだ。

ここまで話に出てこなかったAsync::Variableだが、これもまたPromiseっぽい。Async::Queueとは違い、Async::Notification継承のクラスではなく、Async::Conditionを保持するユーティリティクラス。
ただ、v1.30では使えず、v2.0 (Ruby 3.1必須!! )から使える。

require 'async'
require 'async/variable'

Async do
  cond = Async::Variable.new

  Async do
    puts "waiter > START"
    sleep 2
    puts "waiter > consume"
    result = cond.wait
    puts "waiter > result = #{result}"
  end

  Async do
    puts "worker > START"
    sleep 1
    puts "emit first result"
    cond.resolve('This is the first result')
    puts "worker > END"
  end
end

$ ruby play2.rb 
waiter > STARTworker > START

emit first result
worker > END
waiter > consume
waiter > result = This is the first result