Open7

RxJS再入門

ピン留めされたアイテム
peko858peko858

RxJSを理解するための基本

  • Observerパターンを基本に据えている。
  • Observer(監視者)とObservable(監視対象)がある。監視対象の状態が変化した際に、監視者に通知され監視者の処理が実行される。
  • 通常のObservableはObservableオブジェクトを購読する(subscribeを呼び出す)と、処理が開始される。subscribeにはobserverを渡す。
  • Observableオブジェクトには map filter などの処理を付け加えることができる。

参考

https://rxjs.dev/
https://blog.recruit.co.jp/rmp/front-end/rxjs-intro/
https://blog.recruit.co.jp/rmp/front-end/post-11511/

peko858peko858

Observableをsubscribeするいくつかの例

  • Observable は、将来の値またはイベントの呼び出し可能なコレクションのアイデアを表す。複数の値のコレクションを遅延してプッシュする。
  • Observer は、Observable によって配信される値の利用者。Observable によって配信される通知ごとのコールバック。
// observableを作る
const observable = new Observable(subscriber => {
  subscriber.next({ value: 'foo', delay: 2000 })
  subscriber.next({ value: 'bar', delay: 1000 })
  setTimeout(() => {
    subscriber.next({ value: 'hoge', delay: 1000 });
    subscriber.complete();
  }, 3000);
});

// 例1. subscribeにobserverである関数を渡す
observable
  .subscribe(
  obj => { setTimeout(() => { console.log(obj.value); }, obj.delay); },
  err => console.error('Observer got an error: ' + err),
  () => console.log('Observer got a complete notification')
)

// 例2. subscribeにobserverである関数を渡す(next関数のみ)
observable
  .subscribe(
  obj => { setTimeout(() => { console.log(obj.value); }, obj.delay); }
)

// 例3. subscribeにobserverをオブジェクトで渡す
const observer = {
  next: obj => setTimeout(() => { console.log(obj.value); }, obj.delay),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};
observable.subscribe(observer);

https://jsbin.com/becekudade/edit?js,console

Subscriptionを扱う

  • Subscriptionは Observable の実行を表す使い捨てのオブジェクト。主に実行をキャンセルするのに役立つ。
