💫

Promise.withResolvers みたいな AsyncIterator.withResolvers がほしい

2025/02/27に公開7

皆さまは Promise.withResolvers() は使っておられますでしょうか?

https://developer.mozilla.org/ja/docs/Web/JavaScript/Reference/Global_Objects/Promise/withResolvers

new Promise() と違ってネストを浅くでき、また、 wrap しなくていいので こじんまりと使えるのが利点です。

で、で、でです。

Promise.withResolvers みたいな AsyncIterator.withResolvers がほしい

本題 です。

Promise.withResolvers() だと resolve / reject / promise を生やしていましたが
AsyncIterator.withResolvers() だと resolve / reject / values / complete を生やして 外から操作できるようにするというニュアンスです。

つまりこんな感じのメソッドが欲しいという話です。

(globalThis.AsyncIterator ??= {}).withResolvers ??=
/**
 * @template T
 */
function withResolvers() {
  let completed = false;
  const DONE = {};
  const resolvers = [withResolvers()];
  return { resolve, reject, complete, values: values() };
  /**
   * @param {T} result
   */
  function resolve(result) {
    const resolver = resolvers.at(-1);
    resolver?.resolve(result);
    if (result !== DONE)
      resolvers.push(withResolvers());
  }
  /**
   * @param {never} reason
   */
  function reject(reason) {
    const resolver = resolvers.at(-1);
    if (resolver) {
      resolver.reject(reason);
      complete();
    }
  }
  function complete() {
    resolve(DONE);
  }
  async function* values() {
    do {
      const resolver = resolvers[0];
      if (!resolver.status) await resolver.promise;
      if (resolver.status === "rejected") resolver.rethrow();
      if (resolver.status === "fulfilled") resolvers.shift();
      if (resolver.result === DONE) break;
      yield resolver.result;
    } while (resolvers.length > 0);
  }
  /**
   * @template T
   */
  function withResolvers() {
    const {
      promise,
      resolve: resolve_,
      reject: reject_,
    } = Promise.withResolvers();
    let status = undefined;
    let result = undefined;
    let reason = undefined;
    /**
     * @param {T} r 
     */
    function resolve(r) {
      if (status) return;
      status = "fulfilled";
      result = r;
      resolve_(r);
    }
    /**
     * @param {never} r 
     */
    function reject(r) {
      if (status) return;
      status = "rejected";
      reason = r;
      reject_(r);
    }
    return {
      /** @returns {"fulfilled"|"rejected"|undefined} */
      get status() {
        return status;
      },
      /** @returns {T} */
      get result() {
        return result;
      },
      rethrow() {
        throw reason;
      },
      promise,
      resolve,
      reject,
    };
  }
};

使い方としては次の様な感じです。

const {resolve, values, complete} = AsyncIterator.withResolvers();
element1.addEventListener("click", resolve, {signal});
element2.addEventListener("click", resolve, {signal});
signal.addEventListener("abort", complete);

for await (const event of values) {
  console.log(event);
}

