🚡

DOM Observable は Push ストリームのデファクトスタンダードになるか?

2023/10/01に公開
変更情報

【2025/02/25】

  • 将来的に ReadableStream でも Async Iterator Helpers が使えるようになることを追記
  • 誤っていたコードを修正

【2025/02/24】

【2024/08/13】

【2024/01/25】

【2023/12/01】

【2023/11/21】

RxJS を使ったことがある方は Observable という概念を知っているかと思います。この Observable を Web 標準に入れる提案が進行しています。

https://wicg.github.io/observable/

この記事では JavaScript におけるストリーム機能をまとめ、そして提案されている Observable について触れたいと思います。

Pull ストリームと Push ストリーム

ストリームを扱うプロトコル(インターフェース)やクラスは Pull ストリームと Push ストリームに大別されます。データを使う側から取得の要請するのが Pull ストリームで、データを送る側から任意のタイミングで一方的に送信するのが Push ストリームです。

Pull: ES2018 Async Iteration

ES2018 から Pull ストリームである非同期反復可能プロトコル(インターフェース)が扱えます。

for-await-of 文を使うと内部で Symbol.asyncIterator メソッドで取得した非同期イテレーターの next メソッドを実行し、次の値を取得するようになっています。

const asyncIterable = {
  async *[Symbol.asyncIterator]() {
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "hello";
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "async";
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "iteration!";
  },
};

(async () => {
  for await (const value of asyncIterable) {
    console.log(value); // "hello", "async", "iteration!"
  }
})();

詳しくは過去の記事を参考にしてください。

https://zenn.dev/pixiv/articles/d1650ae332798c

Stage 2 Async Iterator Helpers (TC39 Proposal)

非同期イテレーターはあくまでプロトコル(インターフェース)という扱いでしたが、グローバルに AsyncIterator クラスを追加する Async Iterator Helpers という提案があります。

https://github.com/tc39/proposal-async-iterator-helpers

AsyncIterator クラスを継承することによって next メソッドのみを実装するだけで mapfilter などのメソッドを扱えるようになります。また Async Generator Functions を使った場合にも同様のメソッドが使えます。

const asyncIterable = {
  async *[Symbol.asyncIterator]() {
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "hello";
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "async";
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "iteration!";
  },
};

(async () => {
  await asyncIterable[Symbol.asyncIterator]()
    .filter((value) => value.length === 5)
    .toArray(); // => ["hello", "async"]
})();

以下は同期的な Iterator Helpers についての記事になりますが、参考になるかと思います。

https://zenn.dev/pixiv/articles/062461b79e0d8f

Pull: ReadableStream (Streams Standard)

高機能な Pull ストリームとして ReadableStream クラスがあります。これは Fetch API の Response インスタンスの body プロパティなどで使われています。

https://streams.spec.whatwg.org/#rs

Web のための仕様ですが WinterTC (TC55) という非ブラウザランタイムグループが策定する Minimum Common Web Platform API に含まれており、Node.js や Deno などほとんどの JavaScript ランタイムで扱うことができます。

この ReadableStream 自体は Pull ストリームですが、内部にデータを保持するキューを持っており、Pull と Push いずれのデータソースであっても入力できるようになっています。

// Pull Source から ReadableStream を作る例
new ReadableStream({
  async pull(controller) {
    // source.pull でデータを持つ Promise を返すことを想定
    const { done, value } = await source.pull();
    if (done) {
      controller.close();
    }
    controller.enqueue(data);
  },
});
// Push Source から ReadableStream を作る例
new ReadableStream({
  start(controller) {
    // source.subscribe でコールバック函数にデータを渡すことを想定
    source.subscribe(({ done, value }) => {
      if (done) {
        controller.close();
      }
      controller.enqueue(value);
    });
  },
});

リーダーを取得し read メソッドを使ってデータを受け取ることができます。

const reader = readableStream.getReader();

(async () => {
  for (;;) {
    const { done, value } = await reader.read();
    if (done) {
      console.log("complete");
      break;
    }
    console.log(value);
  }
})();

ストリームをキャンセルし、キューに入っているデータを開放するには cancel メソッドを呼び出します。

他にもパイプチェーン機能、内部キューの状態に基づいてデータソースからの取得を一時停止する機能、そしてデータを受け取る側から提供した ArrayBufferView に(内部キューが空で可能な場合に)ゼロコピーで直接データを書き込む機能などがあります。

https://developer.mozilla.org/ja/docs/Web/API/Streams_API/Concepts

ECMAScript との関連

