🐥

[TypeScript]非同期処理(promise)の最大並列実行数を5行で制限

2021/09/12に公開2

非同期処理の並列実行数に関して

大量の処理をAPIに投げるような場合、非同期処理が行われているとすぐに次の処理の実行が可能になります。そのためDoS攻撃のようなリクエストを投げてしまう場合があります。かといって一件ずつ処理待ちをしていると、待機時間が馬鹿になりません。

こういう場合に必要になってくるのが並列実行回数の制限です。

よくある非同期待ちに関して

最大並列実行数を制御する場合、よくある処理としては実行処理そのものをキューに積んで、最大数を制限しながらキューの中の処理を実行していく方式です。この方法の問題点は、処理内容が10万件とか大規模な処理が必要な場合に、10万件の実行処理を生成してキューに積むので、無駄に大きなメモリを持って行かれてしまうことです。

そういった無駄を発生させないためには、実行処理生成のタイミングで上限を超えないように待機の必要があります。ということで、処理を書いてみました。

今回のソースコード

https://github.com/SoraKumo001/promise-parallels

最大並列の処理待ち関数

5行で書けました。
コード量を減らすため変な記法になっていますが、きちんと動きます。
使用する人が滅多にいないPromise.race()を使っています。

const Parallels = (ps = new Set<Promise<unknown>>()) => ({
  add: (p: Promise<unknown>) => ps.add(!!p.then(() => ps.delete(p)).catch(() => ps.delete(p)) && p),
  wait: (limit: number) => ps.size >= limit && Promise.race(ps),
  all: () => Promise.all(ps),
});

使い方

10個並列実行して、最大並列数を5に制限しています。
並列5つでループが一時停止し、一つ実行が終わると、一つ実行が始まります。

const main = async () => {
  const ps = Parallels();
  for (let i = 0; i < 10; i++) {
    // ランダムに終了する処理を実行し、Promiseを保存
    ps.add(new Promise((resolve) => setTimeout(() => resolve(i), Math.random() * 100)));
    // 最大並列実行数を指定し待機
    // 戻り値 false:制限値内で処理が終了していない
    // 戻り値 resolveで戻した処理のindex
    const v = await ps.wait(5);
    console.log(`${i}:${v}`); // ループ回数:終了した関数を表示
  }
  // 並列5未満でループを抜けた場合の残存処理待ち
  (await ps.all()).forEach((v) => console.log(`*:${v}`));
};

main();

内容的には以下のようになっています

  • Parallels()でインスタンス生成
  • add()でPromise格納
  • wait()で最大数を指定して待機
  • all()で残存処理の待機

実行結果

ループ状態と、実行された処理の番号が表示されています。
0~3までは実行が終わっていないのでfalseになります。
その後、ループ4で処理番号3が終了しています。
*はループから抜けた後の待機状態で、残り4つの処理の終了待ちになります

0:false
1:false
2:false
3:false
4:3
5:4
6:0
7:6
8:2
9:7
*:1
*:5
*:8
*:9

まとめ

今回のプログラムは別記事を書こうとしていたときに発生した副産物です。
本来書こうとしていた記事はMicroCMSのエクスポートしたファイルからTypeScriptの型を生成して、さらにそれを利用して型カッチリ状態でAPIにアクセスするという内容です。
https://www.npmjs.com/package/microcms-typescript
https://www.npmjs.com/package/microcms-lib

ネタが色々あって処理が追いつきません。

GitHubで編集を提案

Discussion

大学生だった.大学生だった.

コメント失礼します。
Promiseの並列実行数が多くてメモリ不足を起こしている処理があるので制御したいと思いこの記事にたどり着いたのですが、コードが理解出来ないです。
教えて頂けないでしょうか?
add関数のps.delete(p)の部分 ps はsetオブジェクトになると思うのですが、そもそもこのpsにプロミスのオブジェクトが入ってる様子はないのに何を削除しているのかいまいち理解出来ないです。 && pで実行している理由も分からないです。
wait関数も Promise.race(ps) でawaitが効く原理も分からないです。
実行結果も残り4つの処理終了待ちとなっており、それはキューが5未満になったので ps.all で実行されるという事でしょうか?

空雲空雲

記述量を短くするためにこう書いていますが
ps.add(!!p.then(() => ps.delete(p)).catch(() => ps.delete(p)) && p)

本来こうなります
p.then(() => ps.delete(p)).catch(() => ps.delete(p))
ps.add(p)

(limit: number) => ps.size >= limit && Promise.race(ps)
は、並列数上限を確認して上限を超えていた場合、非同期処理の一つが終わりまで待つのに使っています

実行結果も残り4つの処理終了待ちとなっており、それはキューが5未満になったので ps.all で実行されるという事でしょうか?

その通りです