Pythonで並列パイプラインを作成するライブラリ「Pypeln」を試す
GitHubレポジトリ
Pypeln
Pypeln(発音は「パイプライン」)は、並列および同時実行が必要な中規模のデータ処理タスク向けに設計された、シンプルでありながら強力な Python ライブラリです。
主な特徴
- シンプル: Pypeln は、Spark や Dask のようなフレームワークを使用するのが過剰または不自然に感じられる場合に、並列性と同時実行が必要な 中規模 データタスクを解決するために設計されました。
- 使いやすい: Pypeln は、通常の Python コードと互換性のある親しみやすい関数型 API を提供します。
- 柔軟: Pypeln は、プロセス、スレッド、asyncio.Task を同一の API を通じて使用することで、パイプラインを構築できるようにします。
- 細かい制御: Pypeln は、パイプラインの各段階で使用されるメモリと CPU リソースを制御することを可能にします。
詳細については、ドキュメント をご覧ください。
referred from https://github.com/cgarciae/pypeln/
公式ドキュメント
インストール
Colaboratoryで。
!pip install pypeln
!pip freeze | grep -i pypeln
pypeln==0.4.9
基本的な使い方
基本的には複数のステージで構成されるパイプラインを作るためのもの。で、ワーカーの実装は3種類。
- プロセス
- スレッド
- 非同期タスク
これに加えて、同期ジェネレータを使う方法もある。
multiprocessing.Process
)
プロセス(2ステージで構成されるパイプラインの例
- データに処理を適用(
map()
) - 条件に合致したデータのみを抽出(
filter()
)
それぞれのステージでmultiprocessing.Process
を使ってマルチプロセスで並列処理を行う。動きを確認してみたかったのでloguruでログを取るようにして、あとsleepも固定にした。
import pypeln as pl
import time
from loguru import logger
def slow_add1(x):
"""
入力値に1を加算して返す関数
Args:
x (int): 入力値
Returns:
int: xに1を加えた値
"""
logger.info("INPUT: {}", x)
time.sleep(1) # 計算が遅い処理を想定
ret = x + 1
logger.info("OUTPUT: {} -> {}", x, ret)
return ret
def slow_gt3(x):
"""
入力値が3より大きいかどうかを判定して返す関数
Args:
x (int): 入力値
Returns:
bool: xが3より大きければTrue、そうでなければFalse
"""
logger.info("INPUT: {}", x)
time.sleep(1) # 計算が遅い処理を想定
ret = x > 3
logger.info("OUTPUT: {} -> {}", x, ret)
return ret
# 0〜9までの数値を入力データとする
data = range(10)
# ステージ1: 入力データの各要素に対して、slow_add1を並列に実行して、適用(map)
stage1 = pl.process.map(
slow_add1, # 適用する関数
data, # 入力対象のデータ
workers=3, # 並列実行するワーカー(プロセス)数
maxsize=4, # バッファリングできるデータの最大数
)
# ステージ2: 上の結果の各要素に対して、slow_gt3を並列に実行して、条件抽出(filter)
stage2 = pl.process.filter(
slow_gt3, # フィルタを行う関数
stage1, # 入力対象のデータ
workers=2, # 並列実行するワーカー(プロセス)数
)
# 結果
final_result = list(stage2)
print(final_result)
2025-02-18 15:43:08.204 | INFO | __main__:slow_add1:16 - INPUT: 1
2025-02-18 15:43:08.200 | INFO | __main__:slow_add1:16 - INPUT: 0
2025-02-18 15:43:08.209 | INFO | __main__:slow_add1:16 - INPUT: 2
2025-02-18 15:43:09.215 | INFO | __main__:slow_add1:19 - OUTPUT: 1 -> 2
2025-02-18 15:43:09.213 | INFO | __main__:slow_add1:19 - OUTPUT: 0 -> 1
2025-02-18 15:43:09.216 | INFO | __main__:slow_add1:19 - OUTPUT: 2 -> 3
2025-02-18 15:43:09.223 | INFO | __main__:slow_add1:16 - INPUT: 3
2025-02-18 15:43:09.231 | INFO | __main__:slow_add1:16 - INPUT: 4
2025-02-18 15:43:09.234 | INFO | __main__:slow_add1:16 - INPUT: 5
2025-02-18 15:43:09.225 | INFO | __main__:slow_gt3:32 - INPUT: 1
2025-02-18 15:43:09.234 | INFO | __main__:slow_gt3:32 - INPUT: 3
2025-02-18 15:43:10.232 | INFO | __main__:slow_add1:19 - OUTPUT: 3 -> 4
2025-02-18 15:43:10.235 | INFO | __main__:slow_add1:16 - INPUT: 6
2025-02-18 15:43:10.238 | INFO | __main__:slow_add1:19 - OUTPUT: 5 -> 6
2025-02-18 15:43:10.240 | INFO | __main__:slow_gt3:35 - OUTPUT: 1 -> False
2025-02-18 15:43:10.236 | INFO | __main__:slow_add1:19 - OUTPUT: 4 -> 5
2025-02-18 15:43:10.242 | INFO | __main__:slow_add1:16 - INPUT: 7
2025-02-18 15:43:10.243 | INFO | __main__:slow_gt3:32 - INPUT: 2
2025-02-18 15:43:10.249 | INFO | __main__:slow_add1:16 - INPUT: 8
2025-02-18 15:43:10.247 | INFO | __main__:slow_gt3:35 - OUTPUT: 3 -> False
2025-02-18 15:43:10.253 | INFO | __main__:slow_gt3:32 - INPUT: 4
2025-02-18 15:43:11.238 | INFO | __main__:slow_add1:19 - OUTPUT: 6 -> 7
2025-02-18 15:43:11.241 | INFO | __main__:slow_add1:16 - INPUT: 9
2025-02-18 15:43:11.247 | INFO | __main__:slow_add1:19 - OUTPUT: 7 -> 8
2025-02-18 15:43:11.251 | INFO | __main__:slow_gt3:35 - OUTPUT: 2 -> False
2025-02-18 15:43:11.252 | INFO | __main__:slow_add1:19 - OUTPUT: 8 -> 9
2025-02-18 15:43:11.259 | INFO | __main__:slow_gt3:32 - INPUT: 6
2025-02-18 15:43:11.262 | INFO | __main__:slow_gt3:35 - OUTPUT: 4 -> True
2025-02-18 15:43:11.266 | INFO | __main__:slow_gt3:32 - INPUT: 5
2025-02-18 15:43:12.243 | INFO | __main__:slow_add1:19 - OUTPUT: 9 -> 10
2025-02-18 15:43:12.263 | INFO | __main__:slow_gt3:35 - OUTPUT: 6 -> True
2025-02-18 15:43:12.268 | INFO | __main__:slow_gt3:35 - OUTPUT: 5 -> True
2025-02-18 15:43:12.266 | INFO | __main__:slow_gt3:32 - INPUT: 7
2025-02-18 15:43:12.274 | INFO | __main__:slow_gt3:32 - INPUT: 8
2025-02-18 15:43:13.272 | INFO | __main__:slow_gt3:35 - OUTPUT: 7 -> True
2025-02-18 15:43:13.277 | INFO | __main__:slow_gt3:32 - INPUT: 9
2025-02-18 15:43:13.278 | INFO | __main__:slow_gt3:35 - OUTPUT: 8 -> True
2025-02-18 15:43:13.288 | INFO | __main__:slow_gt3:32 - INPUT: 10
2025-02-18 15:43:14.285 | INFO | __main__:slow_gt3:35 - OUTPUT: 9 -> True
2025-02-18 15:43:14.291 | INFO | __main__:slow_gt3:35 - OUTPUT: 10 -> True
[4, 6, 5, 7, 8, 9, 10]
workers
はまあわかりやすいのだけど、maxsize
がわかりにくい。色々調べてみたところ、以下の記事を見つけた。
** キュー**
ワーカー同士はQueueを通じて通信します。各Queueが保持できる最大要素数は、pypelnの様々な関数の
maxsize
引数で制御されます。この値のデフォルトはは0で、要素数に制限はありません。しかし、maxsize
が設定されると、キューがいっぱいになった(maxsize
で指定した数値に達した)ときに前のステージが新しい要素をキューに押し込むのを防ぐメカニズムとして機能します。
ということは、maxsize
が指定されたステージでキューがいっぱいになると前のステージの処理が遅延する、ということになるのかな?このサンプルだと、前のステージは入力データ(が入ってくるキュー)そのものになると思うのでいまいちピンとこないのだけども。
色々パラメータを変えてテストしてみたけど、maxsize
の挙動はいまいち掴めず・・・一旦気にせず進める。
threading.Thread
)
スレッド(上のコードをスレッドに置き換えたもの。単にpl.process
をpl.thread
に変更しただけ。
import pypeln as pl
import time
from loguru import logger
def slow_add1(x):
logger.info("INPUT: {}", x)
time.sleep(1)
ret = x + 1
logger.info("OUTPUT: {} -> {}", x, ret)
return ret
def slow_gt3(x):
logger.info("INPUT: {}", x)
time.sleep(1)
ret = x > 3
logger.info("OUTPUT: {} -> {}", x, ret)
return ret
data = range(10)
# スレッドを使う
stage1 = pl.thread.map(
slow_add1,
data,
workers=3,
maxsize=4,
)
# スレッドを使う
stage2 = pl.thread.filter(
slow_gt3,
stage1,
workers=2,
)
# 結果
final_result = list(stage2)
print(final_result)
2025-02-18 15:53:47.672 | INFO | __main__:slow_add1:7 - INPUT: 0
2025-02-18 15:53:47.675 | INFO | __main__:slow_add1:7 - INPUT: 1
2025-02-18 15:53:47.675 | INFO | __main__:slow_add1:7 - INPUT: 2
2025-02-18 15:53:48.676 | INFO | __main__:slow_add1:10 - OUTPUT: 0 -> 1
2025-02-18 15:53:48.678 | INFO | __main__:slow_add1:7 - INPUT: 3
2025-02-18 15:53:48.678 | INFO | __main__:slow_add1:10 - OUTPUT: 1 -> 2
2025-02-18 15:53:48.686 | INFO | __main__:slow_add1:7 - INPUT: 4
2025-02-18 15:53:48.684 | INFO | __main__:slow_add1:10 - OUTPUT: 2 -> 3
2025-02-18 15:53:48.691 | INFO | __main__:slow_add1:7 - INPUT: 5
2025-02-18 15:53:48.686 | INFO | __main__:slow_gt3:14 - INPUT: 2
2025-02-18 15:53:48.678 | INFO | __main__:slow_gt3:14 - INPUT: 1
2025-02-18 15:53:49.685 | INFO | __main__:slow_add1:10 - OUTPUT: 3 -> 4
2025-02-18 15:53:49.687 | INFO | __main__:slow_add1:7 - INPUT: 6
2025-02-18 15:53:49.689 | INFO | __main__:slow_add1:10 - OUTPUT: 4 -> 5
2025-02-18 15:53:49.692 | INFO | __main__:slow_add1:7 - INPUT: 7
2025-02-18 15:53:49.693 | INFO | __main__:slow_add1:10 - OUTPUT: 5 -> 6
2025-02-18 15:53:49.694 | INFO | __main__:slow_add1:7 - INPUT: 8
2025-02-18 15:53:49.697 | INFO | __main__:slow_gt3:17 - OUTPUT: 2 -> False
2025-02-18 15:53:49.699 | INFO | __main__:slow_gt3:14 - INPUT: 3
2025-02-18 15:53:49.698 | INFO | __main__:slow_gt3:17 - OUTPUT: 1 -> False
2025-02-18 15:53:49.701 | INFO | __main__:slow_gt3:14 - INPUT: 4
2025-02-18 15:53:50.690 | INFO | __main__:slow_add1:10 - OUTPUT: 6 -> 7
2025-02-18 15:53:50.692 | INFO | __main__:slow_add1:7 - INPUT: 9
2025-02-18 15:53:50.693 | INFO | __main__:slow_add1:10 - OUTPUT: 7 -> 8
2025-02-18 15:53:50.698 | INFO | __main__:slow_add1:10 - OUTPUT: 8 -> 9
2025-02-18 15:53:50.700 | INFO | __main__:slow_gt3:17 - OUTPUT: 3 -> False
2025-02-18 15:53:50.701 | INFO | __main__:slow_gt3:14 - INPUT: 5
2025-02-18 15:53:50.702 | INFO | __main__:slow_gt3:17 - OUTPUT: 4 -> True
2025-02-18 15:53:50.703 | INFO | __main__:slow_gt3:14 - INPUT: 6
2025-02-18 15:53:51.695 | INFO | __main__:slow_add1:10 - OUTPUT: 9 -> 10
2025-02-18 15:53:51.702 | INFO | __main__:slow_gt3:17 - OUTPUT: 5 -> True
2025-02-18 15:53:51.704 | INFO | __main__:slow_gt3:14 - INPUT: 7
2025-02-18 15:53:51.705 | INFO | __main__:slow_gt3:17 - OUTPUT: 6 -> True
2025-02-18 15:53:51.706 | INFO | __main__:slow_gt3:14 - INPUT: 8
2025-02-18 15:53:52.705 | INFO | __main__:slow_gt3:17 - OUTPUT: 7 -> True
2025-02-18 15:53:52.707 | INFO | __main__:slow_gt3:14 - INPUT: 9
2025-02-18 15:53:52.710 | INFO | __main__:slow_gt3:17 - OUTPUT: 8 -> True
2025-02-18 15:53:52.711 | INFO | __main__:slow_gt3:14 - INPUT: 10
2025-02-18 15:53:53.708 | INFO | __main__:slow_gt3:17 - OUTPUT: 9 -> True
2025-02-18 15:53:53.714 | INFO | __main__:slow_gt3:17 - OUTPUT: 10 -> True
[4, 5, 6, 7, 8, 9, 10]
asyncio.Task
)
非同期タスク(同じように``pl.task`を使って、あと関数はasyncで指定するだけ。
import pypeln as pl
import asyncio
from loguru import logger
import sys
# notebook環境だと以下が必要
import nest_asyncio
nest_asyncio.apply()
# 非同期の場合はenqueue=Trueを付与
logger.remove()
logger.add(sys.stderr, enqueue=True)
async def slow_add1(x):
logger.info("INPUT: {}", x)
await asyncio.sleep(1)
ret = x + 1
logger.info("OUTPUT: {} -> {}", x, ret)
return ret
async def slow_gt3(x):
logger.info("INPUT: {}", x)
await asyncio.sleep(1)
ret = x > 3
logger.info("OUTPUT: {} -> {}", x, ret)
return ret
async def main():
data = range(10)
# 非同期タスクを使う
stage1 = pl.task.map(
slow_add1,
data,
workers=3,
maxsize=4,
)
# 非同期タスクを使う
stage2 = pl.task.filter(
slow_gt3,
stage1,
workers=2,
)
# 結果
final_result = [x async for x in stage2]
print(final_result)
asyncio.run(main())
2025-02-18 16:07:53.203 | INFO | __main__:slow_add1:17 - INPUT: 0
2025-02-18 16:07:53.203 | INFO | __main__:slow_add1:17 - INPUT: 1
2025-02-18 16:07:53.203 | INFO | __main__:slow_add1:17 - INPUT: 2
2025-02-18 16:07:54.205 | INFO | __main__:slow_add1:20 - OUTPUT: 0 -> 1
2025-02-18 16:07:54.205 | INFO | __main__:slow_add1:20 - OUTPUT: 1 -> 2
2025-02-18 16:07:54.205 | INFO | __main__:slow_add1:20 - OUTPUT: 2 -> 3
2025-02-18 16:07:54.206 | INFO | __main__:slow_gt3:24 - INPUT: 1
2025-02-18 16:07:54.206 | INFO | __main__:slow_gt3:24 - INPUT: 2
2025-02-18 16:07:54.206 | INFO | __main__:slow_add1:17 - INPUT: 3
2025-02-18 16:07:54.207 | INFO | __main__:slow_add1:17 - INPUT: 4
2025-02-18 16:07:54.207 | INFO | __main__:slow_add1:17 - INPUT: 5
2025-02-18 16:07:55.208 | INFO | __main__:slow_gt3:27 - OUTPUT: 1 -> False
2025-02-18 16:07:55.208 | INFO | __main__:slow_gt3:27 - OUTPUT: 2 -> False
2025-02-18 16:07:55.209 | INFO | __main__:slow_add1:20 - OUTPUT: 3 -> 4
2025-02-18 16:07:55.209 | INFO | __main__:slow_add1:20 - OUTPUT: 4 -> 5
2025-02-18 16:07:55.209 | INFO | __main__:slow_add1:20 - OUTPUT: 5 -> 6
2025-02-18 16:07:55.209 | INFO | __main__:slow_gt3:24 - INPUT: 3
2025-02-18 16:07:55.210 | INFO | __main__:slow_gt3:24 - INPUT: 4
2025-02-18 16:07:55.210 | INFO | __main__:slow_add1:17 - INPUT: 6
2025-02-18 16:07:55.210 | INFO | __main__:slow_add1:17 - INPUT: 7
2025-02-18 16:07:55.210 | INFO | __main__:slow_add1:17 - INPUT: 8
2025-02-18 16:07:56.211 | INFO | __main__:slow_gt3:27 - OUTPUT: 3 -> False
2025-02-18 16:07:56.211 | INFO | __main__:slow_gt3:27 - OUTPUT: 4 -> True
2025-02-18 16:07:56.212 | INFO | __main__:slow_add1:20 - OUTPUT: 6 -> 7
2025-02-18 16:07:56.212 | INFO | __main__:slow_add1:20 - OUTPUT: 7 -> 8
2025-02-18 16:07:56.212 | INFO | __main__:slow_add1:20 - OUTPUT: 8 -> 9
2025-02-18 16:07:56.212 | INFO | __main__:slow_gt3:24 - INPUT: 5
2025-02-18 16:07:56.213 | INFO | __main__:slow_gt3:24 - INPUT: 6
2025-02-18 16:07:56.213 | INFO | __main__:slow_add1:17 - INPUT: 9
2025-02-18 16:07:57.214 | INFO | __main__:slow_gt3:27 - OUTPUT: 5 -> True
2025-02-18 16:07:57.215 | INFO | __main__:slow_gt3:27 - OUTPUT: 6 -> True
2025-02-18 16:07:57.215 | INFO | __main__:slow_add1:20 - OUTPUT: 9 -> 10
2025-02-18 16:07:57.215 | INFO | __main__:slow_gt3:24 - INPUT: 7
2025-02-18 16:07:57.216 | INFO | __main__:slow_gt3:24 - INPUT: 8
2025-02-18 16:07:58.217 | INFO | __main__:slow_gt3:27 - OUTPUT: 7 -> True
2025-02-18 16:07:58.218 | INFO | __main__:slow_gt3:27 - OUTPUT: 8 -> True
2025-02-18 16:07:58.218 | INFO | __main__:slow_gt3:24 - INPUT: 9
2025-02-18 16:07:58.218 | INFO | __main__:slow_gt3:24 - INPUT: 10
2025-02-18 16:07:59.219 | INFO | __main__:slow_gt3:27 - OUTPUT: 9 -> True
2025-02-18 16:07:59.220 | INFO | __main__:slow_gt3:27 - OUTPUT: 10 -> True
[4, 5, 6, 7, 8, 9, 10]
同期ジェネレータ
pl.sync
を使うと、同期ジェネレータで処理を行う。この場合
- デバッグに便利
- 重たいCPU/IOバウンドなタスクがなく、入力データの順序を保ちたい場合
に使える。この場合、workers
やmaxsize
などのパラメータは無視される。
import pypeln as pl
import time
from loguru import logger
def slow_add1(x):
logger.info("INPUT: {}", x)
time.sleep(1)
ret = x + 1
logger.info("OUTPUT: {} -> {}", x, ret)
return ret
def slow_gt3(x):
logger.info("INPUT: {}", x)
time.sleep(1)
ret = x > 3
logger.info("OUTPUT: {} -> {}", x, ret)
return ret
data = range(10)
# 同期ジェネレータを使う
stage1 = pl.sync.map(
slow_add1,
data,
workers=3,
maxsize=4,
)
# 同期ジェネレータを使う
stage2 = pl.sync.filter(
slow_gt3,
stage1,
workers=2,
)
# 結果
final_result = list(stage2)
print(final_result)
2025-02-18 16:18:31.218 | INFO | __main__:slow_add1:6 - INPUT: 0
2025-02-18 16:18:32.224 | INFO | __main__:slow_add1:9 - OUTPUT: 0 -> 1
2025-02-18 16:18:32.228 | INFO | __main__:slow_gt3:13 - INPUT: 1
2025-02-18 16:18:33.230 | INFO | __main__:slow_gt3:16 - OUTPUT: 1 -> False
2025-02-18 16:18:33.234 | INFO | __main__:slow_add1:6 - INPUT: 1
2025-02-18 16:18:34.236 | INFO | __main__:slow_add1:9 - OUTPUT: 1 -> 2
2025-02-18 16:18:34.237 | INFO | __main__:slow_gt3:13 - INPUT: 2
2025-02-18 16:18:35.241 | INFO | __main__:slow_gt3:16 - OUTPUT: 2 -> False
2025-02-18 16:18:35.243 | INFO | __main__:slow_add1:6 - INPUT: 2
2025-02-18 16:18:36.245 | INFO | __main__:slow_add1:9 - OUTPUT: 2 -> 3
2025-02-18 16:18:36.246 | INFO | __main__:slow_gt3:13 - INPUT: 3
2025-02-18 16:18:37.249 | INFO | __main__:slow_gt3:16 - OUTPUT: 3 -> False
2025-02-18 16:18:37.250 | INFO | __main__:slow_add1:6 - INPUT: 3
2025-02-18 16:18:38.252 | INFO | __main__:slow_add1:9 - OUTPUT: 3 -> 4
2025-02-18 16:18:38.253 | INFO | __main__:slow_gt3:13 - INPUT: 4
2025-02-18 16:18:39.254 | INFO | __main__:slow_gt3:16 - OUTPUT: 4 -> True
2025-02-18 16:18:39.257 | INFO | __main__:slow_add1:6 - INPUT: 4
2025-02-18 16:18:40.258 | INFO | __main__:slow_add1:9 - OUTPUT: 4 -> 5
2025-02-18 16:18:40.260 | INFO | __main__:slow_gt3:13 - INPUT: 5
2025-02-18 16:18:41.262 | INFO | __main__:slow_gt3:16 - OUTPUT: 5 -> True
2025-02-18 16:18:41.264 | INFO | __main__:slow_add1:6 - INPUT: 5
2025-02-18 16:18:42.265 | INFO | __main__:slow_add1:9 - OUTPUT: 5 -> 6
2025-02-18 16:18:42.267 | INFO | __main__:slow_gt3:13 - INPUT: 6
2025-02-18 16:18:43.268 | INFO | __main__:slow_gt3:16 - OUTPUT: 6 -> True
2025-02-18 16:18:43.270 | INFO | __main__:slow_add1:6 - INPUT: 6
2025-02-18 16:18:44.272 | INFO | __main__:slow_add1:9 - OUTPUT: 6 -> 7
2025-02-18 16:18:44.274 | INFO | __main__:slow_gt3:13 - INPUT: 7
2025-02-18 16:18:45.276 | INFO | __main__:slow_gt3:16 - OUTPUT: 7 -> True
2025-02-18 16:18:45.278 | INFO | __main__:slow_add1:6 - INPUT: 7
2025-02-18 16:18:46.280 | INFO | __main__:slow_add1:9 - OUTPUT: 7 -> 8
2025-02-18 16:18:46.282 | INFO | __main__:slow_gt3:13 - INPUT: 8
2025-02-18 16:18:47.283 | INFO | __main__:slow_gt3:16 - OUTPUT: 8 -> True
2025-02-18 16:18:47.285 | INFO | __main__:slow_add1:6 - INPUT: 8
2025-02-18 16:18:48.288 | INFO | __main__:slow_add1:9 - OUTPUT: 8 -> 9
2025-02-18 16:18:48.290 | INFO | __main__:slow_gt3:13 - INPUT: 9
2025-02-18 16:18:49.292 | INFO | __main__:slow_gt3:16 - OUTPUT: 9 -> True
2025-02-18 16:18:49.294 | INFO | __main__:slow_add1:6 - INPUT: 9
2025-02-18 16:18:50.296 | INFO | __main__:slow_add1:9 - OUTPUT: 9 -> 10
2025-02-18 16:18:50.297 | INFO | __main__:slow_gt3:13 - INPUT: 10
2025-02-18 16:18:51.299 | INFO | __main__:slow_gt3:16 - OUTPUT: 10 -> True
[4, 5, 6, 7, 8, 9, 10]
異なるワーカーを組み合わせたパイプライン
複数のワーカータイプを組み合わせたパイプラインを書くこともできる。
from loguru import logger
import pypeln as pl
import time
def get_iterable(x):
"""
入力データを生成するイテラブルな関数
"""
for i in range(x):
logger.info("生成 {}", i)
yield i
def f1(x):
"""
入力値を2倍にして返す関数
"""
logger.info("入力: {}", x)
time.sleep(0.1)
result = x * 2
logger.info("出力: {} -> {}", x, result)
return result
def f2(x):
"""
スレッドで実行するflat_map関数
入力値から、入力値とその次の値の2つのリストを返す
"""
logger.info("入力: {}", x)
time.sleep(0.2)
results = [x, x + 1]
logger.info("出力: {} -> {}", x, results)
return results
def f3(x):
"""
入力値が偶数かどうかを判定する関数
"""
logger.info("入力: {}", x)
result = (x % 2 == 0)
logger.info("結果: {} -> {}", x, result)
return result
def f4(x):
"""
入力値に10を加算して返す
"""
logger.info("入力: {}", x)
time.sleep(0.3)
result = x + 10
logger.info("出力: {} -> {}", x, result)
return result
# 入力データ
data = get_iterable(5)
# Stage 1: スレッドを使用してf1を適用(入力値を2倍にする)
stage = pl.thread.map(f1, data, workers=5, maxsize=10)
# Stage 2: スレッドを使用してflat_mapでf2を適用(各値から2つの値を生成する)
stage = pl.thread.flat_map(f2, stage, workers=3, maxsize=5)
# Stage 3: 組み込みのfilterを使用してf3を適用(偶数のみを抽出する)
stage = filter(f3, stage)
# Stage 4: プロセスを使用してf4を適用(各値に10を加算する)
stage = pl.process.map(f4, stage, workers=2, maxsize=4)
# すべてのステージがイテラブルとして連結されるので、結果をリストで取得
final_result = list(stage)
logger.info("結果: {}", final_result)
2025-02-18 16:49:48.886 | INFO | __main__:get_iterable:10 - 生成 0
2025-02-18 16:49:48.894 | INFO | __main__:get_iterable:10 - 生成 1
2025-02-18 16:49:49.157 | INFO | __main__:get_iterable:10 - 生成 2
2025-02-18 16:49:49.167 | INFO | __main__:get_iterable:10 - 生成 3
2025-02-18 16:49:49.176 | INFO | __main__:get_iterable:10 - 生成 4
2025-02-18 16:49:48.894 | INFO | __main__:f1:17 - 入力: 0
2025-02-18 16:49:49.168 | INFO | __main__:f1:17 - 入力: 2
2025-02-18 16:49:49.158 | INFO | __main__:f1:17 - 入力: 1
2025-02-18 16:49:49.177 | INFO | __main__:f1:17 - 入力: 3
2025-02-18 16:49:49.182 | INFO | __main__:f1:17 - 入力: 4
2025-02-18 16:49:49.287 | INFO | __main__:f1:20 - 出力: 0 -> 0
2025-02-18 16:49:49.294 | INFO | __main__:f2:28 - 入力: 0
2025-02-18 16:49:49.295 | INFO | __main__:f1:20 - 出力: 2 -> 4
2025-02-18 16:49:49.300 | INFO | __main__:f1:20 - 出力: 1 -> 2
2025-02-18 16:49:49.302 | INFO | __main__:f2:28 - 入力: 4
2025-02-18 16:49:49.305 | INFO | __main__:f1:20 - 出力: 3 -> 6
2025-02-18 16:49:49.305 | INFO | __main__:f2:28 - 入力: 2
2025-02-18 16:49:49.310 | INFO | __main__:f1:20 - 出力: 4 -> 8
2025-02-18 16:49:49.497 | INFO | __main__:f2:31 - 出力: 0 -> [0, 1]
2025-02-18 16:49:49.499 | INFO | __main__:f2:28 - 入力: 6
2025-02-18 16:49:49.500 | INFO | __main__:f3:38 - 入力: 0
2025-02-18 16:49:49.507 | INFO | __main__:f2:31 - 出力: 4 -> [4, 5]
2025-02-18 16:49:49.508 | INFO | __main__:f3:40 - 結果: 0 -> True
2025-02-18 16:49:49.512 | INFO | __main__:f2:28 - 入力: 8
2025-02-18 16:49:49.517 | INFO | __main__:f2:31 - 出力: 2 -> [2, 3]
2025-02-18 16:49:49.519 | INFO | __main__:f3:38 - 入力: 1
2025-02-18 16:49:49.522 | INFO | __main__:f3:40 - 結果: 1 -> False
2025-02-18 16:49:49.524 | INFO | __main__:f3:38 - 入力: 4
2025-02-18 16:49:49.526 | INFO | __main__:f3:40 - 結果: 4 -> True
2025-02-18 16:49:49.532 | INFO | __main__:f3:38 - 入力: 5
2025-02-18 16:49:49.533 | INFO | __main__:f3:40 - 結果: 5 -> False
2025-02-18 16:49:49.534 | INFO | __main__:f3:38 - 入力: 2
2025-02-18 16:49:49.535 | INFO | __main__:f3:40 - 結果: 2 -> True
2025-02-18 16:49:49.536 | INFO | __main__:f3:38 - 入力: 3
2025-02-18 16:49:49.537 | INFO | __main__:f3:40 - 結果: 3 -> False
2025-02-18 16:49:49.538 | INFO | __main__:f4:47 - 入力: 4
2025-02-18 16:49:49.520 | INFO | __main__:f4:47 - 入力: 0
2025-02-18 16:49:49.703 | INFO | __main__:f2:31 - 出力: 6 -> [6, 7]
2025-02-18 16:49:49.707 | INFO | __main__:f3:38 - 入力: 6
2025-02-18 16:49:49.710 | INFO | __main__:f3:40 - 結果: 6 -> True
2025-02-18 16:49:49.714 | INFO | __main__:f3:38 - 入力: 7
2025-02-18 16:49:49.717 | INFO | __main__:f3:40 - 結果: 7 -> False
2025-02-18 16:49:49.717 | INFO | __main__:f2:31 - 出力: 8 -> [8, 9]
2025-02-18 16:49:49.723 | INFO | __main__:f3:38 - 入力: 8
2025-02-18 16:49:49.725 | INFO | __main__:f3:40 - 結果: 8 -> True
2025-02-18 16:49:49.727 | INFO | __main__:f3:38 - 入力: 9
2025-02-18 16:49:49.730 | INFO | __main__:f3:40 - 結果: 9 -> False
2025-02-18 16:49:49.854 | INFO | __main__:f4:50 - 出力: 4 -> 14
2025-02-18 16:49:49.858 | INFO | __main__:f4:47 - 入力: 2
2025-02-18 16:49:49.861 | INFO | __main__:f4:50 - 出力: 0 -> 10
2025-02-18 16:49:49.868 | INFO | __main__:f4:47 - 入力: 6
2025-02-18 16:49:50.161 | INFO | __main__:f4:50 - 出力: 2 -> 12
2025-02-18 16:49:50.163 | INFO | __main__:f4:47 - 入力: 8
2025-02-18 16:49:50.170 | INFO | __main__:f4:50 - 出力: 6 -> 16
2025-02-18 16:49:50.465 | INFO | __main__:f4:50 - 出力: 8 -> 18
2025-02-18 16:49:50.495 | INFO | __main__:<cell line: 0>:70 - 結果: [14, 10, 12, 16, 18]
pypeln の各ステージはIterableとして動作するので、Python標準 filter や list への変換などと連携させることもできるが、各ステージごとに異なる並列・並行処理の挙動となる点については考慮が必要。
Pipeline Operator
パイプ (|
) 演算子を使うことで、パイプラインの流れをわかりやすく記述できる。1つ前のパイプラインを例に書くとこうなる。
final_result = (
get_iterable(5)
| pl.thread.map(f1, workers=5, maxsize=10)
| pl.thread.flat_map(f2, workers=3, maxsize=5)
| (lambda iterable: filter(f3, iterable))
| pl.process.map(f4, workers=2, maxsize=4)
| list
)
logger.info("最終結果: {}", final_result)
2025-02-18 17:02:15.722 | INFO | __main__:get_iterable:10 - 生成 0
2025-02-18 17:02:15.729 | INFO | __main__:get_iterable:10 - 生成 1
2025-02-18 17:02:15.733 | INFO | __main__:get_iterable:10 - 生成 2
2025-02-18 17:02:15.735 | INFO | __main__:get_iterable:10 - 生成 3
2025-02-18 17:02:15.738 | INFO | __main__:get_iterable:10 - 生成 4
2025-02-18 17:02:15.736 | INFO | __main__:f1:17 - 入力: 2
2025-02-18 17:02:15.734 | INFO | __main__:f1:17 - 入力: 1
2025-02-18 17:02:15.729 | INFO | __main__:f1:17 - 入力: 0
2025-02-18 17:02:15.739 | INFO | __main__:f1:17 - 入力: 3
2025-02-18 17:02:15.744 | INFO | __main__:f1:17 - 入力: 4
2025-02-18 17:02:15.849 | INFO | __main__:f1:20 - 出力: 2 -> 4
2025-02-18 17:02:15.851 | INFO | __main__:f2:28 - 入力: 4
2025-02-18 17:02:15.855 | INFO | __main__:f1:20 - 出力: 1 -> 2
2025-02-18 17:02:15.859 | INFO | __main__:f2:28 - 入力: 2
2025-02-18 17:02:15.860 | INFO | __main__:f1:20 - 出力: 0 -> 0
2025-02-18 17:02:15.863 | INFO | __main__:f1:20 - 出力: 3 -> 6
2025-02-18 17:02:15.869 | INFO | __main__:f2:28 - 入力: 0
2025-02-18 17:02:15.869 | INFO | __main__:f1:20 - 出力: 4 -> 8
2025-02-18 17:02:16.055 | INFO | __main__:f2:31 - 出力: 4 -> [4, 5]
2025-02-18 17:02:16.056 | INFO | __main__:f2:28 - 入力: 6
2025-02-18 17:02:16.057 | INFO | __main__:f3:38 - 入力: 4
2025-02-18 17:02:16.063 | INFO | __main__:f2:31 - 出力: 2 -> [2, 3]
2025-02-18 17:02:16.069 | INFO | __main__:f2:28 - 入力: 8
2025-02-18 17:02:16.067 | INFO | __main__:f3:40 - 結果: 4 -> True
2025-02-18 17:02:16.076 | INFO | __main__:f3:38 - 入力: 5
2025-02-18 17:02:16.077 | INFO | __main__:f3:40 - 結果: 5 -> False
2025-02-18 17:02:16.081 | INFO | __main__:f3:38 - 入力: 2
2025-02-18 17:02:16.076 | INFO | __main__:f2:31 - 出力: 0 -> [0, 1]
2025-02-18 17:02:16.083 | INFO | __main__:f3:40 - 結果: 2 -> True
2025-02-18 17:02:16.077 | INFO | __main__:f4:47 - 入力: 4
2025-02-18 17:02:16.089 | INFO | __main__:f3:38 - 入力: 3
2025-02-18 17:02:16.091 | INFO | __main__:f3:40 - 結果: 3 -> False
2025-02-18 17:02:16.093 | INFO | __main__:f3:38 - 入力: 0
2025-02-18 17:02:16.095 | INFO | __main__:f3:40 - 結果: 0 -> True
2025-02-18 17:02:16.097 | INFO | __main__:f3:38 - 入力: 1
2025-02-18 17:02:16.091 | INFO | __main__:f4:47 - 入力: 2
2025-02-18 17:02:16.099 | INFO | __main__:f3:40 - 結果: 1 -> False
2025-02-18 17:02:16.262 | INFO | __main__:f2:31 - 出力: 6 -> [6, 7]
2025-02-18 17:02:16.270 | INFO | __main__:f3:38 - 入力: 6
2025-02-18 17:02:16.273 | INFO | __main__:f2:31 - 出力: 8 -> [8, 9]
2025-02-18 17:02:16.274 | INFO | __main__:f3:40 - 結果: 6 -> True
2025-02-18 17:02:16.282 | INFO | __main__:f3:38 - 入力: 7
2025-02-18 17:02:16.284 | INFO | __main__:f3:40 - 結果: 7 -> False
2025-02-18 17:02:16.288 | INFO | __main__:f3:38 - 入力: 8
2025-02-18 17:02:16.290 | INFO | __main__:f3:40 - 結果: 8 -> True
2025-02-18 17:02:16.293 | INFO | __main__:f3:38 - 入力: 9
2025-02-18 17:02:16.296 | INFO | __main__:f3:40 - 結果: 9 -> False
2025-02-18 17:02:16.386 | INFO | __main__:f4:50 - 出力: 4 -> 14
2025-02-18 17:02:16.389 | INFO | __main__:f4:47 - 入力: 0
2025-02-18 17:02:16.397 | INFO | __main__:f4:50 - 出力: 2 -> 12
2025-02-18 17:02:16.400 | INFO | __main__:f4:47 - 入力: 6
2025-02-18 17:02:16.694 | INFO | __main__:f4:50 - 出力: 0 -> 10
2025-02-18 17:02:16.697 | INFO | __main__:f4:47 - 入力: 8
2025-02-18 17:02:16.704 | INFO | __main__:f4:50 - 出力: 6 -> 16
2025-02-18 17:02:16.999 | INFO | __main__:f4:50 - 出力: 8 -> 18
2025-02-18 17:02:17.021 | INFO | __main__:<cell line: 0>:10 - 最終結果: [14, 12, 10, 16, 18]
まとめ
なるほど、シンプルなパイプラインなら、大げさなツールは必要なくて、これで十分書けそう。ただ、キューを複数用意して、とか、順序をきちんと守って、とかになると別のツールを使うことになると思う。
上で紹介した日本語記事の解説がとても詳しいので参考になる。