💭

なるほどTCPソケットを読んで、ネットワークアーキテクチャパターンを理解した。

2024/11/17に公開

なるほどTCPソケットを読んで、ネットワークアーキテクチャパターンを理解した。

前提

サーバーソケットは接続を開始せず、待機する。接続の開始はクライアント起点となり、サーバーライフサイクルは次のようになる

  1. 作成(create)
  2. 割当(bind)
  3. 待機(listen)
  4. 受付(accept)
  5. クローズ(close)

TCPServer#newで、新しいサーバー接続を作成・割当・待機。
TCPServer#acceptで、新しいクライアント接続を受け付ける。

require "socket"
server = TCPServer.new(4481) # 1.作成 2.割当 3.待機
loop do
  connection = server.accept # 4. 受付

  print 'Connection class: '
  p connection.class
  
  print 'Server fileno: '
  p server.fileno
  
  print 'Connection fileno: '
  p connection.fileno
  
  print 'Local address: '
  p connection.local_address
  
  print 'Remote address: '
  p connection.remote_address

  connection.close # 5. クローズ
end

acceptで返された新しいクライアント接続はサーバーソケットとは異なるファイルディスクリプタ番号を持っており、サーバソケットとは異なる新しいソケットインスタンスが作成されることが確認できる。

Connection class: TCPSocket
Server fileno: 7
Connection fileno: 8
Local address: #<Addrinfo: [::1]:4481 TCP>
Remote address: #<Addrinfo: [::1]:51799 TCP>

ネットワークアーキテクチャを説明するため、簡易なFTPプログラムを想定して、クライアント接続の処理に対するパターンごとの差分を把握する。
利用するFTPプログラムは記載の通り。

module FTP
  class CommandHandler
    CRLF = "\r\n"

    attr_reader :connection
    def initialize(connection)
      @connection = connection
    end

    def pwd
      @pwd || Dir.pwd
    end

    def handle(data)
      cmd = data[0..3].strip.upcase
      options = data[4..-1].strip

      case cmd
      when 'USER'
        # 匿名を受け付ける
        "230 Logged in anonymously"

      when 'SYST'
        # このシステムが何と言う名前か?
        "215 UNIX Working With FTP"

      when 'CWD'
        if File.directory?(options)
          @pwd = options
          "250 directory changed to #{pwd}"
        else
          "550 directory not found"
        end

      when 'PWD'
        "257 \"#{pwd}\" is the current directory"

      when 'PORT'
        parts = options.split(',')
        ip_address = parts[0..3].join('.')
        port = Integer(parts[4]) * 256 + Integer(parts[5])

        @data_socket = TCPSocket.new(ip_address, port)
        "200 Active connection established (#{port})"

      when 'RETR'
        file = File.open(File.join(pwd, options), 'r')
        connection.respond "125 Data transfer starting #{file.size} bytes"

        bytes = IO.copy_stream(file, @data_socket)
        @data_socket.close

        "226 Closing data connection, sent #{bytes} bytes"

      when 'LIST'
        connection.respond "125 Opening data connection for file list"

        result = Dir.entries(pwd).join(CRLF)
        @data_socket.write(result)
        @data_socket.close

        "226 Closing data connection, sent #{result.size} bytes"

      when 'QUIT'
        "221 Ciao"

      else
        "502 Don't know how to respond to #{cmd}"
      end
    end
  end
end

シリアル

全てのクライアント接続はシリアルに処理される。

  1. クライアントが接続する。
  2. クライアント/サーバーでリクエストとレスポンスをやりとりする。
  3. クライアントが切断する。
  4. 1 に戻る
require 'socket'

module FTP
  CRLF = "\r\n"

  class Serial
    def initialize(port = 21)
      @control_socket = TCPServer.new(port)
      trap(:INT) { exit }
    end

    def gets
      @client.gets(CRLF)
    end

    def respond(message)
      @client.write(message)
      @client.write(CRLF)
    end

    def run
      loop do
        @client = @control_socket.accept
        respond "220 OHAI"

        handler = CommandHandler.new(self)

        loop do
          request = gets # クライアントからリクエストが来るまで同期処理で待つ。そのため並列処理は不可。

          if request
            respond handler.handle(request)
          else
            @client.close
            break
          end
        end
      end
    end
  end
end

考察

すべての接続はシリアルに行われるため並列に処理されない。後から来たクライアント接続は、先のクライアント接続の処理が終わるまで接続開始できない。

コネクションごとのプロセス

クライアント接続はプロセスで並行に処理される。

  1. 接続がサーバーにやってくる
  2. サーバーの主プロセスが接続を受け付ける
  3. サーバーの主プロセスは新しい子プロセスをforkする。子プロセスはサーバープロセスの完全なコピーを持つ
  4. 子プロセスは接続に対する処理を継続し、それと平行して、サーバーの主プロセスはステップ 1 に戻る。

