😎
OpenAIのRateLimitErrorに向き合う
結論:RateLimit終了まで待機するコンテクストマネージャーを作った
使い方
async with ratelimiter:
response = await self.client.chat.completions.create(
messages=msgs,
model="gpt4-vision-preview",
)
return response
内部実装
class RateLimiter:
"""period_seconds秒あたりfrequency回のリクエストまで投げられる非同期リクエストコンテクストマネジャー。長さfrequencyの最新の利用タイミングにおけるタイムスタンプを保持し、最古のタイムスタンプが現在よりperiod_seconds秒以上前であればリクエストが投げられる。
"""
def __init__(self, frequency: int, period_seconds: int):
self.frequency = frequency
self.recent_accesses: deque[datetime] = deque(maxlen=frequency)
self.period_seconds = timedelta(seconds=period_seconds)
async def acquire(self):
"""リクエスト可能になるまで待機するメソッド
"""
while True:
now = datetime.now()
# 最古のタイムスタンプをみてリクエスト可能になるまで待機する
if (
len(self.recent_accesses) == self.frequency
and now - self.period_seconds < self.recent_accesses[0]
):
await asyncio.sleep(
(self.recent_accesses[0] - now + self.period_seconds).seconds
)
else:
self.recent_accesses.append(now)
return
async def __aenter__(self):
"""__aenter__を定義することでAsync withと合わせて使えるようになる。リクエスト可能になるまで待機する。
"""
await self.acquire()
return self
async def __aexit__(self, exc_type, exc, tb):
pass
@classmethod
def per_minute(cls, frequency: int):
return cls(frequency, 60)
@classmethod
def per_second(cls, frequency: int):
return cls(frequency, 1)
※プロセスセーフではありません。
※APIコール回数に対するリミットしか管理しません。
動機
結論で終わりです。以下蛇足です。
LLMやSaaSの進化によってPythonにおける非同期通信の重要性が極めて高まりました。みなさんのサービスの中にも非同期通信でAIをガシガシ呼び出すコードが書かれているはず。そんなときに厄介なのがRateLimitError。
RateLimitErrorのレスポンスが返ってきたときにRetry機構を取り付けるなどでエラーをハンドルすることはできるんですが
- APIごとにレスポンスをParseしてエラーハンドルしなければならない
- SaaSプロバイダへの負担が大きくなるようなリクエストはそもそも投げたくない
などの事情があり、RateLimitErrorは起こってから対処するのではなく未然に防ぐ必要があると考えました。
意外なことにPythonの主要な通信ライブラリにこの機能を簡単に実装する方法が見当たらなかったので自分で実装してソースコードを公開することにしました。
軽量でAPI仕様への依存度がなくAPIごとに簡単にレートリミットを指定できるところをウリにしています。
例えばOpenAIのクライアントをWrapしてRateLimit付きのクライアントを定義することができます。
@dataclass
class GPTVClient:
client: AsyncOpenAI
ratelimiter: RateLimiter
async def chat_completion(
self, msgs: list[ChatCompletionMessageParam]
) -> ChatCompletion:
"""チャットの補完を行う。
Args:
msgs (list[ChatCompletionMessageParam]): チャットメッセージのリスト。
Returns:
ChatCompletion: チャットの補完結果。
"""
async with self.ratelimiter:
response = await self.client.chat.completions.create(
messages=msgs,
model="gpt4-vision-preview",
)
return response
最後に
以上です。上のコードは気に入っていますが、今後SaaS越しに様々なAIを複合した大きなサービスを作るトレンドがくることは目に見えているのでこうしたことを統一的に管理してくれるOSSが出てきてほしいですね。(もうあったりして)
Discussion