LangChainのRate Limiterが何をしているかコード読んでみた
先日LangChainがRate Limitによるエラーを回避する機能をリリースしました。
次のようにマックスの値を指定するとRate Limitエラーが出ないようにしてくれます。
from langchain_core.rate_limiters import InMemoryRateLimiter
from langchain_anthropic import ChatAnthropic
rate_limiter = InMemoryRateLimiter(
requests_per_second=0.1, # <-- Super slow! We can only make a request once every 10 seconds!!
check_every_n_seconds=0.1, # Wake up every 100 ms to check whether allowed to make a request,
max_bucket_size=10, # Controls the maximum burst size.
)
model = ChatAnthropic(model_name="claude-3-opus-20240229", rate_limiter=rate_limiter)
とは言えRate Limitは各種LLMプロバイダーが設けているものであり、LangChainがどうこうできるものではありません。なので活用を検討するにあたって、実際には何をやっているかを理解するために下記のコードを読んでみました。
トークンバケットアルゴリズム
コードの中身を見てみるとトークンバケットアルゴリズムを使用していました。
内容は至極シンプルで次のような挙動を取ります。
- バケットには一定のレートでトークンが追加される
- バケットは定義された最大容量まで満たす
- リクエストが来るたびに、必要な数のトークンがバケットから消費される
- バケットに十分なトークンがある場合、リクエストは許可される
- トークンが不足している場合、リクエストは拒否またはキューに入れられる
実際のコードを見てみます。
class InMemoryRateLimiter(BaseRateLimiter):
def __init__(
self,
*,
requests_per_second: float = 1,
check_every_n_seconds: float = 0.1,
max_bucket_size: float = 1,
) -> None:
self.requests_per_second = requests_per_second
self.available_tokens = 0.0
self.max_bucket_size = max_bucket_size
self._consume_lock = threading.Lock()
self.last: Optional[float] = None
self.check_every_n_seconds = check_every_n_seconds
このInMemoryRateLimiterクラスは、トークンバケットの基本的な要素を全部持っています。
- requests_per_second: これは1秒あたりに追加されるトークンの数
- available_tokens: 現在バケットにあるトークンの数
- max_bucket_size: バケットが保持できるトークンの最大数
トークンの消費は _consume メソッドで行われます:
def _consume(self) -> bool:
"""Try to consume a token.
Returns:
True means that the tokens were consumed, and the caller can proceed to
make the request. A False means that the tokens were not consumed, and
the caller should try again later.
"""
with self._consume_lock:
now = time.monotonic()
# initialize on first call to avoid a burst
if self.last is None:
self.last = now
elapsed = now - self.last
if elapsed * self.requests_per_second >= 1:
self.available_tokens += elapsed * self.requests_per_second
self.last = now
# Make sure that we don't exceed the bucket size.
# This is used to prevent bursts of requests.
self.available_tokens = min(self.available_tokens, self.max_bucket_size)
# As long as we have at least one token, we can proceed.
if self.available_tokens >= 1:
self.available_tokens -= 1
return True
return False
このメソッドは、前回のチェックからの経過時間に基づいてトークンを追加し、リクエストが可能かどうかを判断します。トークンが利用可能な場合、1つのトークンを消費してTrueを返します。
使う時の注意点
という感じの内容だったのですが、以下の点に注意が必要だなと思いました。
ユーザの待機時間の増加:
Rate Limiterはエラーを防ぐ代わりに、必要に応じてリクエストを遅延させます。
システム的にエラーが出なくとも何十秒もかかってしまうのであればユーザ体験はどちらにしろ大きく毀損されます。(なんなら素直にスッとエラー出した方がいい可能性もあるかもしれない)
なので、ユーザに提供するところで使う場合はそういった観点で注意が必要です。
一方で「評価時に大量のデータセットに対して一度にドカッと送る」みたいな時は「時間はかかっても良く、全部がきっちり終わるのが大事」みたいなユースケースにはハマるでしょう。
プロセス間での共有はなし:
Rate Limiterはメモリ内で動作するため、異なるプロセス間でのレート制限は行いません。なので分散システムでは機能しません。。
以上、気になったので中身を読んでみました、便利な機能なのは間違い無いですが、ユースケースによっては本当に求めているものではないかもしれないのでちょっと注意が必要そうだなという所感でした。
Discussion