🏃

大量にAPIを呼ぶときにやること

2024/04/24に公開

バッチ処理などで、大量にAPIを呼ぶときにいつもやっていることを書きます。言語や使用しているライブラリ・フレームワークによって細部は異なりますが、だいたい以下のようなことをします。

  1. スレッドプールを使って一定並列数でアクセスする
  2. Exponential Backoff に基づいてリトライをする
  3. ログとインジケーターを出力する

今回はRubyで実装していきます。

スレッドプールを使って一定並列数でアクセスする

APIを呼ぶというのはIO処理をすることなのでIO待ちが発生します。そのため、大量にアクセスする場合はスレッドなどを使って並列で実行することでIO待ちの時間を有効に使うようにします。

このとき数限りなくスレッドを生成すると、コンピュータリソースを食い尽くしてしまいますし、アクセス先に一度に大量のアクセスをしてしまい一瞬でレートリミットに達したりするので、スレッドプールを使って並列数を制限します。

Ruby だと Thread::Queue もしくは Thread::SizedQueue を使うことで簡単にスレッドプールを実装できます。

class ThreadPool
  def initialize(n)
    @n = n
  end

  def start
    @queue = Thread::SizedQueue.new(@n)
    @threads = @n.times.map do
      Thread.new { self.exec }
    end
  end

  def stop
    @queue.close
    @threads.each(&:join)
  end

  def perform(&block)
    @queue.push(block)
  end

  private

  def exec
    while task = @queue.pop
      task.call
    end
  end
end
thread_pool = ThreadPool.new(10)
thread_pool.start

1_000.times do |i|
  thread_pool.perform do
    API.put({ data: i })
  end
end

thread_pool.stop

Exponential Backoff に基づいてリトライをする

APIを呼ぶときは、レートリミットエラーなどで失敗することがあるのでリトライできるようにします。

いろいろなリトライ戦略がありますが、一般的には Exponential Backoff という戦略が取られることが多いです。これはリトライするまでの時間をベースとなるリトライ時間から上限に達するまで指数関数的に増やしていくことでリクエストの最適化をはかるものとなります。これは以下のような数式で表されます。

  • \rm{base}: ベースとなるリトライ遅延
  • \rm{cap}: リトライ遅延上限
  • \rm{attempt}: リトライ回数
  • \rm{backoff}_{exp}: リトライ遅延
\rm{backoff}_{exp}(\rm{attempt}) = \min(\rm{cap},\;\; 2^{\rm{attempt}} \cdot \rm{base})

基本的にはこの戦略は有効ですが、このままでは以下のようにスパイクが生じてしまいます。

AWS Solutions Architect ブログ: Exponential Backoff And Jitter