常に1つの親プロセスが接続を受け付けるために待機し、複数の子プロセスがそれぞれ処理される。

module FTP
  CRLF = "\r\n"

  class ProcessPerConnection
    def initialize(port = 21)
      @control_socket = TCPServer.new(port)
      trap(:INT) { exit }
    end

    def gets
      @client.gets(CRLF)
    end

    def respond(message)
      @client.write(message)
      @client.write(CRLF)
    end

    def run
      loop do
        @client = @control_socket.accept
        # 接続を受け付けた後、子プロセスが接続を処理するためにforkする。
        # 親プロセスはforkブロック内の処理は実施しない。
        pid = fork do
          respond "220 OHAI"
  
          handler = CommandHandler.new(self)
  
          loop do
            request = gets # forkされた子プロセスが同期処理を行う
  
            if request
              respond handler.handle(request)
            else
              @client.close
              break
            end
          end
        end
        Process.detach(pid)
      end
    end
  end
end

Kernel.#fork
ブロックを指定して呼び出した場合には、生成した子プロセスでブロックを評価します。

考察

プロセスの生成はリソースを多く消費する可能性がある。
また、生成可能なプロセス数の制限がなく、数千数百のプロセスを生成するとシステムがダウンすることになる可能性がある。
生成可能なプロセス数の制限は、「prefork」により行う。

コネクションごとのスレッド

クライアント接続はスレッドで並行に処理される。

require 'socket'
require 'thread'

module FTP
  Connection = Struct.new(:client) do
    CRLF = "\r\n"
    
    def gets
      client.gets(CRLF)
    end

    def respond(message)
      client.write(message)
      client.write(CRLF)
    end

    def close
      client.close
    end
  end

  class ThreadPerConnection
    def initialize(port = 21)
      @control_socket = TCPServer.new(port)
      trap(:INT) { exit }
    end

    def run
      Thread.abort_on_exception = true

      loop do
        # acceptが返ったクライアントソケットをスレッドごとに管理するためにConnection.newの引数に渡す
        conn = Connection.new(@control_socket.accept)
        Thread.new do
          conn.respond "220 OHAI"
  
          handler = CommandHandler.new(self)
  
          loop do
            request = conn.gets
  
            if request
              conn.respond handler.handle(request)
            else
              conn.close
              break
            end
          end
        end
      end
    end
  end
end

考察

スレッドはプロセスより軽量だが、共有状態の問題が発生する可能性がある。
ただし、スレッド数はシステムをダウンさせるまで増やすことができる。

prefork

接続の到着毎にクライアント接続を作成せずに、サーバーを起動するタイミングなどで事前にプロセスを指定数作成しておく。

  1. メインのサーバープロセスは接続を待機するソケットを作成する。
  2. メインのサーバープロセスは子プロセスの群れを fork する。
  3. それぞれの子プロセスは、共有しているソケット上の接続を受け付け、個々にそれらを処理する。
  4. メインのサーバープロセスは子プロセスを監視する。
module FTP
  class Preforking
    CRLF = "\r\n"
    CONCURRENCY = 4
    
    def initialize(port = 21)
      @control_socket = TCPServer.new(port)
      trap(:INT) { exit }
    end

    def gets
      @client.gets(CRLF)
    end

    def respond(message)
      @client.write(message)
      @client.write(CRLF)
    end

    def run
      child_pids = []
      
      CONCURRENCY.times do
        child_pids << spawn_child
      end

      trap(:INT) { # 親プロセスが死んだタイミングで子プロセスへ伝播
        child_pids.each do |cpid|
          begin
            Process.kill(:INT, cpid)
          rescue Errno::ESRCH
          end
        end
        exit
      }

      loop do
        puts "Process.wait" # ここで親プロセスが子プロセスを監視している。子プロセスは終了しないので、終了した場合は異常終了に相当。
        pid = Process.wait
        $stderr.puts "Process #{pid} quit unexpectedly"
        child_pids.delete(pid)
        child_pids << spawn_child
      end
    end

    def spawn_child
      fork do
        loop do
          @client = @control_socket.accept # fork後にacceptしている、子プロセス側で処理がきたらaccept処理
          respond "220 OHAI"
          handler = CommandHandler.new(self)
  
          loop do
            request = gets
  
            if request
              respond handler.handle(request)
            else
              @client.close
              break
            end
          end
        end
      end
    end
  end
end

考察

fork後に各プロセスでacceptし、クライアント接続を開始する。接続のたびにforkしないので、処理が軽量となる。

スレッドプール

preforkパターンのスレッド版。

