🏞️

RxSwift 6.5のPublishSubjectのコードを見ながら、イベントが流れる仕組みについて触れてみる。

2022/07/12に公開

はじめに

数年前まで非同期プログラミングはRxSwift一強という時代でしたが、Swiftによって提供された async / await により非同期プログラミングは言語標準機能を利用する傾向に変わっているように思います。

実際に使用するかどうか知っていてメリットが有るか?などの点はさておき、Swiftアプリにおいて一時期非同期処理のデファクトスタンダードであった(と勝手に思い込んでいる)RxSwiftがどのようにイベントストリームを実装しているのかを読み解いていくことは技術的にオモシロイと思っています。
今回はその1例として、PublishSubjectを、生成・subscribe・イベントを流してDisposeするまでを、イベントの流れと実際のコードを見ながら辿ってみようと思います。

1. PublishSubject の処理の詳細を追ってみよう!

PublishSubjectはEventを任意に流せる川であり、subscribeすることによって、流れてきたイベントをキャッチすることができます。

1.1 まずはPublishSubjectを生成しよう

let intSubject = PublishSubject<Int>()

一旦上記のように、PublishSubjectを初期化しましょう。

initializerが呼び出され、PublishSubjectが初期化されます。

1.2 subscribeしてみよう

intSubject.subscribe(onNext: { value in
    print("intSubject.onNext Called: \(value)")
}, onError: { error in
    print("intSubject.onError Called: \(error)")
}, onCompleted: {
    print("intSubject.onCompleted Called.")
}, onDisposed: {
    print("intSubject.onDisposed Called.")
}).disposed(by: disposeBag)

PublishSubjectに流れてきたEventを受け取るため、PublishSubjectのsubscribe関数を呼びます。

ここでは大まかに、引数で受け取った各closureを1つの AnonymousObserver<Element>に包み直して自身のsubscribe関数を呼び出しています。

この subscribe関数は Observableクラスの宣言上はabstractとして実装されているため、各Subject側でoverrideされますが、PublishSubjectの場合はこのようになっています

そのまま synchronized_subscribe関数が呼び出され、自身のobservers引数のobserverを格納後SubscriptionDisposableを作って呼び出し側に返却します。

1.3 Eventを流そう!

intSubject.onNext(1)
intSubject.onNext(2)
intSubject.onNext(3)
intSubject.onNext(4)

ここまでで、PublishSubjectの初期化とSubscribeが完了しています。
ではいよいよ、Eventを流していきましょう!

PublishSubjectに対してonNextすると、この関数が呼び出されます。

その際synchronized_on関数を利用して自身が格納しているobserversのArrayが返却されます。
そのArrayと一緒に、関数に引き渡されたEventが dispatch関数へと引き渡され、そちらの関数内でclosureにイベントが渡され、各Eventが非同期に渡されていきます。

ここ以降の処理は subscribeしたコード側での責任になります。

2. 購読解除してみよう!

2.1 SubscriptionDisposablesとは

前述の通り、PublishSubjectを subscribeすると Disposableプロトコルに準拠したSubscriptionDisposableが返却されます
この構造体は、DisposeKeyOwnerを持っています。

2.2 DisposeBagに格納しよう!

通常Subscriptionの結果は下記コードのように、DisposeBagに格納します

intSubject.subscribe(onNext: { value in
    print("intSubject.onNext Called: \(value)")
}, onError: { error in
    print("intSubject.onError Called: \(error)")
}, onCompleted: {
    print("intSubject.onCompleted Called.")
}, onDisposed: {
    print("intSubject.onDisposed Called.")
}).disposed(by: disposeBag)

このとき、最後の disposed(by: disposeBag)の定義はこの様になっていて、DisposeBagのinsert関数が呼ばれるようになっています
その後さらに別のinsert関数が呼び出され、DisposeBagのdisposablesに引数のDisposablesがappendされます。

このとき isDisposedtrue の場合、disposablesに格納することはせず、そのまま引数のDisposableを呼び出し側に返却します。
その場合、そのまま引数のDisposableのdisposeが呼び出されます。
(Disposeが呼び出されたときの挙動については後述します)

2.3 Disposeしよう!

DisposeBagがdeinitされる際、自身のdispose関数を呼び出します。

DisposeBagのdispose関数はこの様になっており自身に登録されている Disposableの配列に対して個別のdispose関数を呼び出していきます

PublishSubjectに対してSubscribeした結果をDisposeBagに登録している場合SubscriptionDisposableのdispose関数が呼び出され、 ownerの synchronizedUnsubscription関数を呼び出します。

PublishSubjectの場合このように実装されていて、 synchronized_unsbscribe関数を最終的に呼び出します
ここでは、自身が持っている observersremoveKey関数を呼び出します。

このときObserversは AnyObserver<Element>.s のTypealiasなので、 removeKey関数の実体は Bag構造体に記述されているここになります

この関数はdictionaryか、pairsのどちらかから BagKeyに一致するEntryを削除して、呼び出し側に返却します。
このときこの関数は mutating がついているため自分自身が変更されます。

PublishSubject側での synchronized_unsubscribeでは、返却された結果を _ として捨てているため、ここまで完了した時点で closureが廃棄され、購読解除が完了します。

3. 終わりに

インターネット上の説明ではObservableをどう扱うか、またObservableとはどのようなものなのかを説明する記事は多くあるように思いますが、ObservableでどのようにEventが伝播してくるのかを説明した記事はあまり見つからなかったように思います。
もちろん、それらは素晴らしい記事ではありますが、RxSwiftのSwiftの部分について着目しないのは、なんだかもったないよなーという気持ちがありました
そのため、今回は、PublishSubjectを生成して、subscribeして、イベントを流してdisposeするまで、というPublishSubjectではよくあると思われるユースケースの際、RxSwiftがどのようにイベントを処理しているのかをコードベースで見てみました。

正直わかりやすいとは言えない説明になってしまったと思っていますが、コードの順番ではなく実際に起きている順番をイメージしながらコードへのリンクと説明を入り混じらせることで、コードではなくプログラム上で有機的に動くさまをイメージしてもらえるように心がけたつもりです。

Swiftの async / awaitで十分代用できるユースケースもある一方、コードもふるまいと実体がきちんと分けられており洗練されていて、それでいてかなりの高機能であり、さすが複数言語に移植されるだけはあるなと思わせる素晴らしいライブラリでした。

また、人生で初めてきちんと(?)読み込めたOSSのソースコードでしたので嬉しくて記事にしました。

4. 参考リンク

Discussion