🔰

ProcessPoolExecutorでExcelファイルを処理する

2024/04/11に公開

ProcessPoolExecutorでExcelファイルを処理する

ProcessPoolExecutorとは

concurrent.futuresモジュールに含まれる並列処理を行うためのクラス。

何ができるのか?

  • マルチプロセス処理ができる
  • プロセスを自動的に管理し、タスクの分配や結果の取得ができる
  • 簡単

ThreadPoolExecutorとの違い

同様に並列処理を実現するクラスにThreadPoolExecutorがあるが、ThreadPoolExecutorとの違いは、ThreadPoolExecutorはスレッドベース、ProcessPoolExecutorはプロセスベースでありスレッド間ではメモリーを共有できるがプロセス間ではメモリーの共有はできない。スレッドベースは1つのCPUコアを共有して処理するが、プロセスベースでは複数のCPUコアを使用する。
スレッドの作成は軽量であるが、プロセスの作成は重い処理くメモリーの消費も多い。

サンプルコード

ベースのスクリプト

import os
import time

def process_task(name):
    print(f"Task {name} started")
    time.sleep(2)
    print(f"Task {name} finished")



def main():
    start_time = time.time()

    tasks = [
        "A", "B", "C", "D", "E"
    ]

    for task in tasks:
        process_task(task)
    end_time = time.time()

    print(f"Total time: {end_time - start_time}")

if __name__ == '__main__':
    main()

出力結果

Task A started
Task A finished
Task B started
Task B finished
Task C started
Task C finished
Task D started
Task D finished
Task E started
Task E finished
Total time: 10.031582355499268

tasksの順番通りに1つづつ実行されているのがわかる。

このコードをProcessPoolExecutorをつかって並列化してみる

修正箇所

  • concurrent.futuresモジュールをインポート
  • process_taskを直接実行するのではなく、executor.mapを利用して、executorに実行差せるように修正をする。
import os
import time
import concurrent.futures

def process_task(name):
    print(f"Task {name} started")
    time.sleep(2)
    print(f"Task {name} finished")

def main():
    start_time = time.time()

    tasks = ["A", "B", "C", "D", "E"]

    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor.map(process_task, tasks)

    end_time = time.time()
    print(f"Total time: {end_time - start_time}")

if __name__ == '__main__':
    main()

Task A started
Task B startedTask C started

Task D startedTask E started

Task B finishedTask C finishedTask A finished


Task E finishedTask D finished

Total time: 3.2148828506469727

配列の要素順に実行が行われるが、終了を待たずに次々とprocess_taskが実行されているのがわかる。
すべてのプロセスが終了してから、print(f"Total time: {end_time - start_time}")が実行されているのがわかる

応用

各プロセスの処理結果を取得するには

ProcessPoolExecutorに渡す関数を戻り値が返るようにすればよい。


import os
import time
import concurrent.futures

def process_task(name):
    print(f"Task {name} started")
    time.sleep(2)
    print(f"Task {name} finished")

    return f"Task {name} finished"

def main():

    tasks = ["A", "B", "C", "D", "E"]

    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = executor.map(process_task, tasks)

    print("All Processes is Completed. Results:")
    for result in results:
        print(result)

if __name__ == '__main__':
    main()
Task A started
Task B startedTask C started
Task D started

Task E started
Task A finished
Task B finished
Task C finished
Task D finished
Task E finished
All Processes is Completed. Results:
Task A finished
Task B finished
Task C finished
Task D finished
Task E finished

tqdmをつかって進捗を表示させる

mapメソッドでまとめて実行させた場合
mapメソッドの戻り値は実行した関数の戻り値のリスト

import os
import time
import concurrent.futures
from tqdm import tqdm


def process_task(name):
    time.sleep(2)

    return f"Task {name} finished"

def main():

    tasks = ["A", "B", "C", "D", "E"]

    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = list(tqdm(executor.map(process_task, tasks), total=len(tasks), desc="Processing Tasks"))

    print("All Processes is Completed. Results:")
    for result in results:
        print(result)

if __name__ == '__main__':
    main()

出力

Processing Tasks: 100%|████████████████████████████████████████████████████████████| 5/5 [00:02<00:00,  1.95it/s]
All Processes is Completed. Results:
Task A finished
Task B finished
Task C finished
Task D finished
Task E finished

submit()で個別に実行させた場合

submitで関数をサブプロセスで実行する場合、戻り値はfutureオブジェクトとなる。
completed_as()関数はfutureオブジェクトの処理が終わったfutureオブジェクトのイテレータを返す。

import os
import time
import concurrent.futures
from concurrent.futures import as_completed
from tqdm import tqdm


def process_task(name):
    time.sleep(2)

    return f"Task {name} finished"

def main():

    tasks = ["A", "B", "C", "D", "E"]

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [executor.submit(process_task, task) for task in tasks]

        for future in tqdm(as_completed(futures), total=len(futures), desc="Processing", leave=True):
            pass


    print("All Processes is Completed. Results:")
    for future in futures:
        print(future.result())


if __name__ == '__main__':
    main()
Processing: 100%|██████████████████████████████████████████████████████████████████| 5/5 [00:02<00:00,  1.98it/s]
All Processes is Completed. Results:
Task A finished
Task B finished
Task C finished
Task D finished
Task E finished

実用例

指定したフォルダ配下にあるエクセル(.xlsx or .xlsm)を読み込んで
ワークブックに含まれるワークシートのリストを作成する

tqdmとopenxlを使用するのでインストールしておく

pip install tqdm openpyxl

import os
from  concurrent.futures import ProcessPoolExecutor, as_completed
from openpyxl import load_workbook, Workbook
from openpyxl.utils import get_column_letter
import logging
import tqdm

# 指定したワークブックに含まれるワークシート名のリストを返す
def process_workbook(process_name, workbook_path):
    result = False
    sheets = []
    try:
        wb = load_workbook(workbook_path, read_only=True, data_only=True)
        sheets = [name for name in wb.sheetnames]
        result = True
    except Exception as e:
        print(e)

    return result, workbook_path, sheets

def main():
    target_dir = r"Path/to/target/directory" # 処理対象のディレクトリを指定

    files_to_process = [(file, os.path.join(root, file)) for root, dirs, files in os.walk(target_dir) for file in files if( (file.endswith('.xlsx') or file.endswith('.xlsm') ) and not file.startswith('~$'))]

    results = []
    with ProcessPoolExecutor() as executor:
        futures = [executor.submit(process_workbook, *file_path) for file_path in files_to_process]

        for  future in tqdm.tqdm(as_completed(futures), total=len(futures), desc="Processing: "):
            results.append(future.result())


    output_wb = Workbook()
    output_ws = output_wb.active
    output_ws.title = "ワークシートリスト"
    output_ws.append(["No", "ワークブック", "ワークシート名"])
    index = 1
    for result in results:
        if result[0]:
            sheets = result[2]
            for sheet in sheets:
                output_ws.append([index, result[1], sheet])
                index += 1
        else:
            logging.error(f"失敗 {result[1]}")

    output_wb.save("output.xlsx")

if __name__ == '__main__':
    main()

Discussion