RxをPub/Subパターンで捉えてみる
Rxは Observerパターン を基にしていると説明されているものが多いですが、 Publisher-Subscriber(Pub/Sub)パターン で解釈した方が整理しやすい場面もあるのかなというお話です。
Rx全般に適用可能だとは思いますが、ここではUniRxのみを対象にして書きます。
Observerパターン
まず、Observerパターンの購読と通知部分を抜き出すと、以下のようになっています。
クラス構成
処理の流れ
-
Subjcet.AddObserver()
を呼び出してSubject
にObserver
を登録する。 - 通知すべき情報が発生すると
Subject.NotifyObservers()
が呼び出される。 - 上記の中で
Observer.Update()
が呼び出され、Observer
へ通知が行われる。
Rxをこれで整理しよう考えたときに、ここには登場しない Observable
をどう扱うべきか戸惑ってしまいました。
そこで、 Publisher-Subscriber(Pub/Sub)パターン をベースに考えた方が整理しやすいかもしれないと考えました。
Pub/Subパターン
Pub/Subパターンの購読と通知部分を抜き出すと、以下のようになっています。メソッド名は一例。
クラス構成
処理の流れ
-
Subscriber
側からBroker.Subscribe()
を呼び出し、Broker
に対してPublisher
の購読と、発行時にはSubscriber
への通知を依頼する。 -
Broker
側からPublisher.AddBroker()
を呼び出して、Publisher
にBroker
を登録する。 -
Publisher
は発行タイミングが来たら、Broker.Publish()
を呼び出してBroker
に発行を依頼する。 -
Broker
はそれを受けて、Subscriber.OnNext()
を呼び出してSubscriber
に発行を通知する。
Subscriber
が Broker
に対して「あの雑誌の定期購読に申し込んでおいて。私やり方知らないし。」と頼んでいる構図を思い浮かべましょう。
RxをPub/Subパターンに当て嵌めてみる
それではRxをPub/Subパターンに当て嵌めてみます。
Pub/Subパターンで考えると、各クラスの役割は以下のようになります。
- Publisher - Subject
- Broker - Observable
- Subscriber - Observer
Observable
の役割が定義されたので整理しやすくなりました。
Observerとは?
UniRxでのみRxに触れている場合は「 Observer
なんて出てこないけど?」と思うかもしれません。
実は、頻繁に使っています。
以下のコードの Subscribe()
の引数から Observer<T>
が生成されるのです。
SomeObservable.Subscribe(_ => DoSomethingOnNext());
上記のコードは下記と同じになります。
SomeObservable.Subscribe(Observer.Create<Unit>(onNext: _ => DoSomethingOnNext());
詳細は以下の記事を参考にして下さい。
【連載:Reactive Extensions(Rx)入門】第2回 イベント・プログラミングとRx
Observableの正体
Subjcet
は new Subject<T>()
で自分でインスタンスを生成するし、Observer
も上記の通りインスタンスを生成していることがわかりました。
では、以下の形でよく登場する Observable
の実体は何者なのでしょうか。
private readonly ISubject<Unit> someSubject = new Subject<Unit>();
public IObservable<Unit> someObservable => this.someSubject;
これは、「Rxの Subject
は Observer
と Observable
を内包している」という定義がヒントになります。
Subject
の購読、通知部分のみを簡易実装してみます。
Observer
と Observable
があると見間違えやすいので Broker
にしておきます。
// ※UniRxの実装とは異なります
public class PartialSubject<T>
{
private readonly InnerBroker innerBroker = new();
private readonly InnerObserver innerObserver;
// コンストラクタ
public PartialSubject() => this.innerObserver = new(this.innerBroker);
// Broker.Subscribe()のラッパー
public IDisposable Subscribe(IObserver<T> observer)
{
// 自身の持つ内部Brokerに購読依頼をそのままスルー
return this.innerBroker.Subscribe(observer);
}
// Observer.OnNext()のラッパー
public void OnNext(T nextItem)
{
// イベント発火タイミングが来た!
// 内部Observerに通知をスルー
this.innerObserver.OnNext(nextItem);
}
// 親クラスのBrokerとなる内部クラス
private class InnerBroker : IBroker<T>
{
private readonly List<IObserver<T>> observers = new();
public IDisposable Subscribe(IObserver<T> observer)
{
this.observers.Add(observer);
// Dispose時の処理は解説範囲外なので省略
return Disposable.Empty;
}
public void Publish(T value)
{
// 購読依頼を受けていたObserver達に発行を知らせる
this.observers.ForEach(x => x.OnNext(value));
}
}
// Subject利用クラスを購読するObserverとなる内部クラス
private class InnerObserver : IObserver<T>
{
private readonly IObservable<T> broker;
// コンストラクタ
public InnerObserver(IBroker<T> broker) => this.broker = broker;
public void OnNext(T value)
{
// 内部Observerの通知受信時の処理は、内部Brokerに発行を伝えること
this.broker.Publish(value);
}
}
}
// IBrokerは IObservable + Publish()
public interface IBroker<T> : IObservable<T> {
void Publish(T value);
}
Subject
は内部に Broker
(つまり Observable
)を抱えているのですね。
そして、内部に Observer
も抱えています。
IObservable<T> someObservable => this.someSubject
の形で登場する Observable
は、 Subjcet
の内部 Observable
のラッパーということになります。
実際には内部クラスである必要もないので、 Subjcet
が IObservable
と IObserver
を実装することになりますが、整理するときは内部クラスに分けて考えた方がわかりやすいと思います。
Observable.Subscribe(Observer)
は誰が誰に命令をしている?
Pub/Subパターンで考えると Observer = Subscriber
なので、やっぱり購読するのはObserverなのだと思います。
Observable
は中継役なので、こんな整理が良さそうです。
- 誰が -
Observable
利用クラスが - 誰に -
Observable
に - 何を - 購読手続きを行って
Observer
が購読できるようにしてと
命令している。
Subject.OnNext()
は誰が誰に何を命令している?
自作の Subject
からメッセージを通知させるときには Subject.OnNext()
を呼び出します。
これが、「 Observer.OnNext()
は『次の通知が来たから処理して』という命令であるのに対し、 Subject.OnNext()
はどういう意図の命名なのだろうか」と疑問でした。
これには以下のように処理を行っていると考えれば辻褄を合わせられそうです。
-
Subject
利用クラスからSubject
のObserver
ラッパーのOnNext()
を呼び出して発行通知を行う。 -
Subject
のObserver
ラッパーは、自身内部のObserver
のOnNext()
を呼び出して発行通知をスルー。 - 内部
Observer
は通知受信時の処理を行う。それはSubject
の内部Broker
にPublish()
を命令することである。 - 内部
Broker
は事前に購読依頼を受けていたObserver
達のOnNext()
を呼び出して発行通知を行う。
Subject.OnNext()
と考えるのではなく、「 Subject
の内部 Observer
が Subject
利用クラスを観測しているので、然るべき時が来たら利用クラスから Observer.OnNext()
を呼び出して発行通知を行う」と捉えます。
つまりこういうこと。
- 誰が -
Subscribe
利用クラスが - 誰に -
Subject
内部のObserver
に - 何を - 発行通知受信時の処理を行うようにと
命令している。
改めて整理
ここまでの話を踏まえ、購読から通知までの処理の流れを整理します。
// 1. Subjectを生成。
private readonly Subject<Unit> someSubject = new Subject<Unit>();
// 2. Subject生成時にSubjectの内部Observerは利用クラスの観測を開始したと見なす。
// こんなイメージ。
private IObserver<Unit> SomeObserver => this.someSubject;
// 3. Subjectの内部Observableを公開してこれを中継役とする。
public IObservable<Unit> SomeObservable => this.someSubject;
private void SubscribeEvent()
{
// 4. 3.のObservableに購読手続きを依頼。実際に購読するObserverを引数で渡す。
var observer = Observer.Create<Unit>(onNext: _ => DoSomethingOnNext());
this.SomeObservable.Subscribe(observer);
}
private void FireEvent()
{
// 5. イベント発火タイミングが来たので、当クラスを監視していることになっているObserverに
// 通知受信時の処理を行うように命令する。
this.SomeObserver.OnNext(Unit.Default);
// 6. 5.の命令を受けてSomeObserverはSomeObservable.Publish()を実行し、発行を依頼する。
// 7. 6.の命令を受けてSomeObservableは、4.で登録されていたObserverのOnNext()を呼び出し、
// 通知受信時の処理を行うように命令する。
}
-
Observer
は観測する人。 -
Observable
は観測可能な状態を作る中継役。 -
Subscribe()
はObservable
利用者からObservable
への購読手続きの依頼。購読するのはObserver
。 -
OnNext()
はObserver
に対しての、「次の号が発行されたから、ふさわしい処理を行ってね」という通知。
各クラスの役割や、メソッドの主語が明確になって理解しやすくなりました。
これはあくまで一つの解釈の仕方ですが、Rxの理解で混乱している方のヒントとなれば幸いです。
大阪で小規模なITエンジニアリングを行っています。 技術領域: C言語, C#, SQL, Windowsアプリ, Unity etc. YouTube配信始めました! youtube.com/@lilytech-lab
Discussion