Pythonで並列処理キューを使いやすくする「ppqueue」を試す
たまたま見つけた
ppqueue
Python用の並列処理キュー
概要
ppqueue
は、multiprocessing.Process
とthreading.Thread
の両方に対する抽象化レイヤーとして機能するPythonモジュールです。私は、コードの並列化が醜いコードを生み出すことに気づくことが多かったため、ppqueue
を開発しました。このシンプルなキューを使用すると、コードを簡単に魅力的に並列化することができます。ppqueueは以下を提供します。
- プロセスまたはスレッドを使用した並列実行のための単一API
- FIFO優先度キュー
- ジョブ実行のガントチャート(
plotnine
+pandas
のおかげ)- プログレスバー(
tqdm
のおかげ)
referred from https://github.com/fresh2dev/ppqueue and translated into Japanese by kun432
可視化が組み込まれているのが良さそう。
インストール
Colaboratoryで。
!pip install ppqueue[plot]
!pip freeze | grep -i ppqueue
ppqueue==0.4.0
キューの追加・削除
ppqueueを使うと、キューを使ってプロセス(またはスレッド)を並列(スレッドだと平行か)実行してくれる。ドキュメントのサンプルを少し噛み砕いた形にした。
from ppqueue import Queue
from ppqueue.plot import plot_jobs
from time import sleep
def slowly_square(value: int, sleep_for: float) -> int:
"""指定された時間待機後、valueの2乗を返す関数"""
sleep(sleep_for)
return value * value
# Queueオブジェクトを作成。最大同時実行数は3、進捗表示を有効化
with Queue(max_concurrent=3, show_progress=True) as queue:
# ジョブをキューに追加(enqueue)
for i in range(18):
print(f"ジョブを追加: {i} の2乗計算")
queue.enqueue(slowly_square, [i, 1])
# 全てのタスクが完了するまで待機
print("すべてのジョブの完了を待っています...")
queue.wait()
print("すべてのジョブが完了しました。")
# キューからジョブを削除(dequeue)
jobs = []
while queue.size() > 0:
# キューをジョブを1つだけ取り出す場合は
# `dequeue()`と`pop()`が使える
job = queue.dequeue()
print(f"ジョブを削除: 結果は {job.result}") # `result`で結果を取得
jobs.append(job)
# キューから全部取り出す場合は`collect()`も使える
# 全部まとめて取り出す場合は以下のように書ける
# - jobs = [queue.dequeue() for _ in range(queue.size())]
# - jobs = [queue.pop() for _ in range(queue.size())]
# - jobs = list(job for job in queue)
# - jobs = queue.collect()
# 結果をリストとして表示
results = [job.result for job in jobs]
print("全ジョブの計算結果:", results)
# ジョブの実行状況を可視化
display(plot_jobs(jobs, no_legend=True))
ジョブを追加: 0 の2乗計算
ジョブを追加: 1 の2乗計算
ジョブを追加: 2 の2乗計算
ジョブを追加: 3 の2乗計算
ジョブを追加: 4 の2乗計算
ジョブを追加: 5 の2乗計算
ジョブを追加: 6 の2乗計算
ジョブを追加: 7 の2乗計算
ジョブを追加: 8 の2乗計算
ジョブを追加: 9 の2乗計算
ジョブを追加: 10 の2乗計算
ジョブを追加: 11 の2乗計算
ジョブを追加: 12 の2乗計算
ジョブを追加: 13 の2乗計算
ジョブを追加: 14 の2乗計算
ジョブを追加: 15 の2乗計算
ジョブを追加: 16 の2乗計算
ジョブを追加: 17 の2乗計算
すべてのジョブの完了を待っています...
100%
18/18 [00:06<00:00, 2.69op/s]
すべてのジョブが完了しました。
ジョブを削除: 結果は 0
ジョブを削除: 結果は 1
ジョブを削除: 結果は 4
ジョブを削除: 結果は 9
ジョブを削除: 結果は 16
ジョブを削除: 結果は 25
ジョブを削除: 結果は 36
ジョブを削除: 結果は 49
ジョブを削除: 結果は 64
ジョブを削除: 結果は 81
ジョブを削除: 結果は 100
ジョブを削除: 結果は 121
ジョブを削除: 結果は 144
ジョブを削除: 結果は 169
ジョブを削除: 結果は 196
ジョブを削除: 結果は 225
ジョブを削除: 結果は 256
ジョブを削除: 結果は 289
全ジョブの計算結果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
なるほど、ジョブが3つづつ処理されているのがわかる。
優先度とグループ
ジョブに優先度をつけると、優先度の高いジョブから実行され、優先度が同じであれば先にキューに登録したジョブから実行される。
from ppqueue import Queue
from ppqueue.plot import plot_jobs, PlotColorBy
from time import sleep
def slowly_square(value: int, sleep_for: float) -> int:
"""指定された時間待機後、valueの2乗を返す関数"""
sleep(sleep_for)
return value * value
with Queue(max_concurrent=3, show_progress=True) as queue:
# ジョブをキューに追加(enqueue)
for i in range(18):
# 優先度は0が最も高い
# 奇数の場合は優先度0、偶数の場合は優先度1とする
priority = int(i % 2 == 0)
print(f"ジョブを追加: {i} の2乗計算 優先度: {priority}")
queue.enqueue(slowly_square, [i, 1], priority=priority)
# 全てのジョブが完了するまで待機
queue.wait()
# キューからジョブを削除(dequeue)
jobs = queue.collect()
# 結果をリストとして表示
print("全ジョブの計算結果:", [job.result for job in jobs])
# ジョブの実行状況を可視化
plot_jobs(
jobs,
color_by=PlotColorBy.PRIORITY,
color_pal=["red", "blue"],
)
ジョブを追加: 0 の2乗計算 優先度: 1
ジョブを追加: 1 の2乗計算 優先度: 0
ジョブを追加: 2 の2乗計算 優先度: 1
ジョブを追加: 3 の2乗計算 優先度: 0
ジョブを追加: 4 の2乗計算 優先度: 1
ジョブを追加: 5 の2乗計算 優先度: 0
ジョブを追加: 6 の2乗計算 優先度: 1
ジョブを追加: 7 の2乗計算 優先度: 0
ジョブを追加: 8 の2乗計算 優先度: 1
ジョブを追加: 9 の2乗計算 優先度: 0
ジョブを追加: 10 の2乗計算 優先度: 1
ジョブを追加: 11 の2乗計算 優先度: 0
ジョブを追加: 12 の2乗計算 優先度: 1
ジョブを追加: 13 の2乗計算 優先度: 0
ジョブを追加: 14 の2乗計算 優先度: 1
ジョブを追加: 15 の2乗計算 優先度: 0
ジョブを追加: 16 の2乗計算 優先度: 1
ジョブを追加: 17 の2乗計算 優先度: 0
100%
18/18 [00:06<00:00, 2.62op/s]
全ジョブの計算結果: [1, 9, 25, 49, 81, 121, 169, 225, 289, 0, 4, 16, 36, 64, 100, 144, 196, 256]
優先度0、つまり奇数の計算が先に行われて、次に優先度1の偶数の計算が行われているのがわかる。
明示的なグループを指定することもできる。
from ppqueue import Queue
from ppqueue.plot import plot_jobs, PlotColorBy
from time import sleep
def slowly_square(value: int, sleep_for: float) -> int:
sleep(sleep_for)
return value * value
with Queue(max_concurrent=3, show_progress=True) as queue:
# ジョブをキューに追加(enqueue)
for i in range(18):
# 奇数と偶数でグループを分ける(奇数:0、偶数:1)
group = int(i % 2 == 0)
print(f"ジョブを追加: {i} の2乗計算 グループ: {group}")
queue.enqueue(slowly_square, [i, 1], group=group)
# 全てのジョブが完了するまで待機
queue.wait()
jobs = queue.collect()
# 結果をリストとして表示
print("全ジョブの計算結果:", [job.result for job in jobs])
# ジョブの実行状況を可視化
plot_jobs(
jobs,
color_by=PlotColorBy.GROUP,
color_pal=["red", "blue"],
)
ジョブを追加: 0 の2乗計算 グループ: 1
ジョブを追加: 1 の2乗計算 グループ: 0
ジョブを追加: 2 の2乗計算 グループ: 1
ジョブを追加: 3 の2乗計算 グループ: 0
ジョブを追加: 4 の2乗計算 グループ: 1
ジョブを追加: 5 の2乗計算 グループ: 0
ジョブを追加: 6 の2乗計算 グループ: 1
ジョブを追加: 7 の2乗計算 グループ: 0
ジョブを追加: 8 の2乗計算 グループ: 1
ジョブを追加: 9 の2乗計算 グループ: 0
ジョブを追加: 10 の2乗計算 グループ: 1
ジョブを追加: 11 の2乗計算 グループ: 0
ジョブを追加: 12 の2乗計算 グループ: 1
ジョブを追加: 13 の2乗計算 グループ: 0
ジョブを追加: 14 の2乗計算 グループ: 1
ジョブを追加: 15 の2乗計算 グループ: 0
ジョブを追加: 16 の2乗計算 グループ: 1
ジョブを追加: 17 の2乗計算 グループ: 0
100%
18/18 [00:10<00:00, 1.80op/s]
全ジョブの計算結果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
グループからそれぞれジョブが取り出されて並行で処理されるという感じになるのか。
map
/ starmap
/ starmapkw
/ デコレータ
map()
を使うとシンプルに書ける。
from ppqueue import Queue
from ppqueue.plot import plot_jobs
from time import sleep
def slowly_square(value: int, sleep_for: float = 1) -> int:
sleep(sleep_for)
return value * value
with Queue(max_concurrent=3, show_progress=True) as queue:
# map()を使うと以下をまるっとやってくれる
# - iterableから要素を取り出し、それを引数とする関数の実行ジョブをキューに登録
# - すべてのジョブが終了するまで待つ
# - ジョブが完了したらキューから削除して結果を取り出す
jobs = queue.map(slowly_square, range(18))
# 結果をリストとして表示
print("全ジョブの計算結果:", [job.result for job in jobs])
# ジョブの実行状況を可視化
plot_jobs(jobs, no_legend=True)
100%
18/18 [00:07<00:00, 2.78op/s]
全ジョブの計算結果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
複数の引数を渡す場合はstarmap()
from ppqueue import Queue
from ppqueue.plot import plot_jobs
from time import sleep
def slowly_square(value: int, sleep_for: float = 1) -> int:
sleep(sleep_for)
return value * value
with Queue(max_concurrent=3, show_progress=True) as queue:
jobs = queue.starmap(slowly_square, [(i, 1) for i in range(18)])
# 結果をリストとして表示
print("全ジョブの計算結果:", [job.result for job in jobs])
# ジョブの実行状況を可視化
plot_jobs(jobs, no_legend=True)
100%
18/18 [00:06<00:00, 3.04op/s]
全ジョブの計算結果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
複数のキーワード引数の場合はstarmapkw
from ppqueue import Queue
from ppqueue.plot import plot_jobs
from time import sleep
def slowly_square(value: int, sleep_for: float = 1) -> int:
sleep(sleep_for)
return value * value
with Queue(max_concurrent=3, show_progress=True) as queue:
jobs = queue.starmapkw(
slowly_square,
[{"value": i, "sleep_for": 1} for i in range(18)],
)
# 結果をリストとして表示
print("全ジョブの計算結果:", [job.result for job in jobs])
# ジョブの実行状況を可視化
plot_jobs(jobs, no_legend=True)
100%
18/18 [00:06<00:00, 2.92op/s]
全ジョブの計算結果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
また、ここまではwithブロックで書いてきたが、デコレータでも書ける
from ppqueue import Queue
from ppqueue.plot import plot_jobs
from time import sleep
@Queue(max_concurrent=3, show_progress=True)
def sleep_foreach(x: float):
sleep(x)
jobs = sleep_foreach([1] * 18)
plot_jobs(jobs, no_legend=True)
んー、でもあまり実践的な例じゃないように思えるな。上で使ってたslowly_square
を使って複数引数を渡すようなケースだとこう。
from ppqueue import Queue
from ppqueue.plot import plot_jobs
from time import sleep
def slowly_square(value: int, sleep_for: float = 1) -> int:
sleep(sleep_for)
return value * value
@Queue(max_concurrent=3, show_progress=True)
def wrap_slowly_square(args) -> int:
value, sleep_for = args
return slowly_square(value, sleep_for)
jobs = wrap_slowly_square([(i, 1) for i in range(18)])
print("全ジョブの計算結果:", [job.result for job in jobs])
plot_jobs(jobs, no_legend=True)
100%
18/18 [00:07<00:00, 2.83op/s]
全ジョブの計算結果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
んー、できるはできるけど、これが必要なユースケースがあるのかはまあ・・・
スレッドとプロセス
スレッドとプロセスを使い分けることができる。デフォルトはmp.Process
になっている。キュー作成時にengine
で指定する。
from ppqueue import Queue
from ppqueue.plot import plot_jobs, PlotColorBy
from threading import Thread
from multiprocessing import Process
from time import sleep
def slowly_square(value: int, sleep_for: float = 1) -> int:
sleep(sleep_for)
return value * value
# スレッドを使ったキュー
with Queue(
max_concurrent=3,
show_progress=True,
engine=Thread,
name="Thread Queue",
) as queue:
thread_jobs = queue.map(slowly_square, range(18))
# プロセスをを使ったキュー
with Queue(
max_concurrent=3,
show_progress=True,
engine=Process,
name="Process Queue",
) as queue:
process_jobs = queue.map(slowly_square, range(18))
print("スレッドジョブの計算結果:", [job.result for job in thread_jobs])
print("プロセスジョブの計算結果:", [job.result for job in process_jobs])
plot_jobs(
thread_jobs,
process_jobs,
color_by=PlotColorBy.QID,
color_pal=["red", "blue"],
)
100%
18/18 [00:06<00:00, 2.97op/s]
100%
18/18 [00:08<00:00, 2.75op/s]
スレッドジョブの計算結果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
プロセスジョブの計算結果: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
まとめ
シンプルに書けそうなのは良さげ。ただ、シンプルな仕組みのものならまあいいのかもだけど、複数キューを使ってそれぞれを並列でやりたいよなぁ・・・みたいなケースは果たしてできるんだろうか。