const observable = new Observable(subscriber => {
  subscriber.next({ value: 'foo', delay: 2000 })
  subscriber.next({ value: 'bar', delay: 1000 })
   setTimeout(() => {
    subscriber.next({ value: 'hoge', delay: 1000 });
    subscriber.complete();
  }, 2000);
});
const observer = {
  next: obj => setTimeout(() => { console.log(obj.value); }, obj.delay),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

const subscription1 = observable.subscribe(observer);
const subscription2 = observable.subscribe(observer);
const subscription3 = observable.subscribe(observer);

subscription1.add(subscription2);

setTimeout(() => {
  // Unsubscribes BOTH subscription and childSubscription
  subscription1.unsubscribe();
}, 1000);

https://jsbin.com/fuqodigewu/1/edit?js,console

peko858peko858

Observableを作るいくつかの例

  • Subjectは値を多くのオブザーバーにマルチキャストできるようにする特別なタイプのObservable。Observableは1人のオブザーバーにしか通知できないが、Subjectは複数のオブザーバーに通知できる。
  • Creation OperatorsはOperatorsの種類であり、新しい Observable を作成するためにスタンドアロン関数として呼び出すことができる。
const observer = {
  next: obj => setTimeout(() => { console.log(obj.value); }, obj.delay),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

// 例0. Observableクラスのコンストラクタを使う
const observable = new Observable((subscriber) => {
  subscriber.next({ value: 'foo', delay: 2000 });
  subscriber.next({ value: 'bar', delay: 1000 });
  subscriber.complete();
});
observable.subscribe(observer)


// 例1. subjectを使った場合
const subject = new Subject()
subject.subscribe(observer)

subject.next({ value: 'foo', delay: 2000 });
subject.next({ value: 'bar', delay: 1000 });
subject.complete();
subject.next({ value: 'foo', delay: 3000 }); // complete後なので無視される

// 例2. Creation Operatorsであるofを使った場合
const observable2 = of(
  { value: 'foo', delay: 2000 },
  { value: 'bar', delay: 1000 }
)
observable2.subscribe(observer);
// 全てsubscribeされた段階で、completeする。

// 例3. Creation Operatorsであるfromを使った場合
const observable3 = from([
  { value: 'foo', delay: 2000 },
  { value: 'bar', delay: 1000 }
])
observable3.subscribe(observer);
// 全てsubscribeされた段階で、completeする。

https://jsbin.com/qemejoruba/edit?js,console

const observer = {
  next: v => console.log(v),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

// 4. Creation OperatorsであるfromEventを使った場合
const clicks = fromEvent(document, 'click');
clicks.subscribe(observer);

// 5. Creation Operatorsであるintervalを使った場合
const numbers = interval(1000)
numbers.subscribe(observer);

https://jsbin.com/nipogayada/1/edit?html,js,console,output

peko858peko858

Observableに処理を付け加える(pipeする)

  • Operatorは、map filter concat reduce などの操作でコレクションを処理する関数型プログラミング スタイルを可能にする純粋な関数。
  • Pipeable Operators は、obs.pipe(operator()) のシンタックスで Observableに繋げることができる。呼び出された場合、既存の Observable インスタンスは変更せず、代わりにサブスクリプション ロジックが最初の Observable に基づいている新しい Observable を返します。
  • Pipeable Operators は関数であるため、通常の関数のように使用できる( op4()(op3()(op2( )(op1()(obs)))) )。これでは()が深くなってしまうが、pipeメソッドを使うと同じことをより簡単に実行できる( obs.pipe(op1(), op2(), op3(), op4());
const observable = of(1, 2, 3, 4, 5)
const observer = {
  next: v => console.log(v),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

// pipeメソッドを使わない不便な例
delay(1000)(map((x) => x * x)(filter((x) => x <= 3)(observable)))
  .subscribe(observer);

// pipeメソッドを使う例
observable
  .pipe(
    filter((x) => x <= 3), // Filtering Operators
    map((x) => x * x), // Transformation Operators
    delay(1000) // Utility Operators
  )
  .subscribe(observer);

https://jsbin.com/cinenalixe/edit?js,console

peko858peko858

エラーハンドリング

const observable = of(1,2,3,4)
  .pipe(tap((v) => {
    if (3 == v) {
      throw new Error('errorだよ')
    }
  }))
const observer = {
  next: v => console.log(v),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

console.log('例1. エラー終了')
observable.subscribe(observer);

console.log('例2. エラーキャッチする')
observable
  .pipe(catchError(err => of(null)))
  .subscribe(observer);

console.log('例3. エラーキャッチする')
observable
  .pipe(catchError(err => of('I', 'II', 'III')))
  .subscribe(observer);

console.log('例4. エラーキャッチする')
observable
  .pipe(catchError(err => {
  throw 'error in source. Details: ' + err;
  }))
  .subscribe(observer);

console.log('例5. retryする')
observable
  .pipe(
    catchError((err, caught) => caught),
    take(7)
  )
  .subscribe(observer);

console.log('例6. retryする')
observable
  .pipe(retry(2))
  .subscribe(observer);

https://jsbin.com/pokuqovuli/1/edit?js,console

peko858peko858

Observablesの中のObservablesをハンドリングする

(Higher-order Observables)

Join Operatorsを使って、observablesの中のobservablesを実行する

const observable = of(
  { url: 'http://bar', delay: 3000 },
  { url: 'http://foo', delay: 1000 },
  { url: 'http://hoge', delay: 2000 }
)
const observer = {
  next: v => console.log(v),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

function httpGet(url, delay) {
  return new Promise(resolve => {
    setTimeout(() => {
      const obj = { ResultData: url + ' -> resolved' }
      resolve(JSON.stringify(obj))
    }, delay)
  })
}

console.log('例0. 新しいobservableを作るが実行されない')
observable
  .pipe(map((obj) => httpGet(obj.url, obj.delay)))
  .subscribe(observer);

console.log('例1. concatAll')
observable
  .pipe(
    map((obj) => httpGet(obj.url, obj.delay)),
    concatAll()
  )
  .subscribe(observer);

console.log('例2. mergeAll')
// 並列実行なので、fooの出力が先にされる
observable
  .pipe(
    map((obj) => httpGet(obj.url, obj.delay)),
    mergeAll()
  )
  .subscribe(observer);

console.log('例3. switchAll')
// 最も最近の Observable からのみ値を発行する
observable
  .pipe(
    map((obj) => httpGet(obj.url, obj.delay)),
    switchAll()
  )
  .subscribe(observer);

console.log('例4. exhaustAll')
// 最初の Observable からのみ値を発行する
observable
  .pipe(
    map((obj) => httpGet(obj.url, obj.delay)),
    exhaustAll()
  )
  .subscribe(observer);

https://jsbin.com/rafobedoqa/1/edit?js,console

Transformation Operatorsを使って、observablesの中のobservablesを実行する

const observable = of(
  { url: 'http://bar', delay: 2000 },
  { url: 'http://foo', delay: 1000 },
  { url: 'http://hoge', delay: 2000 }
)
const observer = {
  next: v => console.log(v),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

function httpGet(url, delay) {
  return new Promise(resolve => {
    setTimeout(() => {
      const obj = { ResultData: url + ' -> resolved' }
      resolve(JSON.stringify(obj))
    }, delay)
  })
}

console.log('例1. concatMap')
// 順次実行
observable
  .pipe(
    concatMap((obj) => httpGet(obj.url, obj.delay))
  )
  .subscribe(observer);

console.log('例2. mergeMap')
// 並列実行なので、fooの出力が先にされる
observable
  .pipe(
    mergeMap((obj) => httpGet(obj.url, obj.delay))
  )
  .subscribe(observer);

console.log('例3. switchMap')
// 最も最近の Observable からのみ値を発行する
observable
  .pipe(
    switchMap((obj) => httpGet(obj.url, obj.delay))
  )
  .subscribe(observer);

console.log('例4. exhaustMap')
// // 最初の Observable からのみ値を発行する
observable
  .pipe(
    exhaustMap((obj) => httpGet(obj.url, obj.delay))
  )
  .subscribe(observer);

https://jsbin.com/supohanefe/1/edit?js,console

Join Creation Operators を使って、observablesの中のobservablesを実行する

const observable1 = new Observable(subscriber => {
  setTimeout(() => {
    subscriber.next('foo1');
    subscriber.next('foo2');
    subscriber.complete();
  }, 3000);
});
const observable2 = new Observable(subscriber => {
  setTimeout(() => {
    subscriber.next('bar1');
    subscriber.next('bar2');
    subscriber.complete();
  }, 1000);
});

const observer = {
  next: v => console.log(v),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

console.log('例1. merge')
// 並列実行
merge(observable1, observable2)
  .subscribe(observer);

console.log('例1. merge (配列をくっつける)')
// 並列実行
const array = [observable1, observable2]
array.reduce((obs, acc) => merge(obs, acc))
  .subscribe(observer);

console.log('例2. concat')
// 逐次実行。mergeと違って順番が保たれる。
const array = [observable1, observable2]
array.reduce((obs, acc) => concat(obs, acc))
  .subscribe(observer);

console.log('例3. zip')
zip(observable1, observable2).pipe(
  map(([o1, o2]) => ({ o1: o1, o2: o2 }))
)
  .subscribe(x => console.log(x));

console.log('例4. forkJoin')
forkJoin({
  foo: observable1,
  bar: observable2,
  baz: timer(4000),
})
  .subscribe(
    obj => console.log(obj.foo + obj.bar)
  );

https://jsbin.com/yukimiwuho/edit?js,console

peko858peko858

Observablesの中のObservablesをハンドリングする②

いろいろ試してみる。

const observable1 = new Observable(subscriber => {
  setTimeout(() => {
    subscriber.next({value: 'foo', num: 200});
    subscriber.next({value: 'foo', num: 100});
    subscriber.complete();
  }, 1000);
});
const observable2 = new Observable(subscriber => {
  setTimeout(() => {
    subscriber.next({value: 'bar', num: 200});
    subscriber.next({value: 'bar', num: 100});
    subscriber.complete();
  }, 1000);
});

const observer = {
  next: v => console.log('next: ' + v),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

const array = [observable1, observable2].map(
  (observable) => (
    observable.pipe(
      mergeMap(
//       concatMap(
        (obj) => of({value: obj.value, num: obj.num * obj.num})
          .pipe(
            delay(obj.num),
            tap((_) => { console.log(obj); }),
            mergeMap((obj2) => of({value: obj.value, num: obj2.num + obj.num + 100})),
            map((obj2) => obj.num + obj2.value + obj2.num)
          )
      ),
      mergeMap(
//       concatMap(
        (value) => of(value)
            .pipe(
              delay(500),
              tap((_) => { console.log(value); }),
              delay(500),
            )
      )
    )
  )
)

array.reduce((obs, acc) => merge(obs, acc))
// array.reduce((obs, acc) => concat(obs, acc))
  .subscribe(observer);

https://jsbin.com/zidedipuca/edit?js,console