DOM Observable は Push ストリームのデファクトスタンダードになるか?
変更情報
【2025/02/25】
- 将来的に
ReadableStream
でも Async Iterator Helpers が使えるようになることを追記 - 誤っていたコードを修正
【2025/02/24】
-
Subscriber
がアクティブな状態で、かつ1つ以上のオブザーバーによって購読されている場合、購読してもObservable
のコンストラクタに渡した(プロデューサー)函数を呼ばず、Subscriber
を再利用する仕様変更に追随し更新 -
SubscriptionObserverCallback
がObservableSubscriptionCallback
にリネームされたのに追随し更新 - WinterCG を WinterTC (TC55) へ変更
- Chrome 135 に搭載されることを追記
【2024/08/13】
【2024/01/25】
- 記事の構成を変更
-
Observer
がSubscriptionObserver
にリネームされたのに追随し更新 - Firefox のプロトタイプ実装が始まったことに言及
【2023/12/01】
-
Observable#subscribe
にそのまま函数を渡せるようになったのに追随し更新 -
subscriber.addTeardown
についての記述を追記
【2023/11/21】
RxJS を使ったことがある方は Observable
という概念を知っているかと思います。この Observable
を Web 標準に入れる提案が進行しています。
この記事では JavaScript におけるストリーム機能をまとめ、そして提案されている Observable
について触れたいと思います。
Pull ストリームと Push ストリーム
ストリームを扱うプロトコル(インターフェース)やクラスは Pull ストリームと Push ストリームに大別されます。データを使う側から取得の要請するのが Pull ストリームで、データを送る側から任意のタイミングで一方的に送信するのが Push ストリームです。
Pull: ES2018 Async Iteration
ES2018 から Pull ストリームである非同期反復可能プロトコル(インターフェース)が扱えます。
for-await-of 文を使うと内部で Symbol.asyncIterator
メソッドで取得した非同期イテレーターの next
メソッドを実行し、次の値を取得するようになっています。
const asyncIterable = {
async *[Symbol.asyncIterator]() {
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "hello";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "async";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "iteration!";
},
};
(async () => {
for await (const value of asyncIterable) {
console.log(value); // "hello", "async", "iteration!"
}
})();
詳しくは過去の記事を参考にしてください。
Stage 2 Async Iterator Helpers (TC39 Proposal)
非同期イテレーターはあくまでプロトコル(インターフェース)という扱いでしたが、グローバルに AsyncIterator
クラスを追加する Async Iterator Helpers という提案があります。
AsyncIterator
クラスを継承することによって next
メソッドのみを実装するだけで map
や filter
などのメソッドを扱えるようになります。また Async Generator Functions を使った場合にも同様のメソッドが使えます。
const asyncIterable = {
async *[Symbol.asyncIterator]() {
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "hello";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "async";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "iteration!";
},
};
(async () => {
await asyncIterable[Symbol.asyncIterator]()
.filter((value) => value.length === 5)
.toArray(); // => ["hello", "async"]
})();
以下は同期的な Iterator Helpers についての記事になりますが、参考になるかと思います。
ReadableStream
(Streams Standard)
Pull: 高機能な Pull ストリームとして ReadableStream
クラスがあります。これは Fetch API の Response
インスタンスの body
プロパティなどで使われています。
Web のための仕様ですが WinterTC (TC55) という非ブラウザランタイムグループが策定する Minimum Common Web Platform API に含まれており、Node.js や Deno などほとんどの JavaScript ランタイムで扱うことができます。
この ReadableStream
自体は Pull ストリームですが、内部にデータを保持するキューを持っており、Pull と Push いずれのデータソースであっても入力できるようになっています。
// Pull Source から ReadableStream を作る例
new ReadableStream({
async pull(controller) {
// source.pull でデータを持つ Promise を返すことを想定
const { done, value } = await source.pull();
if (done) {
controller.close();
}
controller.enqueue(data);
},
});
// Push Source から ReadableStream を作る例
new ReadableStream({
start(controller) {
// source.subscribe でコールバック函数にデータを渡すことを想定
source.subscribe(({ done, value }) => {
if (done) {
controller.close();
}
controller.enqueue(value);
});
},
});
リーダーを取得し read
メソッドを使ってデータを受け取ることができます。
const reader = readableStream.getReader();
(async () => {
for (;;) {
const { done, value } = await reader.read();
if (done) {
console.log("complete");
break;
}
console.log(value);
}
})();
ストリームをキャンセルし、キューに入っているデータを開放するには cancel
メソッドを呼び出します。
他にもパイプチェーン機能、内部キューの状態に基づいてデータソースからの取得を一時停止する機能、そしてデータを受け取る側から提供した ArrayBufferView に(内部キューが空で可能な場合に)ゼロコピーで直接データを書き込む機能などがあります。
ECMAScript との関連
まだ実験的な機能で実装されているブラウザは少ないですが ReadableStream.from
で(非同期)反復可能プロトコルから直接 ReadableStream
を生成できたり、ReadableStream
に対して for-await-of 文が使えたりします。
const asyncIterable = {
async *[Symbol.asyncIterator]() {
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "hello";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "async";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "iteration!";
},
};
const readableStream = ReadableStream.from(asyncIterable);
(async () => {
for await (const data of readableStream) {
console.log(data); // "hello", "async", "iteration!"
}
console.log("complete");
})();
for-await-of 文を break
して非同期イテレータを閉じる際に元の ReadableStream
の cancel
メソッドが実行されたくない場合は、明示的に values
メソッドに preventCancel: true
を指定します。
(async () => {
for await (const data of readableStream.values({ preventCancel: true })) {
if (data === "suspend") {
break;
}
console.log(data);
}
})();
また将来的に values
メソッドを使うことで Stage 2 Async Iterator Helpers の map
や filter
メソッドなどが扱えるようになるでしょう。
async () => {
await readableStream.values()
.filter((value) => value.length === 5)
.toArray();
})();
EventTarget
(DOM Standard)
Push: よく使われる Push ストリームの例として EventTarget
クラスがあります。addEventListener
メソッドでイベントが発火されるたびに実行されるコールバックを登録できます。DOM の Node
がこの EventTarget
を継承しており、広く使われています。
const eventTarget = new EventTarget();
eventTarget.addEventListener("data", (e) => {
console.log(e.detail); // "hello", "event", "target!"
});
eventTarget.dispatchEvent(new CustomEvent("data", { detail: "hello" }));
eventTarget.dispatchEvent(new CustomEvent("data", { detail: "event" }));
eventTarget.dispatchEvent(new CustomEvent("data", { detail: "target!" }));
EventTarget
の問題点
EventTarget
はあくまでイベントを扱います。データストリームとして扱うとなると冗長です。
また Iterator Helpers のように map
や filter
などのメソッドを追加したくても EventTarget
にその機能を入れるのは容易ではなさそうです。
Observable
(WICG Proposal)
Push: 記事冒頭で述べた通り、新たな Push ストリームとして Web 標準に Observable
を入れる提案が進行中です。
const observable = new Observable(async (subscriber) => {
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.next("hello");
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.next("observable!");
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.complete();
});
observable.subscribe({
next(deta) {
console.log(data); // "hello", "observable!"
},
complete() {
console.log("complete");
},
});
Observable
の subscribe
メソッドの第一引数には SubscriptionObserver
インターフェースもしくは単なるコールバック函数を渡すことが出来ます。単に函数を渡したときは SubscriptionObserver
インターフェースの next
だけを渡したことと等価になります。
type ObservableSubscriptionCallback<T> = (value: T) => void;
interface SubscriptionObserver<T> {
next(value: T): void;
error(err: any): void;
complete(): void;
}
type ObserverUnion<T> =
| ObservableSubscriptionCallback<T>
| SubscriptionObserver<T>;
もちろん map
や filter
などのメソッドが備わっています。
const observable = new Observable(async (subscriber) => {
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.next("hello");
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.next("observable!");
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.complete();
});
(async () => {
await observable
.filter((value) => value.length === 5)
.toArray(); // => ["hello"]
})();
Observable
の購読を辞める
Observable
の購読を辞める必要がある場合は subscribe
メソッドの第二引数に AbortSignal
を渡し、abort
メソッドを実行します。後処理が必要な場合は RxJS とは異なり、Observable
のコンストラクタに渡したプロデューサー函数内で subscriber.addTeardown
にコールバック函数を登録します。
const observable = new Observable((subscriber) => {
subscriber.addTeardown(() => {
// 後処理
});
});
const controller = new AbortController();
observable.subscribe((next) => {
console.log(data);
}, { signal: controller.signal });
// 購読を辞める
controller.abort();
Observable
のコンストラクタに渡すプロデューサー函数の特徴
Observable
は Push ストリームですが、Pull と Push のいずれのデータソースでも入力できます。
ただし ReadableStream
とは異なり subscribe
メソッドが呼ばれてからコンストラクタに渡したプロデューサー函数が実行される仕様なため、Push データソースから Observable
を作る場合は意図せずデータが欠損してしまうおそれがあることに注意が必要です。
これは Push ストリーム一般の特徴で、EventTarget
が addEventListener
されてない間は dispatchEvent
で渡されたイベントを使わずに捨てることと同義です。
const observable = new Observable((subscriber) => {
// source.subscribe でコールバック函数にデータを渡すことを想定
source.subscribe(({ done, value }) => {
if (done) {
subscriber.complete();
}
subscriber.next(value);
});
});
// ソースに同期的にデータをプッシュする
source.push(1);
// ここでコンストラクタに渡したプロデューサー函数が実行される
observable.subscribe((data) => {
console.log(data); // 2, 3
});
// 以降は購読される
source.push(2);
source.push(3);
なお Observable
が複数のオブサーバーから購読される場合プロデューサー函数は複数回実行されず Subscriber
を再利用します。Subscriber
の complete
や error
が呼ばれ非アクティブな状態になっているか、購読しているオブサーバーの数が0になった後に購読すると再度プロデューサー函数が実行されます。
let count = 0;
const observable = new Observable(async (subscriber) => {
const value = ++count;
// 同期実行の場合1つ目のオブザーバーにのみデータが渡るため、1秒後にデータを渡す
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.next(value);
});
(async () => {
// 複数購読する
const controller = new AbortController();
observable.subscribe((data) => {
console.log(data); // 1
}, { signal: controller.signal });
observable.subscribe((data) => {
console.log(data); // 1
}, { signal: controller.signal });
// 2秒後に購読を辞める
await new Promise((resolve) => setTimeout(resolve, 2000));
controller.abort();
// 再購読する
observable.subscribe((data) => {
console.log(data); // 2
});
})();
ECMAScript との関連
Observable.from
を使うことで(非同期)反復可能プロトコルや Promise
から直接 Observable
を生成することができます。
const asyncIterable = {
async *[Symbol.asyncIterator]() {
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "hello";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "async";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "iteration!";
},
};
const observable = Observable.from(asyncIterable);
observable.subscribe({
next(deta) {
console.log(data); // "hello", "async", "iteration!"
},
complete() {
console.log("complete");
},
});
EventTarget
からのインテグレーション
EventTarget
に when
メソッドが追加され、Observable
が簡単に作れるようになっています。
element.when("click")
.filter((e) => e.target.matches(".foo"))
.map((e) => ({ x: e.clientX, y: e.clientY }))
.subscribe(handleClickAtPoint);
提案の背景
実は Observable
の提案はもともと TC39 の方で進行していましたが Stage 1 のままでなかなか進んでいませんでした。
AbortController
やインテグレーションする EventTarget
の仕様が DOM Standard にある関係もあり、RxJS のコアチームリードである Ben Lesh 氏から WICG の方で改めて進行しないかと提案がありました。
その後 TPAC 2023 で議題に上がり、2023年9月から Chrome で、2024年1月から Firefox でプロトタイプ実装が始まりました。
そして TPAC 2024 からの議論によってオブザーバーを弱参照で保持することで Subscriber
を再利用することが決まり、WebKit から提案をサポートすると表明されたことによって Chrome 135 に搭載されることが決まりました。
締め
Observable
の議論の流れを見てまとめてみました。
個人的に EventTarget
から Observable
が作れるのは便利かなと思うものの、一般的なストリームとしては高機能な ReadableStream
が使われていくことになるのではないかと考えています。
また Preact, Vue そして Svelte のような UI ライブラリに対しては TC39 で進んでいる Stage 1 Signals が使われることになると思うので、そういった面でも Observable
が活用されるのは限定的になる気がします。
とはいえ今後どうなっていくのか注視していこうと思います。
Discussion