まだ実験的な機能で実装されているブラウザは少ないですが ReadableStream.from で(非同期)反復可能プロトコルから直接 ReadableStream を生成できたり、ReadableStream に対して for-await-of 文が使えたりします。

const asyncIterable = {
  async *[Symbol.asyncIterator]() {
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "hello";
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "async";
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "iteration!";
  },
};

const readableStream = ReadableStream.from(asyncIterable);

(async () => {
  for await (const data of readableStream) {
    console.log(data); // "hello", "async", "iteration!"
  }
  console.log("complete");
})();

for-await-of 文を break して非同期イテレータを閉じる際に元の ReadableStreamcancel メソッドが実行されたくない場合は、明示的に values メソッドに preventCancel: true を指定します。

(async () => {
  for await (const data of readableStream.values({ preventCancel: true })) {
    if (data === "suspend") {
      break;
    }
    console.log(data);
  }
})();

また将来的に values メソッドを使うことで Stage 2 Async Iterator Helpers の mapfilter メソッドなどが扱えるようになるでしょう。

async () => {
  await readableStream.values()
    .filter((value) => value.length === 5)
    .toArray();
})();

Push: EventTarget (DOM Standard)

よく使われる Push ストリームの例として EventTarget クラスがあります。addEventListener メソッドでイベントが発火されるたびに実行されるコールバックを登録できます。DOM の Node がこの EventTarget を継承しており、広く使われています。

const eventTarget = new EventTarget();

eventTarget.addEventListener("data", (e) => {
  console.log(e.detail); // "hello", "event", "target!"
});

eventTarget.dispatchEvent(new CustomEvent("data", { detail: "hello" }));
eventTarget.dispatchEvent(new CustomEvent("data", { detail: "event" }));
eventTarget.dispatchEvent(new CustomEvent("data", { detail: "target!" }));

EventTarget の問題点

EventTarget はあくまでイベントを扱います。データストリームとして扱うとなると冗長です。

また Iterator Helpers のように mapfilter などのメソッドを追加したくても EventTarget にその機能を入れるのは容易ではなさそうです。

Push: Observable (WICG Proposal)

記事冒頭で述べた通り、新たな Push ストリームとして Web 標準に Observable を入れる提案が進行中です。

https://wicg.github.io/observable/

const observable = new Observable(async (subscriber) => {
  await new Promise((resolve) => setTimeout(resolve, 1000));
  subscriber.next("hello");
  await new Promise((resolve) => setTimeout(resolve, 1000));
  subscriber.next("observable!");
  await new Promise((resolve) => setTimeout(resolve, 1000));
  subscriber.complete();
});

observable.subscribe({
  next(deta) {
    console.log(data); // "hello", "observable!"
  },
  complete() {
    console.log("complete");
  },
});

Observablesubscribe メソッドの第一引数には SubscriptionObserver インターフェースもしくは単なるコールバック函数を渡すことが出来ます。単に函数を渡したときは SubscriptionObserver インターフェースの next だけを渡したことと等価になります。

type ObservableSubscriptionCallback<T> = (value: T) => void;

interface SubscriptionObserver<T> {
    next(value: T): void;
    error(err: any): void;
    complete(): void;
}

type ObserverUnion<T> =
  | ObservableSubscriptionCallback<T>
  | SubscriptionObserver<T>;

もちろん mapfilter などのメソッドが備わっています。

const observable = new Observable(async (subscriber) => {
  await new Promise((resolve) => setTimeout(resolve, 1000));
  subscriber.next("hello");
  await new Promise((resolve) => setTimeout(resolve, 1000));
  subscriber.next("observable!");
  await new Promise((resolve) => setTimeout(resolve, 1000));
  subscriber.complete();
});

(async () => {
  await observable
    .filter((value) => value.length === 5)
    .toArray(); // => ["hello"]
})();

Observable の購読を辞める

Observable の購読を辞める必要がある場合は subscribe メソッドの第二引数に AbortSignal を渡し、abort メソッドを実行します。後処理が必要な場合は RxJS とは異なり、Observable のコンストラクタに渡したプロデューサー函数内で subscriber.addTeardown にコールバック函数を登録します。

const observable = new Observable((subscriber) => {
  subscriber.addTeardown(() => {
    // 後処理
  });
});

const controller = new AbortController();
observable.subscribe((next) => {
  console.log(data);
}, { signal: controller.signal });

// 購読を辞める
controller.abort();

Observable のコンストラクタに渡すプロデューサー函数の特徴

