🫗

追伸: AsyncIterator.withResolvers は ReadableStream 内のメソッドを外部化した方が作りやすい?

2025/02/28に公開

前回 あんなに長いコードを書いていたのに、よくよく考えると ReadableStream も asyncIterator が実装されていることを思い出したのでもっと簡単に書けることに気が付きました。

https://streams.spec.whatwg.org/#rs-asynciterator

具体的には ReadableStream の内部で使われている ReadableStreamDefaultController を使うとですね。

https://developer.mozilla.org/ja/docs/Web/API/ReadableStreamDefaultController

Promise.withResolvers の作り方と同じ感じに作ると次の様な感じで作れます
( ※ReadableStream の start は 同期的に実行されます。

(globalThis.AsyncIterator ??= {}).withResolvers ??= ({ cancel } = {}) => {
  let controller;
  const stream = new ReadableStream({
    start(controller_) {
      controller = controller_;
    },
    cancel() {
      if (typeof cancel === "function") cancel();
    },
  });
  return {
    values: stream.values(),
    resolve: controller.enqueue.bind(controller),
    complete: controller.close.bind(controller),
    reject: controller.error.bind(controller),
  };
};

※ 直接 ReadableStrem を values として返していますが、 ReadbleStream は cancel できるので withResolvers の引数として cancel コールバックを追加しています。 ニュアンス signal を返してもよいかもしれません

以上。

playground

追伸の追伸

ReadableStream.prototype.values() ってよくよく思ったら Stream の transferable と同様に Safari ではまだ実装できていないやんけ!!! (2025/2/28 現在)

https://caniuse.com/mdn-api_readablestream_values

なので Gemini さんに次の様なお願いすると次の様なコードになりました。
(cancel ってこの動きであっているっけ?

次のコードを ReadableStream を使わない形に修正していただけませんでしょうか?
(※コードは先述のものなので省略)
(globalThis.AsyncIterator ??= {}).withResolvers ??= ({ cancel } = {}) => {
  const queue = [];
  let closed = false;
  let error = null;

  const asyncIterator = {
    [Symbol.asyncIterator]() {
      return this;
    },
    async next() {
      if (error) {
        throw error;
      }
      if (queue.length === 0 && closed) {
        return { done: true };
      }
      if (queue.length === 0) {
        await new Promise(resolve => {
          const check = () => {
            if (queue.length > 0 || closed || error) {
              resolve();
            } else {
              setTimeout(check, 0);
            }
          };
          check();
        });
      }
      return { value: queue.shift(), done: false };
    },
    return() {
      closed = true;
      if (typeof cancel === "function") cancel(); // キャンセル処理
      return Promise.resolve({ done: true });
    },
    throw(err) {
      error = err;
      if (typeof cancel === "function") cancel(); // キャンセル処理
      return Promise.reject(err);
    }
  };

  return {
    values: asyncIterator,
    resolve: (value) => { queue.push(value); },
    complete: () => {
      closed = true;
      if (typeof cancel === "function") cancel(); // キャンセル処理
    },
    reject: (err) => {
      error = err;
      if (typeof cancel === "function") cancel(); //キャンセル処理
    },
  };
};

Discussion