【Angular】【RxJS】押さえておきたいRxJSオペレータ
皆さんこんにちは、エムアイ・ラボ です!
今回は、RxJSの数多くあるオペレータの中で、複数のObservableを処理する基本的なオペレータについてまとめてみたいと思います。
この記事でまとめるオペレータは以下5つです。
それぞれサンプルコードと合わせて見ていきます。
- merge
- concat
- zip
- race
- forkJoin
以前のブログでまとめたMap系のオペレータについてはこちら
■ merge
複数のObservableを一つにまとめて新しいObservableを生成します。
mergeの引数の順番に関係なく、処理が終わった順番で発行されてcompleteに流れます。
import { of, merge } from 'rxjs';
const observableA$ = of(1, 2, 3);
const observableB$ = of(10, 20, 30);
merge(observableA$, observableB$).subscribe({
next: res => console.log(res),
error: err => console.log(`Error: ${err}`),
complete: () => console.log('complete!!')
});
// 結果
// 1
// 2
// 3
// 10
// 20
// 30
// complete!!
import { of, merge, delay } from 'rxjs';
const observableA$ = of(1, 2, 3).pipe(delay(3000)); // 3000ミリ秒遅らせる
const observableB$ = of(10, 20, 30).pipe(delay(1000)); // 1000ミリ秒遅らせる
const observableC$ = of(100, 200, 300);
merge(observableA$, observableB$, observableC$).subscribe({
next: res => console.log(res),
error: err => console.log(`Error: ${err}`),
complete: () => console.log('complete!!')
});
// 結果
// 100
// 200
// 300
// 10
// 20
// 30
// 1
// 2
// 3
// complete!!
以下のサンプルのように
エラーが発生した場合は、errorに流れて以後のnextとcompleteは発火しません。
import { merge, of, Observable } from 'rxjs';
const observableA$ = of(1, 2, 3);
const observableB$ = new Observable(sub => sub.error('エラー発生'));
const observableC$ = of(100, 200, 300);
merge(observableA$, observableB$, observableC$).subscribe({
next: res => console.log(res),
error: err => console.log(`Error: ${err}`),
complete: () => console.log('complete!!')
});
// 結果
// 1
// 2
// 3
// Error: エラー発生
■ concat
複数のObservableを一つにまとめて新しいObservableを生成します。
mergeとの違いは、concatは引数の順番通り発行していくので前のobservableが完了してから、次のobservableに流れます。そして、全て完了したらcompleteに流れます。
import { of, concat, delay } from 'rxjs';
const observableA$ = of(1, 2, 3).pipe(delay(3000));
const observableB$ = of(10, 20, 30).pipe(delay(1000));
const observableC$ = of(100, 200, 300);
concat(observableA$, observableB$, observableC$ ).subscribe({
next: res => console.log(res),
error: err => console.log(`Error: ${err}`),
complete: () => console.log('complete!!')
});
// 結果
// 1
// 2
// 3
// 10
// 20
// 30
// 100
// 200
// 300
// complete!!
■ zip
複数のObservableの値を受け取り、その処理結果を組み合わせた配列を生成します。
引数の順番は保持されます。
import { of, zip, delay } from 'rxjs';
const observableA$ = of(1, 2, 3).pipe(delay(1000));
const observableB$ = of(10, 20, 30).pipe(delay(3000));
const observableC$ = of(100, 200, 300);
zip(observableA$, observableB$, observableC$).subscribe({
next: res => console.log(res),
error: err => console.error(`Error: ${err}`),
complete: () => console.log('complete!!')
});
// 結果
// [1, 10, 10]
// [2, 20, 200]
// [3, 30, 300]
// complete!!
また、複数のObservableのうち一つでも処理が完了したら、completeに流れます。
import { of, zip, delay } from 'rxjs';
const observableA$ = of(1, 2, 3, 4).pipe(delay(1000)); // 4は無視される
const observableB$ = of(10, 20, 30).pipe(delay(2000));
const observableC$ = of(100, 200, 300, 400, 500, 600).pipe(delay(3000)); //400以降は無視される
zip(observableA$, observableB$, observableC$).subscribe({
next: res => console.log(res),
error: err => console.error(`Error: ${err}`),
complete: () => console.log('complete!!')
});
// 結果
// [1, 10, 100]
// [2, 20, 200]
// [3, 30, 300]
// complete!!
■ race
複数のObservableのうち、最初に処理が完了したObservableの値を発行してcompleteに流れます。
Promiseのraceと同じような使い方ができます。
import { of, race, delay } from 'rxjs';
const observableA$ = of(1, 2, 3).pipe(delay(3000));
const observableB$ = of(10, 20, 30).pipe(delay(1000));
const observableC$ = of(100, 200, 300).pipe(delay(6000));
race(observableA$, observableB$, observableC$).subscribe({
next: res => console.log(res),
error: err => console.log(`Error: ${err}`),
complete: () => console.log('complete!!')
});
// 結果
// 10
// 20
// 30
// complete!!
エラーが一番早い場合は、errorに流れて終了します。
import { of, race, delay, Observable } from 'rxjs';
const observableA$ = of(1, 2, 3).pipe(delay(1000));
const observableB$: Observable<number> = new Observable(subscriber => {
subscriber.next(10);
subscriber.error(20);
subscriber.next(30);
});
const observableC$ = of(100, 200, 300).pipe(delay(6000));
race(observableA$, observableB$, observableC$).subscribe({
next: res => console.log(res),
error: err => console.log(`Error: ${err}`),
complete: () => console.log('complete!!')
});
// 結果
// 10
// Error: 20
■ forkJoin
全てのObservableの完了を待って、各Observableの最後の値の処理結果の配列を生成します。
引数の順番は保持されます。
const observableA$ = of(1, 2, 3, 4).pipe(delay(1000));
const observableB$ = of(10, 20, 30).pipe(delay(2000));
const observableC$ = of(100, 200, 300, 400, 500, 600).pipe(delay(3000));
forkJoin([observableA$, observableB$, observableC$]).subscribe({
next: res => console.log(res),
error: err => console.log(`Error: ${err}`),
complete: () => console.log('complete!!')
});
// 結果
// [4, 30, 600]
// complete!!
全てのObservableの完了を待つので、一つでもエラーがあった場合はerrorに流れます。
import { of, forkJoin, delay, Observable } from 'rxjs';
const observableA$ = of(1, 2, 3).pipe(delay(1000));
const observableB$: Observable<number> = new Observable(subscriber => {
subscriber.next(10);
subscriber.error(20);
subscriber.next(30);
});
const observableC$ = of(100, 200, 300).pipe(delay(3000));
forkJoin([observableA$, observableB$, observableC$]).subscribe({
next: res => console.log(res),
error: err => console.log(`Error: ${err}`),
complete: () => console.log('complete!!')
});
// 結果
// Error: 20
■ まとめ
merge
- 複数のObservableを1つにまとめて生成する
- 処理が完了した順番に発行する
concat
- 複数のObservableを1つにまとめて生成する
- 引数の順番に発行する
zip
- 複数のObservableの値を受け取り、その処理結果を組み合わせた配列を生成する
- 引数の順番に発行する
- Observableの1つが完了になったらcompleteに流れる
race
- 一番早く完了Observableの値を発行する
- エラーが一番早い場合はerrorに流れる
forkJoin
- 全てのObservableの完了を待った上で各Observableの最後の値の処理結果の配列を生成する
- 引数の順番に発行する
- 一つでもエラーがあった場合はerrorに流れる
■ 参考文献
■ 採用情報
エムアイ・ラボでは一緒に働くメンバーを募集しています。
まずは気軽にオンラインでお話ししてみませんか?
Wantedlyアカウントをお持ちでない方はTwitterのDMからでも大丈夫です。
お待ちしております。
Discussion