【C#】ものすごくかんたんなReactiveExtensions入門

公開:2020/09/28
更新:2020/09/28
25 min読了の目安(約15500字TECH技術記事

はじめに

ReactiveExtensions(以下Rx)は使い出すと本当に便利で、これなしでは生きていけないレベルなのですが、何やら敷居が高くて導入しづらいという意見が散見されます。

たしかに、Rxのすべてを完全に理解するのは敷居が高いと思います。
僕自身も、例えばSchedulerに関してはまったく理解していません。

しかし、Rxの導入部分を少し理解して使ってみるだけで、Rxの便利さは十分実感できますし、その利点も十分すぎるほど享受することができます。

Rxを使うと何が便利なの?

Rxを導入すると何ができるのかと言うと、「クラス間の値のやり取りが非常に簡潔、柔軟、かつ安全にできるようになります」。

例えば、「データをまとめて管理するDataServerクラス」と、そのデータを表示するDataViewクラスがあったとします。
DataViewクラスは、DataServerクラスの内部データをそのまま表示したいため、DataServerクラスのデータに変更があったら、その変更値を受け取りたいです。

//データを格納する
class DataServer 
{
    public int Data1 { get; set; }
    public int Data2 { get; set; }    
}

//サーバーのデータを表示する
class DataView
{
    public void DataChanged(int data1, int data2)
    {
        //表示内容を更新する処理
    }
}

このとき、DataViewはどのようにデータの更新を知るのでしょうか?
DataServerはどのようにデータが更新されたことを伝えればよいのでしょうか?

こうですか?

class DataServer 
{
    private DataView m_dataView;

    private int _data1;
    public int Data1 { 
        get => _data1;
        set {
            _data1 = value;
            m_dataView.DataChanged(this.Data1, this.Data2);
        }
    }

    private int _data2;
    public int Data2 {
        get => _data2;
        set {
            _data2 = value;
            m_dataView.DataChanged(this.Data1, this.Data2);
        }
    }   
}

class DataView
{
    public void DataChanged(int data1, int data2)
    {
        //表示内容を更新する処理
    }
}

こうすれば確かにDataServerのデータ更新通知を的確に伝えられるし、DataViewはデータの更新通知を受け取れます。

では、データの更新通知が欲しい人が増えたときは?

class DataServer 
{
    private DataView m_dataView;
    private DataView2 m_dataView2;

    private int _data1;
    public int Data1 { 
        get => _data1;
        set {
            _data1 = value;
            m_dataView.DataChanged(this.Data1, this.Data2);
            m_dataView2.DataChanged(this.Data1, this.Data2);
        }
    }

    private int _data2;
    public int Data2 {
        get => _data2;
        set {
            _data2 = value;
            m_dataView.DataChanged(this.Data1, this.Data2);
            m_dataView2.DataChanged(this.Data1, this.Data2);
        }
    }   
}

class DataView
{
    public void DataChanged(int data1, int data2)
    {
        //表示内容を更新する処理
    }
}
class DataView2
{
    public void DataChanged(int data1, int data2)
    {
        //表示内容を更新する処理
    }
}

もっと増えたら?

データ更新通知が欲しい人が増えるたびに、DataServer側で毎回値の通知先を増やす必要があります。
単純な例なので、データ更新通知先が増えたところで大したことはないと思うかもしれませんが、複雑になってくると「いつの間にかデータの通知先が増えていたけど、通知する処理を入れ忘れてバグになった!」ということが起きやすくなります。

いやいや、それイベントで良くない?

イベントを使えば解決だよね?


class DataServer 
{
    public event Action<int> Data1Changed;
    public event Action<int> Data2Changed;

    private int _data1;
    public int Data1 { 
        get => _data1;
        set {
            _data1 = value;
            Data1Changed?.Invoke(value);
        }
    }

    private int _data2;
    public int Data2 {
        get => _data2;
        set {
            _data2 = value;
            Data2Changed?.Invoke(value);
        }
    }   
}

class DataView
{
    public DataView(DataServer server)
    {
        server.Data1Changed += this.Data1Changed;
        server.Data2Changed += this.Data2Changed;
    }

    private void Data1Changed(int data1)
    {
        //表示内容を更新する処理
    }
    private void Data2Changed(int data2)
    {
        //表示内容を更新する処理
    }
}

こうすればいくら通知先が増えたところで、自分で値を取りに行ってくれるから、DataServer側は変更いらないし、何も問題ないよね?

そのとおりです。

しかし、Rxを使えばもっと便利になります。
なぜなら、Rxはイベントの完全上位互換だからです。

Rxはイベントの完全上位互換

Rxを使うと、イベントに比べて次のような利点があります。

  • 発行された値を柔軟に加工して、ほしい値を取得できる
  • イベントの購読と解除がとても柔軟にできる
  • イベントをオブジェクトとして扱える
  • イベントの合成とかもできる
  • エラー処理も簡単
  • 表記が直感的でわかりやすい

多分他にももっと色々あると思いますが、ここに列挙してもあまり意味がないのでこれくらいにしておきます。
とにかく!Rxはものすごく便利で、使わない手はもはやありません!
Rxがあればイベントを使う意味はほとんどないでしょう。

Rxが便利ということは分かっていただけましたか?
分かっても実感はしてないと思いますので次項から使い方と使用例を見ていきます。

事前知識

その前に最低限必要な知識を整理しておきます。

Rxは オブザーバーパターン が基礎になっています。

実際にRxを使う上では、オブザーバーパターンのことを意識する必要はあまりないのですが、どうしてもRxを使う上で必要となる次の3つのクラスとインターフェイスについて簡単に説明していきます。

  • IObserver<T>インターフェイス
  • IObservable<T>インターフェイス
  • Subject<T>クラス

IObserver<T>インターフェイス

Observerは、「観察者」とか「観測者」といった意味の単語です。
つまり、IObserver<T>インターフェイスは「値を受け取る側」が実装するインターフェイスですね。

定義は次のようになっています。


public interface IObserver<T>
{
    /// <summary>
    /// 値を通知する
    /// </summary>
    /// <param name="value"></param>
    void OnNext(T value);    
    /// <summary>
    /// 例外が発生したことを通知する
    /// </summary>
    /// <param name="e"></param>
    void OnError(Exception e);
    /// <summary>
    /// 値の発行がすべて完了したことを通知する
    /// </summary>
    void OnCompleted();
}

値を通知したいときは、IObserverさんのOnNextの引数に値をセットして呼んであげれば良いわけですね。
あとはエラーの発生や、値の発行完了通知などもOnError, OnCompletedなどで受け取ることができるようになっています。

IObservable<T>インターフェイス

IObservable<T>インターフェイスは「観察される人」[1]を表します。
つまり、値を発行する側が実装するインターフェイスですね。

ちょうど、IObserver<T>IObservable<T>は対のような関係になっています。

定義は次のようになっています。


public interface IObservable<T>
{
    /// <summary>
    /// 値を購読する
    /// </summary>
    /// <param name="observer">値の発行先</param>
    /// <returns></returns>
    IDisposable Subscribe(IObserver<T> observer);    
}

値の発行を購読したいときは、Subscribeメソッドを呼び出して、引数に発行先(IObserverさん)を指定すれば良いですね。

オブザーバーパターンをそのまま適用するときは、このように値を受け取る側(IObserver<T>側)が、値を発行する側(IObservable<T>側)に直接自分自身のインスンタンスを教えて、そこに値を流し込んでもらう感じになります。

しかし、この方式だと次のような問題があります。

  • 複数の種類の値を発行・購読できない
  • 受け取り側のインスンタンスを直接渡す違和感

これを解決するために、Rxには次に紹介するSubject<T>クラスが用意されています。

Subject<T>クラス

Subject<T>クラスは、 IObservable<T>インターフェイスとIObserver<T>インターフェイスを両方実装したクラス です。

つまり、Subject<T>は、値の監視者でもありながら、監視される人でもあるという変わったクラスで、ちょうど 値の発行側と受け取り側の仲介役 のような役割を果たします。

この仲介役がいるおかげで、値の発行側と受け取り側で互いを直接知ることを防ぎ、また仲介役を複数用意することで、何種類でも好きなだけ値を発行することができるようになります。

以上IObserver<T>インターフェイス, IObservable<T>インターフェイス, Subject<T>クラスについて理解したところで、実際にRxをの使い方を説明します。

Rxの使い方

前述したとおり、内部の仕組みやオブザーバーパターンなどを理解していなくても、Rxは簡単に使用することができます。
では、実際にどのように使用するかを、序盤で使用したDataServerクラスとDataViewクラスになぞらえて説明していきます。

準備:Rxを使う環境を用意する

Rxを使う前に、Rxを使えるように開発環境を用意する必要があります。
とはいっても、大したことは有りません。

Windowsアプリケーション開発の場合

VisualStudioによるWindowsアプリケーション開発の場合、VisualStudioのNuGetを使ってSystem.Reactive.Linqをインストールすれば完了です。

Unity開発の場合

Unity開発の場合はAssetStoreからUniRxをインストールすれば完了です。

値を発行する方法

Rxを使う準備が完了したら、いよいよRxを使っていきます。

まずは、値を発行する方法を紹介します。
値を発行する側のクラスが値を発行するには、次のようにします。

  1. 仲介役となるSubject<T>インスタンスをprivateフィールドで内部に持ちます
  2. そのSubject<T>IObservable<T>で公開します
  3. 値を発行するときはSubject<T>OnNextします

具体的には以下のようなコードになります。


class DataServer 
{
    //1. 仲介役となる`Subject<T>`インスタンスをprivateフィールドで内部に持ちます
    private Subject<int> m_data1Subject = new Subject<int>();
    private Subject<int> m_data2Subject = new Subject<int>();

    //2. その`Subject<T>`を`IObservable<T>`で公開します
    public IObservable<int> ObservableData1Changed => m_data1Subject.AsObservable();
    public IObservable<int> ObservableData2Changed => m_data2Subject.AsObservable();

    private int _data1;
    public int Data1 { 
        get => _data1;
        set {
            _data1 = value;
            //3. 値を発行するときは`Subject<T>`に`OnNext`します
            m_data1Subject.OnNext(value);
        }
    }

    private int _data2;
    public int Data2 {
        get => _data2;
        set {
            _data2 = value;
            //3. 値を発行するときは`Subject<T>`に`OnNext`します
            m_data2Subject.OnNext(value);
        }
    }   
}

AsObservable()は別になくても動くのですが、付けておくとSubject<T>にキャストされるのを防ぐことができます。

値を受け取る方法

値を受け取るには、公開されたIObservable<T>Subscribeするだけです。

class DataView
{
    public DataView(DataServer server)
    {
        //公開されたIObservableを購読
        server.ObservableData1Changed.Subscribe(x => Data1Changed(x));
        server.ObservableData2Changed.Subscribe(x => Data2Changed(x));
    }

    private void Data1Changed(int data1)
    {
        //表示内容を更新する処理
    }
    private void Data2Changed(int data2)
    {
        //表示内容を更新する処理
    }
}

ここで注目すべきは、Subscribeの引数にラムダ式が入っている点です。
本来、ここにはIObserver<T>(値を受け取るクラスのインスタンス)が入るはずでした。

もちろん、DataViewクラスが受け取る本人なので、次のようにDataViewIObserver<T>を実装し、

class DataServer : IObserver<int>
{
    //..省略.. 

    public void OnNext(int value)
    {

    }

    public void OnError(Exception error)
    {

    }

    public void OnCompleted()
    {
        
    }
}

自分自身をSubscribeで渡しても構わないのですが、

server.ObservableData1Changed.Subscribe(this);
server.ObservableData2Changed.Subscribe(this);

こうすると購読したすべての値がひとつのOnNextメソッドに流れてきてしまい、使い勝手が悪いです。

そこで、Rxでは、OnNextしたときの処理をラムダ式で直接指定できるようなSubscribeのオーバーロードが用意されています。

//OnNextだけ登録する
public IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext);
//OnNextとOnErrorだけ登録する
public IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError);
//OnNextとOnCompletedだけ登録する
public IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted);
//OnNextとOnErrorとOnCompletedをすべて登録する
public IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted);

