🎃

Obervable の実行タイミングを理解する

2022/03/08に公開

Observable の動作

Observable は subscribe されるまで内部の処理が実行されません。
以下のコードでは、subscriber fired! のログは出力されません。

import { Observable } from 'rxjs';
import { tap } from 'rxjs/operators';

const obs$ = new Observable(subscriber => {
    console.log("subscriber fired!")
});

obs$.pipe(
    tap( /* ... */)
)

また、Observable は 複数の subscribe があったときに、
subscribe それぞれに対して内部処理を実行します。
以下のコードでは、subscriber fired! のログは2回出力されます。

import { Observable } from 'rxjs';

const obs$ = new Observable(subscriber => {
    console.log("subscriber fired!")
});

obs$.subscribe(console.log)
obs$.subscribe(console.log)

Shared Observable

Observable に share オペレータを施すことで、複数の subscribe に対して処理を1回に共通化することができます。

https://rxjs.dev/api/operators/share

以下のような形で、share パイプを 施すことで、
subscriber fired のログは1回に制限されます。

import { Observable } from 'rxjs';
import { share, map } from 'rxjs/operators';

const obs$ = new Observable((subscriber) => {
  setTimeout(() => {
    console.log('subscriber fired!');
    subscriber.next('hello');
  }, 1000);
}).pipe(share());

obs$.subscribe(console.log);
obs$.subscribe(console.log);

が、shared Observable においても、
Observable が 同期的に complete を発行するようなケースでは、注意が必要です。

以下の例では、subscriber fired は2回発行されることになります。

import { Observable } from 'rxjs';
import { share } from 'rxjs/operators';

const obs$ = new Observable((subscriber) => {
  console.log('subscriber fired!');
  subscriber.next('hello');
  subscriber.complete();
}).pipe(share());

obs$.subscribe(console.log);
obs$.subscribe(console.log);

これは2回目の subscribe の前に Observable が complete していることが原因です。

さらに、上記のコードで complete をコメントアウトした場合、
二回目の subscribe は動作すらしなくなります!

通常 Observable を利用する際において、
内部の処理が同期的か非同期化を考慮するのは流石に困難です。

同期的に Complete する Observable においても、
shared を有効にするには、delay(0)を pipe に追加すると良いでしょう。

import { Observable } from 'rxjs';
import { share, delay } from 'rxjs/operators';

const obs$ = new Observable((subscriber) => {
  console.log('subscriber fired!');
  subscriber.next('hello');
  subscriber.complete();
}).pipe(delay(0),share());

obs$.subscribe(console.log);
obs$.subscribe(console.log);

Discussion