Closed5
socketry/asyncめも
Async { }
= Async::Reactor.run { }
一番外側は非同期ジョブ実行環境を作る、ネストされたものは非同期タスクを作る。
# The preferred method to invoke asynchronous behavior at the top level.
#
# - When invoked within an existing reactor task, it will run the given block
# asynchronously. Will return the task once it has been scheduled.
# - When invoked at the top level, will create and run a reactor, and invoke
# the block as an asynchronous task. Will block until the reactor finishes
# running.
def self.run(*arguments, **options, &block)
if current = Task.current?
reactor = current.reactor
return reactor.async(*arguments, **options, &block)
else
reactor = self.new
begin
return reactor.run(*arguments, **options, &block)
ensure
reactor.close
end
end
end
非同期ジョブ実行環境を作る際に、ブロックではなく、開始・終了で書かないといけないケースもあるが、そういうときは
@reactor = Async::Reactor.new
・ @reactor.run { ... }
と @reactor.close
すればいいみたい。
Reactorのcloseもれたらどうなるか
require 'async'
reactor = Async::Reactor.new
puts "start -- #{Time.now}"
reactor.run {
Async { |t| t.sleep 2; puts "1--#{Time.now}" }
Async { |t| t.sleep 1; puts "2--#{Time.now}" }
Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}
puts "end -- #{Time.now}"
#reactor.close
$ bundle exec ruby a.rb
start -- 2021-04-01 23:39:32 +0900
2--2021-04-01 23:39:33 +0900
1--2021-04-01 23:39:34 +0900
3--2021-04-01 23:39:35 +0900
end -- 2021-04-01 23:39:35 +0900
$
→nioのselectorのcloseがされないが、実害はない?
Reactor#run
の定義を見るに、↓と同じ動き
require 'async'
reactor = Async::Reactor.new
puts "start -- #{Time.now}"
reactor.async {
Async { |t| t.sleep 2; puts "1--#{Time.now}" }
Async { |t| t.sleep 1; puts "2--#{Time.now}" }
Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}
puts "run -- #{Time.now}"
reactor.run
puts "end -- #{Time.now}"
reactor.close
$ bundle exec ruby a.rb
start -- 2021-04-01 23:57:53 +0900
run -- 2021-04-01 23:57:53 +0900
2--2021-04-01 23:57:54 +0900
1--2021-04-01 23:57:55 +0900
3--2021-04-01 23:57:56 +0900
end -- 2021-04-01 23:57:56 +0900
$
Reactor#run
は、その時点までに溜まっているasyncタスクを履くだけ。
以下は、何も実行されない。
require 'async'
reactor = Async::Reactor.new
reactor.run
reactor.async {
Async { |t| t.sleep 2; puts "1--#{Time.now}" }
Async { |t| t.sleep 1; puts "2--#{Time.now}" }
Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}
reactor.async {
Async { |t| t.sleep 2; puts "1--#{Time.now}" }
Async { |t| t.sleep 1; puts "2--#{Time.now}" }
Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}
reactor.close
以下は、1つ目の塊だけ実行される
require 'async'
reactor = Async::Reactor.new
reactor.async {
Async { |t| t.sleep 2; puts "1--#{Time.now}" }
Async { |t| t.sleep 1; puts "2--#{Time.now}" }
Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}
reactor.run
reactor.async {
Async { |t| t.sleep 2; puts "1--#{Time.now}" }
Async { |t| t.sleep 1; puts "2--#{Time.now}" }
Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}
reactor.close
以下は、2つの塊が非同期に実行される
require 'async'
reactor = Async::Reactor.new
reactor.async {
Async { |t| t.sleep 2; puts "1--#{Time.now}" }
Async { |t| t.sleep 1; puts "2--#{Time.now}" }
Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}
reactor.async {
Async { |t| t.sleep 2; puts "1--#{Time.now}" }
Async { |t| t.sleep 1; puts "2--#{Time.now}" }
Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}
reactor.run
reactor.close
$ bundle exec ruby a.rb
2--2021-04-02 00:22:25 +0900
2--2021-04-02 00:22:25 +0900
1--2021-04-02 00:22:26 +0900
1--2021-04-02 00:22:26 +0900
3--2021-04-02 00:22:27 +0900
3--2021-04-02 00:22:27 +0900
以下は、2つの塊が直列に実行される
require 'async'
reactor = Async::Reactor.new
reactor.async {
Async { |t| t.sleep 2; puts "1--#{Time.now}" }
Async { |t| t.sleep 1; puts "2--#{Time.now}" }
Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}
reactor.run
reactor.async {
Async { |t| t.sleep 2; puts "1--#{Time.now}" }
Async { |t| t.sleep 1; puts "2--#{Time.now}" }
Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}
reactor.run
reactor.close
$ bundle exec ruby a.rb
2--2021-04-02 00:23:32 +0900
1--2021-04-02 00:23:33 +0900
3--2021-04-02 00:23:34 +0900
2--2021-04-02 00:23:35 +0900
1--2021-04-02 00:23:36 +0900
3--2021-04-02 00:23:37 +0900
reactor.async { ... }
はブロックを使わずに書けないか?
トップレベルの # Start an asynchronous task within the specified reactor. The task will be
# executed until the first blocking call, at which point it will yield and
# and this method will return.
#
# This is the main entry point for scheduling asynchronus tasks.
#
# @yield [Task] Executed within the task.
# @return [Task] The task that was scheduled into the reactor.
def async(*arguments, **options, &block)
task = Task.new(self, **options, &block)
# I want to take a moment to explain the logic of this.
# When calling an async block, we deterministically execute it until the
# first blocking operation. We don't *have* to do this - we could schedule
# it for later execution, but it's useful to:
# - Fail at the point of the method call where possible.
# - Execute determinstically where possible.
# - Avoid scheduler overhead if no blocking operation is performed.
task.run(*arguments)
# logger.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..."
return task
end
Task.new { ... }.run
# Create a new task.
# @param reactor [Async::Reactor] the reactor this task will run within.
# @param parent [Async::Task] the parent task.
def initialize(reactor, parent = Task.current?, logger: nil, finished: nil, **options, &block)
super(parent || reactor, **options)
@reactor = reactor
@status = :initialized
@result = nil
@finished = finished
@logger = logger
@fiber = make_fiber(&block)
end
make_fiberにブロックがそのまま渡っている
def make_fiber(&block)
Fiber.new do |*arguments|
set!
begin
@result = yield(self, *arguments)
@status = :complete
# logger.debug(self) {"Task was completed with #{@children.size} children!"}
rescue Stop
stop!
rescue StandardError => error
fail!(error, false)
rescue Exception => exception
fail!(exception, true)
ensure
# logger.debug(self) {"Task ensure $!=#{$!} with #{@children.size} children!"}
finish!
end
end
end
→むり
このスクラップは2021/04/02にクローズされました