🐷

Gemini Proを非同期処理で呼び出す

2024/03/25に公開

Geminiについて

Geminiとは、Google提供するLLMの一つです。OpenAIのChatGPT APIのように、APIが用意されています。今回は、そのAPIへのリクエストの非同期実装を行います。
https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/gemini?hl=ja

なぜ非同期で呼び出したいのか?

動機は、ただ単純に、Geminiへの多くのリクエストを短時間で処理したいからです。そういうことで、非同期処理でGemini Proを呼び出すコードを実装します。技術的には、Pythonの標準パッケージであるasyncioを使います。

asyncioを使う理由

Pythonだと、非同期実行を可能にしてくれるパッケージとして、multiprocessingとasyncioの2つがあります。今回は、asyncioを選びました。API呼び出しタスクのようなI/Oタスクでの非同期処理に特化しているからです。ファイル読み込みやAPI呼び出し中に、スレッドが待ちに入ります。asyncioは、そのスレッドを使って、別の処理を実行することを可能にします。こうすることで、全体の処理の高速化が実現できます。

コード紹介

Gemini API 呼び出し編

まずは、パッケージのインポートを行います。

import os

import aiohttp
import asyncio

以下のコードは、非同期でGemini APIを呼び出すコードです。GCP経由の呼び出しなので、os.getenv('GCP_PROJECT_ID')を使っています。aiohttpを用いて、非同期で処理しています。
parse_streaming_response関数は、レスポンスからGeminiによって生成されたテキストを取り出す処理を行っています。


def parse_streaming_response(streaming_response):
    def recursive_parse(part):
        if "candidates" in part:
            return [recursive_parse(candidate) for candidate in part["candidates"]]
        else:
            return part["content"]
    final_response = [
        part["candidates"][0].get("content", {}).get("parts", [{}])[0].get("text", "") for part in streaming_response
    ]
    final_response = "".join(final_response)
    return final_response


async def async_call_gemini_pro(prompt: str):

    url = f"https://us-central1-aiplatform.googleapis.com/v1/projects/{os.getenv('GCP_PROJECT_ID')}/locations/us-central1/publishers/google/models/gemini-1.0-pro:streamGenerateContent"
    header = {
        "Authorization": f"Bearer {os.getenv('GEMINI_API_TOKEN')}",
        "Content-Type": "application/json; charset=utf-8"
    }
    json_body = {
        "contents": {
            "role": "user",
            "parts": {
                "text": prompt
            }
        },
        "safety_settings": {
            "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
            "threshold": "BLOCK_LOW_AND_ABOVE"
        },
        "generation_config": {
            "temperature": 0.9,
            "topP": 1.0,
            "maxOutputTokens": 200
        }
    }
    async with aiohttp.ClientSession() as session:
        async with session.post(
            url,
            headers=header,
            json=json_body
        ) as response:
            streaming_response = await response.json()
            final_response = parse_streaming_response(streaming_response)
            return final_response

以下がメインの非同期処理を呼び出すコードです。

async def main():
    tasks = []
    for i in range(10):
        tasks.append(async_call_gemini_pro(
            "今の気分を詩にしてください")
        )
    results = await asyncio.gather(*tasks)
    print("Results")
    print(results)

で、結局速くなったのか?

asyncioありとなしで、10回のリクエストを処理する実行時間を比較してみました。

非同期 or Not 実行時間(秒)
非同期 2.9
Not 非同期 21.5

Discussion