これらのオーバーロードを呼び出すと、内部でIObserver<T>を実装したクラスを生成して、代わりに値を受け取ったときに登録したアクションを実行してくれるようになります。

このおかげで、

server.ObservableData1Changed.Subscribe(x => Data1Changed(x));
server.ObservableData2Changed.Subscribe(x => Data2Changed(x));

このように値を受け取ったときに実行する処理を直接インラインで書くことができ、見た目的にも美しく、直感的な記述が可能となるわけですね。

OnErrorOnCompletedの通知も受け取りたい場合は、他のオーバーロードを使用して、

server.ObservableData1Changed.Subscribe(x => Data1Changed(x),
					e => Console.WriteLine(e.Message),
					() => Console.WriteLine("OnCompleted"));

などのように記述することができます。

データを加工する

ここからがRxの本領発揮です。
今までの「値の発行・受け取り」は、イベントでも全く同じことができます。
しかし、Rxを使うと、 受け取ったデータを自分の都合の良いように自由に加工することができます

具体的には、コレクションでおなじみのLINQのオペレータをそのまま使うことができるのです。

Where:値をフィルタリングする

Whereオペレータを使って、発行された値をフィルタリングして、ほしい値だけ購読できます。


//発行された値から、偶数のものだけフィルタリングして購読する
server.ObservableData1Changed
      .Where(x => x % 2 == 0) //偶数のものだけに絞り込む
      .Subscribe(x => Data1Changed(x));
