RxJS の基本理解
RxJS の基本理解
RxJS は Observable と呼ばれるデータ構造を提供します。
Observable は複数回のデータ送信を実装した Promise のようなものです。
Promise.resolve(1) が 1を発行する Promise を生成するのと同様に、
Observable では of(1) が 1を発行する Observable を生成します。
同様に of(1,2,3) は 1,2,3 という3つの値を発行する Observable を生成します。
import { of } from 'rxjs';
const obs = of(1,2,3)
obs.subscribe(console.log) // 1,2,3 (3 times)
Promise が then でデータの受け取りを行っていたのと同様に、
Observable では、subscribe でデータを受け取り、
引数で渡した関数は Observable が発行するデータの個数に応じて、その都度繰り返し実行されます。
基本的な Subscribe の仕組み
Promise では then の戻り地は Promise であったため、
then をメソドチェーンでつなぐ記法が可能でした。
Observable における subscribe の戻り値は Observable ではなく、
Subscription と呼ばれるオブジェクトで、メソドチェーンで subscribe をつなぐことができません。
Subscription は subscribe で登録した関数を、停止させるためのメソドなどを提供します。
import { interval } from 'rxjs';
const observable = interval(1000);
const subscription = observable.subscribe(console.log);
setTimeout(()=>{
subscription.unsubscribe();
},5000)
上記の例で interval は一定間隔ごとに値を送信し続ける Observable を生成する関数です。
console.log を subscribe すると 1000 ms ごとに ログが実行されますが、
setTimeout で 5000 ms 後に unsubscribe を実行しているため、処理は 5000ms 後にストップされます。
Pipe を使ったデータ加工
Promise では、then によるメソドチェーンで Promise の結果の加工を行っていましたが、
Observable おけるデータ加工では、pipe を用いた記法が用いられています。
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
const observable = of(1, 2, 3)
.pipe(map((x) => x * x))
observable.subscribe((v) => console.log(`value: ${v}`)); // 1,4,9
pipe は observable で実行可能な関数で、引数に各種操作命令を登録して、
Observable に流れるデータの加工や制御を行います。
pipe 関数の引数で利用する様々な関数を オペレータといい、 上記の例では map がオペレータです。
オペレータはすべて、rxjs/operators
から import して利用する事ができます。
RxJS には 様々なオペレータが用意されており、
各種オペレータの役割と使い方を理解することが、RxJS 習得の中で一番困難と言えるかもしれません。
Observable の生成
RxJS には of などの便利な Observable 生成関数が用意されていますが、
Observable コンストラクタを利用して Observable を生成することも可能です。
of(1,2,3)
の Observable は以下のようにして実装できます。
import { Observable } from 'rxjs';
const obs = new Observable((sub) => {
sub.next(1);
sub.next(2);
sub.next(3);
sub.complete();
});
sub.next
は Promise の resolve に近い処理で、
next をコールして値を送信することができます。
Observable は複数の値を送信できるため、 complete をコールして、
明示的に Observable の終了を宣言することもできます。
Promise の reject に相当する、エラー処理には、sub.error()
を用いることができます。
import { Observable } from 'rxjs';
const obs = new Observable((sub) => {
sub.next(1);
sub.next(2);
sub.error(new Error("something wrong!"))
sub.next(3);
sub.complete();
});
obs.subscribe(console.log) // 1,2
注意が必要なのは、sub.error は関数内の処理を中断しない、ということです。
sub.error 実行後も関数内で記述された処理は継続して行われますが、
後続の sub.next
sub.complete
で実装される処理は subscribe の引数である関数に渡されることはありません。
Observable のエラー処理
ここまでの subscribe では、引数にただ一つの関数を用いて来ましたが、
もう少し複雑な例としてオブジェクトを用いた subscribe の実装を確認しておきましょう。
import { of, map, Observable } from 'rxjs';
const obs = new Observable((sub) => {
sub.next(1);
sub.next(2);
sub.error(new Error('hoge'));
sub.next(3);
sub.complete();
});
obs.subscribe({
next(r) {
console.log('next', r);
},
error(e) {
console.error(e);
},
complete() {
console.log('complete');
},
});
subscribe の引数を関数ではなくオブジェクトで実装する場合、
next, error, complete の3つの要素を持つオブジェクトを利用します。
next は通常のデータの受信で利用される関数で、
error は Observable の内部でエラーが発生した際にコールされる関数です。
エラーが発生しうる Observable を subscribe する場合、
適切に error の処理を定義しない限り、ルートレベルでエラーがレポートされてしまいます。
RxJS では subscribe で エラー処理の関数を実装する他にも、
catchError オペレータを用いて、Observable 内部からエラーを除去するような書き方も用いられることがあります。
RxJS の用語理解
RxJS のドキュメントを理解する上で、 RxJS にまつわる様々な用語の理解は必要な知識です。
new Observable
や of
関数などで生成される値を observable といいますが、
subscribe 関数に引数で渡される関数は、一般的に observer と呼ばれます。
よって以下のような記述が成り立ちます。
observable.subscribe(observer);
また、Observable の処理を受け取るのは、subscribe 関数ですが、
Observable コンストラクタで定義する関数は subscriber と呼ばれることがあります。
このため、Observable の生成は以下のようなコードで書かれることがあります。
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
その他の重要な RxJSの概念は以下の資料から確認することも可能です。
Discussion