DOM Observable は Push ストリームのデファクトスタンダードになるか?
変更情報
【2024/08/13】
【2024/01/25】
- 記事の構成を変更
-
Observer
がSubscriptionObserver
にリネームされたのに追随し更新 - Firefox のプロトタイプ実装が始まったことに言及
【2023/12/01】
-
Observable#subscribe
にそのまま函数を渡せるようになったのに追随し更新 -
subscriber.addTeardown
についての記述を追記
【2023/11/21】
RxJS を使ったことがある方は Observable
という概念を知っているかと思います。この Observable
を Web 標準に入れる提案が進行しています。
この記事では JavaScript におけるストリーム機能をまとめ、そして提案されている Observable
について触れたいと思います。
Pull ストリームと Push ストリーム
ストリームを扱うプロトコル(インターフェース)やクラスは Pull ストリームと Push ストリームに大別されます。データを使う側から取得の要請するのが Pull ストリームで、データを送る側から任意のタイミングで一方的に送信するのが Push ストリームです。
Pull: ES2018 Async Iteration
ES2018 から Pull ストリームである非同期反復可能プロトコル(インターフェース)が扱えます。
for-await-of 文を使うと内部で Symbol.asyncIterator
メソッドで取得した非同期イテレーターの next
メソッドを実行し、次の値を取得するようになっています。
const asyncIterable = {
async *[Symbol.asyncIterator]() {
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "hello";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "async";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "iteration!";
},
};
(async () => {
for await (const value of asyncIterable) {
console.log(value); // "hello", "async", "iteration!"
}
})();
詳しくは過去の記事を参考にしてください。
Stage 2 Async Iterator Helpers (TC39 Proposal)
非同期イテレーターはあくまでプロトコル(インターフェース)という扱いでしたが、グローバルに AsyncIterator
クラスを追加する Async Iterator Helpers という提案があります。
AsyncIterator
クラスを継承することによって next
メソッドのみを実装するだけで map
や filter
などのメソッドを扱えるようになります。また Async Generator Functions を使った場合にも同様のメソッドが使えます。
const asyncIterable = {
async *[Symbol.asyncIterator]() {
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "hello";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "async";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "iteration!";
},
};
(async () => {
await asyncIterable[Symbol.asyncIterator]()
.filter((value) => value.length === 5)
.toArray(); // => ["hello", "async"]
})();
以下は同期的な Iterator Helpers についての記事になりますが、参考になるかと思います。
ReadableStream
(Streams Standard)
Pull: 高機能な Pull ストリームとして ReadableStream
クラスがあります。これは Fetch API の Response
インスタンスの body
プロパティなどで使われています。
Web のための仕様ですが WinterCG という非ブラウザランタイムのコミュニティグループが策定する Minimum Common Web Platform API に含まれており、Node.js や Deno などほとんどの JavaScript ランタイムで扱うことができます。
この ReadableStream
自体は Pull ストリームですが、内部にキューを持っており、Pull と Push いずれのデータソースであっても入力できるようになっています。
// Pull Source から ReadableStream を作る例
new ReadableStream({
async pull(controller) {
// source.pull でデータを持つ Promise を返すことを想定
const { done, value } = await source.pull();
if (done) {
controller.close();
}
controller.enqueue(data);
},
});
// Push Source から ReadableStream を作る例
new ReadableStream({
start(controller) {
// source.subscribe でコールバック函数にデータを渡すことを想定
source.subscribe(({ done, value }) => {
if (done) {
controller.close();
}
controller.enqueue(value);
});
},
});
リーダーを取得し read
メソッドを使ってデータを受け取ることができます。
const reader = readableStream.getReader();
(async () => {
for (;;) {
const { done, value } = await reader.read();
if (done) {
console.log("complete");
break;
}
console.log(value);
}
})();
ストリームをキャンセルし、キューに入っているデータを開放するには cancel
メソッドを呼び出します。
他にもパイプチェーン機能、内部キューの状態に基づいてデータソースを一時停止する機能、そして受け取る側から提供した ArrayBufferView に(内部キューが空で可能な場合に)ゼロコピーで直接データを書き込む機能などがあります。
ES2018 Async Iteration との関連仕様
まだ実験的な機能で実装されているブラウザは少ないですが ReadableStream.from
で非同期反復可能プロトコル(インターフェース)から直接 ReadableStream
を生成できたり、ReadableStream
に対して for-await-of 文が使えたりします。
const readableStream = ReadableStream.from(asyncIterable);
(async () => {
for await (const data of readableStream) {
console.log(data);
}
console.log("complete");
})();
for-await-of 文を break した時に cancel
されたくない場合は明示的に preventCancel: true
を指定します。
(async () => {
for await (const data of readableStream.values({ preventCancel: true })) {
if (data === "suspend") {
break;
}
console.log(data);
}
})();
EventTarget
(DOM Standard)
Push: よく使われる Push ストリームの例として EventTarget
クラスがあります。addEventListener
メソッドでイベントが発火されるたびに実行されるコールバックを登録できます。DOM の Node
がこの EventTarget
を継承しており、広く使われています。
const eventTarget = new EventTarget();
eventTarget.addEventListener("data", (e) => {
console.log(e.detail); // "hello", "event", "target!"
});
eventTarget.dispatchEvent(new CustomEvent("data", { detail: "hello" }));
eventTarget.dispatchEvent(new CustomEvent("data", { detail: "event" }));
eventTarget.dispatchEvent(new CustomEvent("data", { detail: "target!" }));
EventTarget
の問題点
EventTarget
はあくまでイベントを扱います。データストリームとして扱うとなると冗長です。
また Iterator Helpers の提案のように map
や filter
などのメソッドを追加したくても EventTarget
にその機能を入れるのは容易ではなさそうです。
Observable
(WICG Proposal)
Push: 記事冒頭で述べた通り、新たな Push ストリームとして Web 標準に Observable
を入れる提案が進行中です。
const observable = new Observable(async (subscriber) => {
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.next("hello");
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.next("observable!");
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.complete();
});
observable.subscribe({
next(deta) {
console.log(data); // "hello", "observable!"
},
complete() {
console.log("complete");
},
});
この Observable#subscribe
の第一引数には SubscriptionObserver
インターフェースもしくは単なるコールバック函数を渡すことが出来ます。単に函数を渡したときは next
だけ登録したことと等価になります。
interface SubscriptionObserver {
next(value: any): void;
error(err: any): void;
complete(): void;
}
type SubscriptionObserverCallback = (value: any) => void;
もちろん map
や filter
などのメソッドが備わっています。
const observable = new Observable(async (subscriber) => {
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.next("hello");
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.next("observable!");
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.complete();
});
(async () => {
await observable
.filter((value) => value.length === 5)
.toArray(); // => ["hello"]
})();
Observable
は Push ストリームですが、Pull と Push のいずれのデータソースでも入力できます。
ただし ReadableStream
とは異なり subscribe
メソッドが呼ばれる度にコンストラクタのコールバックが実行される仕様なため、Push データソースから Observable
を作る場合は意図せずデータが欠損してしまうおそれがあることに注意が必要です。
これは Push ストリームの特徴で、EventTarget
が addEventListener
されてない間は dispatchEvent
のイベントを使わずに捨てることと同義です。RxJS の Hot/Cold の概念に近いかなと思います。
const observable = new Observable((subscriber) => {
// source.subscribe でコールバック函数にデータを渡すことを想定
source.subscribe(({ done, value }) => {
if (done) {
subscriber.complete();
}
subscriber.next(value);
});
});
// ソースに同期的にデータをプッシュする
source.push(1);
// ここでコンストラクタのコールバックが実行される
observable.subscribe((data) => {
console.log(data); // 2, 3
});
// 以降はサブスクライブされる
source.push(2);
source.push(3);
そしてサブスクライブを辞めるには第二引数に AbortSignal
を渡します。後処理が必要な場合は RxJS とは異なり、Observable
のコンストラクタ内で subscriber.addTeardown
にコールバック函数を登録します。
const observable((subscriber) => {
subscriber.addTeardown(() => {
// 後処理
});
});
const controller = new AbortController();
observable.subscribe((next) => {
console.log(data);
}, { signal: controller.signal });
// サブスクライブを辞める
controller.abort();
EventTarget
からのインテグレーション
EventTarget
に when
メソッドが追加され、Observable
を簡単に作ることができるようになっています。
element.when("click")
.filter((e) => e.target.matches(".foo"))
.map((e) => ({ x: e.clientX, y: e.clientY }))
.subscribe(handleClickAtPoint);
提案の背景
実は Observable
の提案はもともと TC39 の方で進行していましたが Stage 1 のままでなかなか進んでいませんでした。
AbortController
やインテグレーションする EventTarget
の仕様が DOM Standard にある関係もあり、RxJS のコアチームリードである Ben Lesh 氏から WICG の方で改めて進行しないかと提案がありました。
その後 TPAC 2023 で議題に上がり、2023年9月から Chrome で、2024年1月から Firefox でプロトタイプ実装が始まりました。
締め
Observable
の議論の流れを見て勢いでまとめてみました。
個人的に EventTarget
から Observable
を作るのは便利かなと思うものの、一般的なストリームとしては高機能な ReadableStream
が使われていくことになるのではないかと考えています。
今後どうなっていくのか注視していこうと思います。
Discussion