▶️
Python 並列処理を Progress bar で可視化する方法
概要
- Python で並列処理を行うときに Progress bar を表示する方法について
- 今回は
ThreadPoolExecutor
を使用
ThreadPoolExecutor
の動作
- コンストラクタ (
__init__
) で指定した数のスレッドをあらかじめ起動する。 -
submit(fn, *args)
で仕事をキューに投入すると、ワーカースレッドが順次取り出してfn(*args) を実行し、結果をFuture
に格納。 - 最終的に
shutdown()
されることでキューにNone
が投入され、ワーカースレッドが停止処理を行い、すべてのタスクが完了するとプールを閉じる。
progress bar と並列処理の競合
- スレッドが同時に進捗を更新すると表示が競合する。
- タスクがいつ完了したかを正確に把握し、完了タイミングでのみ進捗を進める必要がある。
-
as_completed
を使用して、タスクが完了するたびにイベントを受け取り、進捗を1ステップずつ更新する。
as_completed
の動作
- 複数の
Future
を引数に取り、完了した順に取得できるイテレータを返す。 - タスクが終わるごとにイベント通知され、
as_completed
側では順次yield
する。 - 完了したタスクを随時取得できるため、正しいタイミングで進捗を更新できる。
実装例
from concurrent.futures import ThreadPoolExecutor, as_completed
from rich.progress import Progress
def do_work(item):
# 並列実行する処理(例:重い計算、I/Oなど)
return item
def main():
data = range(10)
with Progress() as progress:
task = progress.add_task("[green]Processing...", total=len(data))
with ThreadPoolExecutor() as executor:
futures = [executor.submit(do_work, x) for x in data]
for future in as_completed(futures):
_ = future.result() # 結果を受け取る
progress.advance(task, 1) # 完了ごとに進捗を1つ進める
if __name__ == "__main__":
main()
-
Progress
コンテキスト内でタスクを作成 -
ThreadPoolExecutor
により並列にタスクを実行 -
as_completed
で完了を検知し、progress.advance(...)
で進捗を1つ更新
Discussion