🌹

複数の 非同期イテレータを 発生順に イテレートしたい場合

2024/11/22に公開

課題

例えば 非同期イテレータを返すメソッド loop1() があったとします。

async function* loop1({ signal } = {}) {
  let num = 0;
  const milliseconds = 2000;
  while (!(signal?.aborted ?? false)) {
    await timeout({ signal, milliseconds });
    yield `loop1:${num++}:${milliseconds}ms`;
  }
}

例えば loop1() とはまた違ったタイミングの 非同期イテレータを返すメソッド loop2() があったとします。

async function* loop2({ signal } = {}) {
  let num = 0;
  let milliseconds = 1000;
  const add = 100;
  while (!(signal?.aborted ?? false)) {
    await timeout({ signal, milliseconds });
    yield `loop2:${num++}:${milliseconds}ms`;
    milliseconds += add;
  }
}

さて、どうやって タイミング 発生順に イテレートさせたらいいでしょうか?

というのが今回の問題。

回答のメソッドは一旦 iterator() とでもしておきます。

例えばこんなコードが完成形としておきましょう

const format = Intl.DateTimeFormat("ja-jp", {
  dateStyle: "short",
  timeStyle: "long"
});
try {
  for await (const [v, tag] of iterator()) {
    output(`${format.format(new Date())}: ${v}`, tag);
  }
} catch (e) {
  error.innerText = error;
}

// #region utility
/**
 * @param {{
 *   milliseconds?: number;
 *   signal?: AbortSignal;
 * }} param0
 */
function timeout({ milliseconds, signal } = {}) {
  const { promise, resolve, reject } = Promise.withResolvers();
  let clear;
  if (signal) {
    if (signal.aborted) {
      abort();
      return;
    }
    signal.addEventListener("abort", abort);
    promise.finally(() => signal.removeEventListener("abort", abort));
    function abort() {
      if (typeof clear === "number") clearTimeout(clear);
      reject(signal.reason);
    }
  }
  clear = setTimeout(resolve, milliseconds);
  return promise;
}
function output(message, ...classes) {
  const node = template.content.cloneNode(true);
  const $row = node.querySelector("*");
  $row.innerText = `${message}`;
  if (classes.length > 0) $row.classList.add(...classes);
  target.insertAdjacentElement("afterbegin", $row);
  target.querySelectorAll(":scope > :nth-child(1n + 100 of :not(template))");
}
// #endregion

私の手法

私の回答としては次の方法です。(他にいい方法があったら記事にしてリンクしていただけるととてもありがたい。)

javascript に於いては UI スレッドは シングルスレッドです。
ということは つまり FIして 完了したものを上から順に削除していけばいいのです。

ということで 非同期状態を 管理するのが Promise.withResolvers() では足りないので 完了状態のプロパティ done をつけた makeResolver() を作成します。

/**
 * @template {any} T
 * @returns {{
 *   promise: Promise<T>;
 *   resolve: (v:T) => void;
 *   reject: (v:any) => void;
 *   done: boolean;
 * }}
 */
function makeResolvers() {
  const { promise, resolve, reject } = Promise.withResolvers();
  const resolvers = {
    promise,
    resolve,
    reject,
    done: false
  };
  promise.finally(() => (resolvers.done = true));
  return resolvers;
}

そしてそれを 配列に追加して 先頭側のまだ未完了に設定していく iterate() を 追加します。

/**
 * @param {() => AsyncIterator} getIterator イテレータ取得関数
 * @param {any} tag 識別用タグ
 * @param {ReturnType<typeof makeResolvers>[]} resolvers 登録先リスト
 */
async function iterate(getIterator, tag, resolvers) {
  try {
    resolvers.push(makeResolvers());
    for await (const v of getIterator()) {
      resolvers.find((v) => !v.done).resolve([v, tag]);
      resolvers.push(makeResolvers());
    }
  } catch (e) {
    resolvers.find((v) => !v.done)?.reject(e);
  }
}

それらを使った iterater() はこうなります。

/**
 * @param {{
 *   signal?:AbortSignal
 * }}
 */
async function* iterator({ signal: parentSignal } = {}) {
  const controller = new AbortController();
  const signal = AbortSignal.any([
    controller.signal,
    ...(parentSignal ? [parentSignal] : [])
  ]);
  try {
    /** @type {ReturnType<typeof makeResolvers[]} */
    const resolvers = [];
    iterate(() => loop1({ signal }), "loop1", resolvers);
    iterate(() => loop2({ signal }), "loop2", resolvers);
    while (resolvers.length) {
      yield await resolvers[0].promise;
      resolvers.shift();
    }
  } finally {
    controller.abort();
  }
}

動作サンプル

以上。

Discussion