Closed5

socketry/asyncめも

Yusuke IwakiYusuke Iwaki

Async { } = Async::Reactor.run { }

一番外側は非同期ジョブ実行環境を作る、ネストされたものは非同期タスクを作る。

https://github.com/socketry/async/blob/master/lib/async/reactor.rb

		# The preferred method to invoke asynchronous behavior at the top level.
		#
		# - When invoked within an existing reactor task, it will run the given block
		# asynchronously. Will return the task once it has been scheduled.
		# - When invoked at the top level, will create and run a reactor, and invoke
		# the block as an asynchronous task. Will block until the reactor finishes
		# running.
		def self.run(*arguments, **options, &block)
			if current = Task.current?
				reactor = current.reactor
				
				return reactor.async(*arguments, **options, &block)
			else
				reactor = self.new
				
				begin
					return reactor.run(*arguments, **options, &block)
				ensure
					reactor.close
				end
			end
		end

非同期ジョブ実行環境を作る際に、ブロックではなく、開始・終了で書かないといけないケースもあるが、そういうときは

@reactor = Async::Reactor.new@reactor.run { ... }@reactor.close すればいいみたい。

Yusuke IwakiYusuke Iwaki

Reactorのcloseもれたらどうなるか

require 'async'

reactor = Async::Reactor.new

puts "start -- #{Time.now}"
reactor.run {
  Async { |t| t.sleep 2; puts "1--#{Time.now}" }
  Async { |t| t.sleep 1; puts "2--#{Time.now}" }
  Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}
puts "end -- #{Time.now}"
#reactor.close
$ bundle exec ruby a.rb 
start -- 2021-04-01 23:39:32 +0900
2--2021-04-01 23:39:33 +0900
1--2021-04-01 23:39:34 +0900
3--2021-04-01 23:39:35 +0900
end -- 2021-04-01 23:39:35 +0900
$

→nioのselectorのcloseがされないが、実害はない?

Yusuke IwakiYusuke Iwaki

https://github.com/socketry/async/blob/master/lib/async/reactor.rb

Reactor#run の定義を見るに、↓と同じ動き

require 'async'

reactor = Async::Reactor.new

puts "start -- #{Time.now}"
reactor.async {
  Async { |t| t.sleep 2; puts "1--#{Time.now}" }
  Async { |t| t.sleep 1; puts "2--#{Time.now}" }
  Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}
puts "run -- #{Time.now}"
reactor.run
puts "end -- #{Time.now}"
reactor.close
$ bundle exec ruby a.rb 
start -- 2021-04-01 23:57:53 +0900
run -- 2021-04-01 23:57:53 +0900
2--2021-04-01 23:57:54 +0900
1--2021-04-01 23:57:55 +0900
3--2021-04-01 23:57:56 +0900
end -- 2021-04-01 23:57:56 +0900
$
Yusuke IwakiYusuke Iwaki

Reactor#run は、その時点までに溜まっているasyncタスクを履くだけ。

以下は、何も実行されない。

require 'async'

reactor = Async::Reactor.new

reactor.run

reactor.async {
  Async { |t| t.sleep 2; puts "1--#{Time.now}" }
  Async { |t| t.sleep 1; puts "2--#{Time.now}" }
  Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}

reactor.async {
  Async { |t| t.sleep 2; puts "1--#{Time.now}" }
  Async { |t| t.sleep 1; puts "2--#{Time.now}" }
  Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}

reactor.close

以下は、1つ目の塊だけ実行される

require 'async'

reactor = Async::Reactor.new

reactor.async {
  Async { |t| t.sleep 2; puts "1--#{Time.now}" }
  Async { |t| t.sleep 1; puts "2--#{Time.now}" }
  Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}

reactor.run

reactor.async {
  Async { |t| t.sleep 2; puts "1--#{Time.now}" }
  Async { |t| t.sleep 1; puts "2--#{Time.now}" }
  Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}

reactor.close

以下は、2つの塊が非同期に実行される

require 'async'

reactor = Async::Reactor.new

reactor.async {
  Async { |t| t.sleep 2; puts "1--#{Time.now}" }
  Async { |t| t.sleep 1; puts "2--#{Time.now}" }
  Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}

reactor.async {
  Async { |t| t.sleep 2; puts "1--#{Time.now}" }
  Async { |t| t.sleep 1; puts "2--#{Time.now}" }
  Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}

reactor.run

reactor.close
$ bundle exec ruby a.rb 
2--2021-04-02 00:22:25 +0900
2--2021-04-02 00:22:25 +0900
1--2021-04-02 00:22:26 +0900
1--2021-04-02 00:22:26 +0900
3--2021-04-02 00:22:27 +0900
3--2021-04-02 00:22:27 +0900

以下は、2つの塊が直列に実行される

require 'async'

reactor = Async::Reactor.new

reactor.async {
  Async { |t| t.sleep 2; puts "1--#{Time.now}" }
  Async { |t| t.sleep 1; puts "2--#{Time.now}" }
  Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}

reactor.run

reactor.async {
  Async { |t| t.sleep 2; puts "1--#{Time.now}" }
  Async { |t| t.sleep 1; puts "2--#{Time.now}" }
  Async { |t| t.sleep 3; puts "3--#{Time.now}" }
}

reactor.run

reactor.close
$ bundle exec ruby a.rb 
2--2021-04-02 00:23:32 +0900
1--2021-04-02 00:23:33 +0900
3--2021-04-02 00:23:34 +0900
2--2021-04-02 00:23:35 +0900
1--2021-04-02 00:23:36 +0900
3--2021-04-02 00:23:37 +0900
Yusuke IwakiYusuke Iwaki

トップレベルの reactor.async { ... } はブロックを使わずに書けないか?

		# Start an asynchronous task within the specified reactor. The task will be
		# executed until the first blocking call, at which point it will yield and
		# and this method will return.
		#
		# This is the main entry point for scheduling asynchronus tasks.
		#
		# @yield [Task] Executed within the task.
		# @return [Task] The task that was scheduled into the reactor.
		def async(*arguments, **options, &block)
			task = Task.new(self, **options, &block)
			
			# I want to take a moment to explain the logic of this.
			# When calling an async block, we deterministically execute it until the
			# first blocking operation. We don't *have* to do this - we could schedule
			# it for later execution, but it's useful to:
			# - Fail at the point of the method call where possible.
			# - Execute determinstically where possible.
			# - Avoid scheduler overhead if no blocking operation is performed.
			task.run(*arguments)
			
			# logger.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..."
			return task
		end

Task.new { ... }.run

		# Create a new task.
		# @param reactor [Async::Reactor] the reactor this task will run within.
		# @param parent [Async::Task] the parent task.
		def initialize(reactor, parent = Task.current?, logger: nil, finished: nil, **options, &block)
			super(parent || reactor, **options)
			
			@reactor = reactor
			
			@status = :initialized
			@result = nil
			@finished = finished
			
			@logger = logger
			
			@fiber = make_fiber(&block)
		end

make_fiberにブロックがそのまま渡っている

		def make_fiber(&block)
			Fiber.new do |*arguments|
				set!
				
				begin
					@result = yield(self, *arguments)
					@status = :complete
					# logger.debug(self) {"Task was completed with #{@children.size} children!"}
				rescue Stop
					stop!
				rescue StandardError => error
					fail!(error, false)
				rescue Exception => exception
					fail!(exception, true)
				ensure
					# logger.debug(self) {"Task ensure $!=#{$!} with #{@children.size} children!"}
					finish!
				end
			end
		end

→むり

このスクラップは2021/04/02にクローズされました