🍓

【Angular】【RxJS】押さえておきたいRxJSオペレータ

2023/04/12に公開

皆さんこんにちは、エムアイ・ラボ です!
今回は、RxJSの数多くあるオペレータの中で、複数のObservableを処理する基本的なオペレータについてまとめてみたいと思います。

この記事でまとめるオペレータは以下5つです。
それぞれサンプルコードと合わせて見ていきます。

  • merge
  • concat
  • zip
  • race
  • forkJoin

以前のブログでまとめたMap系のオペレータについてはこちら
https://zenn.dev/milab/articles/3f99cb3f08e60e

■ merge

複数のObservableを一つにまとめて新しいObservableを生成します。
mergeの引数の順番に関係なく、処理が終わった順番で発行されてcompleteに流れます。

import { of, merge } from 'rxjs';

const observableA$ = of(1, 2, 3);
const observableB$ = of(10, 20, 30);

merge(observableA$, observableB$).subscribe({
  next: res => console.log(res),
  error: err => console.log(`Error: ${err}`),
  complete: () => console.log('complete!!')
});

// 結果
// 1
// 2
// 3
// 10
// 20
// 30
// complete!!

import { of, merge, delay } from 'rxjs';

const observableA$ = of(1, 2, 3).pipe(delay(3000)); // 3000ミリ秒遅らせる
const observableB$ = of(10, 20, 30).pipe(delay(1000)); // 1000ミリ秒遅らせる
const observableC$ = of(100, 200, 300);

merge(observableA$, observableB$, observableC$).subscribe({
  next: res => console.log(res),
  error: err => console.log(`Error: ${err}`),
  complete: () => console.log('complete!!')
});

// 結果
// 100
// 200
// 300
// 10
// 20
// 30
// 1
// 2
// 3
// complete!!

以下のサンプルのように
エラーが発生した場合は、errorに流れて以後のnextとcompleteは発火しません。

import { merge, of, Observable } from 'rxjs';


const observableA$ = of(1, 2, 3);
const observableB$ = new Observable(sub => sub.error('エラー発生'));
const observableC$ = of(100, 200, 300);

merge(observableA$, observableB$, observableC$).subscribe({
  next: res => console.log(res),
  error: err => console.log(`Error: ${err}`),
  complete: () => console.log('complete!!')
});

// 結果
// 1
// 2
// 3
// Error: エラー発生

■ concat

複数のObservableを一つにまとめて新しいObservableを生成します。
mergeとの違いは、concatは引数の順番通り発行していくので前のobservableが完了してから、次のobservableに流れます。そして、全て完了したらcompleteに流れます。

import { of, concat, delay } from 'rxjs';

const observableA$ = of(1, 2, 3).pipe(delay(3000));
const observableB$ = of(10, 20, 30).pipe(delay(1000));
const observableC$ = of(100, 200, 300);

concat(observableA$, observableB$, observableC$ ).subscribe({
  next: res => console.log(res),
  error: err => console.log(`Error: ${err}`),
  complete: () => console.log('complete!!')
});

// 結果
// 1
// 2
// 3
// 10
// 20
// 30
// 100
// 200
// 300
// complete!!

■ zip

複数のObservableの値を受け取り、その処理結果を組み合わせた配列を生成します。
引数の順番は保持されます。

import { of, zip, delay } from 'rxjs';

const observableA$ = of(1, 2, 3).pipe(delay(1000));
const observableB$ = of(10, 20, 30).pipe(delay(3000)); 
const observableC$ = of(100, 200, 300);

zip(observableA$, observableB$, observableC$).subscribe({
  next: res => console.log(res),
  error: err => console.error(`Error: ${err}`),
  complete: () => console.log('complete!!')
});

// 結果
// [1, 10, 10]
// [2, 20, 200]
// [3, 30, 300]
// complete!!

また、複数のObservableのうち一つでも処理が完了したら、completeに流れます。

import { of, zip, delay } from 'rxjs';

const observableA$ = of(1, 2, 3, 4).pipe(delay(1000)); // 4は無視される
const observableB$ = of(10, 20, 30).pipe(delay(2000));
const observableC$ = of(100, 200, 300, 400, 500, 600).pipe(delay(3000)); //400以降は無視される

