Parallel processing ― r-pipeline 2.2.0
概要
なんか一日一機能みたいなリリースしてますが・・・w
r-piplineに並列実行を仕込めるようにしてきました。
もともとkeyedPromiseAllと混ぜようかと思ったのだけど、とりあえずそのまま実行できるのが良いかと、普通のPromise.allで、タプルで型の順番を固定する方向で実装ました。
同期関数・非同期関数の混在も扱えます(async交じるので、streamAsync呼び出しのみですが)
型推論もタプル順に追随しますので、戻り値の扱いも非常に自然です。
こちらから使えますので、よければご利用ください。
使い方
今回の対応で、タプル化した関数群([ ... ] as const)を渡すだけで並列実行できるようになりました。
同期関数混ざってても平気にしておきました。
すべての関数やPipeは、入力値に直前のOutを取りますので、データを一気に流し込んで並列処理なんてのも出来ます。
タプルで入れた関数群の戻り値は、そのまま応答に返ってきます。
- 後続にjointして結果を処理
- streamAsyncの戻り値で使う
お好きな方法で使っていただけます。
サンプルコードはこちら
普通の使い方
関数を3つ並列実行するだけの基本例です。
並列実行した関数でエラーを返すと?
fail fast(最初のエラー検知で処理終了)な感じにしてあります。
並列処理自体は止めませんが、パイプラインとしての実行は停止します。
(こういうの failfastっていうんすね・・・無知。)
また、throw や Promise.reject は、あえてcatchしません。
Pipe 内で握りつぶしてしまうと、予期せぬ挙動を生む可能性があるため、pipeはそのまま外に流す(catchしない)仕組みとしてます。
エラーしても続けたいのよ
そんなあなたに、failFast フラグ(オプション引数)をご用意しました。
このフラグをfalseにすることで、失敗結果も応答にそのまま含まれるようになります。
Pipeも並列でつなげる
勢い余って、pipeの非同期実行をそのまま仕込む仕組みも入れておきました。
タプルで Pipe 群を渡すと、それぞれが同時に streamAsync 実行されます。
なんで並列化を入れたのか
「複数のAPIを同時発行して、応答をまとめて処理したい」って、結構いろんなところで出てくると思ってます。
そんなときに「安全性を考えて、全部awaitで1個ずつ止めながら処理」ってのも全然ありなんですが、レスポンス時間とかリソース効率を考えるなら、並行実行の選択肢は強いと思います。
r-pipeline 2.2.0では、
値の初期化 → 並列実行[Data1取得, Data2取得, Data3取得 ] → 3つのデータを統合処理
といった流れを パイプラインの自然な構文で実現 できます。
これがPipeで出来たらちょっと嬉しいなと思って作ったのです。
まとめ
- parallelJoint() … 関数群を並列実行
- parallelBranch() … Pipe群を並列実行
- failfast … エラー即停止 or 続行の切り替え
シンプルな構文で、Promise.allより安全・明確に並列処理が書けます。
ぜひ機会がございましたら、触ってみてください。
Discussion