🐷
Gemini Proを非同期処理で呼び出す
Geminiについて
Geminiとは、Google提供するLLMの一つです。OpenAIのChatGPT APIのように、APIが用意されています。今回は、そのAPIへのリクエストの非同期実装を行います。
なぜ非同期で呼び出したいのか?
動機は、ただ単純に、Geminiへの多くのリクエストを短時間で処理したいからです。そういうことで、非同期処理でGemini Proを呼び出すコードを実装します。技術的には、Pythonの標準パッケージであるasyncioを使います。
asyncioを使う理由
Pythonだと、非同期実行を可能にしてくれるパッケージとして、multiprocessingとasyncioの2つがあります。今回は、asyncioを選びました。API呼び出しタスクのようなI/Oタスクでの非同期処理に特化しているからです。ファイル読み込みやAPI呼び出し中に、スレッドが待ちに入ります。asyncioは、そのスレッドを使って、別の処理を実行することを可能にします。こうすることで、全体の処理の高速化が実現できます。
コード紹介
Gemini API 呼び出し編
まずは、パッケージのインポートを行います。
import os
import aiohttp
import asyncio
以下のコードは、非同期でGemini APIを呼び出すコードです。GCP経由の呼び出しなので、os.getenv('GCP_PROJECT_ID')を使っています。aiohttpを用いて、非同期で処理しています。
parse_streaming_response関数は、レスポンスからGeminiによって生成されたテキストを取り出す処理を行っています。
def parse_streaming_response(streaming_response):
def recursive_parse(part):
if "candidates" in part:
return [recursive_parse(candidate) for candidate in part["candidates"]]
else:
return part["content"]
final_response = [
part["candidates"][0].get("content", {}).get("parts", [{}])[0].get("text", "") for part in streaming_response
]
final_response = "".join(final_response)
return final_response
async def async_call_gemini_pro(prompt: str):
url = f"https://us-central1-aiplatform.googleapis.com/v1/projects/{os.getenv('GCP_PROJECT_ID')}/locations/us-central1/publishers/google/models/gemini-1.0-pro:streamGenerateContent"
header = {
"Authorization": f"Bearer {os.getenv('GEMINI_API_TOKEN')}",
"Content-Type": "application/json; charset=utf-8"
}
json_body = {
"contents": {
"role": "user",
"parts": {
"text": prompt
}
},
"safety_settings": {
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
"threshold": "BLOCK_LOW_AND_ABOVE"
},
"generation_config": {
"temperature": 0.9,
"topP": 1.0,
"maxOutputTokens": 200
}
}
async with aiohttp.ClientSession() as session:
async with session.post(
url,
headers=header,
json=json_body
) as response:
streaming_response = await response.json()
final_response = parse_streaming_response(streaming_response)
return final_response
以下がメインの非同期処理を呼び出すコードです。
async def main():
tasks = []
for i in range(10):
tasks.append(async_call_gemini_pro(
"今の気分を詩にしてください")
)
results = await asyncio.gather(*tasks)
print("Results")
print(results)
で、結局速くなったのか?
asyncioありとなしで、10回のリクエストを処理する実行時間を比較してみました。
非同期 or Not | 実行時間(秒) |
---|---|
非同期 | 2.9 |
Not 非同期 | 21.5 |
Discussion