🚡

DOM Observable は Push ストリームのデファクトスタンダードになるか?

2023/10/01に公開
変更情報

【2024/01/25】

【2023/12/01】

【2023/11/21】

RxJS を使ったことがある方は Observable という概念を知っているかと思います。この Observable を Web 標準に入れる提案が進行しています。

https://wicg.github.io/observable/

この記事では JavaScript におけるストリーム機能をまとめ、そして提案されている Observable について触れたいと思います。

Pull ストリームと Push ストリーム

ストリームを扱うプロトコル(インターフェース)やクラスは Pull ストリームと Push ストリームに大別されます。データを使う側から取得の要請するのが Pull ストリームで、データを送る側から任意のタイミングで一方的に送信するのが Push ストリームです。

Pull: ES2018 Async Iteration

ES2018 から Pull ストリームである非同期反復可能プロトコル(インターフェース)が扱えます。

for-await-of 文を使うと内部で Symbol.asyncIterator メソッドで取得した非同期イテレーターの next メソッドを実行し、次の値を取得するようになっています。

const asyncIterable = {
  async *[Symbol.asyncIterator]() {
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "hello";
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "async";
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "iteration!";
  },
};

(async () => {
  for await (const value of asyncIterable) {
    console.log(value); // "hello", "async", "iteration!"
  }
})();

詳しくは過去の記事を参考にしてください。

https://zenn.dev/pixiv/articles/d1650ae332798c

Stage 2 Async Iterator Helpers (TC39 Proposal)

非同期イテレーターはあくまでプロトコル(インターフェース)という扱いでしたが、グローバルに AsyncIterator クラスを追加する Async Iterator Helpers という提案があります。

https://github.com/tc39/proposal-async-iterator-helpers

AsyncIterator クラスを継承することによって next メソッドのみを実装するだけで mapfilter などのメソッドを扱えるようになります。また Async Generator Functions を使った場合にも同様のメソッドが使えます。

const asyncIterable = {
  async *[Symbol.asyncIterator]() {
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "hello";
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "async";
    await new Promise((resolve) => setTimeout(resolve, 1000));
    yield "iteration!";
  },
};

(async () => {
  await asyncIterable[Symbol.asyncIterator]()
    .filter((value) => value.length === 5)
    .toArray(); // => ["hello", "async"]
})();

以下は同期的な Iterator Helpers についての記事になりますが、参考になるかと思います。

https://zenn.dev/pixiv/articles/062461b79e0d8f

Pull: ReadableStream (Streams Standard)

高機能な Pull ストリームとして ReadableStream クラスがあります。これは Fetch API の Response インスタンスの body プロパティなどで使われています。

https://streams.spec.whatwg.org/#rs

Web のための仕様ですが WinterCG という非ブラウザランタイムのコミュニティグループが策定する Minimum Common Web Platform API に含まれており、Node.js や Deno などほとんどの JavaScript ランタイムで扱うことができます。

この ReadableStream 自体は Pull ストリームですが、内部にキューを持っており、Pull と Push いずれのデータソースであっても入力できるようになっています。

// Pull Source から ReadableStream を作る例
new ReadableStream({
  async pull(controller) {
    // source.pull でデータを持つ Promise を返すことを想定
    const { done, value } = await source.pull();
    if (done) {
      controller.close();
    }
    controller.enqueue(data);
  },
});
// Push Source から ReadableStream を作る例
new ReadableStream({
  start(controller) {
    // source.subscribe でコールバック函数にデータを渡すことを想定
    source.subscribe(({ done, value }) => {
      if (done) {
        controller.close();
      }
      controller.enqueue(value);
    });
  },
});

リーダーを取得し read メソッドを使ってデータを受け取ることができます。

const reader = readableStream.getReader();
(async () => {
  for (;;) {
    const { done, value } = await reader.read();
    if (done) {
      console.log("complete");
      break;
    }
    console.log(value);
  }
})();

ストリームをキャンセルし、キューに入っているデータを開放するには cancel メソッドを呼び出します。

他にもパイプチェーン機能、内部キューの状態に基づいてデータソースを一時停止する機能、そして受け取る側から提供した ArrayBufferView に(内部キューが空で可能な場合に)ゼロコピーで直接データを書き込む機能などがあります。

https://developer.mozilla.org/ja/docs/Web/API/Streams_API/Concepts

ES2018 Async Iteration との関連仕様

まだ実験的な機能で実装されているブラウザは少ないですが ReadableStream.from で非同期反復可能プロトコル(インターフェース)から直接 ReadableStream を生成できたり、ReadableStream に対して for-await-of 文が使えたりします。

const readableStream = ReadableStream.from(asyncIterable);

(async () => {
  for await (const data of readableStream) {
    console.log(data);
  }
  console.log("complete");
})();

for-await-of 文を break した時に cancel されたくない場合は明示的に preventCancel: true を指定します。

(async () => {
  for await (const data of readableStream.values({ preventCancel: true })) {
    if (data === "suspend") {
      break;
    }
    console.log(data);
  }
})();

Push: EventTarget (DOM Standard)

よく使われる Push ストリームの例として EventTarget クラスがあります。addEventListener メソッドでイベントが発火されるたびに実行されるコールバックを登録できます。DOM の Node がこの EventTarget を継承しており、広く使われています。

