RxJS Observableのライフサイクル完全ガイド(TypeScript対応)
RxJSでは、Observableがどのように作成・実行・完了・解除されるのかという「ライフサイクル」の理解がとても重要です。
ここでは、RxJSにおけるObservableのライフサイクルについて、作成から購読、データの発行、完了・エラー通知、購読解除、そしてリソース管理に至るまでの一連の流れを段階的に解説します。
このライフサイクルを理解することは、RxJSを効果的に使用するための基礎となります。
Observableのライフサイクルの概要
Observableのライフサイクルとは、Observableがどのようにして生成され、どのタイミングでデータを発行し、どのように終了やエラー通知が行われ、最終的にリソースが解放されるかという一連の流れを指します。これを理解することで、RxJSの挙動を正しく把握し、意図したタイミングでの購読解除や副作用の制御、エラーハンドリングが可能になります。
🔄 Observableのライフサイクル図(状態遷移)
Observableのライフサイクルの構成
Observableのライフサイクルは、以下のフェーズで構成されています。
順番 | フェーズ | 内容 |
---|---|---|
1 | 作成(Creation) | Observableインスタンスの作成 |
2 | サブスクリプション(Subscription) |
subscribe() メソッドによる購読開始 |
3 | 実行(Execution) | - next() : データの発行- error() : エラー通知- complete() : 完了通知 |
4 | 解除(Disposal) |
unsubscribe() メソッドによる購読解除 |
Observableは「遅延実行(lazy)」であり、subscribe()
を呼び出すまでストリームは発行されません。また、complete()
または error()
が呼ばれるとストリームは終了し、それ以上の next()
呼び出しは無視されます。
Observableのライフサイクルの例
import { Observable } from 'rxjs';
// 1. Observable作成
const observable$ = new Observable<number>(subscriber => {
console.log('Observable実行開始');
// 3. 実行:データ発行
subscriber.next(1);
subscriber.next(2);
// タイマーのセットアップ
const timerId = setTimeout(() => {
subscriber.next(3);
subscriber.complete(); // 3. 実行:完了通知
console.log('Observable完了');
}, 1000);
// クリーンアップ関数を返す(unsubscribe時に呼ばれる)
return () => {
console.log('クリーンアップ実行');
clearTimeout(timerId);
};
});
// 2. サブスクリプション
const subscription = observable$.subscribe({
next: value => console.log('次の値:', value),
error: err => console.error('エラー:', err),
complete: () => console.log('完了通知を受け取りました')
});
// 4. 購読解除(手動または完了時)
setTimeout(() => {
console.log('手動で購読解除');
subscription.unsubscribe();
}, 500); // 500msで解除(完了通知の前)
// (出力)
// Observable実行開始
// 次の値: 1
// 次の値: 2
// 手動で購読解除
// クリーンアップ実行
Observer(オブザーバー)
より簡潔で用途に特化したObservable作成には、RxJSが提供する「作成操作子(creation operator)」が便利です。繰り返し使われるユースケースにはこれらを使うことでコードが簡素化されます。
Observerが持つ、3つのコールバック関数
Observerは、Observableから通知を受け取るためのインターフェースです。
3つのコールバック関数を持ちます。
-
next
: データの発行 -
error
: エラー通知 -
complete
: 完了通知
import { Observer } from 'rxjs';
// 完全なObserverオブジェクト
const observer: Observer<number> = {
next: value => console.log('値:', value),// データの発行
error: err => console.error('エラー:', err), // エラー通知
complete: () => console.log('完了') // 完了通知
};
const observable$ = of(1, 2, 3); // 簡易的にObervableを作成
// 部分的なObserverも可能
observable$.subscribe({
next: value => console.log('値のみ処理:', value)
});
// 簡略記法
observable$.subscribe(
value => console.log('値:', value),
err => console.error('エラー:', err),
() => console.log('完了')
);
// (出力)
// 値のみ処理: 1
// 値のみ処理: 2
// 値のみ処理: 3
// 値: 1
// 値: 2
// 値: 3
// 完了
Subscription(サブスクリプション)
サブスクリプションはObservableの実行を表し、主に購読解除unsubscribe()
のために使用されます。
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
const numbers$ = interval(1000).pipe(take(5));
// サブスクリプションを保持
const subscription = numbers$.subscribe({
next: value => console.log('値:', value),
complete: () => console.log('完了')
});
// 3秒後に手動で購読解除
setTimeout(() => {
subscription.unsubscribe(); // 購読解除
console.log('購読解除済み');
}, 3000);
// (出力)
// 値: 0
// 値: 1
// 値: 2
// 購読解除済み
エラー処理
Observableのライフサイクルでは、 error()
を呼び出すと直ちにストリームが終了し、complete()
は呼び出されません。このため、catchError
の使用や retry
の設計は重要です。
import { Observable, throwError, of } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';
// エラーを発生させるObservable
const failingObservable$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.error(new Error('意図的なエラー'));
// エラー後はcompleteが呼ばれないことに注意
});
// エラー処理の例
failingObservable$.pipe(
// エラー発生時、3回リトライ
retry(3),
// それでもエラーになったら、代替Observableに切り替え
catchError(error => {
console.error('エラーをキャッチ:', error.message);
return of('エラー後の代替値');
})
).subscribe({
next: value => console.log('値:', value),
error: err => console.error('ハンドリングされなかったエラー:', err),
complete: () => console.log('完了')
});
// (出力)
// 値: 1
// 値: 2
// 値: 1
// 値: 2
// 値: 1
// 値: 2
// 値: 1
// 値: 2
// エラーをキャッチ: 意図的なエラー
// 値: エラー後の代替値
// 完了
完了のライフサイクル
Observableの完了は、明示的にcomplete()
が呼ばれるか、有限のストリームが終了した場合に発生します。
import { of, interval, Observable } from 'rxjs';
import { take } from 'rxjs/operators';
// 有限のObservable(自動的に完了)
const finite$ = of(1, 2, 3);
finite$.subscribe({
next: value => console.log('有限値:', value),
complete: () => console.log('有限Observable完了')
});
// 無限のObservableを有限に変換
const limited$ = interval(1000).pipe(take(3));
limited$.subscribe({
next: value => console.log('制限付き値:', value),
complete: () => console.log('制限付きObservable完了')
});
// 手動で完了させるObservable
const manual$ = new Observable<number>(subscriber => {
subscriber.next(1);
setTimeout(() => {
subscriber.next(2);
subscriber.complete(); // 明示的に完了
}, 2000);
});
manual$.subscribe({
next: value => console.log('手動値:', value),
complete: () => console.log('手動Observable完了')
});
// (出力)ts
// 有限値: 1
// 有限値: 2
// 有限値: 3
// 有限Observable完了
// 手動値: 1
// 制限付き値: 0
// 制限付き値: 1
// 手動値: 2
// 手動Observable完了
// 制限付き値: 2
// 制限付きObservable完了
リソース管理とメモリリーク防止
適切なタイミングでのサブスクリプション解除は、メモリリークを防ぐために重要です。
import { interval, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
// コンポーネントのライフサイクルを模倣
class Component {
private destroy$ = new Subject<void>();
constructor() {
// 1秒ごとのインターバル(潜在的なメモリリークの原因)
interval(1000).pipe(
// コンポーネント破棄時に自動的に購読解除
takeUntil(this.destroy$)
).subscribe(value => {
console.log('コンポーネント内の値:', value);
});
}
// コンポーネントの破棄
ngOnDestroy() {
console.log('コンポーネント破棄');
this.destroy$.next();
this.destroy$.complete();
}
}
// 使用例
const component = new Component();
// 5秒後にコンポーネントを破棄
setTimeout(() => {
(component as any).ngOnDestroy();
}, 5000);
// (出力)
// コンポーネント内の値: 0
// コンポーネント内の値: 1
// コンポーネント内の値: 2
// コンポーネント内の値: 3
// コンポーネント内の値: 4
// コンポーネント破棄
まとめ
Observableのライフサイクルを理解することで、以下のことが可能になります。
- 適切なタイミングでのリソース解放
- エラー処理と回復戦略の実装
- コールドObservableとホットObservableの使い分け
- 副作用の管理
特に、AngularやReactなどのコンポーネントベースのフレームワークでは、takeUntil
, unsubscribe
, finalize
などを活用して、ライフサイクルに沿った購読管理が必要です。
📘 本記事は以下の学習用サイト「RxJS with TypeScript」をもとに構成されています。
Discussion