Closed6

Pythonで並列パイプラインを作成するライブラリ「Pypeln」を試す

kun432kun432

GitHubレポジトリ

https://github.com/cgarciae/pypeln/

Pypeln

Pypeln(発音は「パイプライン」)は、並列および同時実行が必要な中規模のデータ処理タスク向けに設計された、シンプルでありながら強力な Python ライブラリです。

主な特徴

  • シンプル: Pypeln は、Spark や Dask のようなフレームワークを使用するのが過剰または不自然に感じられる場合に、並列性と同時実行が必要な 中規模 データタスクを解決するために設計されました。
  • 使いやすい: Pypeln は、通常の Python コードと互換性のある親しみやすい関数型 API を提供します。
  • 柔軟: Pypeln は、プロセス、スレッド、asyncio.Task を同一の API を通じて使用することで、パイプラインを構築できるようにします。
  • 細かい制御: Pypeln は、パイプラインの各段階で使用されるメモリと CPU リソースを制御することを可能にします。

詳細については、ドキュメント をご覧ください。

diagram
referred from https://github.com/cgarciae/pypeln/

公式ドキュメント

https://cgarciae.github.io/pypeln/

kun432kun432

インストール

Colaboratoryで。

!pip install pypeln
!pip freeze | grep -i pypeln
出力
pypeln==0.4.9
kun432kun432

基本的な使い方

基本的には複数のステージで構成されるパイプラインを作るためのもの。で、ワーカーの実装は3種類。

  1. プロセス
  2. スレッド
  3. 非同期タスク

これに加えて、同期ジェネレータを使う方法もある。

