なるほどTCPソケットを読んで、ネットワークアーキテクチャパターンを理解した。
なるほどTCPソケットを読んで、ネットワークアーキテクチャパターンを理解した。
前提
サーバーソケットは接続を開始せず、待機する。接続の開始はクライアント起点となり、サーバーライフサイクルは次のようになる
- 作成(create)
- 割当(bind)
- 待機(listen)
- 受付(accept)
- クローズ(close)
TCPServer#newで、新しいサーバー接続を作成・割当・待機。
TCPServer#acceptで、新しいクライアント接続を受け付ける。
require "socket"
server = TCPServer.new(4481) # 1.作成 2.割当 3.待機
loop do
connection = server.accept # 4. 受付
print 'Connection class: '
p connection.class
print 'Server fileno: '
p server.fileno
print 'Connection fileno: '
p connection.fileno
print 'Local address: '
p connection.local_address
print 'Remote address: '
p connection.remote_address
connection.close # 5. クローズ
end
acceptで返された新しいクライアント接続はサーバーソケットとは異なるファイルディスクリプタ番号を持っており、サーバソケットとは異なる新しいソケットインスタンスが作成されることが確認できる。
Connection class: TCPSocket
Server fileno: 7
Connection fileno: 8
Local address: #<Addrinfo: [::1]:4481 TCP>
Remote address: #<Addrinfo: [::1]:51799 TCP>
ネットワークアーキテクチャを説明するため、簡易なFTPプログラムを想定して、クライアント接続の処理に対するパターンごとの差分を把握する。
利用するFTPプログラムは記載の通り。
module FTP
class CommandHandler
CRLF = "\r\n"
attr_reader :connection
def initialize(connection)
@connection = connection
end
def pwd
@pwd || Dir.pwd
end
def handle(data)
cmd = data[0..3].strip.upcase
options = data[4..-1].strip
case cmd
when 'USER'
# 匿名を受け付ける
"230 Logged in anonymously"
when 'SYST'
# このシステムが何と言う名前か?
"215 UNIX Working With FTP"
when 'CWD'
if File.directory?(options)
@pwd = options
"250 directory changed to #{pwd}"
else
"550 directory not found"
end
when 'PWD'
"257 \"#{pwd}\" is the current directory"
when 'PORT'
parts = options.split(',')
ip_address = parts[0..3].join('.')
port = Integer(parts[4]) * 256 + Integer(parts[5])
@data_socket = TCPSocket.new(ip_address, port)
"200 Active connection established (#{port})"
when 'RETR'
file = File.open(File.join(pwd, options), 'r')
connection.respond "125 Data transfer starting #{file.size} bytes"
bytes = IO.copy_stream(file, @data_socket)
@data_socket.close
"226 Closing data connection, sent #{bytes} bytes"
when 'LIST'
connection.respond "125 Opening data connection for file list"
result = Dir.entries(pwd).join(CRLF)
@data_socket.write(result)
@data_socket.close
"226 Closing data connection, sent #{result.size} bytes"
when 'QUIT'
"221 Ciao"
else
"502 Don't know how to respond to #{cmd}"
end
end
end
end
シリアル
全てのクライアント接続はシリアルに処理される。
- クライアントが接続する。
- クライアント/サーバーでリクエストとレスポンスをやりとりする。
- クライアントが切断する。
- 1 に戻る
require 'socket'
module FTP
CRLF = "\r\n"
class Serial
def initialize(port = 21)
@control_socket = TCPServer.new(port)
trap(:INT) { exit }
end
def gets
@client.gets(CRLF)
end
def respond(message)
@client.write(message)
@client.write(CRLF)
end
def run
loop do
@client = @control_socket.accept
respond "220 OHAI"
handler = CommandHandler.new(self)
loop do
request = gets # クライアントからリクエストが来るまで同期処理で待つ。そのため並列処理は不可。
if request
respond handler.handle(request)
else
@client.close
break
end
end
end
end
end
end
考察
すべての接続はシリアルに行われるため並列に処理されない。後から来たクライアント接続は、先のクライアント接続の処理が終わるまで接続開始できない。
コネクションごとのプロセス
クライアント接続はプロセスで並行に処理される。
- 接続がサーバーにやってくる
- サーバーの主プロセスが接続を受け付ける
- サーバーの主プロセスは新しい子プロセスをforkする。子プロセスはサーバープロセスの完全なコピーを持つ
- 子プロセスは接続に対する処理を継続し、それと平行して、サーバーの主プロセスはステップ 1 に戻る。
常に1つの親プロセスが接続を受け付けるために待機し、複数の子プロセスがそれぞれ処理される。
module FTP
CRLF = "\r\n"
class ProcessPerConnection
def initialize(port = 21)
@control_socket = TCPServer.new(port)
trap(:INT) { exit }
end
def gets
@client.gets(CRLF)
end
def respond(message)
@client.write(message)
@client.write(CRLF)
end
def run
loop do
@client = @control_socket.accept
# 接続を受け付けた後、子プロセスが接続を処理するためにforkする。
# 親プロセスはforkブロック内の処理は実施しない。
pid = fork do
respond "220 OHAI"
handler = CommandHandler.new(self)
loop do
request = gets # forkされた子プロセスが同期処理を行う
if request
respond handler.handle(request)
else
@client.close
break
end
end
end
Process.detach(pid)
end
end
end
end
Kernel.#fork
ブロックを指定して呼び出した場合には、生成した子プロセスでブロックを評価します。
考察
プロセスの生成はリソースを多く消費する可能性がある。
また、生成可能なプロセス数の制限がなく、数千数百のプロセスを生成するとシステムがダウンすることになる可能性がある。
生成可能なプロセス数の制限は、「prefork」により行う。
コネクションごとのスレッド
クライアント接続はスレッドで並行に処理される。
require 'socket'
require 'thread'
module FTP
Connection = Struct.new(:client) do
CRLF = "\r\n"
def gets
client.gets(CRLF)
end
def respond(message)
client.write(message)
client.write(CRLF)
end
def close
client.close
end
end
class ThreadPerConnection
def initialize(port = 21)
@control_socket = TCPServer.new(port)
trap(:INT) { exit }
end
def run
Thread.abort_on_exception = true
loop do
# acceptが返ったクライアントソケットをスレッドごとに管理するためにConnection.newの引数に渡す
conn = Connection.new(@control_socket.accept)
Thread.new do
conn.respond "220 OHAI"
handler = CommandHandler.new(self)
loop do
request = conn.gets
if request
conn.respond handler.handle(request)
else
conn.close
break
end
end
end
end
end
end
end
考察
スレッドはプロセスより軽量だが、共有状態の問題が発生する可能性がある。
ただし、スレッド数はシステムをダウンさせるまで増やすことができる。
prefork
接続の到着毎にクライアント接続を作成せずに、サーバーを起動するタイミングなどで事前にプロセスを指定数作成しておく。
- メインのサーバープロセスは接続を待機するソケットを作成する。
- メインのサーバープロセスは子プロセスの群れを fork する。
- それぞれの子プロセスは、共有しているソケット上の接続を受け付け、個々にそれらを処理する。
- メインのサーバープロセスは子プロセスを監視する。
module FTP
class Preforking
CRLF = "\r\n"
CONCURRENCY = 4
def initialize(port = 21)
@control_socket = TCPServer.new(port)
trap(:INT) { exit }
end
def gets
@client.gets(CRLF)
end
def respond(message)
@client.write(message)
@client.write(CRLF)
end
def run
child_pids = []
CONCURRENCY.times do
child_pids << spawn_child
end
trap(:INT) { # 親プロセスが死んだタイミングで子プロセスへ伝播
child_pids.each do |cpid|
begin
Process.kill(:INT, cpid)
rescue Errno::ESRCH
end
end
exit
}
loop do
puts "Process.wait" # ここで親プロセスが子プロセスを監視している。子プロセスは終了しないので、終了した場合は異常終了に相当。
pid = Process.wait
$stderr.puts "Process #{pid} quit unexpectedly"
child_pids.delete(pid)
child_pids << spawn_child
end
end
def spawn_child
fork do
loop do
@client = @control_socket.accept # fork後にacceptしている、子プロセス側で処理がきたらaccept処理
respond "220 OHAI"
handler = CommandHandler.new(self)
loop do
request = gets
if request
respond handler.handle(request)
else
@client.close
break
end
end
end
end
end
end
end
考察
fork後に各プロセスでacceptし、クライアント接続を開始する。接続のたびにforkしないので、処理が軽量となる。
スレッドプール
preforkパターンのスレッド版。
module FTP
Connection = Struct.new(:client) do
CRLF = "\r\n"
def gets
client.gets(CRLF)
end
def respond(message)
client.write(message)
client.write(CRLF)
end
def close
client.close
end
end
class ThreadPool
CONCURRENCY = 25
def initialize(port = 21)
@control_socket = TCPServer.new(port)
trap(:INT) { exit }
end
def run
Thread.abort_on_exception = true
threads = ThreadGroup.new
CONCURRENCY.times do
threads.add spawn_thread
end
sleep
end
def spawn_thread
Thread.new do
loop do
# 1つのスレッドで1つのクライアント接続を受け付ける
conn = Connection.new(@control_socket.accept)
conn.respond "220 OHAI"
handler = CommandHandler.new(self)
loop do
request = conn.gets
if request
conn.respond handler.handle(request)
else
conn.close
break
end
end
end
end
end
end
end
考察
スレッドはプロセスよりも軽量のため、CONCURRENCYの回数を増やせる。
イベント(Reactor)
接続を多重化しすべてシングルスレッドで行われる。
- サーバーは接続を待機するソケットを監視する。
- 新しい接続がやってくると、サーバーは監視するソケットのリストにその接続を追加する。
- 接続を待機するソケットと同様に、サーバーはアクティブな接続を監視する。
- アクティブな接続が読み込み可能だという通知を受けると、サーバーは接続からデータの固まりを読み込み、関連するコールバックを呼び出す。
- アクティブな接続がまだ読み込み可能だという通知を受けると、サーバーは再び接続からデータの固まりを読み込み、関連するコールバックを呼び出す。
- サーバーは別の接続を受信すると、監視するソケットのリストにその接続を追加する。
- サーバーは最初の接続が書き込み可能だという通知を受けると、その接続にレスポンスを書き込む。
module FTP
class Evented
# イベントパターンはシングルスレッドで複数の接続を処理する。1つの接続をこのクラスに集約するために作成した。
# クライアントからの接続はことなるユーザーからの接続となるため、お互いの状態に知る必要がないため独自のオブジェクトを使った表現をする。
class Connection
CRLF = "\r\n"
attr_reader :client
def initialize(io)
@client = io
@request, @response = "", ""
@handler = CommandHandler.new(self)
respond "220 OHAI"
on_writable
end
def on_data(data)
@request << data
if @request.end_with?(CRLF)
respond @handler.handle(@request)
@request = ""
end
end
def respond(message) # "220 OHAI"のデータが@responseに格納される。
@response << message + CRLF
on_writable
end
def on_writable
bytes = client.write_nonblock(@response)
@response.slice!(0, bytes) # wirte_noblockで書き込んだ分だけ@responseデータを削除
end
def monitor_for_reading?
true
end
def monitor_for_writing?
!(@response.empty?)
end
end
def initialize(port = 21)
@control_socket = TCPServer.new(port)
trap(:INT) { exit }
end
def run
@handles = {}
loop do
to_read = @handles.values.select(&:monitor_for_reading?).map(&:client)
to_write = @handles.values.select(&:monitor_for_writing?).map(&:client)
readbles, writables = IO.select(to_read + [@control_socket], to_write)
readbles.each do |socket|
if socket == @control_socket
io = @control_socket.accept
connection = Connection.new(io) # 新しいクライアント接続が来た場合は、クライアント接続を引数にConnectを作成。control_socketが読み込み可能は新しいClient接続が来たことを意味する
@handles[io.fileno] = connection # クライアント接続ごとのハッシュを作成する
else
connection = @handles[socket.fileno]
begin
data = socket.read_nonblock(CHUNK_SIZE) # socketはacceptで作成されたクライアント接続。ノンブロッキングIOでデータを読み込み、接続Connectionクラスへdataを渡す。
connection.on_data(data)
rescue Errno::EAGAIN
rescue EOFError
@handles.delete(socket.fileno)
end
end
end
writables.each do |socket|
connection = @handles[socket.fileno]
connection.on_writable
end
end
end
end
end
考察
Reactorパターンは他のパターンとは異なっている。
プロセスやスレッドに依存しないため、数千、数万の同時接続を捌くことができる。5000の同時接続を捌くため、5000のプロセスをforkすることは難しい。
ブロッキング処理を実施すると、1つのクライアント接続が遅い場合に全体が遅延するため、io処理を見直す必要がある。
Reactorパターンは特にI/Oバウンドのアプリケーションに効果的で、数千、数万の同時接続を捌くことができる。
ハイブリッド
nginxは、「prefork」パターンを使用している。しかし、forkしたそれぞれのプロセスの内側は「Reactor」パターンとなっている。[1]
Pumaのメインスレッドは「Reactor」パターンで処理され、個別のリクエスト処理は、「スレッドプール」となっている。[2]
Discussion