//発行された値から、6以上のものだけフィルタリングして購読する
server.ObservableData2Changed
      .Where(x => x > 5) //6以上のものだけに絞り込む
      .Subscribe(x => Data2Changed(x));

これをイベントでやろうと思ったら、、、

server.Data1Changed += x => 
{
    if(x % 2 == 0)
    {
        Data1Changed(x)
    }
};

こんな感じになってスマートじゃないです。見づらいです。
RxならWhereオペレータ挟むだけで実現できます。

Select:値を変換する

Selectオペレータを使って、発行された値を変換することができます。


//発行された値を表示形式に変換して購読する
server.ObservableData1Changed
      .Select(x => $"Data1が更新されました!:{x}") //表示する形式に変換する
      .Subscribe(str => Data1View(str));
server.ObservableData2Changed
      .Select(x => $"Data2が更新されました!:{x}") //表示する形式に変換する
      .Subscribe(str => Data2View(str));

これもイベントで同じことをしようとしたら、、、

server.Data1Changed += x => 
{
    Data1View($"Data1が更新されました!:{x}");
};

まぁこれは大したことなかったですね。笑
でもSelectオペレータでデータを加工して次に流すという流れのほうが、直感的に理解しやすいかと思います。

DistinctUntilChanged:値が変化したときだけ次に流す

