🐙

RxJS の基本理解

2022/03/07に公開

RxJS の基本理解

RxJS は Observable と呼ばれるデータ構造を提供します。

Observable は複数回のデータ送信を実装した Promise のようなものです。

Promise.resolve(1) が 1を発行する Promise を生成するのと同様に、
Observable では of(1) が 1を発行する Observable を生成します。

同様に of(1,2,3) は 1,2,3 という3つの値を発行する Observable を生成します。

import { of } from 'rxjs';

const obs = of(1,2,3)
obs.subscribe(console.log) // 1,2,3 (3 times)

Promise が then でデータの受け取りを行っていたのと同様に、
Observable では、subscribe でデータを受け取り、
引数で渡した関数は Observable が発行するデータの個数に応じて、その都度繰り返し実行されます。

基本的な Subscribe の仕組み

Promise では then の戻り地は Promise であったため、
then をメソドチェーンでつなぐ記法が可能でした。

Observable における subscribe の戻り値は Observable ではなく、
Subscription と呼ばれるオブジェクトで、メソドチェーンで subscribe をつなぐことができません。

Subscription は subscribe で登録した関数を、停止させるためのメソドなどを提供します。

import { interval } from 'rxjs';

const observable = interval(1000);
const subscription = observable.subscribe(console.log);
setTimeout(()=>{
    subscription.unsubscribe();
},5000)

上記の例で interval は一定間隔ごとに値を送信し続ける Observable を生成する関数です。
console.log を subscribe すると 1000 ms ごとに ログが実行されますが、
setTimeout で 5000 ms 後に unsubscribe を実行しているため、処理は 5000ms 後にストップされます。

Pipe を使ったデータ加工

Promise では、then によるメソドチェーンで Promise の結果の加工を行っていましたが、
Observable おけるデータ加工では、pipe を用いた記法が用いられています。

import { of } from 'rxjs';
import { map } from 'rxjs/operators';

const observable = of(1, 2, 3)
  .pipe(map((x) => x * x))

observable.subscribe((v) => console.log(`value: ${v}`)); // 1,4,9

pipe は observable で実行可能な関数で、引数に各種操作命令を登録して、
Observable に流れるデータの加工や制御を行います。

pipe 関数の引数で利用する様々な関数を オペレータといい、 上記の例では map がオペレータです。
オペレータはすべて、rxjs/operators から import して利用する事ができます。

RxJS には 様々なオペレータが用意されており、
各種オペレータの役割と使い方を理解することが、RxJS 習得の中で一番困難と言えるかもしれません。

Observable の生成

RxJS には of などの便利な Observable 生成関数が用意されていますが、
Observable コンストラクタを利用して Observable を生成することも可能です。

of(1,2,3) の Observable は以下のようにして実装できます。

import { Observable } from 'rxjs';

const obs = new Observable((sub) => {
  sub.next(1);
  sub.next(2);
  sub.next(3);
  sub.complete();
});

sub.next は Promise の resolve に近い処理で、
next をコールして値を送信することができます。

Observable は複数の値を送信できるため、 complete をコールして、
明示的に Observable の終了を宣言することもできます。

Promise の reject に相当する、エラー処理には、sub.error() を用いることができます。

import { Observable } from 'rxjs';

const obs = new Observable((sub) => {
  sub.next(1);
  sub.next(2);
  sub.error(new Error("something wrong!"))
  sub.next(3);
  sub.complete();
});

obs.subscribe(console.log) // 1,2 

注意が必要なのは、sub.error は関数内の処理を中断しない、ということです。
sub.error 実行後も関数内で記述された処理は継続して行われますが、
後続の sub.next sub.complete で実装される処理は subscribe の引数である関数に渡されることはありません。

Observable のエラー処理

ここまでの subscribe では、引数にただ一つの関数を用いて来ましたが、
もう少し複雑な例としてオブジェクトを用いた subscribe の実装を確認しておきましょう。

import { of, map, Observable } from 'rxjs';

const obs = new Observable((sub) => {
  sub.next(1);
  sub.next(2);
  sub.error(new Error('hoge'));
  sub.next(3);
  sub.complete();
});

obs.subscribe({
  next(r) {
    console.log('next', r);
  },
  error(e) {
    console.error(e);
  },
  complete() {
    console.log('complete');
  },
});

subscribe の引数を関数ではなくオブジェクトで実装する場合、
next, error, complete の3つの要素を持つオブジェクトを利用します。

next は通常のデータの受信で利用される関数で、
error は Observable の内部でエラーが発生した際にコールされる関数です。

エラーが発生しうる Observable を subscribe する場合、
適切に error の処理を定義しない限り、ルートレベルでエラーがレポートされてしまいます。

RxJS では subscribe で エラー処理の関数を実装する他にも、
catchError オペレータを用いて、Observable 内部からエラーを除去するような書き方も用いられることがあります。

RxJS の用語理解

RxJS のドキュメントを理解する上で、 RxJS にまつわる様々な用語の理解は必要な知識です。

new Observableof 関数などで生成される値を observable といいますが、
subscribe 関数に引数で渡される関数は、一般的に observer と呼ばれます。

よって以下のような記述が成り立ちます。

observable.subscribe(observer);

また、Observable の処理を受け取るのは、subscribe 関数ですが、
Observable コンストラクタで定義する関数は subscriber と呼ばれることがあります。
このため、Observable の生成は以下のようなコードで書かれることがあります。

import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});

その他の重要な RxJSの概念は以下の資料から確認することも可能です。

https://rxjs.dev/guide/glossary-and-semantics

Discussion