🤞

Promise.all で同時に実行される数を制限したい

2021/09/25に公開
1
  • Promise.all で同時に実行される数を制限したい
  • それに加えて、100 並行で動かすとして突然 100 個の処理を実行し始めるのでなく、少しずつ 100 に向けて増やしていきたい
    • 例えば最小実行間隔を設けるようにしたい

実装

export class ConcurrencyLock {
  private readonly concurrency: number;
  private readonly interval: number;

  private running = 0;
  private waitingResolves: Array<() => void> = [];
  private lastRunAt: Date | null = null;

  constructor({
    concurrency,
    interval,
  }: {
    concurrency: number;
    interval?: number;
  }) {
    this.concurrency = concurrency;
    this.interval = interval ?? 0;
  }

  async run<T>(func: () => PromiseLike<T> | T): Promise<T> {
    await this.get(new Date());
    const result = await func();
    await this.release(new Date());

    return result;
  }

  private async get(calledAt: Date) {
    await new Promise<void>(resolve => {
      if (this.running >= this.concurrency) {
        this.waitingResolves.push(resolve);
        return;
      }

      this.running += 1;
      this.schedule(resolve, calledAt);
    });
  }

  private async release(calledAt: Date) {
    if (this.running === 0) {
      console.warn('ConcurrencyLock#release was called but has no runnings');
      return;
    }

    if (this.waitingResolves.length === 0) {
      this.running -= 1;
      return;
    }

    const popped = this.waitingResolves.shift();
    this.schedule(popped, calledAt);
  }

  private schedule(func: () => void, calledAt: Date) {
    const willRunAt = !this.lastRunAt
      ? calledAt
      : new Date(
          Math.max(
            calledAt.getTime(),
            this.lastRunAt.getTime() + this.interval,
          ),
        );

    this.lastRunAt = willRunAt;

    setTimeout(func, willRunAt.getTime() - calledAt.getTime());
  }
}

使い方

const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));

// 最大 5 並行、最短でも 1 秒間隔で実行
const lock = new ConcurrencyLock({ concurrency: 5, interval: 1000 });
const numbers = [...new Array(10)].map((_, i) => i);
const duration = 10000; // 一つの処理は 10 秒要する
const startedAt = new Date();

const processed = await Promise.all(
  numbers.map(async number =>
    lock.run(async () => {
      const elapsed = Math.round(
        (new Date().getTime() - startedAt.getTime()) / 1000,
      );
      console.log(`${number} started after ${elapsed}s.`);
      await sleep(duration);
      return number;
    }),
  ),
);

// 0 started after 0s.
// 1 started after 1s.
// 2 started after 2s.
// 3 started after 3s.
// 4 started after 4s.
// 5 started after 10s.
// 6 started after 11s.
// 7 started after 12s.
// 8 started after 13s.
// 9 started after 14s.

Discussion

nap5nap5

Radashのparallelを使って少しデモを作ってみました。

import { test, expect } from "vitest";
import { parallel, sleep } from "radash";
import { default as fetch } from "cross-fetch";
import { PromisePool } from '@supercharge/promise-pool'

const demoData = {
  by: "bpierre",
  descendants: 48,
  id: 37361050,
  kids: [
    37362222, 37363734, 37362079, 37362429, 37366919, 37366920, 37363171,
    37361240, 37361524, 37361316, 37364299, 37363233, 37361613, 37364963,
    37364330, 37361551, 37365637, 37362068, 37363830, 37361140, 37364871,
    37364746, 37363775,
  ],
  score: 156,
  time: 1693659009,
  title: "Semantic Zoom",
  type: "story",
  url: "https://alexanderobenauer.com/labnotes/038/",
};

type StoryData = typeof demoData;

const fetchStory = async (storyId: number) => {
  const response = await fetch(
    `https://hacker-news.firebaseio.com/v0/item/${storyId}.json?print=pretty`
  );
  const json = await response.json();
  return json;
};

const doFetchParallel = <T = unknown, D = unknown>(
  input: ReadonlyArray<T>,
  fn: (d: T) => Promise<D>,
  maximumTaskNumber: number = 7,
  intervalMillSeconds: number = 700
) => {
  return parallel(maximumTaskNumber, input, async (d) => {
    console.log(d);
    await sleep(intervalMillSeconds);
    return await fn(d);
  });
};

test("PromisePool", async () => {
  const storyIds = [
    37367013, 37361947, 37361409, 37363362, 37358063, 37359193, 37364290,
    37366341, 37361053, 37356119, 37359310, 37362132, 37359250, 37364388,
    37349906, 37362740, 37365485, 37366678, 37364124, 37365468, 37361094,
    37364624, 37361050, 37362151,
  ];
  const { results, errors } = await PromisePool
    .withConcurrency(9)
    .for(storyIds)
    .process(async (storyId, index, pool) => {
      console.log(storyId)
      await sleep(700)
      const story = await fetchStory(storyId)
      return story
    })
  if (errors.length > 0) return console.log(errors)
  expect(results.length).toStrictEqual(24)
})

test("Radash Parallel", async () => {
  const storyIds = [
    37367013, 37361947, 37361409, 37363362, 37358063, 37359193, 37364290,
    37366341, 37361053, 37356119, 37359310, 37362132, 37359250, 37364388,
    37349906, 37362740, 37365485, 37366678, 37364124, 37365468, 37361094,
    37364624, 37361050, 37362151,
  ];
  const storiesData = await doFetchParallel<number, StoryData>(storyIds, fetchStory, 13, 800);
  expect(storiesData.length).toStrictEqual(24)
});

demo code.

https://codesandbox.io/p/sandbox/infallible-hermann-hfty6h?file=/src/index.test.ts:1,1

簡単ですが、以上です。