😎

OpenAIのRateLimitErrorに向き合う

2024/03/17に公開

結論:RateLimit終了まで待機するコンテクストマネージャーを作った

使い方

async with ratelimiter:
    response = await self.client.chat.completions.create(
        messages=msgs,
        model="gpt4-vision-preview",
    )
return response

内部実装

class RateLimiter:
    """period_seconds秒あたりfrequency回のリクエストまで投げられる非同期リクエストコンテクストマネジャー。長さfrequencyの最新の利用タイミングにおけるタイムスタンプを保持し、最古のタイムスタンプが現在よりperiod_seconds秒以上前であればリクエストが投げられる。
    """
    def __init__(self, frequency: int, period_seconds: int):
        self.frequency = frequency
        self.recent_accesses: deque[datetime] = deque(maxlen=frequency)
        self.period_seconds = timedelta(seconds=period_seconds)

    async def acquire(self):
        """リクエスト可能になるまで待機するメソッド
        """
        while True:
            now = datetime.now()
            # 最古のタイムスタンプをみてリクエスト可能になるまで待機する
            if (
                len(self.recent_accesses) == self.frequency
                and now - self.period_seconds < self.recent_accesses[0]
            ):
                await asyncio.sleep(
                    (self.recent_accesses[0] - now + self.period_seconds).seconds
                )
            else:
                self.recent_accesses.append(now)
                return

    async def __aenter__(self):
        """__aenter__を定義することでAsync withと合わせて使えるようになる。リクエスト可能になるまで待機する。
        """
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        pass

    @classmethod
    def per_minute(cls, frequency: int):
        return cls(frequency, 60)

    @classmethod
    def per_second(cls, frequency: int):
        return cls(frequency, 1)

※プロセスセーフではありません。
※APIコール回数に対するリミットしか管理しません。

動機

結論で終わりです。以下蛇足です。
LLMやSaaSの進化によってPythonにおける非同期通信の重要性が極めて高まりました。みなさんのサービスの中にも非同期通信でAIをガシガシ呼び出すコードが書かれているはず。そんなときに厄介なのがRateLimitError。

RateLimitErrorのレスポンスが返ってきたときにRetry機構を取り付けるなどでエラーをハンドルすることはできるんですが

  • APIごとにレスポンスをParseしてエラーハンドルしなければならない
  • SaaSプロバイダへの負担が大きくなるようなリクエストはそもそも投げたくない
    などの事情があり、RateLimitErrorは起こってから対処するのではなく未然に防ぐ必要があると考えました。

意外なことにPythonの主要な通信ライブラリにこの機能を簡単に実装する方法が見当たらなかったので自分で実装してソースコードを公開することにしました。

軽量でAPI仕様への依存度がなくAPIごとに簡単にレートリミットを指定できるところをウリにしています。

例えばOpenAIのクライアントをWrapしてRateLimit付きのクライアントを定義することができます。

@dataclass
class GPTVClient:
    client: AsyncOpenAI
    ratelimiter: RateLimiter

    async def chat_completion(
        self, msgs: list[ChatCompletionMessageParam]
    ) -> ChatCompletion:
        """チャットの補完を行う。

        Args:
            msgs (list[ChatCompletionMessageParam]): チャットメッセージのリスト。

        Returns:
            ChatCompletion: チャットの補完結果。
        """
        async with self.ratelimiter:
            response = await self.client.chat.completions.create(
                messages=msgs,
                model="gpt4-vision-preview",
            )
        return response

最後に

以上です。上のコードは気に入っていますが、今後SaaS越しに様々なAIを複合した大きなサービスを作るトレンドがくることは目に見えているのでこうしたことを統一的に管理してくれるOSSが出てきてほしいですね。(もうあったりして)

Discussion