const eventTarget = new EventTarget();

eventTarget.addEventListener("data", (e) => {
  console.log(e.detail); // "hello", "event", "target!"
});

eventTarget.dispatchEvent(new CustomEvent("data", { detail: "hello" }));
eventTarget.dispatchEvent(new CustomEvent("data", { detail: "event" }));
eventTarget.dispatchEvent(new CustomEvent("data", { detail: "target!" }));

EventTarget の問題点

EventTarget はあくまでイベントを扱います。データストリームとして扱うとなると冗長です。

また Iterator Helpers の提案のように mapfilter などのメソッドを追加したくても EventTarget にその機能を入れるのは容易ではなさそうです。

Push: Observable (WICG Proposal)

記事冒頭で述べた通り、新たな Push ストリームとして Web 標準に Observable を入れる提案が進行中です。

const observable = new Observable(async (subscriber) => {
  await new Promise((resolve) => setTimeout(resolve, 1000));
  subscriber.next("hello");
  await new Promise((resolve) => setTimeout(resolve, 1000));
  subscriber.next("observable!");
  await new Promise((resolve) => setTimeout(resolve, 1000));
  subscriber.complete();
});

observable.subscribe({
  next(deta) {
    console.log(data); // "hello", "observable!"
  },
  complete() {
    console.log("complete");
  },
});

この Observable#subscribe の第一引数には SubscriptionObserver インターフェースもしくは単なるコールバック函数を渡すことが出来ます。単に函数を渡したときは next だけ登録したことと等価になります。

interface SubscriptionObserver {
    next(value: any): void;
    error(err: any): void;
    complete(): void;
}

type SubscriptionObserverCallback = (value: any) => void;

もちろん mapfilter などのメソッドが備わっています。

const observable = new Observable(async (subscriber) => {
  await new Promise((resolve) => setTimeout(resolve, 1000));
  subscriber.next("hello");
  await new Promise((resolve) => setTimeout(resolve, 1000));
  subscriber.next("observable!");
  await new Promise((resolve) => setTimeout(resolve, 1000));
  subscriber.complete();
});

(async () => {
  await observable
    .filter((value) => value.length === 5)
    .toArray(); // => ["hello"]
})();

Observable は Push ストリームですが、Pull と Push のいずれのデータソースでも入力できます。

ただし ReadableStream とは異なり subscribe メソッドが呼ばれる度にコンストラクタのコールバックが実行される仕様なため、Push データソースから Observable を作る場合は意図せずデータが欠損してしまうおそれがあることに注意が必要です。

これは Push ストリームの特徴で、EventTargetaddEventListener されてない間は dispatchEvent のイベントを使わずに捨てることと同義です。RxJS の Hot/Cold の概念に近いかなと思います。

const observable = new Observable((subscriber) => {
  // source.subscribe でコールバック函数にデータを渡すことを想定
  source.subscribe(({ done, value }) => {
    if (done) {
      subscriber.complete();
    }
    subscriber.next(value);
  });
});

// ソースに同期的にデータをプッシュする
source.push(1);

// ここでコンストラクタのコールバックが実行される
observable.subscribe((data) => {
  console.log(data); // 2, 3
});

// 以降はサブスクライブされる
source.push(2);
source.push(3);

そしてサブスクライブを辞めるには第二引数に AbortSignal を渡します。後処理が必要な場合は RxJS とは異なり、Observable のコンストラクタ内で subscriber.addTeardown にコールバック函数を登録します。

const observable((subscriber) => {
  subscriber.addTeardown(() => {
    // 後処理
  });
});

const controller = new AbortController();
observable.subscribe((next) => {
  console.log(data);
}, { signal: controller.signal });

// サブスクライブを辞める
controller.abort();

EventTarget からのインテグレーション

EventTargeton メソッドが追加され、Observable を簡単に作ることができるようになっています。

element.on("click")
  .filter((e) => e.target.matches(".foo"))
  .map((e) => ({ x: e.clientX, y: e.clientY }))
  .subscribe(handleClickAtPoint);

提案の背景

実は Observable の提案はもともと TC39 の方で進行していましたが Stage 1 のままでなかなか進んでいませんでした。

AbortController やインテグレーションする EventTarget の仕様が DOM Standard にある関係もあり、RxJS のコアチームリードである Ben Lesh 氏から WICG の方で改めて進行しないかと提案がありました。

https://github.com/whatwg/dom/issues/544

その後 TPAC 2023 で議題に上がり、2023年9月から Chrome で2024年1月から Firefox でプロトタイプ実装が始まりました。

https://docs.google.com/presentation/d/1lPLUcm_yqR5couGwouETTkblhqgHfpgUBkm2mnmxegU/edit

https://docs.google.com/document/d/1NEobxgiQO-fTSocxJBqcOOOVZRmXcTFg9Iqrhebb7bg/edit

締め

Observable の議論の流れを見て勢いでまとめてみました。

個人的に EventTarget から Observable を作るのは便利かなと思うものの、一般的なストリームとしては高機能な ReadableStream が使われていくことになるのではないかと考えています。

今後どうなっていくのか注視していこうと思います。

Discussion