プロセス(multiprocessing.Process

2ステージで構成されるパイプラインの例

  1. データに処理を適用(map()
  2. 条件に合致したデータのみを抽出(filter()

それぞれのステージでmultiprocessing.Processを使ってマルチプロセスで並列処理を行う。動きを確認してみたかったのでloguruでログを取るようにして、あとsleepも固定にした。

import pypeln as pl
import time
from loguru import logger

def slow_add1(x):
    """
    入力値に1を加算して返す関数
    
    Args:
        x (int): 入力値
        
    Returns:
        int: xに1を加えた値
    """
    logger.info("INPUT: {}", x)
    time.sleep(1) # 計算が遅い処理を想定
    ret = x + 1
    logger.info("OUTPUT: {} -> {}", x, ret)
    return ret

def slow_gt3(x):
    """
    入力値が3より大きいかどうかを判定して返す関数
    
    Args:
        x (int): 入力値
        
    Returns:
        bool: xが3より大きければTrue、そうでなければFalse
    """
    logger.info("INPUT: {}", x)
    time.sleep(1) # 計算が遅い処理を想定
    ret = x > 3
    logger.info("OUTPUT: {} -> {}", x, ret)
    return ret

# 0〜9までの数値を入力データとする
data = range(10)

# ステージ1: 入力データの各要素に対して、slow_add1を並列に実行して、適用(map)
stage1 = pl.process.map(
    slow_add1,  # 適用する関数
    data,       # 入力対象のデータ
    workers=3,  # 並列実行するワーカー(プロセス)数
    maxsize=4,  # バッファリングできるデータの最大数
)

# ステージ2: 上の結果の各要素に対して、slow_gt3を並列に実行して、条件抽出(filter)
stage2 = pl.process.filter(
    slow_gt3,   # フィルタを行う関数
    stage1,     # 入力対象のデータ
    workers=2,  # 並列実行するワーカー(プロセス)数
)

# 結果
final_result = list(stage2)
print(final_result)
出力
2025-02-18 15:43:08.204 | INFO     | __main__:slow_add1:16 - INPUT: 1
2025-02-18 15:43:08.200 | INFO     | __main__:slow_add1:16 - INPUT: 0
2025-02-18 15:43:08.209 | INFO     | __main__:slow_add1:16 - INPUT: 2
2025-02-18 15:43:09.215 | INFO     | __main__:slow_add1:19 - OUTPUT: 1 -> 2
2025-02-18 15:43:09.213 | INFO     | __main__:slow_add1:19 - OUTPUT: 0 -> 1
2025-02-18 15:43:09.216 | INFO     | __main__:slow_add1:19 - OUTPUT: 2 -> 3
2025-02-18 15:43:09.223 | INFO     | __main__:slow_add1:16 - INPUT: 3
2025-02-18 15:43:09.231 | INFO     | __main__:slow_add1:16 - INPUT: 4
2025-02-18 15:43:09.234 | INFO     | __main__:slow_add1:16 - INPUT: 5
2025-02-18 15:43:09.225 | INFO     | __main__:slow_gt3:32 - INPUT: 1
2025-02-18 15:43:09.234 | INFO     | __main__:slow_gt3:32 - INPUT: 3
2025-02-18 15:43:10.232 | INFO     | __main__:slow_add1:19 - OUTPUT: 3 -> 4
2025-02-18 15:43:10.235 | INFO     | __main__:slow_add1:16 - INPUT: 6
2025-02-18 15:43:10.238 | INFO     | __main__:slow_add1:19 - OUTPUT: 5 -> 6
2025-02-18 15:43:10.240 | INFO     | __main__:slow_gt3:35 - OUTPUT: 1 -> False
2025-02-18 15:43:10.236 | INFO     | __main__:slow_add1:19 - OUTPUT: 4 -> 5
2025-02-18 15:43:10.242 | INFO     | __main__:slow_add1:16 - INPUT: 7
2025-02-18 15:43:10.243 | INFO     | __main__:slow_gt3:32 - INPUT: 2
2025-02-18 15:43:10.249 | INFO     | __main__:slow_add1:16 - INPUT: 8
2025-02-18 15:43:10.247 | INFO     | __main__:slow_gt3:35 - OUTPUT: 3 -> False
2025-02-18 15:43:10.253 | INFO     | __main__:slow_gt3:32 - INPUT: 4
2025-02-18 15:43:11.238 | INFO     | __main__:slow_add1:19 - OUTPUT: 6 -> 7
2025-02-18 15:43:11.241 | INFO     | __main__:slow_add1:16 - INPUT: 9
2025-02-18 15:43:11.247 | INFO     | __main__:slow_add1:19 - OUTPUT: 7 -> 8
2025-02-18 15:43:11.251 | INFO     | __main__:slow_gt3:35 - OUTPUT: 2 -> False
2025-02-18 15:43:11.252 | INFO     | __main__:slow_add1:19 - OUTPUT: 8 -> 9
2025-02-18 15:43:11.259 | INFO     | __main__:slow_gt3:32 - INPUT: 6
2025-02-18 15:43:11.262 | INFO     | __main__:slow_gt3:35 - OUTPUT: 4 -> True
2025-02-18 15:43:11.266 | INFO     | __main__:slow_gt3:32 - INPUT: 5
2025-02-18 15:43:12.243 | INFO     | __main__:slow_add1:19 - OUTPUT: 9 -> 10
2025-02-18 15:43:12.263 | INFO     | __main__:slow_gt3:35 - OUTPUT: 6 -> True
2025-02-18 15:43:12.268 | INFO     | __main__:slow_gt3:35 - OUTPUT: 5 -> True
2025-02-18 15:43:12.266 | INFO     | __main__:slow_gt3:32 - INPUT: 7
2025-02-18 15:43:12.274 | INFO     | __main__:slow_gt3:32 - INPUT: 8
2025-02-18 15:43:13.272 | INFO     | __main__:slow_gt3:35 - OUTPUT: 7 -> True
2025-02-18 15:43:13.277 | INFO     | __main__:slow_gt3:32 - INPUT: 9
2025-02-18 15:43:13.278 | INFO     | __main__:slow_gt3:35 - OUTPUT: 8 -> True
2025-02-18 15:43:13.288 | INFO     | __main__:slow_gt3:32 - INPUT: 10
2025-02-18 15:43:14.285 | INFO     | __main__:slow_gt3:35 - OUTPUT: 9 -> True
2025-02-18 15:43:14.291 | INFO     | __main__:slow_gt3:35 - OUTPUT: 10 -> True
[4, 6, 5, 7, 8, 9, 10]

workersはまあわかりやすいのだけど、maxsizeがわかりにくい。色々調べてみたところ、以下の記事を見つけた。

https://scrapbox.io/PythonOsaka/Pypelnを使って効率良くデータを操作してみよう

** キュー**

ワーカー同士はQueueを通じて通信します。各Queueが保持できる最大要素数は、pypelnの様々な関数のmaxsize引数で制御されます。この値のデフォルトはは0で、要素数に制限はありません。しかし、maxsize が設定されると、キューがいっぱいになった(maxsizeで指定した数値に達した)ときに前のステージが新しい要素をキューに押し込むのを防ぐメカニズムとして機能します。

ということは、maxsizeが指定されたステージでキューがいっぱいになると前のステージの処理が遅延する、ということになるのかな?このサンプルだと、前のステージは入力データ(が入ってくるキュー)そのものになると思うのでいまいちピンとこないのだけども。

色々パラメータを変えてテストしてみたけど、maxsizeの挙動はいまいち掴めず・・・一旦気にせず進める。

スレッド(threading.Thread

上のコードをスレッドに置き換えたもの。単にpl.processpl.threadに変更しただけ。

import pypeln as pl
import time
from loguru import logger

def slow_add1(x):
    logger.info("INPUT: {}", x)
    time.sleep(1)
    ret = x + 1
    logger.info("OUTPUT: {} -> {}", x, ret)
    return ret

def slow_gt3(x):
    logger.info("INPUT: {}", x)
    time.sleep(1)
    ret = x > 3
    logger.info("OUTPUT: {} -> {}", x, ret)
    return ret

data = range(10)

# スレッドを使う
stage1 = pl.thread.map(
    slow_add1,
    data,
    workers=3,
    maxsize=4,
)

# スレッドを使う
stage2 = pl.thread.filter(
    slow_gt3,
    stage1,
    workers=2,
)

# 結果
final_result = list(stage2)
print(final_result)
出力
2025-02-18 15:53:47.672 | INFO     | __main__:slow_add1:7 - INPUT: 0
2025-02-18 15:53:47.675 | INFO     | __main__:slow_add1:7 - INPUT: 1
2025-02-18 15:53:47.675 | INFO     | __main__:slow_add1:7 - INPUT: 2
2025-02-18 15:53:48.676 | INFO     | __main__:slow_add1:10 - OUTPUT: 0 -> 1
2025-02-18 15:53:48.678 | INFO     | __main__:slow_add1:7 - INPUT: 3
2025-02-18 15:53:48.678 | INFO     | __main__:slow_add1:10 - OUTPUT: 1 -> 2
2025-02-18 15:53:48.686 | INFO     | __main__:slow_add1:7 - INPUT: 4
2025-02-18 15:53:48.684 | INFO     | __main__:slow_add1:10 - OUTPUT: 2 -> 3
2025-02-18 15:53:48.691 | INFO     | __main__:slow_add1:7 - INPUT: 5
2025-02-18 15:53:48.686 | INFO     | __main__:slow_gt3:14 - INPUT: 2
2025-02-18 15:53:48.678 | INFO     | __main__:slow_gt3:14 - INPUT: 1
2025-02-18 15:53:49.685 | INFO     | __main__:slow_add1:10 - OUTPUT: 3 -> 4
2025-02-18 15:53:49.687 | INFO     | __main__:slow_add1:7 - INPUT: 6
2025-02-18 15:53:49.689 | INFO     | __main__:slow_add1:10 - OUTPUT: 4 -> 5
2025-02-18 15:53:49.692 | INFO     | __main__:slow_add1:7 - INPUT: 7
2025-02-18 15:53:49.693 | INFO     | __main__:slow_add1:10 - OUTPUT: 5 -> 6
2025-02-18 15:53:49.694 | INFO     | __main__:slow_add1:7 - INPUT: 8
2025-02-18 15:53:49.697 | INFO     | __main__:slow_gt3:17 - OUTPUT: 2 -> False
2025-02-18 15:53:49.699 | INFO     | __main__:slow_gt3:14 - INPUT: 3
2025-02-18 15:53:49.698 | INFO     | __main__:slow_gt3:17 - OUTPUT: 1 -> False
2025-02-18 15:53:49.701 | INFO     | __main__:slow_gt3:14 - INPUT: 4
2025-02-18 15:53:50.690 | INFO     | __main__:slow_add1:10 - OUTPUT: 6 -> 7
2025-02-18 15:53:50.692 | INFO     | __main__:slow_add1:7 - INPUT: 9
2025-02-18 15:53:50.693 | INFO     | __main__:slow_add1:10 - OUTPUT: 7 -> 8
2025-02-18 15:53:50.698 | INFO     | __main__:slow_add1:10 - OUTPUT: 8 -> 9
2025-02-18 15:53:50.700 | INFO     | __main__:slow_gt3:17 - OUTPUT: 3 -> False
2025-02-18 15:53:50.701 | INFO     | __main__:slow_gt3:14 - INPUT: 5
2025-02-18 15:53:50.702 | INFO     | __main__:slow_gt3:17 - OUTPUT: 4 -> True
2025-02-18 15:53:50.703 | INFO     | __main__:slow_gt3:14 - INPUT: 6
2025-02-18 15:53:51.695 | INFO     | __main__:slow_add1:10 - OUTPUT: 9 -> 10
2025-02-18 15:53:51.702 | INFO     | __main__:slow_gt3:17 - OUTPUT: 5 -> True
2025-02-18 15:53:51.704 | INFO     | __main__:slow_gt3:14 - INPUT: 7
2025-02-18 15:53:51.705 | INFO     | __main__:slow_gt3:17 - OUTPUT: 6 -> True
2025-02-18 15:53:51.706 | INFO     | __main__:slow_gt3:14 - INPUT: 8
2025-02-18 15:53:52.705 | INFO     | __main__:slow_gt3:17 - OUTPUT: 7 -> True
2025-02-18 15:53:52.707 | INFO     | __main__:slow_gt3:14 - INPUT: 9
2025-02-18 15:53:52.710 | INFO     | __main__:slow_gt3:17 - OUTPUT: 8 -> True
2025-02-18 15:53:52.711 | INFO     | __main__:slow_gt3:14 - INPUT: 10
2025-02-18 15:53:53.708 | INFO     | __main__:slow_gt3:17 - OUTPUT: 9 -> True
2025-02-18 15:53:53.714 | INFO     | __main__:slow_gt3:17 - OUTPUT: 10 -> True
[4, 5, 6, 7, 8, 9, 10]

非同期タスク(asyncio.Task

同じように``pl.task`を使って、あと関数はasyncで指定するだけ。

import pypeln as pl
import asyncio
from loguru import logger
import sys


# notebook環境だと以下が必要
import nest_asyncio
nest_asyncio.apply()

# 非同期の場合はenqueue=Trueを付与
logger.remove()
logger.add(sys.stderr, enqueue=True) 


async def slow_add1(x):
    logger.info("INPUT: {}", x)
    await asyncio.sleep(1)
    ret = x + 1
    logger.info("OUTPUT: {} -> {}", x, ret)
    return ret

async def slow_gt3(x):
    logger.info("INPUT: {}", x)
    await asyncio.sleep(1)
    ret = x > 3
    logger.info("OUTPUT: {} -> {}", x, ret)
    return ret

async def main():
    data = range(10)

    # 非同期タスクを使う
    stage1 = pl.task.map(
        slow_add1,
        data,
        workers=3,
        maxsize=4,
    )

    # 非同期タスクを使う
    stage2 = pl.task.filter(
        slow_gt3,
        stage1,
        workers=2,
    )

    # 結果
    final_result = [x async for x in stage2]
    print(final_result)

asyncio.run(main())
出力
2025-02-18 16:07:53.203 | INFO     | __main__:slow_add1:17 - INPUT: 0
2025-02-18 16:07:53.203 | INFO     | __main__:slow_add1:17 - INPUT: 1
2025-02-18 16:07:53.203 | INFO     | __main__:slow_add1:17 - INPUT: 2
2025-02-18 16:07:54.205 | INFO     | __main__:slow_add1:20 - OUTPUT: 0 -> 1
2025-02-18 16:07:54.205 | INFO     | __main__:slow_add1:20 - OUTPUT: 1 -> 2
2025-02-18 16:07:54.205 | INFO     | __main__:slow_add1:20 - OUTPUT: 2 -> 3
2025-02-18 16:07:54.206 | INFO     | __main__:slow_gt3:24 - INPUT: 1
2025-02-18 16:07:54.206 | INFO     | __main__:slow_gt3:24 - INPUT: 2
2025-02-18 16:07:54.206 | INFO     | __main__:slow_add1:17 - INPUT: 3
2025-02-18 16:07:54.207 | INFO     | __main__:slow_add1:17 - INPUT: 4
2025-02-18 16:07:54.207 | INFO     | __main__:slow_add1:17 - INPUT: 5
2025-02-18 16:07:55.208 | INFO     | __main__:slow_gt3:27 - OUTPUT: 1 -> False
2025-02-18 16:07:55.208 | INFO     | __main__:slow_gt3:27 - OUTPUT: 2 -> False
2025-02-18 16:07:55.209 | INFO     | __main__:slow_add1:20 - OUTPUT: 3 -> 4
2025-02-18 16:07:55.209 | INFO     | __main__:slow_add1:20 - OUTPUT: 4 -> 5
2025-02-18 16:07:55.209 | INFO     | __main__:slow_add1:20 - OUTPUT: 5 -> 6
2025-02-18 16:07:55.209 | INFO     | __main__:slow_gt3:24 - INPUT: 3
2025-02-18 16:07:55.210 | INFO     | __main__:slow_gt3:24 - INPUT: 4
2025-02-18 16:07:55.210 | INFO     | __main__:slow_add1:17 - INPUT: 6
2025-02-18 16:07:55.210 | INFO     | __main__:slow_add1:17 - INPUT: 7
2025-02-18 16:07:55.210 | INFO     | __main__:slow_add1:17 - INPUT: 8
2025-02-18 16:07:56.211 | INFO     | __main__:slow_gt3:27 - OUTPUT: 3 -> False
2025-02-18 16:07:56.211 | INFO     | __main__:slow_gt3:27 - OUTPUT: 4 -> True
2025-02-18 16:07:56.212 | INFO     | __main__:slow_add1:20 - OUTPUT: 6 -> 7
2025-02-18 16:07:56.212 | INFO     | __main__:slow_add1:20 - OUTPUT: 7 -> 8
2025-02-18 16:07:56.212 | INFO     | __main__:slow_add1:20 - OUTPUT: 8 -> 9
2025-02-18 16:07:56.212 | INFO     | __main__:slow_gt3:24 - INPUT: 5
2025-02-18 16:07:56.213 | INFO     | __main__:slow_gt3:24 - INPUT: 6
2025-02-18 16:07:56.213 | INFO     | __main__:slow_add1:17 - INPUT: 9
2025-02-18 16:07:57.214 | INFO     | __main__:slow_gt3:27 - OUTPUT: 5 -> True
2025-02-18 16:07:57.215 | INFO     | __main__:slow_gt3:27 - OUTPUT: 6 -> True
2025-02-18 16:07:57.215 | INFO     | __main__:slow_add1:20 - OUTPUT: 9 -> 10
2025-02-18 16:07:57.215 | INFO     | __main__:slow_gt3:24 - INPUT: 7
2025-02-18 16:07:57.216 | INFO     | __main__:slow_gt3:24 - INPUT: 8
2025-02-18 16:07:58.217 | INFO     | __main__:slow_gt3:27 - OUTPUT: 7 -> True
2025-02-18 16:07:58.218 | INFO     | __main__:slow_gt3:27 - OUTPUT: 8 -> True
2025-02-18 16:07:58.218 | INFO     | __main__:slow_gt3:24 - INPUT: 9
2025-02-18 16:07:58.218 | INFO     | __main__:slow_gt3:24 - INPUT: 10
2025-02-18 16:07:59.219 | INFO     | __main__:slow_gt3:27 - OUTPUT: 9 -> True
2025-02-18 16:07:59.220 | INFO     | __main__:slow_gt3:27 - OUTPUT: 10 -> True
[4, 5, 6, 7, 8, 9, 10]

同期ジェネレータ

pl.syncを使うと、同期ジェネレータで処理を行う。この場合

  • デバッグに便利
  • 重たいCPU/IOバウンドなタスクがなく、入力データの順序を保ちたい場合

に使える。この場合、workersmaxsizeなどのパラメータは無視される。

import pypeln as pl
import time
from loguru import logger

def slow_add1(x):
    logger.info("INPUT: {}", x)
    time.sleep(1)
    ret = x + 1
    logger.info("OUTPUT: {} -> {}", x, ret)
    return ret

def slow_gt3(x):
    logger.info("INPUT: {}", x)
    time.sleep(1)
    ret = x > 3
    logger.info("OUTPUT: {} -> {}", x, ret)
    return ret

data = range(10)

# 同期ジェネレータを使う
stage1 = pl.sync.map(
    slow_add1,
    data,
    workers=3,
    maxsize=4,
)

# 同期ジェネレータを使う
stage2 = pl.sync.filter(
    slow_gt3,
    stage1,
    workers=2,
)

# 結果
final_result = list(stage2)
print(final_result)
出力
2025-02-18 16:18:31.218 | INFO     | __main__:slow_add1:6 - INPUT: 0
2025-02-18 16:18:32.224 | INFO     | __main__:slow_add1:9 - OUTPUT: 0 -> 1
2025-02-18 16:18:32.228 | INFO     | __main__:slow_gt3:13 - INPUT: 1
2025-02-18 16:18:33.230 | INFO     | __main__:slow_gt3:16 - OUTPUT: 1 -> False
2025-02-18 16:18:33.234 | INFO     | __main__:slow_add1:6 - INPUT: 1
2025-02-18 16:18:34.236 | INFO     | __main__:slow_add1:9 - OUTPUT: 1 -> 2
2025-02-18 16:18:34.237 | INFO     | __main__:slow_gt3:13 - INPUT: 2
2025-02-18 16:18:35.241 | INFO     | __main__:slow_gt3:16 - OUTPUT: 2 -> False
2025-02-18 16:18:35.243 | INFO     | __main__:slow_add1:6 - INPUT: 2
2025-02-18 16:18:36.245 | INFO     | __main__:slow_add1:9 - OUTPUT: 2 -> 3
2025-02-18 16:18:36.246 | INFO     | __main__:slow_gt3:13 - INPUT: 3
2025-02-18 16:18:37.249 | INFO     | __main__:slow_gt3:16 - OUTPUT: 3 -> False
2025-02-18 16:18:37.250 | INFO     | __main__:slow_add1:6 - INPUT: 3
2025-02-18 16:18:38.252 | INFO     | __main__:slow_add1:9 - OUTPUT: 3 -> 4
2025-02-18 16:18:38.253 | INFO     | __main__:slow_gt3:13 - INPUT: 4
2025-02-18 16:18:39.254 | INFO     | __main__:slow_gt3:16 - OUTPUT: 4 -> True
2025-02-18 16:18:39.257 | INFO     | __main__:slow_add1:6 - INPUT: 4
2025-02-18 16:18:40.258 | INFO     | __main__:slow_add1:9 - OUTPUT: 4 -> 5
2025-02-18 16:18:40.260 | INFO     | __main__:slow_gt3:13 - INPUT: 5
2025-02-18 16:18:41.262 | INFO     | __main__:slow_gt3:16 - OUTPUT: 5 -> True
2025-02-18 16:18:41.264 | INFO     | __main__:slow_add1:6 - INPUT: 5
2025-02-18 16:18:42.265 | INFO     | __main__:slow_add1:9 - OUTPUT: 5 -> 6
2025-02-18 16:18:42.267 | INFO     | __main__:slow_gt3:13 - INPUT: 6
2025-02-18 16:18:43.268 | INFO     | __main__:slow_gt3:16 - OUTPUT: 6 -> True
2025-02-18 16:18:43.270 | INFO     | __main__:slow_add1:6 - INPUT: 6
2025-02-18 16:18:44.272 | INFO     | __main__:slow_add1:9 - OUTPUT: 6 -> 7
2025-02-18 16:18:44.274 | INFO     | __main__:slow_gt3:13 - INPUT: 7
2025-02-18 16:18:45.276 | INFO     | __main__:slow_gt3:16 - OUTPUT: 7 -> True
2025-02-18 16:18:45.278 | INFO     | __main__:slow_add1:6 - INPUT: 7
2025-02-18 16:18:46.280 | INFO     | __main__:slow_add1:9 - OUTPUT: 7 -> 8
2025-02-18 16:18:46.282 | INFO     | __main__:slow_gt3:13 - INPUT: 8
2025-02-18 16:18:47.283 | INFO     | __main__:slow_gt3:16 - OUTPUT: 8 -> True
2025-02-18 16:18:47.285 | INFO     | __main__:slow_add1:6 - INPUT: 8
2025-02-18 16:18:48.288 | INFO     | __main__:slow_add1:9 - OUTPUT: 8 -> 9
2025-02-18 16:18:48.290 | INFO     | __main__:slow_gt3:13 - INPUT: 9
2025-02-18 16:18:49.292 | INFO     | __main__:slow_gt3:16 - OUTPUT: 9 -> True
2025-02-18 16:18:49.294 | INFO     | __main__:slow_add1:6 - INPUT: 9
2025-02-18 16:18:50.296 | INFO     | __main__:slow_add1:9 - OUTPUT: 9 -> 10
2025-02-18 16:18:50.297 | INFO     | __main__:slow_gt3:13 - INPUT: 10
2025-02-18 16:18:51.299 | INFO     | __main__:slow_gt3:16 - OUTPUT: 10 -> True
[4, 5, 6, 7, 8, 9, 10]
kun432kun432

異なるワーカーを組み合わせたパイプライン

複数のワーカータイプを組み合わせたパイプラインを書くこともできる。

from loguru import logger
import pypeln as pl
import time

def get_iterable(x):
    """
    入力データを生成するイテラブルな関数
    """
    for i in range(x):
        logger.info("生成 {}", i)
        yield i

def f1(x):
    """
    入力値を2倍にして返す関数
    """
    logger.info("入力: {}", x)
    time.sleep(0.1)
    result = x * 2
    logger.info("出力: {} -> {}", x, result)
    return result

def f2(x):
    """
    スレッドで実行するflat_map関数
    入力値から、入力値とその次の値の2つのリストを返す
    """
    logger.info("入力: {}", x)
    time.sleep(0.2)
    results = [x, x + 1]
    logger.info("出力: {} -> {}", x, results)
    return results

def f3(x):
    """
    入力値が偶数かどうかを判定する関数
    """
    logger.info("入力: {}", x)
    result = (x % 2 == 0)
    logger.info("結果: {} -> {}", x, result)
    return result

def f4(x):
    """
    入力値に10を加算して返す
    """
    logger.info("入力: {}", x)
    time.sleep(0.3)
    result = x + 10
    logger.info("出力: {} -> {}", x, result)
    return result

# 入力データ
data = get_iterable(5)

# Stage 1: スレッドを使用してf1を適用(入力値を2倍にする)
stage = pl.thread.map(f1, data, workers=5, maxsize=10)

# Stage 2: スレッドを使用してflat_mapでf2を適用(各値から2つの値を生成する)
stage = pl.thread.flat_map(f2, stage, workers=3, maxsize=5)

# Stage 3: 組み込みのfilterを使用してf3を適用(偶数のみを抽出する)
stage = filter(f3, stage)

# Stage 4: プロセスを使用してf4を適用(各値に10を加算する)
stage = pl.process.map(f4, stage, workers=2, maxsize=4)

# すべてのステージがイテラブルとして連結されるので、結果をリストで取得
final_result = list(stage)
logger.info("結果: {}", final_result)
出力
2025-02-18 16:49:48.886 | INFO     | __main__:get_iterable:10 - 生成 0
2025-02-18 16:49:48.894 | INFO     | __main__:get_iterable:10 - 生成 1
2025-02-18 16:49:49.157 | INFO     | __main__:get_iterable:10 - 生成 2
2025-02-18 16:49:49.167 | INFO     | __main__:get_iterable:10 - 生成 3
2025-02-18 16:49:49.176 | INFO     | __main__:get_iterable:10 - 生成 4
2025-02-18 16:49:48.894 | INFO     | __main__:f1:17 - 入力: 0
2025-02-18 16:49:49.168 | INFO     | __main__:f1:17 - 入力: 2
2025-02-18 16:49:49.158 | INFO     | __main__:f1:17 - 入力: 1
2025-02-18 16:49:49.177 | INFO     | __main__:f1:17 - 入力: 3
2025-02-18 16:49:49.182 | INFO     | __main__:f1:17 - 入力: 4
2025-02-18 16:49:49.287 | INFO     | __main__:f1:20 - 出力: 0 -> 0
2025-02-18 16:49:49.294 | INFO     | __main__:f2:28 - 入力: 0
2025-02-18 16:49:49.295 | INFO     | __main__:f1:20 - 出力: 2 -> 4
2025-02-18 16:49:49.300 | INFO     | __main__:f1:20 - 出力: 1 -> 2
2025-02-18 16:49:49.302 | INFO     | __main__:f2:28 - 入力: 4
2025-02-18 16:49:49.305 | INFO     | __main__:f1:20 - 出力: 3 -> 6
2025-02-18 16:49:49.305 | INFO     | __main__:f2:28 - 入力: 2
2025-02-18 16:49:49.310 | INFO     | __main__:f1:20 - 出力: 4 -> 8
2025-02-18 16:49:49.497 | INFO     | __main__:f2:31 - 出力: 0 -> [0, 1]
2025-02-18 16:49:49.499 | INFO     | __main__:f2:28 - 入力: 6
2025-02-18 16:49:49.500 | INFO     | __main__:f3:38 - 入力: 0
2025-02-18 16:49:49.507 | INFO     | __main__:f2:31 - 出力: 4 -> [4, 5]
2025-02-18 16:49:49.508 | INFO     | __main__:f3:40 - 結果: 0 -> True
2025-02-18 16:49:49.512 | INFO     | __main__:f2:28 - 入力: 8
2025-02-18 16:49:49.517 | INFO     | __main__:f2:31 - 出力: 2 -> [2, 3]
2025-02-18 16:49:49.519 | INFO     | __main__:f3:38 - 入力: 1
2025-02-18 16:49:49.522 | INFO     | __main__:f3:40 - 結果: 1 -> False
2025-02-18 16:49:49.524 | INFO     | __main__:f3:38 - 入力: 4
2025-02-18 16:49:49.526 | INFO     | __main__:f3:40 - 結果: 4 -> True
2025-02-18 16:49:49.532 | INFO     | __main__:f3:38 - 入力: 5
2025-02-18 16:49:49.533 | INFO     | __main__:f3:40 - 結果: 5 -> False
2025-02-18 16:49:49.534 | INFO     | __main__:f3:38 - 入力: 2
2025-02-18 16:49:49.535 | INFO     | __main__:f3:40 - 結果: 2 -> True
2025-02-18 16:49:49.536 | INFO     | __main__:f3:38 - 入力: 3
2025-02-18 16:49:49.537 | INFO     | __main__:f3:40 - 結果: 3 -> False
2025-02-18 16:49:49.538 | INFO     | __main__:f4:47 - 入力: 4
2025-02-18 16:49:49.520 | INFO     | __main__:f4:47 - 入力: 0
2025-02-18 16:49:49.703 | INFO     | __main__:f2:31 - 出力: 6 -> [6, 7]
2025-02-18 16:49:49.707 | INFO     | __main__:f3:38 - 入力: 6
2025-02-18 16:49:49.710 | INFO     | __main__:f3:40 - 結果: 6 -> True
2025-02-18 16:49:49.714 | INFO     | __main__:f3:38 - 入力: 7
2025-02-18 16:49:49.717 | INFO     | __main__:f3:40 - 結果: 7 -> False
2025-02-18 16:49:49.717 | INFO     | __main__:f2:31 - 出力: 8 -> [8, 9]
2025-02-18 16:49:49.723 | INFO     | __main__:f3:38 - 入力: 8
2025-02-18 16:49:49.725 | INFO     | __main__:f3:40 - 結果: 8 -> True
2025-02-18 16:49:49.727 | INFO     | __main__:f3:38 - 入力: 9
2025-02-18 16:49:49.730 | INFO     | __main__:f3:40 - 結果: 9 -> False
2025-02-18 16:49:49.854 | INFO     | __main__:f4:50 - 出力: 4 -> 14
2025-02-18 16:49:49.858 | INFO     | __main__:f4:47 - 入力: 2
2025-02-18 16:49:49.861 | INFO     | __main__:f4:50 - 出力: 0 -> 10
2025-02-18 16:49:49.868 | INFO     | __main__:f4:47 - 入力: 6
2025-02-18 16:49:50.161 | INFO     | __main__:f4:50 - 出力: 2 -> 12
2025-02-18 16:49:50.163 | INFO     | __main__:f4:47 - 入力: 8
2025-02-18 16:49:50.170 | INFO     | __main__:f4:50 - 出力: 6 -> 16
2025-02-18 16:49:50.465 | INFO     | __main__:f4:50 - 出力: 8 -> 18
2025-02-18 16:49:50.495 | INFO     | __main__:<cell line: 0>:70 - 結果: [14, 10, 12, 16, 18]

pypeln の各ステージはIterableとして動作するので、Python標準 filter や list への変換などと連携させることもできるが、各ステージごとに異なる並列・並行処理の挙動となる点については考慮が必要。

kun432kun432

Pipeline Operator

パイプ (|) 演算子を使うことで、パイプラインの流れをわかりやすく記述できる。1つ前のパイプラインを例に書くとこうなる。

final_result = (
    get_iterable(5)
    | pl.thread.map(f1, workers=5, maxsize=10)
    | pl.thread.flat_map(f2, workers=3, maxsize=5)
    | (lambda iterable: filter(f3, iterable))
    | pl.process.map(f4, workers=2, maxsize=4)
    | list
)

logger.info("最終結果: {}", final_result)
出力
2025-02-18 17:02:15.722 | INFO     | __main__:get_iterable:10 - 生成 0
2025-02-18 17:02:15.729 | INFO     | __main__:get_iterable:10 - 生成 1
2025-02-18 17:02:15.733 | INFO     | __main__:get_iterable:10 - 生成 2
2025-02-18 17:02:15.735 | INFO     | __main__:get_iterable:10 - 生成 3
2025-02-18 17:02:15.738 | INFO     | __main__:get_iterable:10 - 生成 4
2025-02-18 17:02:15.736 | INFO     | __main__:f1:17 - 入力: 2
2025-02-18 17:02:15.734 | INFO     | __main__:f1:17 - 入力: 1
2025-02-18 17:02:15.729 | INFO     | __main__:f1:17 - 入力: 0
2025-02-18 17:02:15.739 | INFO     | __main__:f1:17 - 入力: 3
2025-02-18 17:02:15.744 | INFO     | __main__:f1:17 - 入力: 4
2025-02-18 17:02:15.849 | INFO     | __main__:f1:20 - 出力: 2 -> 4
2025-02-18 17:02:15.851 | INFO     | __main__:f2:28 - 入力: 4
2025-02-18 17:02:15.855 | INFO     | __main__:f1:20 - 出力: 1 -> 2
2025-02-18 17:02:15.859 | INFO     | __main__:f2:28 - 入力: 2
2025-02-18 17:02:15.860 | INFO     | __main__:f1:20 - 出力: 0 -> 0
2025-02-18 17:02:15.863 | INFO     | __main__:f1:20 - 出力: 3 -> 6
2025-02-18 17:02:15.869 | INFO     | __main__:f2:28 - 入力: 0
2025-02-18 17:02:15.869 | INFO     | __main__:f1:20 - 出力: 4 -> 8
2025-02-18 17:02:16.055 | INFO     | __main__:f2:31 - 出力: 4 -> [4, 5]
2025-02-18 17:02:16.056 | INFO     | __main__:f2:28 - 入力: 6
2025-02-18 17:02:16.057 | INFO     | __main__:f3:38 - 入力: 4
2025-02-18 17:02:16.063 | INFO     | __main__:f2:31 - 出力: 2 -> [2, 3]
2025-02-18 17:02:16.069 | INFO     | __main__:f2:28 - 入力: 8
2025-02-18 17:02:16.067 | INFO     | __main__:f3:40 - 結果: 4 -> True
2025-02-18 17:02:16.076 | INFO     | __main__:f3:38 - 入力: 5
2025-02-18 17:02:16.077 | INFO     | __main__:f3:40 - 結果: 5 -> False
2025-02-18 17:02:16.081 | INFO     | __main__:f3:38 - 入力: 2
2025-02-18 17:02:16.076 | INFO     | __main__:f2:31 - 出力: 0 -> [0, 1]
2025-02-18 17:02:16.083 | INFO     | __main__:f3:40 - 結果: 2 -> True
2025-02-18 17:02:16.077 | INFO     | __main__:f4:47 - 入力: 4
2025-02-18 17:02:16.089 | INFO     | __main__:f3:38 - 入力: 3
2025-02-18 17:02:16.091 | INFO     | __main__:f3:40 - 結果: 3 -> False
2025-02-18 17:02:16.093 | INFO     | __main__:f3:38 - 入力: 0
2025-02-18 17:02:16.095 | INFO     | __main__:f3:40 - 結果: 0 -> True
2025-02-18 17:02:16.097 | INFO     | __main__:f3:38 - 入力: 1
2025-02-18 17:02:16.091 | INFO     | __main__:f4:47 - 入力: 2
2025-02-18 17:02:16.099 | INFO     | __main__:f3:40 - 結果: 1 -> False
2025-02-18 17:02:16.262 | INFO     | __main__:f2:31 - 出力: 6 -> [6, 7]
2025-02-18 17:02:16.270 | INFO     | __main__:f3:38 - 入力: 6
2025-02-18 17:02:16.273 | INFO     | __main__:f2:31 - 出力: 8 -> [8, 9]
2025-02-18 17:02:16.274 | INFO     | __main__:f3:40 - 結果: 6 -> True
2025-02-18 17:02:16.282 | INFO     | __main__:f3:38 - 入力: 7
2025-02-18 17:02:16.284 | INFO     | __main__:f3:40 - 結果: 7 -> False
2025-02-18 17:02:16.288 | INFO     | __main__:f3:38 - 入力: 8
2025-02-18 17:02:16.290 | INFO     | __main__:f3:40 - 結果: 8 -> True
2025-02-18 17:02:16.293 | INFO     | __main__:f3:38 - 入力: 9
2025-02-18 17:02:16.296 | INFO     | __main__:f3:40 - 結果: 9 -> False
2025-02-18 17:02:16.386 | INFO     | __main__:f4:50 - 出力: 4 -> 14
2025-02-18 17:02:16.389 | INFO     | __main__:f4:47 - 入力: 0
2025-02-18 17:02:16.397 | INFO     | __main__:f4:50 - 出力: 2 -> 12
2025-02-18 17:02:16.400 | INFO     | __main__:f4:47 - 入力: 6
2025-02-18 17:02:16.694 | INFO     | __main__:f4:50 - 出力: 0 -> 10
2025-02-18 17:02:16.697 | INFO     | __main__:f4:47 - 入力: 8
2025-02-18 17:02:16.704 | INFO     | __main__:f4:50 - 出力: 6 -> 16
2025-02-18 17:02:16.999 | INFO     | __main__:f4:50 - 出力: 8 -> 18
2025-02-18 17:02:17.021 | INFO     | __main__:<cell line: 0>:10 - 最終結果: [14, 12, 10, 16, 18]
kun432kun432

まとめ

なるほど、シンプルなパイプラインなら、大げさなツールは必要なくて、これで十分書けそう。ただ、キューを複数用意して、とか、順序をきちんと守って、とかになると別のツールを使うことになると思う。

上で紹介した日本語記事の解説がとても詳しいので参考になる。

このスクラップは2025/02/19にクローズされました