🤞
Promise.all で同時に実行される数を制限したい
-
Promise.all
で同時に実行される数を制限したい- Promise Pool のようなライブラリがある
- それに加えて、100 並行で動かすとして突然 100 個の処理を実行し始めるのでなく、少しずつ 100 に向けて増やしていきたい
- 例えば最小実行間隔を設けるようにしたい
実装
export class ConcurrencyLock {
private readonly concurrency: number;
private readonly interval: number;
private running = 0;
private waitingResolves: Array<() => void> = [];
private lastRunAt: Date | null = null;
constructor({
concurrency,
interval,
}: {
concurrency: number;
interval?: number;
}) {
this.concurrency = concurrency;
this.interval = interval ?? 0;
}
async run<T>(func: () => PromiseLike<T> | T): Promise<T> {
await this.get(new Date());
const result = await func();
await this.release(new Date());
return result;
}
private async get(calledAt: Date) {
await new Promise<void>(resolve => {
if (this.running >= this.concurrency) {
this.waitingResolves.push(resolve);
return;
}
this.running += 1;
this.schedule(resolve, calledAt);
});
}
private async release(calledAt: Date) {
if (this.running === 0) {
console.warn('ConcurrencyLock#release was called but has no runnings');
return;
}
if (this.waitingResolves.length === 0) {
this.running -= 1;
return;
}
const popped = this.waitingResolves.shift();
this.schedule(popped, calledAt);
}
private schedule(func: () => void, calledAt: Date) {
const willRunAt = !this.lastRunAt
? calledAt
: new Date(
Math.max(
calledAt.getTime(),
this.lastRunAt.getTime() + this.interval,
),
);
this.lastRunAt = willRunAt;
setTimeout(func, willRunAt.getTime() - calledAt.getTime());
}
}
使い方
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
// 最大 5 並行、最短でも 1 秒間隔で実行
const lock = new ConcurrencyLock({ concurrency: 5, interval: 1000 });
const numbers = [...new Array(10)].map((_, i) => i);
const duration = 10000; // 一つの処理は 10 秒要する
const startedAt = new Date();
const processed = await Promise.all(
numbers.map(async number =>
lock.run(async () => {
const elapsed = Math.round(
(new Date().getTime() - startedAt.getTime()) / 1000,
);
console.log(`${number} started after ${elapsed}s.`);
await sleep(duration);
return number;
}),
),
);
// 0 started after 0s.
// 1 started after 1s.
// 2 started after 2s.
// 3 started after 3s.
// 4 started after 4s.
// 5 started after 10s.
// 6 started after 11s.
// 7 started after 12s.
// 8 started after 13s.
// 9 started after 14s.
Discussion
Radashのparallelを使って少しデモを作ってみました。
demo code.
簡単ですが、以上です。