module FTP
  Connection = Struct.new(:client) do
    CRLF = "\r\n"
    
    def gets
      client.gets(CRLF)
    end

    def respond(message)
      client.write(message)
      client.write(CRLF)
    end

    def close
      client.close
    end
  end

  class ThreadPool
    CONCURRENCY = 25

    def initialize(port = 21)
      @control_socket = TCPServer.new(port)
      trap(:INT) { exit }
    end

    def run
      Thread.abort_on_exception = true
      threads = ThreadGroup.new

      CONCURRENCY.times do
        threads.add spawn_thread
      end
      sleep
    end

    def spawn_thread
      Thread.new do
        loop do
          # 1つのスレッドで1つのクライアント接続を受け付ける
          conn = Connection.new(@control_socket.accept)
          conn.respond "220 OHAI"
  
          handler = CommandHandler.new(self)
  
          loop do
            request = conn.gets
  
            if request
              conn.respond handler.handle(request)
            else
              conn.close
              break
            end
          end
        end
      end
    end
  end
end

考察

スレッドはプロセスよりも軽量のため、CONCURRENCYの回数を増やせる。

イベント(Reactor)

接続を多重化しすべてシングルスレッドで行われる。

  1. サーバーは接続を待機するソケットを監視する。
  2. 新しい接続がやってくると、サーバーは監視するソケットのリストにその接続を追加する。
  3. 接続を待機するソケットと同様に、サーバーはアクティブな接続を監視する。
  4. アクティブな接続が読み込み可能だという通知を受けると、サーバーは接続からデータの固まりを読み込み、関連するコールバックを呼び出す。
  5. アクティブな接続がまだ読み込み可能だという通知を受けると、サーバーは再び接続からデータの固まりを読み込み、関連するコールバックを呼び出す。
  6. サーバーは別の接続を受信すると、監視するソケットのリストにその接続を追加する。
  7. サーバーは最初の接続が書き込み可能だという通知を受けると、その接続にレスポンスを書き込む。
module FTP
  class Evented
    # イベントパターンはシングルスレッドで複数の接続を処理する。1つの接続をこのクラスに集約するために作成した。
    # クライアントからの接続はことなるユーザーからの接続となるため、お互いの状態に知る必要がないため独自のオブジェクトを使った表現をする。
    class Connection
      CRLF = "\r\n"
      attr_reader :client
    
      def initialize(io)
        @client = io
        @request, @response = "", ""
        
        @handler = CommandHandler.new(self)
        respond "220 OHAI"
        on_writable
      end

      def on_data(data)
        @request << data
        if @request.end_with?(CRLF)
          respond @handler.handle(@request)
          @request = ""
        end
      end

      def respond(message) # "220 OHAI"のデータが@responseに格納される。
        @response << message + CRLF
        on_writable
      end

      def on_writable
        bytes = client.write_nonblock(@response)
        @response.slice!(0, bytes) # wirte_noblockで書き込んだ分だけ@responseデータを削除
      end

      def monitor_for_reading?
        true
      end

      def monitor_for_writing?
        !(@response.empty?)
      end
    end

    def initialize(port = 21)
      @control_socket = TCPServer.new(port)
      trap(:INT) { exit }
    end

    def run
      @handles = {}

      loop do
        to_read = @handles.values.select(&:monitor_for_reading?).map(&:client)
        to_write = @handles.values.select(&:monitor_for_writing?).map(&:client)

        readbles, writables = IO.select(to_read + [@control_socket], to_write)
        readbles.each do |socket|
          if socket == @control_socket
            io = @control_socket.accept
            connection = Connection.new(io)  # 新しいクライアント接続が来た場合は、クライアント接続を引数にConnectを作成。control_socketが読み込み可能は新しいClient接続が来たことを意味する
            @handles[io.fileno] = connection # クライアント接続ごとのハッシュを作成する
          else
            connection = @handles[socket.fileno]
            begin
              data = socket.read_nonblock(CHUNK_SIZE) # socketはacceptで作成されたクライアント接続。ノンブロッキングIOでデータを読み込み、接続Connectionクラスへdataを渡す。
              connection.on_data(data)
            rescue Errno::EAGAIN
            rescue EOFError
              @handles.delete(socket.fileno)
            end
          end
        end
        writables.each do |socket|
          connection = @handles[socket.fileno]
          connection.on_writable
        end
      end
    end
  end
end

考察

Reactorパターンは他のパターンとは異なっている。
プロセスやスレッドに依存しないため、数千、数万の同時接続を捌くことができる。5000の同時接続を捌くため、5000のプロセスをforkすることは難しい。
ブロッキング処理を実施すると、1つのクライアント接続が遅い場合に全体が遅延するため、io処理を見直す必要がある。
Reactorパターンは特にI/Oバウンドのアプリケーションに効果的で、数千、数万の同時接続を捌くことができる。

ハイブリッド

nginxは、「prefork」パターンを使用している。しかし、forkしたそれぞれのプロセスの内側は「Reactor」パターンとなっている。[1]
Pumaのメインスレッドは「Reactor」パターンで処理され、個別のリクエスト処理は、「スレッドプール」となっている。[2]

脚注
  1. https://aosabook.org/en/v2/nginx.html ↩︎

  2. https://github.com/puma/puma/blob/master/docs/architecture.md ↩︎

Discussion