そのため、乱数成分Jitterを加えスパイクを抑制する Exponential Backoff And Jitter[1][2] という手法が実際には使われます。Jitter にはいくつか種類がありますが、 ここでは Equal Jitter という極端に短いリトライ間隔を回避しつつ、全体をできるだけ小さくするものを採用します。これにより、改良したリトライ遅延 \rm{backoff}_{exp'} は以下のようになります。

\rm{backoff}_{exp'}(\rm{attempt}) = \frac{\rm{backoff}_{exp}(\rm{attempt})}{2} + \rm{random\_between}(0, \; \frac{\rm{backoff}_{exp}(\rm{attempt})}{2})

Ruby で実装するときはリトライする処理をブロックで囲み、指定した例外を投げたときに上記のリトライ戦略に基づきリトライ処理するようにします。このとき、たとえばアクセス先が障害を起こしたときに無限にリトライし続けるのを防ぐために、最大試行回数を超えたときは処理を中断できるようにしておきます。

class Retryer
  class RetryError < StandardError; end

  class Backoff
    def call(attempt)
      raise NotImplementedError
    end
  end

  class ExponentialBackoff < Backoff
    def initialize(base:, cap:)
      @base = base
      @cap = cap
    end

    def call(attempt)
      [@cap, 2 ** attempt * @base].min
    end
  end

  class ExponentialBackoffAndEqualJitter < ExponentialBackoff
    def call(attempt)
      exp = super
      exp / 2.0 + random_between(0, exp / 2.0)
    end

    private

    def random_between(min, max)
      rand * (max - min) + min
    end
  end

  def initialize(
    *retryable_exceptions,
    max_attempt: 10,
    backoff: ExponentialBackoffAndEqualJitter.new(base: 0.1, cap: 10),
    on_retry: ->(attempt, wait) {}
  )
    @retryable_exceptions = retryable_exceptions.empty? ? [RetryError] : retryable_exceptions
    @max_attempt = max_attempt
    @backoff = backoff
    @on_retry = on_retry
  end

  def retryable(&block)
    attempt = 0
    begin
      return block.call
    rescue *@retryable_exceptions
      raise if attempt >= @max_attempt
      attempt += 1
      wait = @backoff.call(attempt)
      @on_retry.call(attempt, wait)
      sleep wait
      retry
    end
  end
end
retryer = Retryer.new(
  API::RateLimitError,
  max_attempt: 10,
  backoff: Retryer::ExponentialBackoffAndEqualJitter.new(base: 0.1, cap: 10),
)
thread_pool = ThreadPool.new(10)
thread_pool.start

1_000.times do |i|
  thread_pool.perform do
    retryer.retryable do
      API.put({ data: i })
    end
  end
end

thread_pool.stop

ログとインジケーターを出力する

大量にAPIを呼ぶときは、進捗を確認するためのインジケーターの出力とエラーが発生したときにログを出力したいです。

インジケーターにはよくドット.を表示することをしますが、このときにログを出すと以下のように改行の問題がでることがあります。

....I, [2024-04-19T09:23:00.742595 #23681]  INFO -- : message1
I, [2024-04-19T09:24:00.742595 #23681]  INFO -- : message2
..

そのため、以下のようにログとインジケーターを出しつつ、適切な改行を入れるようにします。今回は Ruby の Logger<< という渡された文字列をそのまま出力するだけのメソッドがあるので、Loggerを拡張する形で実装します。

<< が呼ばれると改行が必要かどうかのフラグを立てて、ログを出力するときにこのフラグが立っていれば改行を出力するようにします。なお、複数スレッドからアクセスされるので Thread::Mutex を使って排他処理をしてスレッドセーフにしておきます。

class LoggerWithIndicator
  def initialize(logger)
    @base = logger
    @mutex = Thread::Mutex.new
    @need_newline = false
  end

  %i[debug info warn error fatal unknown].each do |method|
    define_method(method) do |message|
      output { @base.send(method, message) }
    end
  end

  %i[add log].each do |method|
    define_method(method) do |severity, message = nil, progname = nil, &block|
      output { @base.send(method, severity, message, progname, &block) }
    end
  end

  def <<(msg)
    @mutex.synchronize do
      @need_newline = msg.last != "\n"
      @base << msg
    end
  end

  def method_missing(method, *args, &block)
    @base.send(method, *args, &block)
  end

  private

  def output
    @mutex.synchronize do
      if @need_newline
        @base << "\n"
        @need_newline = false
      end
      yield
    end
  end
end
logger = Logger.new($stdout)
logger = LoggerWithIndicator.new(logger)

retryer = Retryer.new(
  API::RateLimitError,
  max_attempt: 10,
  backoff: Retryer::ExponentialBackoffAndEqualJitter.new(base: 0.1, cap: 10),
  on_retry: ->(attempt, wait) {
    logger.info "Retrying #{attempt}th time(waiting for #{wait.round(3)} seconds)..."
  }
)
thread_pool = ThreadPool.new(10)
thread_pool.start
logger.info 'Start'

1_000.times do |i|
  thread_pool.perform do
    retryer.retryable do
      API.put({ data: i })
    end
    logger << '.'
  end
end

logger.info 'Stopping...'
thread_pool.stop
logger.info 'Stopped'

以上で、効率的にAPIにアクセスすることができるようになりました。このように、スレッドプールを使って一定並列数でアクセスし、Exponential Backoff に基づいてリトライをすることで、効率的にアクセスできるようになります。

なお、APIによってはレートリミット解除時間がレスポンスに含まれることがあるので、その場合はそれを使ってリトライ遅延を設定します。

脚注
  1. AWS Solutions Architect ブログ: Exponential Backoff And Jitter ↩︎

  2. クラウド・ネイティブのお作法(2)「リトライ」~効率的なリトライ手法「Exponential Backoff and jitter」とは何か (1/3):CodeZine(コードジン) ↩︎

ハートレイルズ

Discussion