zip(observableA$, observableB$, observableC$).subscribe({
  next: res => console.log(res),
  error: err => console.error(`Error: ${err}`),
  complete: () => console.log('complete!!')
});

// 結果
// [1, 10, 100]
// [2, 20, 200]
// [3, 30, 300]
// complete!!

■ race

複数のObservableのうち、最初に処理が完了したObservableの値を発行してcompleteに流れます。
Promiseのraceと同じような使い方ができます。

import { of, race, delay } from 'rxjs';

const observableA$ = of(1, 2, 3).pipe(delay(3000));
const observableB$ = of(10, 20, 30).pipe(delay(1000));
const observableC$ = of(100, 200, 300).pipe(delay(6000));

race(observableA$, observableB$, observableC$).subscribe({
next: res => console.log(res),
error: err => console.log(`Error: ${err}`),
complete: () => console.log('complete!!')
});

// 結果
// 10
// 20
// 30
// complete!!

エラーが一番早い場合は、errorに流れて終了します。

import { of, race, delay, Observable } from 'rxjs';

const observableA$ = of(1, 2, 3).pipe(delay(1000));
const observableB$: Observable<number> = new Observable(subscriber => {
  subscriber.next(10);
  subscriber.error(20);
  subscriber.next(30);
});
const observableC$ = of(100, 200, 300).pipe(delay(6000));

race(observableA$, observableB$, observableC$).subscribe({
  next: res => console.log(res),
  error: err => console.log(`Error: ${err}`),
  complete: () => console.log('complete!!')
});

// 結果
// 10
// Error: 20

■ forkJoin

全てのObservableの完了を待って、各Observableの最後の値の処理結果の配列を生成します。
引数の順番は保持されます。

const observableA$ = of(1, 2, 3, 4).pipe(delay(1000));
const observableB$ = of(10, 20, 30).pipe(delay(2000));
const observableC$ = of(100, 200, 300, 400, 500, 600).pipe(delay(3000));


forkJoin([observableA$, observableB$, observableC$]).subscribe({
  next: res => console.log(res),
  error: err => console.log(`Error: ${err}`),
  complete: () => console.log('complete!!')
});

// 結果
// [4, 30, 600]
// complete!!

全てのObservableの完了を待つので、一つでもエラーがあった場合はerrorに流れます。

import { of, forkJoin, delay, Observable } from 'rxjs';

const observableA$ = of(1, 2, 3).pipe(delay(1000));
const observableB$: Observable<number> = new Observable(subscriber => {
  subscriber.next(10);
  subscriber.error(20);
  subscriber.next(30);
});
const observableC$ = of(100, 200, 300).pipe(delay(3000));

forkJoin([observableA$, observableB$, observableC$]).subscribe({
  next: res => console.log(res),
  error: err => console.log(`Error: ${err}`),
  complete: () => console.log('complete!!')
});

// 結果
// Error: 20

■ まとめ

merge

  • 複数のObservableを1つにまとめて生成する
  • 処理が完了した順番に発行する

concat

  • 複数のObservableを1つにまとめて生成する
  • 引数の順番に発行する

zip

  • 複数のObservableの値を受け取り、その処理結果を組み合わせた配列を生成する
  • 引数の順番に発行する
  • Observableの1つが完了になったらcompleteに流れる

race

  • 一番早く完了Observableの値を発行する
  • エラーが一番早い場合はerrorに流れる

forkJoin

  • 全てのObservableの完了を待った上で各Observableの最後の値の処理結果の配列を生成する
  • 引数の順番に発行する
  • 一つでもエラーがあった場合はerrorに流れる

■ 参考文献

https://rxjs.dev/guide/operators


■ 採用情報

エムアイ・ラボでは一緒に働くメンバーを募集しています。
まずは気軽にオンラインでお話ししてみませんか?

Wantedlyアカウントをお持ちでない方はTwitterのDMからでも大丈夫です。
お待ちしております。

https://www.wantedly.com/companies/milab-inc

Discussion