💪

LLMの効率的な実行方法:ThreadPoolExecutorとasyncio.gather

2024/10/01に公開

はじめに

近年、LLM(大規模言語モデル)を利用する機会が増えています。しかし、複数のリクエストを順次実行すると、処理時間が長くなり効率的ではありません。本記事では、PythonのThreadPoolExecutorasyncio.gatherを使用して、LLMの複数実行を効率化する方法を解説します。

LLMの基本的な実行方法

今回は簡単のためLangChainを使います。

ドキュメントは以下です。

https://python.langchain.com/docs/integrations/llms/openai/

まず環境変数OPENAI_API_KEYを設定し、langchain_openaiライブラリをインストールします。

LLMの呼び出し方法は以下です。

from langchain_openai import OpenAI

llm = OpenAI()
response = llm.invoke("Hello how are you?")
print(response)

このコードでは、llm.invoke()メソッドを使用してモデルにプロンプトを送信し、その応答を取得しています。

ThreadPoolExecutorを使用したマルチスレッド実行

ThreadPoolExecutorは、複数のスレッドでタスクを並行して実行するためのクラスです。これにより、複数の処理を同時に実行できます。

from concurrent.futures import ThreadPoolExecutor
from langchain_openai import OpenAI

llm = OpenAI()

def invoke_llm(prompt):
    print(f"Prompt: {prompt}") #逐次実行されていないか確認
    return llm.invoke(prompt)

prompts = [
    "What is the capital of France?",
    "Explain the theory of relativity."
]

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(invoke_llm, prompt) for prompt in prompts]
    for future in futures:
        response = future.result()
        print(response)

コードの解説は以下です。

  • ThreadPoolExecutor(max_workers=2):同時に実行するワーカースレッドの数を指定
  • executor.submit():各プロンプトを非同期に実行するためにタスクをスケジュール
  • future.result():タスクの実行結果を取得

asyncio.gatherを使用した非同期実行

asyncioはPythonの非同期I/Oライブラリで、asyncio.gatherを使用すると複数の非同期タスクを同時に実行できます。

import asyncio
from langchain_openai import OpenAI

llm = OpenAI()

async def invoke_llm_async(prompt):
    print(f"Prompt: {prompt}") #逐次実行されていないか確認
    return await llm.ainvoke(prompt)

async def main():
    prompts = [
        "What is the capital of France?",
        "Explain the theory of relativity."
    ]
    tasks = [invoke_llm_async(prompt) for prompt in prompts]
    responses = await asyncio.gather(*tasks)
    for response in responses:
        print(response)

asyncio.run(main())

コードの解説は以下です。

  • llm.ainvoke(): 非同期版のllm.invoke()メソッド
  • asyncio.gather(*tasks):複数の非同期タスクを同時に実行

ThreadPoolExecutorとasyncio.gatherの使い分け

今回挙げたLangChainを用いてOpenAIを呼ぶI/Oバウンドな処理の場合は、スレッドを占有する必要性がないため、asyncio.gatherを採用するのが良いでしょう。

ただプロジェクトの要件、技術選定によって使い分けが必要となります。

まとめ

ThreadPoolExecutorasyncio.gatherは簡潔に以下のようにまとめられます。

  • ThreadPoolExecutor:マルチスレッドによる並行処理
  • asyncio.gather:非同期でタスクを複数実行

Discussion