Observable は Push ストリームですが、Pull と Push のいずれのデータソースでも入力できます。

ただし ReadableStream とは異なり subscribe メソッドが呼ばれてからコンストラクタに渡したプロデューサー函数が実行される仕様なため、Push データソースから Observable を作る場合は意図せずデータが欠損してしまうおそれがあることに注意が必要です。

これは Push ストリーム一般の特徴で、EventTargetaddEventListener されてない間は dispatchEvent で渡されたイベントを使わずに捨てることと同義です。

const observable = new Observable((subscriber) => {
  // source.subscribe でコールバック函数にデータを渡すことを想定
  source.subscribe(({ done, value }) => {
    if (done) {
      subscriber.complete();
    }
    subscriber.next(value);
  });
});

// ソースに同期的にデータをプッシュする
source.push(1);

// ここでコンストラクタに渡したプロデューサー函数が実行される
observable.subscribe((data) => {
  console.log(data); // 2, 3
});

// 以降は購読される
source.push(2);
source.push(3);

なお Observable が複数のオブサーバーから購読される場合プロデューサー函数は複数回実行されず Subscriber を再利用します。Subscribercompleteerror が呼ばれ非アクティブな状態になっているか、購読しているオブサーバーの数が0になった後に購読すると再度プロデューサー函数が実行されます。

let count = 0;

const observable = new Observable(async (subscriber) => {
  const value = ++count;
  // 同期実行の場合1つ目のオブザーバーにのみデータが渡るため、1秒後にデータを渡す
  await new Promise((resolve) => setTimeout(resolve, 1000));
  subscriber.next(value);
});

(async () => {
  // 複数購読する
  const controller = new AbortController();
  observable.subscribe((data) => {
    console.log(data); // 1
  }, { signal: controller.signal });
  observable.subscribe((data) => {
    console.log(data); // 1
  }, { signal: controller.signal });

  // 2秒後に購読を辞める
  await new Promise((resolve) => setTimeout(resolve, 2000));
  controller.abort();

  // 再購読する
  observable.subscribe((data) => {
    console.log(data); // 2
  });
})();

ECMAScript との関連

Observable.from を使うことで(非同期)反復可能プロトコルや Promise から直接 Observable を生成することができます。

const asyncIterable = {
  async *[Symbol.asyncIterator]() {
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "hello";
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "async";
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "iteration!";
  },
};

const observable = Observable.from(asyncIterable);

observable.subscribe({
  next(deta) {
    console.log(data); // "hello", "async", "iteration!"
  },
  complete() {
    console.log("complete");
  },
});

EventTarget からのインテグレーション

EventTargetwhen メソッドが追加され、Observable が簡単に作れるようになっています。

element.when("click")
  .filter((e) => e.target.matches(".foo"))
  .map((e) => ({ x: e.clientX, y: e.clientY }))
  .subscribe(handleClickAtPoint);

提案の背景

実は Observable の提案はもともと TC39 の方で進行していましたが Stage 1 のままでなかなか進んでいませんでした。

AbortController やインテグレーションする EventTarget の仕様が DOM Standard にある関係もあり、RxJS のコアチームリードである Ben Lesh 氏から WICG の方で改めて進行しないかと提案がありました。

https://github.com/whatwg/dom/issues/544

その後 TPAC 2023 で議題に上がり、2023年9月から Chrome で2024年1月から Firefox でプロトタイプ実装が始まりました。

https://docs.google.com/presentation/d/1lPLUcm_yqR5couGwouETTkblhqgHfpgUBkm2mnmxegU/edit

https://docs.google.com/document/d/1NEobxgiQO-fTSocxJBqcOOOVZRmXcTFg9Iqrhebb7bg/edit

そして TPAC 2024 からの議論によってオブザーバーを弱参照で保持することで Subscriber を再利用することが決まりWebKit から提案をサポートすると表明されたことによって Chrome 135 に搭載されることが決まりました。

https://groups.google.com/a/chromium.org/g/blink-dev/c/stxSgTgMHog

締め

Observable の議論の流れを見てまとめてみました。

個人的に EventTarget から Observable が作れるのは便利かなと思うものの、一般的なストリームとしては高機能な ReadableStream が使われていくことになるのではないかと考えています。

また Preact, Vue そして Svelte のような UI ライブラリに対しては TC39 で進んでいる Stage 1 Signals が使われることになると思うので、そういった面でも Observable が活用されるのは限定的になる気がします。

とはいえ今後どうなっていくのか注視していこうと思います。

Discussion