▶️

Python 並列処理を Progress bar で可視化する方法

2025/03/12に公開

概要

  • Python で並列処理を行うときに Progress bar を表示する方法について
  • 今回は ThreadPoolExecutor を使用

ThreadPoolExecutor の動作

  • コンストラクタ (__init__) で指定した数のスレッドをあらかじめ起動する。
  • submit(fn, *args) で仕事をキューに投入すると、ワーカースレッドが順次取り出してfn(*args) を実行し、結果を Future に格納。
  • 最終的に shutdown() されることでキューに None が投入され、ワーカースレッドが停止処理を行い、すべてのタスクが完了するとプールを閉じる。

progress bar と並列処理の競合

  • スレッドが同時に進捗を更新すると表示が競合する。
  • タスクがいつ完了したかを正確に把握し、完了タイミングでのみ進捗を進める必要がある。
  • as_completedを使用して、タスクが完了するたびにイベントを受け取り、進捗を1ステップずつ更新する。

as_completed の動作

  • 複数の Future を引数に取り、完了した順に取得できるイテレータを返す。
  • タスクが終わるごとにイベント通知され、as_completed 側では順次 yield する。
  • 完了したタスクを随時取得できるため、正しいタイミングで進捗を更新できる。

実装例

from concurrent.futures import ThreadPoolExecutor, as_completed
from rich.progress import Progress

def do_work(item):
    # 並列実行する処理(例:重い計算、I/Oなど)
    return item

def main():
    data = range(10)
    with Progress() as progress:
        task = progress.add_task("[green]Processing...", total=len(data))
        with ThreadPoolExecutor() as executor:
            futures = [executor.submit(do_work, x) for x in data]
            for future in as_completed(futures):
                _ = future.result()  # 結果を受け取る
                progress.advance(task, 1)  # 完了ごとに進捗を1つ進める

if __name__ == "__main__":
    main()
  • Progress コンテキスト内でタスクを作成
  • ThreadPoolExecutor により並列にタスクを実行
  • as_completed で完了を検知し、progress.advance(...) で進捗を1つ更新

参考URL

Discussion