DistinctUntilChangedオペレータを使うと、前回に流れてきた値と異なる値が来たときだけ、次に流すといったことができます。

//Data1の値が変化したときだけ購読する
server.ObservableData1Changed
      .DistinctUntilChanged() //値が変化したときだけ次に流す
      .Subscribe(x => Data1Changed(x));

これはイベントで同じことをしようとすると大変です。


int _beforeValue;
bool _isFirst = true;
server.Data1Changed += x =>
{
    if(_isFirst || x != _beforeValue)
    {
        Data1Changed(x);
    }
    _isFirst = false;
    _beforeValue = x;
}

こんなふうに外部変数とか使わなきゃ実現できません。
これがRxのDistinctUntilChangedオペレータを使うと、ただ挟むだけで簡単に実現可能です。
DistinctUntilChangedオペレータの内部に、このような汚い部分を隠蔽してくれているからですね。

Merge:合成する

Mergeオペレータを使うと、複数のIObservable<T>を合成して、一つのIObservable<T>として扱うことができます。

//Data1とData2の変更通知を合成して購読する
server.ObservableData1Changed
      .Merge(server.ObservableData2Changed)
      .Subscribe(x => AnyDataChanged(x));

これをイベントでやるならこんな感じですかね。

server.Data1Changed += AnyDataChanged;
server.Data2Changed += AnyDataChanged;

ただ合成するだけなら思ったより簡潔でしたが、もっといろいろなオペレータを組み合わせて合成するなど、複雑なこともRxなら簡単に実現できてしまいます。

例えば、次のような感じ。

//Data1と、Data2の偶数部分だけの変更通知を合成して購読する
server.ObservableData1Changed
      .Merge(server.ObservableData2Changed.Where(x => x % 2 == 0))
      .Subscribe(x => AnyDataChanged(x));

こんなこともRxなら簡単・簡潔ですが、イベントだと一気に面倒になることが容易に想像つくでしょう。

他にもいろんなオペレータ

他にもいろいろなオペレータが用意されています。
ここでは紹介しきれないので次のような記事を参照ください。
https://qiita.com/toRisouP/items/3cf1c9be3c37e7609a2f
この記事で紹介されているのはUniRxという、Unityに特化したRxライブラリのオペレータなので、若干違うところもあるかもしれませんが、基本的にはほぼ同じです。

