大量にAPIを呼ぶときにやること
バッチ処理などで、大量にAPIを呼ぶときにいつもやっていることを書きます。言語や使用しているライブラリ・フレームワークによって細部は異なりますが、だいたい以下のようなことをします。
- スレッドプールを使って一定並列数でアクセスする
- Exponential Backoff に基づいてリトライをする
- ログとインジケーターを出力する
今回はRubyで実装していきます。
スレッドプールを使って一定並列数でアクセスする
APIを呼ぶというのはIO処理をすることなのでIO待ちが発生します。そのため、大量にアクセスする場合はスレッドなどを使って並列で実行することでIO待ちの時間を有効に使うようにします。
このとき数限りなくスレッドを生成すると、コンピュータリソースを食い尽くしてしまいますし、アクセス先に一度に大量のアクセスをしてしまい一瞬でレートリミットに達したりするので、スレッドプールを使って並列数を制限します。
Ruby だと Thread::Queue
もしくは Thread::SizedQueue
を使うことで簡単にスレッドプールを実装できます。
class ThreadPool
def initialize(n)
@n = n
end
def start
@queue = Thread::SizedQueue.new(@n)
@threads = @n.times.map do
Thread.new { self.exec }
end
end
def stop
@queue.close
@threads.each(&:join)
end
def perform(&block)
@queue.push(block)
end
private
def exec
while task = @queue.pop
task.call
end
end
end
thread_pool = ThreadPool.new(10)
thread_pool.start
1_000.times do |i|
thread_pool.perform do
API.put({ data: i })
end
end
thread_pool.stop
Exponential Backoff に基づいてリトライをする
APIを呼ぶときは、レートリミットエラーなどで失敗することがあるのでリトライできるようにします。
いろいろなリトライ戦略がありますが、一般的には Exponential Backoff という戦略が取られることが多いです。これはリトライするまでの時間をベースとなるリトライ時間から上限に達するまで指数関数的に増やしていくことでリクエストの最適化をはかるものとなります。これは以下のような数式で表されます。
-
: ベースとなるリトライ遅延\rm{base} -
: リトライ遅延上限\rm{cap} -
: リトライ回数\rm{attempt} -
: リトライ遅延\rm{backoff}_{exp}
基本的にはこの戦略は有効ですが、このままでは以下のようにスパイクが生じてしまいます。
— AWS Solutions Architect ブログ: Exponential Backoff And Jitter
そのため、乱数成分Jitterを加えスパイクを抑制する Exponential Backoff And Jitter[1][2] という手法が実際には使われます。Jitter にはいくつか種類がありますが、 ここでは Equal Jitter という極端に短いリトライ間隔を回避しつつ、全体をできるだけ小さくするものを採用します。これにより、改良したリトライ遅延
Ruby で実装するときはリトライする処理をブロックで囲み、指定した例外を投げたときに上記のリトライ戦略に基づきリトライ処理するようにします。このとき、たとえばアクセス先が障害を起こしたときに無限にリトライし続けるのを防ぐために、最大試行回数を超えたときは処理を中断できるようにしておきます。
class Retryer
class RetryError < StandardError; end
class Backoff
def call(attempt)
raise NotImplementedError
end
end
class ExponentialBackoff < Backoff
def initialize(base:, cap:)
@base = base
@cap = cap
end
def call(attempt)
[@cap, 2 ** attempt * @base].min
end
end
class ExponentialBackoffAndEqualJitter < ExponentialBackoff
def call(attempt)
exp = super
exp / 2.0 + random_between(0, exp / 2.0)
end
private
def random_between(min, max)
rand * (max - min) + min
end
end
def initialize(
*retryable_exceptions,
max_attempt: 10,
backoff: ExponentialBackoffAndEqualJitter.new(base: 0.1, cap: 10),
on_retry: ->(attempt, wait) {}
)
@retryable_exceptions = retryable_exceptions.empty? ? [RetryError] : retryable_exceptions
@max_attempt = max_attempt
@backoff = backoff
@on_retry = on_retry
end
def retryable(&block)
attempt = 0
begin
return block.call
rescue *@retryable_exceptions
raise if attempt >= @max_attempt
attempt += 1
wait = @backoff.call(attempt)
@on_retry.call(attempt, wait)
sleep wait
retry
end
end
end
retryer = Retryer.new(
API::RateLimitError,
max_attempt: 10,
backoff: Retryer::ExponentialBackoffAndEqualJitter.new(base: 0.1, cap: 10),
)
thread_pool = ThreadPool.new(10)
thread_pool.start
1_000.times do |i|
thread_pool.perform do
retryer.retryable do
API.put({ data: i })
end
end
end
thread_pool.stop
ログとインジケーターを出力する
大量にAPIを呼ぶときは、進捗を確認するためのインジケーターの出力とエラーが発生したときにログを出力したいです。
インジケーターにはよくドット.
を表示することをしますが、このときにログを出すと以下のように改行の問題がでることがあります。
....I, [2024-04-19T09:23:00.742595 #23681] INFO -- : message1
I, [2024-04-19T09:24:00.742595 #23681] INFO -- : message2
..
そのため、以下のようにログとインジケーターを出しつつ、適切な改行を入れるようにします。今回は Ruby の Logger
に <<
という渡された文字列をそのまま出力するだけのメソッドがあるので、Logger
を拡張する形で実装します。
<<
が呼ばれると改行が必要かどうかのフラグを立てて、ログを出力するときにこのフラグが立っていれば改行を出力するようにします。なお、複数スレッドからアクセスされるので Thread::Mutex
を使って排他処理をしてスレッドセーフにしておきます。
class LoggerWithIndicator
def initialize(logger)
@base = logger
@mutex = Thread::Mutex.new
@need_newline = false
end
%i[debug info warn error fatal unknown].each do |method|
define_method(method) do |message|
output { @base.send(method, message) }
end
end
%i[add log].each do |method|
define_method(method) do |severity, message = nil, progname = nil, &block|
output { @base.send(method, severity, message, progname, &block) }
end
end
def <<(msg)
@mutex.synchronize do
@need_newline = msg.last != "\n"
@base << msg
end
end
def method_missing(method, *args, &block)
@base.send(method, *args, &block)
end
private
def output
@mutex.synchronize do
if @need_newline
@base << "\n"
@need_newline = false
end
yield
end
end
end
logger = Logger.new($stdout)
logger = LoggerWithIndicator.new(logger)
retryer = Retryer.new(
API::RateLimitError,
max_attempt: 10,
backoff: Retryer::ExponentialBackoffAndEqualJitter.new(base: 0.1, cap: 10),
on_retry: ->(attempt, wait) {
logger.info "Retrying #{attempt}th time(waiting for #{wait.round(3)} seconds)..."
}
)
thread_pool = ThreadPool.new(10)
thread_pool.start
logger.info 'Start'
1_000.times do |i|
thread_pool.perform do
retryer.retryable do
API.put({ data: i })
end
logger << '.'
end
end
logger.info 'Stopping...'
thread_pool.stop
logger.info 'Stopped'
以上で、効率的にAPIにアクセスすることができるようになりました。このように、スレッドプールを使って一定並列数でアクセスし、Exponential Backoff に基づいてリトライをすることで、効率的にアクセスできるようになります。
なお、APIによってはレートリミット解除時間がレスポンスに含まれることがあるので、その場合はそれを使ってリトライ遅延を設定します。
Discussion