指定した並列度で非同期処理(HTTPリクエストなど)を実行する
はじめに
自作ライブラリ Rotery に新機能を追加しました。
Rotery は、反復処理を効率よく行うためのユーティリティライブラリです。 iterator と asyncIterator のサポート、関数型プログラミングと pipe 関数、TypeScript の強力な型サポートが特徴です。
機能の概要
今回追加した新機能は、「並列度制御」の機能です。例えば、以下のコードは並列で 5 つの HTTP リクエストが実行されるように制御します。
const responses = await Rt.pipe(
urls,
Rt.map.sync(async url => {
const response = await fetch(url);
return await response.json();
}),
Rt.throttle(5), // This maintains up to 5 concurrent HTTP fetch requests.
Rt.toArray.async, // The results are ordered by the completion time.
);
上記のコードでは、 Rt.map.sync(...
がイテレータによる遅延評価で HTTP リクエストを生成し、 throttle(5)
が生成される HTTP リクエストの並列度を 5 に制御しています。
Rotery の実装の特徴
並列度制御の機能を持つライブラリは他にもありますが、 Rotery の並列度制御は非同期ジェネレータによって実装されているため、 pipe()
関数中に自然な形で並列度制御を組み込むことができます。
また、並列実行中の各処理は、完了次第すぐに結果を出力して次の待機中の処理を実行するようになっています。これにより、非同期処理の待機時間を最小に減らすようにしています。
実装のソースコードを紹介します。改行やコメントを含めて 50 行ほどと非常にコンパクトにまとめることができ、個人的に満足しています。
ソースコード
const isNotEmptyElement = () => true;
async function* _buffer<T>(input: Series<T>, size: number): AsyncGenerator<Awaited<T>> {
if (size <= 0 || !Number.isInteger(size))
throw new RangeError(`"size" must be a positive integer (got ${size.toString()}).`);
const awaited = await input;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const iterator = (Array.isArray as (v: unknown) => v is readonly any[])(awaited)
? awaited.values()
: awaited;
const pull = async () => {
const next = iterator.next();
return next instanceof Promise
? await next
: // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
({ value: await next.value, done: next.done } as IteratorResult<MaybePromise<T>>);
};
const pullers = Array.from({ length: size }, (_, k) => pull().then(result => ({ k, result })));
// eslint-disable-next-line unicorn/no-array-callback-reference
while (pullers.some(isNotEmptyElement)) {
// eslint-disable-next-line unicorn/no-array-callback-reference
const item = await Promise.race(pullers.filter(isNotEmptyElement));
if (item.result.done) {
// eslint-disable-next-line @typescript-eslint/no-array-delete, @typescript-eslint/no-dynamic-delete
delete pullers[item.k];
continue;
}
yield await item.result.value;
// eslint-disable-next-line @typescript-eslint/no-floating-promises
pullers.splice(
item.k,
1,
pull().then(result => ({ k: item.k, result })),
);
}
}
Microsoft Copilot によるソースコードの解説
この TypeScript のソースコードは、非同期ジェネレーター関数 _buffer を定義しています。この関数は、入力された非同期シーケンスをバッファリングし、指定されたサイズのバッファを維持しながら要素を生成します。以下に、コードの各部分を解説します。
- isNotEmptyElement 関数
const isNotEmptyElement = () => true;
この関数は、常に true を返します。これは、後で pullers 配列の要素が存在するかどうかを確認するために使用されます。
- _buffer 関数
async function* _buffer<T>(input: Series<T>, size: number): AsyncGenerator<Awaited<T>> {
この関数は、ジェネリック型 T を受け取り、非同期ジェネレーターを返します。input は非同期シーケンスで、size はバッファのサイズです。
- サイズの検証
if (size <= 0 || !Number.isInteger(size))
throw new RangeError(`"size" must be a positive integer (got ${size.toString()}).`);
バッファサイズが正の整数であることを確認し、そうでない場合はエラーをスローします。
- 入力の待機
const awaited = await input;
入力シーケンスが非同期であるため、await を使用してその結果を待ちます。
- イテレータの取得
const iterator = (Array.isArray as (v: unknown) => v is readonly any[])(awaited)
? awaited.values()
: awaited;
入力が配列である場合は、その値のイテレータを取得し、そうでない場合はそのままイテレータとして使用します。
- pull 関数
const pull = async () => {
const next = iterator.next();
return next instanceof Promise
? await next
: ({ value: await next.value, done: next.done } as IteratorResult<MaybePromise<T>>);
};
この関数は、次の要素を取得し、それが Promise である場合は待機し、そうでない場合はそのまま返します。
- pullers 配列の作成
const pullers = Array.from({ length: size }, (_, k) => pull().then(result => ({ k, result })));
バッファサイズに基づいて pullers 配列を作成し、各要素は pull 関数の結果を保持します。
- メインループ
while (pullers.some(isNotEmptyElement)) {
const item = await Promise.race(pullers.filter(isNotEmptyElement));
if (item.result.done) {
delete pullers[item.k];
continue;
}
yield await item.result.value;
pullers.splice(
item.k,
1,
pull().then(result => ({ k: item.k, result })),
);
}
このループは、pullers 配列に要素が存在する限り実行されます。Promise.race を使用して最初に完了した pull を取得し、その結果が完了している場合は pullers から削除し、そうでない場合はその値を生成し、新しい pull を pullers に追加します。
このコードは、非同期シーケンスを効率的にバッファリングし、指定されたサイズのバッファを維持しながら要素を生成するために設計されています。
おわりに
このライブラリについての質問、機能追加リクエスト、バグ報告等を歓迎します。この記事のコメント欄か、 GitHub の Issue, Pull Request までお願いします。
Discussion