複数 もしくは 単数 の 連続したイベントを ループで処理したい という需要はあるのでは?と思います。(無い?

例えば window を別途開いてそちらの通信を連続して処理するとき

const win = window.open(..., '_blank');
const {resolve, values, complete} = AsyncIterator.withResolvers();
win.addEventListener('message', ({data}) => resolve(data));

// #region window が閉じるのを監視
(async ()=> {
    while (!win.closed) {
        await new Promise((resolve) => setTimeout(resolve, 1000));
    }
    complete();
})();
// #dendregion

// #region entry point
for await (const data of values) {
    // doing... 
}
// #endregion

以上。

参考

playground

Discussion

rithmetyrithmety

complete の後に resolve か reject を呼ばないと for を抜けてくれなさそうです?
修正しつつもっと簡潔に書けないか試してみました

;(globalThis.AsyncIterator ??= {}).withResolvers ??= () => {
  const list = [Promise.withResolvers()]
  const last = () => {
    const last = list[list.length - 1]
    list.push(Promise.withResolvers())
    return last
  }
  const resolve = v => last().resolve(v)
  const reject = e => last().reject(e)
  const DONE = { is: 'done' }
  const complete = () => last().resolve(DONE)
  async function* iter() {
    for (let i = 0; i < list.length; i++) {
      let item = await list[i].promise
      if (DONE === item) return
      yield item
    }
  }
  const values = { [Symbol.asyncIterator]: iter }
  return { resolve, reject, complete, values }
}
junerjuner

あー、それはそうだ。確かに。むしろその場合は 内部 signal用意して abort するなりした方がいいかもしれないですね。(Symbol的に使うオブジェクトの方がシンプルか。 → こっちのソースにも反映しました。

ただ、その ループだと無限実行だめなやつでは……?(リストが膨れ続ける?

rithmetyrithmety

values が複数個所で使われたときに予期せぬ挙動になりそうなのでリストをそのままにしてます
長期の実行が予想されるなら、それを禁止して shift する方が安心ですね

let used = false
async function* iter() {
  if (used) throw new Error("values can't use twice")
  used = true
  for (let r; r = list[0]; list.shift()) {
    const item = await r.promise
    if (DONE === item) return
    yield item
  }
}
rithmetyrithmety

下記のような購読前から値が投げられている場合や複数の購読者が居る場合にどうするかは仕様によるのかなと思います

const { resolve, values, complete } = AsyncIterator.withResolvers()
resolve(1) // 購読前の resolve
;(async () => {
  await sleep(3, 'second') // 3 秒待つ
  resolve(2)
  complete()
})()
await Promise.all(
  ['a', 'b'].map(async id => {
    for await (const v of values) // 購読開始
      console.log(id, v)
    console.log(id, 'done')
  }),
)

私の最初のコードでは下記のようになります

a 1
b 1
a 2
b 2
a done
b done

変更版では下記のようになると思います

// b で エラー
a 1
a 2
a done

実用面では下記のようになるのが一番良いかもしれません

a 2
b 2
a done
b done
;(globalThis.AsyncIterator ??= {}).withResolvers ??= () => {
  const set = new Set()
  const each = (method, arg) => {
    for (const list of set) {
      const last = list[list.length - 1]
      list.push(Promise.withResolvers())
      last[method](arg)
    }
  }
  const resolve = v => each('resolve', v)
  const reject = e => each('reject', e)
  const DONE = { is: 'done' }
  const complete = () => resolve(DONE)
  async function* iter() {
    const list = [Promise.withResolvers()]
    try {
      set.add(list)
      for (let r; (r = list[0]); list.shift()) {
        let item = await r.promise
        if (DONE === item) return
        yield item
      }
    } finally {
      set.delete(list)
    }
  }
  const values = { [Symbol.asyncIterator]: iter }
  return { resolve, reject, complete, values }
}
junerjuner

もしかして、最適解ってこれでは……?

https://zenn.dev/kojiroueda/articles/e5a18b2c0dc3d4

(※意図して同じIFで書きます

(globalThis.AsyncIterator ??= {}).withResolvers ??= ({ cancel } = {}) => {
  let controller;
  const stream = new ReadableStream({
    start(controller_) {
      controller = controller_;
    },
    cancel() {
      if (typeof cancel === "function") cancel();
    },
  });
  return {
    values: stream.values(),
    resolve: controller.enqueue.bind(controller),
    complete: controller.close.bind(controller),
    reject: controller.error.bind(controller),
  };
};

playground

rithmetyrithmety

確かに
でもそれは ReadableStream.withResolvers のような正しい名前や IF がありそうですね

仕様の違いは用途の違いですし message やファイルの転送は ReadableStream が使われて click 系のイベントの検知は Observable の方が適正が高そうなことを考えると AsyncIterator.withResolvers の目指すべき仕様を考えるのは難しそうに思います…