また、LINQオペレータがIEnumerable<T>を受け取ってIEnumerable<T>を返すように、RxオペレータもIObservable<T>を受け取ってIObservable<T>を返すようになっているので、ほぼすべてのオペレータはメソッドチェーンのようにつなげて組み合わせることができますよ。

いろいろなIObservableファクトリメソッド

今までは、自作クラスの内部にSubject<T>を持って値を発行したいタイミングでOnNextを行うように、「値の発行側」を自前で作っていました。
しかしRxには、便利な「値の発行側」があらかじめ用意されています。これらは、IObservableを生成することから、「ファクトリメソッド」と呼ばれています。

ここでは、Rxに搭載されているいくつかのファクトリメソッドのうち、個人的にもっともよく使用するObservable.Intervalファクトリメソッドを紹介します。

Observable.Intervalファクトリメソッド

Observable.Intervalファクトリメソッドは、一定時間間隔で値を発行するファクトリメソッドです。
引数にTimeSpan構造体を入れるだけで、その指定した時間間隔で値を発行してくれるIObservableが手に入ります。
発行される値は、0から順にインクリメントされたものになります。

次のように使用します。

Observable.Interval(TimeSpan.FromSeconds(1)) //1秒間隔で値を発行
          .Take(5)                           //値を5回受け取る
          .Timestamp()                       //タイムスタンプを付加
          .Subscribe(t => Console.WriteLine($"Value:{t.Value} Time:{t.Timestamp}"),
                     () => Console.WriteLine("OnCompleted"));

このコードを順に追って説明します。

  1. Observable.Interval(TimeSpan.FromSeconds(1))で、1秒間隔で0,1,2,...と値を順番に発行するIObservableを作り出します。
  2. Take(5)で発行された値を5個だけ受け取り、その後OnCompletedするように設定します。
  3. Timestamp()で受け取ったときのタイムスタンプを付加します。[2]
  4. 流れてきた値を購読して出力します。

出力結果は次のようになります。

Value:0 Time:2020/09/28 13:27:06 +00:00
Value:1 Time:2020/09/28 13:27:07 +00:00
Value:2 Time:2020/09/28 13:27:08 +00:00
Value:3 Time:2020/09/28 13:27:09 +00:00
Value:4 Time:2020/09/28 13:27:10 +00:00
OnCompleted

このように、1秒間隔で値が発行されていることがわかります。

Observable.Intervalファクトリメソッドを使うと、このように簡単に一定時間間隔で値が発行されるIObservableを作り出すことができるので、一定周期で一位の処理を行いたい場合などに非常に便利です。

他にもいろいろなファクトリメソッド

RxにはObservable.Intervalの他にも多数のファクトリメソッドが用意されています。
例えば、使い方が複雑なのでここでは紹介しませんが、イベントからIObservableを生成できるObservable.FromEventなどはよく使用しますね。

気になる方は「observable ファクトリメソッド」などで検索すると簡単に情報がヒットするので調べてみてください。

おさらい

長くなってしまったので、最後におさらいします。

値を発行する方法

【方法1】自分で発行する

  1. Subject<T>インスタンスをprivateフィールドで内部にもつ
  2. そのSubject<T>IObservable<T>で公開
  3. 値を発行するときはSubject<T>OnNext

【方法2】ファクトリメソッドを使用する
Observableクラスの静的メソッドとしていくつかのIObservableファクトリメソッドが用意されている。

値を受け取る方法

  1. 発行元から公開されたIObservable<T>Subscribeする
  2. 必要に応じてLINQメソッドを挟んで値を加工することもできる

最後に

この記事ではReactiveExtensionの基礎部分を簡単に紹介しました。

基礎部分だけなら、学習コストの割にメリットが非常に大きいと思いますので、皆さんも是非導入を検討してみてください。

脚注
  1. -ableだから「観察できる」では?と思うかもしれませんが、-ableは「受身」の意味で使われることもあるようです。 ↩︎

  2. Timestamp()を通過すると、流れてきた値はTimestamped<T>という構造体にラップされます。この構造体はT型のValueプロパティとDateTimeOffset型のTimestampプロパティを持ち、それぞれラップされた値とタイムスタンプを取得できるようになっています。 ↩︎