DOM Observable は Push ストリームのデファクトスタンダードになるか?
変更情報
【2025/02/25】
- 将来的に
ReadableStreamでも Async Iterator Helpers が使えるようになることを追記 - 誤っていたコードを修正
【2025/02/24】
-
Subscriberがアクティブな状態で、かつ1つ以上のオブザーバーによって購読されている場合、購読してもObservableのコンストラクタに渡した(プロデューサー)函数を呼ばず、Subscriberを再利用する仕様変更に追随し更新 -
SubscriptionObserverCallbackがObservableSubscriptionCallbackにリネームされたのに追随し更新 - WinterCG を WinterTC (TC55) へ変更
- Chrome 135 に搭載されることを追記
【2024/08/13】
【2024/01/25】
- 記事の構成を変更
-
ObserverがSubscriptionObserverにリネームされたのに追随し更新 - Firefox のプロトタイプ実装が始まったことに言及
【2023/12/01】
-
Observable#subscribeにそのまま函数を渡せるようになったのに追随し更新 -
subscriber.addTeardownについての記述を追記
【2023/11/21】
RxJS を使ったことがある方は Observable という概念を知っているかと思います。この Observable を Web 標準に入れる提案が進行しています。
この記事では JavaScript におけるストリーム機能をまとめ、そして提案されている Observable について触れたいと思います。
Pull ストリームと Push ストリーム
ストリームを扱うプロトコル(インターフェース)やクラスは Pull ストリームと Push ストリームに大別されます。データを使う側から取得の要請するのが Pull ストリームで、データを送る側から任意のタイミングで一方的に送信するのが Push ストリームです。
Pull: ES2018 Async Iteration
ES2018 から Pull ストリームである非同期反復可能プロトコル(インターフェース)が扱えます。
for-await-of 文を使うと内部で Symbol.asyncIterator メソッドで取得した非同期イテレーターの next メソッドを実行し、次の値を取得するようになっています。
const asyncIterable = {
async *[Symbol.asyncIterator]() {
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "hello";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "async";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "iteration!";
},
};
(async () => {
for await (const value of asyncIterable) {
console.log(value); // "hello", "async", "iteration!"
}
})();
詳しくは過去の記事を参考にしてください。
Stage 2 Async Iterator Helpers (TC39 Proposal)
非同期イテレーターはあくまでプロトコル(インターフェース)という扱いでしたが、グローバルに AsyncIterator クラスを追加する Async Iterator Helpers という提案があります。
AsyncIterator クラスを継承することによって next メソッドのみを実装するだけで map や filter などのメソッドを扱えるようになります。また Async Generator Functions を使った場合にも同様のメソッドが使えます。
const asyncIterable = {
async *[Symbol.asyncIterator]() {
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "hello";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "async";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "iteration!";
},
};
(async () => {
await asyncIterable[Symbol.asyncIterator]()
.filter((value) => value.length === 5)
.toArray(); // => ["hello", "async"]
})();
以下は同期的な Iterator Helpers についての記事になりますが、参考になるかと思います。
Pull: ReadableStream (Streams Standard)
高機能な Pull ストリームとして ReadableStream クラスがあります。これは Fetch API の Response インスタンスの body プロパティなどで使われています。
Web のための仕様ですが WinterTC (TC55) という非ブラウザランタイムグループが策定する Minimum Common Web Platform API に含まれており、Node.js や Deno などほとんどの JavaScript ランタイムで扱うことができます。
この ReadableStream 自体は Pull ストリームですが、内部にデータを保持するキューを持っており、Pull と Push いずれのデータソースであっても入力できるようになっています。
// Pull Source から ReadableStream を作る例
new ReadableStream({
async pull(controller) {
// source.pull でデータを持つ Promise を返すことを想定
const { done, value } = await source.pull();
if (done) {
controller.close();
}
controller.enqueue(data);
},
});
// Push Source から ReadableStream を作る例
new ReadableStream({
start(controller) {
// source.subscribe でコールバック函数にデータを渡すことを想定
source.subscribe(({ done, value }) => {
if (done) {
controller.close();
}
controller.enqueue(value);
});
},
});
リーダーを取得し read メソッドを使ってデータを受け取ることができます。
const reader = readableStream.getReader();
(async () => {
for (;;) {
const { done, value } = await reader.read();
if (done) {
console.log("complete");
break;
}
console.log(value);
}
})();
ストリームをキャンセルし、キューに入っているデータを開放するには cancel メソッドを呼び出します。
他にもパイプチェーン機能、内部キューの状態に基づいてデータソースからの取得を一時停止する機能、そしてデータを受け取る側から提供した ArrayBufferView に(内部キューが空で可能な場合に)ゼロコピーで直接データを書き込む機能などがあります。
ECMAScript との関連
まだ実験的な機能で実装されているブラウザは少ないですが ReadableStream.from で(非同期)反復可能プロトコルから直接 ReadableStream を生成できたり、ReadableStream に対して for-await-of 文が使えたりします。
const asyncIterable = {
async *[Symbol.asyncIterator]() {
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "hello";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "async";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "iteration!";
},
};
const readableStream = ReadableStream.from(asyncIterable);
(async () => {
for await (const data of readableStream) {
console.log(data); // "hello", "async", "iteration!"
}
console.log("complete");
})();
for-await-of 文を break して非同期イテレータを閉じる際に元の ReadableStream の cancel メソッドが実行されたくない場合は、明示的に values メソッドに preventCancel: true を指定します。
(async () => {
for await (const data of readableStream.values({ preventCancel: true })) {
if (data === "suspend") {
break;
}
console.log(data);
}
})();
また将来的に values メソッドを使うことで Stage 2 Async Iterator Helpers の map や filter メソッドなどが扱えるようになるでしょう。
async () => {
await readableStream.values()
.filter((value) => value.length === 5)
.toArray();
})();
Push: EventTarget (DOM Standard)
よく使われる Push ストリームの例として EventTarget クラスがあります。addEventListener メソッドでイベントが発火されるたびに実行されるコールバックを登録できます。DOM の Node がこの EventTarget を継承しており、広く使われています。
const eventTarget = new EventTarget();
eventTarget.addEventListener("data", (e) => {
console.log(e.detail); // "hello", "event", "target!"
});
eventTarget.dispatchEvent(new CustomEvent("data", { detail: "hello" }));
eventTarget.dispatchEvent(new CustomEvent("data", { detail: "event" }));
eventTarget.dispatchEvent(new CustomEvent("data", { detail: "target!" }));
EventTarget の問題点
EventTarget はあくまでイベントを扱います。データストリームとして扱うとなると冗長です。
また Iterator Helpers のように map や filter などのメソッドを追加したくても EventTarget にその機能を入れるのは容易ではなさそうです。
Push: Observable (WICG Proposal)
記事冒頭で述べた通り、新たな Push ストリームとして Web 標準に Observable を入れる提案が進行中です。
const observable = new Observable(async (subscriber) => {
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.next("hello");
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.next("observable!");
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.complete();
});
observable.subscribe({
next(deta) {
console.log(data); // "hello", "observable!"
},
complete() {
console.log("complete");
},
});
Observable の subscribe メソッドの第一引数には SubscriptionObserver インターフェースもしくは単なるコールバック函数を渡すことが出来ます。単に函数を渡したときは SubscriptionObserver インターフェースの next だけを渡したことと等価になります。
type ObservableSubscriptionCallback<T> = (value: T) => void;
interface SubscriptionObserver<T> {
next(value: T): void;
error(err: any): void;
complete(): void;
}
type ObserverUnion<T> =
| ObservableSubscriptionCallback<T>
| SubscriptionObserver<T>;
もちろん map や filter などのメソッドが備わっています。
const observable = new Observable(async (subscriber) => {
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.next("hello");
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.next("observable!");
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.complete();
});
(async () => {
await observable
.filter((value) => value.length === 5)
.toArray(); // => ["hello"]
})();
Observable の購読を辞める
Observable の購読を辞める必要がある場合は subscribe メソッドの第二引数に AbortSignal を渡し、abort メソッドを実行します。後処理が必要な場合は RxJS とは異なり、Observable のコンストラクタに渡したプロデューサー函数内で subscriber.addTeardown にコールバック函数を登録します。
const observable = new Observable((subscriber) => {
subscriber.addTeardown(() => {
// 後処理
});
});
const controller = new AbortController();
observable.subscribe((next) => {
console.log(data);
}, { signal: controller.signal });
// 購読を辞める
controller.abort();
Observable のコンストラクタに渡すプロデューサー函数の特徴
Observable は Push ストリームですが、Pull と Push のいずれのデータソースでも入力できます。
ただし ReadableStream とは異なり subscribe メソッドが呼ばれてからコンストラクタに渡したプロデューサー函数が実行される仕様なため、Push データソースから Observable を作る場合は意図せずデータが欠損してしまうおそれがあることに注意が必要です。
これは Push ストリーム一般の特徴で、EventTarget が addEventListener されてない間は dispatchEvent で渡されたイベントを使わずに捨てることと同義です。
const observable = new Observable((subscriber) => {
// source.subscribe でコールバック函数にデータを渡すことを想定
source.subscribe(({ done, value }) => {
if (done) {
subscriber.complete();
}
subscriber.next(value);
});
});
// ソースに同期的にデータをプッシュする
source.push(1);
// ここでコンストラクタに渡したプロデューサー函数が実行される
observable.subscribe((data) => {
console.log(data); // 2, 3
});
// 以降は購読される
source.push(2);
source.push(3);
なお Observable が複数のオブサーバーから購読される場合プロデューサー函数は複数回実行されず Subscriber を再利用します。Subscriber の complete や error が呼ばれ非アクティブな状態になっているか、購読しているオブサーバーの数が0になった後に購読すると再度プロデューサー函数が実行されます。
let count = 0;
const observable = new Observable(async (subscriber) => {
const value = ++count;
// 同期実行の場合1つ目のオブザーバーにのみデータが渡るため、1秒後にデータを渡す
await new Promise((resolve) => setTimeout(resolve, 1000));
subscriber.next(value);
});
(async () => {
// 複数購読する
const controller = new AbortController();
observable.subscribe((data) => {
console.log(data); // 1
}, { signal: controller.signal });
observable.subscribe((data) => {
console.log(data); // 1
}, { signal: controller.signal });
// 2秒後に購読を辞める
await new Promise((resolve) => setTimeout(resolve, 2000));
controller.abort();
// 再購読する
observable.subscribe((data) => {
console.log(data); // 2
});
})();
ECMAScript との関連
Observable.from を使うことで(非同期)反復可能プロトコルや Promise から直接 Observable を生成することができます。
const asyncIterable = {
async *[Symbol.asyncIterator]() {
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "hello";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "async";
await new Promise((resolve) => setTimeout(resolve, 1000));
yield "iteration!";
},
};
const observable = Observable.from(asyncIterable);
observable.subscribe({
next(deta) {
console.log(data); // "hello", "async", "iteration!"
},
complete() {
console.log("complete");
},
});
EventTarget からのインテグレーション
EventTarget に when メソッドが追加され、Observable が簡単に作れるようになっています。
element.when("click")
.filter((e) => e.target.matches(".foo"))
.map((e) => ({ x: e.clientX, y: e.clientY }))
.subscribe(handleClickAtPoint);
提案の背景
実は Observable の提案はもともと TC39 の方で進行していましたが Stage 1 のままでなかなか進んでいませんでした。
AbortController やインテグレーションする EventTarget の仕様が DOM Standard にある関係もあり、RxJS のコアチームリードである Ben Lesh 氏から WICG の方で改めて進行しないかと提案がありました。
その後 TPAC 2023 で議題に上がり、2023年9月から Chrome で、2024年1月から Firefox でプロトタイプ実装が始まりました。
そして TPAC 2024 からの議論によってオブザーバーを弱参照で保持することで Subscriber を再利用することが決まり、WebKit から提案をサポートすると表明されたことによって Chrome 135 に搭載されることが決まりました。
締め
Observable の議論の流れを見てまとめてみました。
個人的に EventTarget から Observable が作れるのは便利かなと思うものの、一般的なストリームとしては高機能な ReadableStream が使われていくことになるのではないかと考えています。
また Preact, Vue そして Svelte のような UI ライブラリに対しては TC39 で進んでいる Stage 1 Signals が使われることになると思うので、そういった面でも Observable が活用されるのは限定的になる気がします。
とはいえ今後どうなっていくのか注視していこうと思います。
Discussion