🎉

RxJS における主要なオペレータ

2022/03/07に公開

RxJS で用いられる主要なオペレータの紹介です。
数が多いため、一つ一つの利用方法は URL記載の公式ドキュメントを確認してください。

https://rxjs.dev/api/operators

なお、解説の中で pipe 関数を実行する起点の Observable を
Source Observable と表記しています。

Combination

combineLatestWith

combineLatestWith は、Observable に別の Observable を結合させるオペレータです。
combineLatestWith を 用いることで、observer は配列で2つの値を受け取ることになります。

生成された Observable は、2つの observable が値を発行するたびに、
それぞれ値を受け取ります。

https://rxjs.dev/api/operators/combineLatestWith

withLatestFrom

withLatestFrom は、Observable に別の Observable を結合させるオペレータです。
withLatestFrom を 用いることで、observer は配列で2つの値を受け取ることになります。

observer が初めて値を受け取るのは、結合された 2つの observable が 初めて値を送出したタイミングからとなります。
それ以降は Source Observable の値発行タイミングに基づいて、observer が動作し、
observer が受け取る配列には、それぞれの observable の最新の値が保持されます。

https://rxjs.dev/api/operators/withLatestFrom

concatWith

concatWith は、Observable の末尾に、別の Observable を付与します。

concatWith の 引数に渡された Observable は、
Source Observable が Complete するまで subscribe すらされません。

生成された Observable は、Source Observableが complete しても、
complete することなく concatWith で紐付けられた Observable の 値の受け取りをスタートします。

https://rxjs.dev/api/operators/concatWith

mergeWith

mergeWith は、Observable に、別の Observable を付与します。

concatWith の 引数に渡された Observable は、
Source Observable とともに subscribe され値が送出されます。

生成された Observable は、merge されたそれぞれの Observable の値を
区別することなく一つの Observable の中で受け取ります。
merge された すべての Observable が complete すると、
生成された Observable は complete を発行します。

https://rxjs.dev/api/operators/mergeWith

startWith

startWith は Observable の先頭に、引数で渡した任意の値を挿入します。

https://rxjs.dev/api/operators/startWith

zipWith

https://rxjs.dev/api/operators/zipWith

filtering

take

複数回発行される subscribe の 指定分のみの イベントを取得します。

import { of } from 'rxjs';
import { take } from 'rxjs/operators';

const intervalCount = of(...[1, 2, 3, 4, 5, 6, 7, 8, 9]);
const takeFive = intervalCount.pipe(take(3));
takeFive.subscribe((x) => console.log(x)); // 1,2,3

https://rxjs.dev/api/operators/take

debounceTime

https://rxjs.dev/api/operators/debounceTime

delay

delay は Observable の値の送出を、単純に指定ミリ秒遅らせます。

https://rxjs.dev/api/operators/delay

distinctUntilChanged

distinctUntilChanged は、Source Observable から
連続して同じ値が送出された際に、値の発行を行わないようにします。

あくまで連続して同じ値が送出された際に、フィルタリング処理が行われるだけなので、
結果として observer がユニークな値のみを受け取る、というわけではありません。

import './style.css';

import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

const obs = of(1, 2, 2, 2, 1, 3, 3, 4).pipe(distinctUntilChanged());
obs.subscribe(console.log); // 1,2,1,3,4

https://rxjs.dev/api/operators/distinctUntilChanged

filter

filter は、関数式を利用してObservable の値を制御します。

Source Observable から発行される値のうち、
関数式の中で true を返す値のみが、生成される Observable に渡されます。

https://rxjs.dev/api/operators/filter

takeUntil

https://rxjs.dev/api/operators/takeUntil

Transformation

bufferTime

https://rxjs.dev/api/operators/bufferTime

concatMap

https://rxjs.dev/api/operators/concatMap

map

map は、関数式を利用してObservable の値を変更します。
Array.map に近いイメージです。

https://rxjs.dev/api/operators/map

mergeMap

https://rxjs.dev/api/operators/mergeMap

scan

scan は、Observable の値それぞれに特定の関数処理を施して、
それぞれの経過を出力します。
Array.reduce に近いイメージです。

https://rxjs.dev/api/operators/scan

switchMap

https://rxjs.dev/api/operators/switchMap

others

catchError

Promise の中でエラーが発生した場合の処理を定義することができます。
catchError の関数内で別の Observable を返すことで、

https://rxjs.dev/api/operators/catchError

retry

https://rxjs.dev/api/operators/retry

share

https://rxjs.dev/api/operators/share

tap

tap は pipe の中で値を取得して処理を追加することができます。
tap は pipe の中で記述しても後続の処理に影響を与えません。

tap の挙動は subscribe と似ていますが、
subscribe が値の変更に反応する 主作用として利用されるのに対して、
tap の利用場面はデバッグや副作用の記述目的と限定的です。

https://rxjs.dev/api/operators/tap

switchMap

https://rxjs.dev/api/operators/switchMap

この記事内での選定基準について

Angular の下記ドキュメント内 中程に記載のあるテーブルでの記述を元に作成しています。

https://angular.jp/guide/rx-library

別途、ドキュメント後半で紹介される catchError , delay も追加しています。

その他以下の観点でオペレータを追加/変更しています。

  • combineLatest : v8 で deprecated となったため、combineLatestWith に変更
  • concat : v8 で deprecated となったため、concatWith に変更
  • merge : v8 で deprecated となったため、mergeWith に変更
  • zip : v8 で deprecated となったため、zipWith に変更
  • retry : 実装上で頻繁に利用される & 他のオペレータでの代替が困難であるため

Discussion