Closed7
RxJS再入門
ピン留めされたアイテム

RxJSを理解するための基本
- Observerパターンを基本に据えている。
- Observer(監視者)とObservable(監視対象)がある。監視対象の状態が変化した際に、監視者に通知され監視者の処理が実行される。
- 通常のObservableはObservableオブジェクトを購読する(subscribeを呼び出す)と、処理が開始される。subscribeにはobserverを渡す。
- Observableオブジェクトには
map
filter
などの処理を付け加えることができる。
参考

Observableをsubscribeするいくつかの例
- Observable は、将来の値またはイベントの呼び出し可能なコレクションのアイデアを表す。複数の値のコレクションを遅延してプッシュする。
- Observer は、Observable によって配信される値の利用者。Observable によって配信される通知ごとのコールバック。
// observableを作る
const observable = new Observable(subscriber => {
subscriber.next({ value: 'foo', delay: 2000 })
subscriber.next({ value: 'bar', delay: 1000 })
setTimeout(() => {
subscriber.next({ value: 'hoge', delay: 1000 });
subscriber.complete();
}, 3000);
});
// 例1. subscribeにobserverである関数を渡す
observable
.subscribe(
obj => { setTimeout(() => { console.log(obj.value); }, obj.delay); },
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
)
// 例2. subscribeにobserverである関数を渡す(next関数のみ)
observable
.subscribe(
obj => { setTimeout(() => { console.log(obj.value); }, obj.delay); }
)
// 例3. subscribeにobserverをオブジェクトで渡す
const observer = {
next: obj => setTimeout(() => { console.log(obj.value); }, obj.delay),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
observable.subscribe(observer);
Subscriptionを扱う
- Subscriptionは Observable の実行を表す使い捨てのオブジェクト。主に実行をキャンセルするのに役立つ。
const observable = new Observable(subscriber => {
subscriber.next({ value: 'foo', delay: 2000 })
subscriber.next({ value: 'bar', delay: 1000 })
setTimeout(() => {
subscriber.next({ value: 'hoge', delay: 1000 });
subscriber.complete();
}, 2000);
});
const observer = {
next: obj => setTimeout(() => { console.log(obj.value); }, obj.delay),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
const subscription1 = observable.subscribe(observer);
const subscription2 = observable.subscribe(observer);
const subscription3 = observable.subscribe(observer);
subscription1.add(subscription2);
setTimeout(() => {
// Unsubscribes BOTH subscription and childSubscription
subscription1.unsubscribe();
}, 1000);

Observableを作るいくつかの例
- Subjectは値を多くのオブザーバーにマルチキャストできるようにする特別なタイプのObservable。Observableは1人のオブザーバーにしか通知できないが、Subjectは複数のオブザーバーに通知できる。
- Creation OperatorsはOperatorsの種類であり、新しい Observable を作成するためにスタンドアロン関数として呼び出すことができる。
const observer = {
next: obj => setTimeout(() => { console.log(obj.value); }, obj.delay),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
// 例0. Observableクラスのコンストラクタを使う
const observable = new Observable((subscriber) => {
subscriber.next({ value: 'foo', delay: 2000 });
subscriber.next({ value: 'bar', delay: 1000 });
subscriber.complete();
});
observable.subscribe(observer)
// 例1. subjectを使った場合
const subject = new Subject()
subject.subscribe(observer)
subject.next({ value: 'foo', delay: 2000 });
subject.next({ value: 'bar', delay: 1000 });
subject.complete();
subject.next({ value: 'foo', delay: 3000 }); // complete後なので無視される
// 例2. Creation Operatorsであるofを使った場合
const observable2 = of(
{ value: 'foo', delay: 2000 },
{ value: 'bar', delay: 1000 }
)
observable2.subscribe(observer);
// 全てsubscribeされた段階で、completeする。
// 例3. Creation Operatorsであるfromを使った場合
const observable3 = from([
{ value: 'foo', delay: 2000 },
{ value: 'bar', delay: 1000 }
])
observable3.subscribe(observer);
// 全てsubscribeされた段階で、completeする。
const observer = {
next: v => console.log(v),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
// 4. Creation OperatorsであるfromEventを使った場合
const clicks = fromEvent(document, 'click');
clicks.subscribe(observer);
// 5. Creation Operatorsであるintervalを使った場合
const numbers = interval(1000)
numbers.subscribe(observer);

Observableに処理を付け加える(pipeする)
- Operatorは、
map
filter
concat
reduce
などの操作でコレクションを処理する関数型プログラミング スタイルを可能にする純粋な関数。 - Pipeable Operators は、
obs.pipe(operator())
のシンタックスで Observableに繋げることができる。呼び出された場合、既存の Observable インスタンスは変更せず、代わりにサブスクリプション ロジックが最初の Observable に基づいている新しい Observable を返します。 - Pipeable Operators は関数であるため、通常の関数のように使用できる(
op4()(op3()(op2( )(op1()(obs))))
)。これでは()が深くなってしまうが、pipeメソッドを使うと同じことをより簡単に実行できる(obs.pipe(op1(), op2(), op3(), op4());
)
const observable = of(1, 2, 3, 4, 5)
const observer = {
next: v => console.log(v),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
// pipeメソッドを使わない不便な例
delay(1000)(map((x) => x * x)(filter((x) => x <= 3)(observable)))
.subscribe(observer);
// pipeメソッドを使う例
observable
.pipe(
filter((x) => x <= 3), // Filtering Operators
map((x) => x * x), // Transformation Operators
delay(1000) // Utility Operators
)
.subscribe(observer);

エラーハンドリング
const observable = of(1,2,3,4)
.pipe(tap((v) => {
if (3 == v) {
throw new Error('errorだよ')
}
}))
const observer = {
next: v => console.log(v),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
console.log('例1. エラー終了')
observable.subscribe(observer);
console.log('例2. エラーキャッチする')
observable
.pipe(catchError(err => of(null)))
.subscribe(observer);
console.log('例3. エラーキャッチする')
observable
.pipe(catchError(err => of('I', 'II', 'III')))
.subscribe(observer);
console.log('例4. エラーキャッチする')
observable
.pipe(catchError(err => {
throw 'error in source. Details: ' + err;
}))
.subscribe(observer);
console.log('例5. retryする')
observable
.pipe(
catchError((err, caught) => caught),
take(7)
)
.subscribe(observer);
console.log('例6. retryする')
observable
.pipe(retry(2))
.subscribe(observer);

Observablesの中のObservablesをハンドリングする
(Higher-order Observables)
Join Operatorsを使って、observablesの中のobservablesを実行する
const observable = of(
{ url: 'http://bar', delay: 3000 },
{ url: 'http://foo', delay: 1000 },
{ url: 'http://hoge', delay: 2000 }
)
const observer = {
next: v => console.log(v),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
function httpGet(url, delay) {
return new Promise(resolve => {
setTimeout(() => {
const obj = { ResultData: url + ' -> resolved' }
resolve(JSON.stringify(obj))
}, delay)
})
}
console.log('例0. 新しいobservableを作るが実行されない')
observable
.pipe(map((obj) => httpGet(obj.url, obj.delay)))
.subscribe(observer);
console.log('例1. concatAll')
observable
.pipe(
map((obj) => httpGet(obj.url, obj.delay)),
concatAll()
)
.subscribe(observer);
console.log('例2. mergeAll')
// 並列実行なので、fooの出力が先にされる
observable
.pipe(
map((obj) => httpGet(obj.url, obj.delay)),
mergeAll()
)
.subscribe(observer);
console.log('例3. switchAll')
// 最も最近の Observable からのみ値を発行する
observable
.pipe(
map((obj) => httpGet(obj.url, obj.delay)),
switchAll()
)
.subscribe(observer);
console.log('例4. exhaustAll')
// 最初の Observable からのみ値を発行する
observable
.pipe(
map((obj) => httpGet(obj.url, obj.delay)),
exhaustAll()
)
.subscribe(observer);
Transformation Operatorsを使って、observablesの中のobservablesを実行する
const observable = of(
{ url: 'http://bar', delay: 2000 },
{ url: 'http://foo', delay: 1000 },
{ url: 'http://hoge', delay: 2000 }
)
const observer = {
next: v => console.log(v),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
function httpGet(url, delay) {
return new Promise(resolve => {
setTimeout(() => {
const obj = { ResultData: url + ' -> resolved' }
resolve(JSON.stringify(obj))
}, delay)
})
}
console.log('例1. concatMap')
// 順次実行
observable
.pipe(
concatMap((obj) => httpGet(obj.url, obj.delay))
)
.subscribe(observer);
console.log('例2. mergeMap')
// 並列実行なので、fooの出力が先にされる
observable
.pipe(
mergeMap((obj) => httpGet(obj.url, obj.delay))
)
.subscribe(observer);
console.log('例3. switchMap')
// 最も最近の Observable からのみ値を発行する
observable
.pipe(
switchMap((obj) => httpGet(obj.url, obj.delay))
)
.subscribe(observer);
console.log('例4. exhaustMap')
// // 最初の Observable からのみ値を発行する
observable
.pipe(
exhaustMap((obj) => httpGet(obj.url, obj.delay))
)
.subscribe(observer);
Join Creation Operators を使って、observablesの中のobservablesを実行する
const observable1 = new Observable(subscriber => {
setTimeout(() => {
subscriber.next('foo1');
subscriber.next('foo2');
subscriber.complete();
}, 3000);
});
const observable2 = new Observable(subscriber => {
setTimeout(() => {
subscriber.next('bar1');
subscriber.next('bar2');
subscriber.complete();
}, 1000);
});
const observer = {
next: v => console.log(v),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
console.log('例1. merge')
// 並列実行
merge(observable1, observable2)
.subscribe(observer);
console.log('例1. merge (配列をくっつける)')
// 並列実行
const array = [observable1, observable2]
array.reduce((obs, acc) => merge(obs, acc))
.subscribe(observer);
console.log('例2. concat')
// 逐次実行。mergeと違って順番が保たれる。
const array = [observable1, observable2]
array.reduce((obs, acc) => concat(obs, acc))
.subscribe(observer);
console.log('例3. zip')
zip(observable1, observable2).pipe(
map(([o1, o2]) => ({ o1: o1, o2: o2 }))
)
.subscribe(x => console.log(x));
console.log('例4. forkJoin')
forkJoin({
foo: observable1,
bar: observable2,
baz: timer(4000),
})
.subscribe(
obj => console.log(obj.foo + obj.bar)
);

Observablesの中のObservablesをハンドリングする②
いろいろ試してみる。
const observable1 = new Observable(subscriber => {
setTimeout(() => {
subscriber.next({value: 'foo', num: 200});
subscriber.next({value: 'foo', num: 100});
subscriber.complete();
}, 1000);
});
const observable2 = new Observable(subscriber => {
setTimeout(() => {
subscriber.next({value: 'bar', num: 200});
subscriber.next({value: 'bar', num: 100});
subscriber.complete();
}, 1000);
});
const observer = {
next: v => console.log('next: ' + v),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
const array = [observable1, observable2].map(
(observable) => (
observable.pipe(
mergeMap(
// concatMap(
(obj) => of({value: obj.value, num: obj.num * obj.num})
.pipe(
delay(obj.num),
tap((_) => { console.log(obj); }),
mergeMap((obj2) => of({value: obj.value, num: obj2.num + obj.num + 100})),
map((obj2) => obj.num + obj2.value + obj2.num)
)
),
mergeMap(
// concatMap(
(value) => of(value)
.pipe(
delay(500),
tap((_) => { console.log(value); }),
delay(500),
)
)
)
)
)
array.reduce((obs, acc) => merge(obs, acc))
// array.reduce((obs, acc) => concat(obs, acc))
.subscribe(observer);
このスクラップは4ヶ月前にクローズされました