ProcessPoolExecutorでExcelファイルを処理する
ProcessPoolExecutorでExcelファイルを処理する
ProcessPoolExecutorとは
concurrent.futuresモジュールに含まれる並列処理を行うためのクラス。
何ができるのか?
- マルチプロセス処理ができる
- プロセスを自動的に管理し、タスクの分配や結果の取得ができる
- 簡単
ThreadPoolExecutorとの違い
同様に並列処理を実現するクラスにThreadPoolExecutorがあるが、ThreadPoolExecutorとの違いは、ThreadPoolExecutorはスレッドベース、ProcessPoolExecutorはプロセスベースでありスレッド間ではメモリーを共有できるがプロセス間ではメモリーの共有はできない。スレッドベースは1つのCPUコアを共有して処理するが、プロセスベースでは複数のCPUコアを使用する。
スレッドの作成は軽量であるが、プロセスの作成は重い処理くメモリーの消費も多い。
サンプルコード
ベースのスクリプト
import os
import time
def process_task(name):
print(f"Task {name} started")
time.sleep(2)
print(f"Task {name} finished")
def main():
start_time = time.time()
tasks = [
"A", "B", "C", "D", "E"
]
for task in tasks:
process_task(task)
end_time = time.time()
print(f"Total time: {end_time - start_time}")
if __name__ == '__main__':
main()
出力結果
Task A started
Task A finished
Task B started
Task B finished
Task C started
Task C finished
Task D started
Task D finished
Task E started
Task E finished
Total time: 10.031582355499268
tasksの順番通りに1つづつ実行されているのがわかる。
このコードをProcessPoolExecutorをつかって並列化してみる
修正箇所
- concurrent.futuresモジュールをインポート
- process_taskを直接実行するのではなく、executor.mapを利用して、executorに実行差せるように修正をする。
import os
import time
import concurrent.futures
def process_task(name):
print(f"Task {name} started")
time.sleep(2)
print(f"Task {name} finished")
def main():
start_time = time.time()
tasks = ["A", "B", "C", "D", "E"]
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(process_task, tasks)
end_time = time.time()
print(f"Total time: {end_time - start_time}")
if __name__ == '__main__':
main()
Task A started
Task B startedTask C started
Task D startedTask E started
Task B finishedTask C finishedTask A finished
Task E finishedTask D finished
Total time: 3.2148828506469727
配列の要素順に実行が行われるが、終了を待たずに次々とprocess_taskが実行されているのがわかる。
すべてのプロセスが終了してから、print(f"Total time: {end_time - start_time}")が実行されているのがわかる
応用
各プロセスの処理結果を取得するには
ProcessPoolExecutorに渡す関数を戻り値が返るようにすればよい。
import os
import time
import concurrent.futures
def process_task(name):
print(f"Task {name} started")
time.sleep(2)
print(f"Task {name} finished")
return f"Task {name} finished"
def main():
tasks = ["A", "B", "C", "D", "E"]
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(process_task, tasks)
print("All Processes is Completed. Results:")
for result in results:
print(result)
if __name__ == '__main__':
main()
Task A started
Task B startedTask C started
Task D started
Task E started
Task A finished
Task B finished
Task C finished
Task D finished
Task E finished
All Processes is Completed. Results:
Task A finished
Task B finished
Task C finished
Task D finished
Task E finished
tqdmをつかって進捗を表示させる
mapメソッドでまとめて実行させた場合
mapメソッドの戻り値は実行した関数の戻り値のリスト
import os
import time
import concurrent.futures
from tqdm import tqdm
def process_task(name):
time.sleep(2)
return f"Task {name} finished"
def main():
tasks = ["A", "B", "C", "D", "E"]
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(tqdm(executor.map(process_task, tasks), total=len(tasks), desc="Processing Tasks"))
print("All Processes is Completed. Results:")
for result in results:
print(result)
if __name__ == '__main__':
main()
出力
Processing Tasks: 100%|████████████████████████████████████████████████████████████| 5/5 [00:02<00:00, 1.95it/s]
All Processes is Completed. Results:
Task A finished
Task B finished
Task C finished
Task D finished
Task E finished
submit()で個別に実行させた場合
submitで関数をサブプロセスで実行する場合、戻り値はfutureオブジェクトとなる。
completed_as()関数はfutureオブジェクトの処理が終わったfutureオブジェクトのイテレータを返す。
import os
import time
import concurrent.futures
from concurrent.futures import as_completed
from tqdm import tqdm
def process_task(name):
time.sleep(2)
return f"Task {name} finished"
def main():
tasks = ["A", "B", "C", "D", "E"]
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(process_task, task) for task in tasks]
for future in tqdm(as_completed(futures), total=len(futures), desc="Processing", leave=True):
pass
print("All Processes is Completed. Results:")
for future in futures:
print(future.result())
if __name__ == '__main__':
main()
Processing: 100%|██████████████████████████████████████████████████████████████████| 5/5 [00:02<00:00, 1.98it/s]
All Processes is Completed. Results:
Task A finished
Task B finished
Task C finished
Task D finished
Task E finished
実用例
指定したフォルダ配下にあるエクセル(.xlsx or .xlsm)を読み込んで
ワークブックに含まれるワークシートのリストを作成する
tqdmとopenxlを使用するのでインストールしておく
pip install tqdm openpyxl
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
from openpyxl import load_workbook, Workbook
from openpyxl.utils import get_column_letter
import logging
import tqdm
# 指定したワークブックに含まれるワークシート名のリストを返す
def process_workbook(process_name, workbook_path):
result = False
sheets = []
try:
wb = load_workbook(workbook_path, read_only=True, data_only=True)
sheets = [name for name in wb.sheetnames]
result = True
except Exception as e:
print(e)
return result, workbook_path, sheets
def main():
target_dir = r"Path/to/target/directory" # 処理対象のディレクトリを指定
files_to_process = [(file, os.path.join(root, file)) for root, dirs, files in os.walk(target_dir) for file in files if( (file.endswith('.xlsx') or file.endswith('.xlsm') ) and not file.startswith('~$'))]
results = []
with ProcessPoolExecutor() as executor:
futures = [executor.submit(process_workbook, *file_path) for file_path in files_to_process]
for future in tqdm.tqdm(as_completed(futures), total=len(futures), desc="Processing: "):
results.append(future.result())
output_wb = Workbook()
output_ws = output_wb.active
output_ws.title = "ワークシートリスト"
output_ws.append(["No", "ワークブック", "ワークシート名"])
index = 1
for result in results:
if result[0]:
sheets = result[2]
for sheet in sheets:
output_ws.append([index, result[1], sheet])
index += 1
else:
logging.error(f"失敗 {result[1]}")
output_wb.save("output.xlsx")
if __name__ == '__main__':
main()
Discussion