🍆

同時実行数の上限を指定できる非同期処理の並行実行JSクラス

2024/07/11に公開

node-fetchなどでスクレイピング処理をしていると、並行実行したくなる場面が出てきます。

並行実行処理は Promise.all()Promise.allSettled() を使えば簡単に実現できますが、同時実行数の上限を指定できません。仮に10,000個の処理をしたくなったときに上限制御しなければ落ちる可能性があります。負荷を下げる目的として、簡単に制御できるJavaScriptのクラスを書いたので掲載しておきます。

asyncPool.ts
export class AsyncPool {
  allPromises: (() => Promise<any>)[] = []; // 残プロセス(pendingPromisesの方が良かったかも...)
  strict = false; // Promise.allで実行するか、Promise.allSettledで実行するか
  limit = 10; // 同時実行上限数
  count = {
    success: 0,
    error: 0,
  };

  constructor({ limit = 10, strict = false } = {}) {
    this.limit = limit;
    this.strict = strict;
  }

  addPromise(promise: () => Promise<any>) {
    this.allPromises.push(promise);
  }

  clearPromises() {
    this.allPromises = [];
    this.count = {
      success: 0,
      error: 0,
    };
  }

  async run() {
    while (this.allPromises.length > 0) {
      const chunkPromises = this.allPromises.slice(0, this.limit).map((thePromise) => thePromise());
      this.allPromises.splice(0, this.limit);

      if (this.strict) {
        await Promise.all(chunkPromises);
      } else {
        const results = await Promise.allSettled(chunkPromises);

        results.map((theResult) => {
          switch (theResult.status) {
            case "fulfilled":
              this.count.success++;
              break;
            case "rejected":
              this.count.error++;
              break;
          }
        });
      }
    }

    console.log(`【AsyncPool】 OK: ${this.count.success} / NG: ${this.count.error}`);
    this.clearPromises();
  }

  getResultCount() {
    if (!this.strict) return this.count;
    console.warn("【注意】getResultCount() は stric: false でのみ利用できます。");
  }
}

使ってみる

今回は下記の関数を並行実行してみます。

function randomSleep() {
  const sleepTime = Math.floor(Math.random() * (8000 - 3000 + 1)) + 3000;
  console.log(sleepTime / 1000 + "秒待機を実行しています");
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      Math.random() < 0.35 ? resolve("") : reject();
    }, sleepTime);
  });
}

この関数は3~8秒ランダムで待機をするものです。
35%ぐらいの確率でresolve(成功)、それ以外はreject(失敗)を返します。

実行するには下記のように書けばOKです。

index
import { AsyncPool } from "@utils/functions/asyncPool";

(async () => {
  const asyncPool = new AsyncPool({
    limit: 4
  });

  for (let i = 0; i < 31; i++) {
    asyncPool.addPromise(randomSleep);
  }

  // ▼ 12秒後に処理を追加する(実行中でも後からプロセスを追加できることを確認する)
  setTimeout(() => {
    for (let i = 0; i < 32; i++) {
      asyncPool.addPromise(randomSleep);
    }
  }, 1000 * 12);

  // このタイミングで実行される
  await asyncPool.run();
})();

limit: 4 で同時処理数を4つまでに制限しています。4つで1つのグループにしてまとめて処理するイメージです。

特にスクレイピングの際に重宝する便利なクラスです。

より柔軟に処理を動的に追加したい場合

まだ必要ではないので試していませんが、Async.jsのqueueを使うと良いでしょう。上記のクラスではグループ内の処理が完了しないと次のグループの処理が開始されないため、グループとしての処理を意識したくない場合は Async.js の方が向いているかもしれません。

https://caolan.github.io/async/v3/docs.html#queue

https://www.npmjs.com/package/async

Discussion