PythonでThreadPoolExecutorを使ってマルチスレッド処理を実装する
この記事は何?
ThreadPoolExecutor を使ったマルチスレッド処理の概要と使い所、主な使い方を説明する記事です。
想定読者
- 何らかのPython処理を並列化して高速化したいが、どのような処理をすれば良いのか分からない人
- マルチスレッド処理の特徴を知りたい人
- ThreadPoolExecutor の使い方を知りたい人
TL;DR
- システム外部のI/O待ちが発生するHTTPリクエストやDBアクセスを含む処理はマルチスレッド処理を使った高速化ができる場合があるよ
- ThreadPoolExecutor を使うとマルチスレッド処理が簡単に書けるよ
- ThreadPoolExecutor を使ったマルチスレッド処理は Futureオブジェクト と呼ばれる "未来に実行結果が入るオブジェクト" を扱うよ
- ThreadPoolExecutor を使ったマルチスレッド処理で全件Futureオブジェクトをメモリに保持するとメモリ不足になる可能性があるので定期的に外部に結果を吐き出してリスト内容をクリアするなどしてメモリをリセットすると良いよ
実行環境
- Windows11 24H2
- Python 3.11
マルチスレッド処理を使った高速化が有効なケース
まず最初に、マルチスレッド処理を使うと処理が高速化しやすいユースケースを記載します。
以下に記載するパターンにあてはまる場合はマルチスレッド処理を利用してみてください。
ケース1_大量のHTTPリクエストを発生させるサーバー側処理
システムが利用するデータを加工するため、大量のレコードを入力としてシステム外部のWebAPIにHTTPリクエストをしなければならない場合があります。
入力レコードが10万件、100万件といったオーダーになってくるとシンプルにループしてHTTPリクエストするだけでは100時間以上必要になることもザラです。
このようにHTTPリクエスト処理が遅すぎる!という場合はマルチスレッド処理による高速化が有効です。
ケース2_大量にDBへのクエリやコマンドを発生させるサーバー側処理
システムが利用するデータを加工するため、大量のレコードを入力としてシステム外部のDBに入力データを条件に含むSELECTクエリなどのデータ取得処理をしなければならない場合があります。
同様に、システムが利用するデータを登録・更新するため、大量のレコードを入力としてシステム外部のDBに入力データを条件に含むINSERT/UPDATE などのデータ登録・更新処理をしなければならない場合があります。
入力レコードが10万件、100万件といったオーダーになってくるとシンプルにループしてDBアクセスするだけでは100時間以上必要になることもザラです。
このようにDBアクセスを大量に繰り返すような処理が遅い場合はマルチスレッド処理による高速化が有効です。
※DBへの接続処理もボトルネックになりがちなので、コネクションプール機能があれば併用すると更に高速化できます
TIPS_I/Oバウンド処理とCPUバウンド処理
マルチスレッド処理によって高速化できる処理は I/Oバウンドな処理 と呼ばれます。
HTTPリクエストやDB読み書きなど、プログラム外部の応答待ちになるような処理が当てはまります。
プログラム外部の応答待ち時間はCPUが空いているため、CPU空き時間に次のI/Oバウンドな処理を実行しよう!というのがマルチスレッド処理です。
プログラム内で完結する処理は CPUバウンドな処理 になりがちで、マルチスレッド処理ではあまり高速化できません。
上記のような処理はマルチプロセス処理で実行しましょう。この記事では触れないので調べてみてください。
ThreadPoolExecutor の使い方
ThreadPoolExecutorの概要
concurrent.futures モジュールに入っている並列処理を行うためのPython組み込み機能です。
詳細は以下の公式ドキュメントをご覧ください。
マルチスレッド処理したい関数の準備
Pythonでマルチスレッド処理を作成したい場合、まずは高速化したい関数の準備をしましょう。
1つの入力を元に1つの高速化したい処理を行うように関数を定義してください。
今回の記事で推奨するユースケースでは、HTTPリクエストやDBアクセスを行う関数を準備することになります。
たとえば以下のような関数を準備してみます。
# HTTPリクエストでよく使われるrequestsライブラリを利用します。必要に応じて実行環境にインストールしてください
import requests
def some_http_request(input_param: str) -> str:
"""入力値を元にHTTPリクエストする処理
Args:
input_param (str): 入力パラメータ
Returns:
str: HTTPレスポンスを加工した返り値
"""
input_param_dict = {"some_param": input_param}
# なんらかのHTTPリクエストを実行してレスポンスを受け取る
response = requests.post("https://example.com", json=input_param_dict)
# レスポンスのJSONをdict型に変換
response_json_dict = response.json()
# dict型に含まれるなんらかの返り値を取得する。キーが存在しない場合は空文字を受け取る
result_str = response_json_dict.get("some_json_key", "")
return result_str
マルチスレッド処理で関数を呼び出すコード
ThreadPoolExecutorを使ったマルチスレッド処理はとてもシンプルです。
マルチスレッド処理を使いたい場合、単純なforループでは遅すぎる処理を高速化したいはずです。
そこで、それまでforループで順番に実行していた処理を executor.submit を経由した実行に変更してください。
これでマルチスレッド処理の完成です。
つまり、普通のforループ処理を少し改変するだけでマルチスレッド処理を実装できるということですね。
このコードを実行した際の挙動については後述します。
以下にマルチスレッド処理を呼び出すコードの例を示します。
from concurrent.futures import ThreadPoolExecutor, Future
import time
from typing import Iterator
def some_process_with_multithread(input_param_iter: Iterator[str]) -> list[str]
"""イテレータから供給されるパラメータを元にマルチスレッド処理でHTTPリクエストする
Args:
input_param_iter (Iterator[str]): forループで回すとリクエストパラメータが供給されるイテレータ
forループで回せれば何でも良いのでイテレータ型に限らずリスト型でも良いです。
パラメータが複数のフィールドやカラムを持つ場合は Iterator[なんらかのデータクラス型] にすると扱いやすいです。
Returns:
list[str]: HTTPリクエストの結果が格納されたリスト
レスポンス1つごとに複数フィールドの返り値を持つ場合は list[なんらかのデータクラス型] にすると扱いやすいです。
"""
# max_workersには最大並列数を記載します
# 求める処理時間や、接続先のAPIやDBの性能上限に引っかからないような値を考慮して指定してください
with ThreadPoolExecutor(max_workers=5) as executor:
# Futureオブジェクトリストの初期化
tmp_future_list: list[Future[str]] = []
# イテレータをループしてHTTPリクエストに利用するパラメータを受け取っていく
for request_no, input_param in enumerate(input_param_iter):
# executor.submitの第1引数にはマルチスレッド実行したい関数オブジェクト、第2引数以降にはマルチスレッド実行したい関数の引数を渡します
future: Future[str] = executor.submit(some_http_request, input_param)
tmp_future_list.append(future)
マルチスレッド処理による高速化
executor.submit 関数を実行すると、引数で渡したsome_http_request関数の実行予約が行われます。
予約するだけなので、HTTPリクエストが完了していなくても即座に Future[str] 型の返り値が返却され、次のループ処理が実行されます。
マルチスレッド処理でない場合のコードでは、1件分の some_http_request 関数が完了しない限り次のループ処理を実行することはできませんでした。いわゆる同期処理ですね。
executor.submit 関数を利用することで、1件分の some_http_request 関数が完了する前に、どんどん次の some_http_request 関数を実行予約していくことができます。
実行予約された some_http_request 関数は裏側で同時に実行されるため、1件ずつ処理するよりも高速化することができます。
ワーカー数による実行上限
関数が完了しなくても次のループを処理できるということは、全ての処理をまとめて1回で実行できるのでしょうか?
もちろんそんなことはなく ThreadPoolExecutor インスタンスの生成時に引数で渡した max_workers が同時に実行できる上限です。
大量のHTTPリクエストを同時に実行すると、リクエスト先のサーバーの応答が遅くなるなどリクエスト先が高速化のボトルネックになる場合があります。
処理を高速化する場合にワーカー数をどんどん増やせば良い、という訳ではないので最高効率を目指すためにはチューニングが必要です。
Futureオブジェクトの説明
executor.submit で関数の実行予約をすると即座に Future[str] オブジェクトが返却されます。
Futureオブジェクトは、その時点では実行予約しただけなので実行結果の返り値である str 型の値を保持していません。
実行予約後、しばらく待つと実行予約した関数が終了し、Futureオブジェクトに実行予約した関数の返り値である str 型の値が格納されます。
Futureオブジェクトに実行予約関数の返り値が格納された後は future.result() でstr型の値を取得可能です。
通常のforループであれば、ループを抜けた時点で処理が全て完了していることが確実です。
一方、マルチスレッド処理の場合forループを抜けた時点ではまだ実行予約が全件完了しただけで関数そのものが完了しているとは限らないため注意が必要です。
Future型は "Futureオブジェクトを生成した時点では関数処理が完了していないが、時間が経過して関数処理が完了した未来では返り値が取得できる" というデータ型だと理解すると良いでしょう。
Futureオブジェクトの完了待機
Futureオブジェクトから実行予約した関数の返り値を受け取るためには、Futureオブジェクトの完了を待機する必要があります。
Futureオブジェクトの完了を待機せずに future.result() を実行した場合、そのFutureが完了するまで待機してから返り値を取得することになり、通常のforループのような同期処理と変わらなくなってしまいます。
concurrent.futures モジュールには as_completed という関数があり、これを使うとリスト内のFutureのうち完了したFutureから順不同で返り値を取得できるイテレータが返却されます。
複数の実行予約を活かしつつ、終わった端から結果を処理する場合におすすめです。
以下に例を示します。
from concurrent.futures import ThreadPoolExecutor, Future, as_completed
import time
from typing import Iterator
# : 省略
def some_process_with_multithread(input_param_iter: Iterator[str]) -> list[str]
with ThreadPoolExecutor(max_workers=5) as executor:
# 完了していないFutureを一時的に貯めておくリスト
tmp_future_list: list[Future[str]] = []
# イテレータをループしてHTTPリクエスト関数の実行予約をしていく
for request_no, input_param in enumerate(input_param_iter, 1):
future: Future[str] = executor.submit(some_http_request, input_param)
tmp_future_list.append(future)
# 全ての実行予約が完了後、リスト内のFutureが完了した端から実行予約した関数の返り値を取り出すイテレータを生成する
completed_future_iter: Iterator[Future[str]] = as_completed(tmp_future_list)
# イテレータ内の関数返り値を取得して何らかの処理をしていく
for completed_future in completed_future_iter:
print(f"request_result 結果: {completed_future.result()}")
メモリに優しいマルチスレッド処理の方法
マルチスレッド処理を必要とするような大量の処理を実行する際、全ての実行結果をリストに格納しているとメモリが不足する可能性があります。
n件分の実行予約をするごとに結果を待機してDBやファイル、ログなどに出力することでメモリを節約することが可能です。
以下は100件ごとに途中結果を出力し、完了したFutureをメモリから忘れるサンプルです。
from concurrent.futures import ThreadPoolExecutor, Future, as_completed
import time
from typing import Iterator
# : 省略
def some_process_with_multithread(input_param_iter: Iterator[str]) -> list[str]
with ThreadPoolExecutor(max_workers=5) as executor:
# 完了していないFutureを一時的に貯めておくリスト
tmp_future_list: list[Future[str]] = []
# イテレータをループしてHTTPリクエスト関数の実行予約をしていく
for request_no, input_param in enumerate(input_param_iterm, 1):
future: Future[str] = executor.submit(some_http_request, input_param)
tmp_future_list.append(future)
# 100件の実行予約が完了したら、100件分が全件実行完了するまで待機する
if request_no % 100 == 0:
completed_feature_iter: Iterator[Future[str]] = as_completed(tmp_future_list)
# イテレータ内の関数返り値を取得して外部へ出力するなどの処理を実行していく
for completed_feature in completed_feature_iter:
print(f"request_result 結果: {completed_feature.result()}")
# 結果を取得し終わった tmp_future_list は空にする
tmp_future_list.clear()
Discussion