Open14

AsyncStream で multi broadcast

yamada-k-25yamada-k-25

AsyncStreamfor await sent in stream { ... } といった感じで書けるが、multi broadcast には対応していない。つまり、stream, consumer1, consumer2 がある場合、stream.send(value) によって値を consumer1, consumer2 で送信した場合に、consumer1consumer2 のどちらかでしか value を受け取ることができず奪い合いになる。また、AsyncStream は Consumer 側が受け取って処理が終わったかどうかは関係なく次の値を送信してしまうため、Consumer 側の処理が追いつかないために値が失われてしまう可能性がある。これを Back pressure と呼ぶ。

Multi broadcast に対応していないことを確認する実験コードは AsyncStreamを使った実装 の2番目を参照。

yamada-k-25yamada-k-25

Multi broadcast 対応案 1 : AsyncStream + CurrentValueSubject

(Swift 6 以降では、non-sendable な型である cancellable が Sendable なクロージャである onTermination 内でキャプチャされているせいでコンパイルエラーとなっており、今のところ良い手立てを考えられてないため、良い方法とは言えない -> https://zenn.dev/link/comments/d079f22a9259b5)

  1. CurrentValueSubject に broadcast したい値を保持させる
  2. CurrentValueSubject を AsyncStream で subscribe し、値が流れてくるたびに continuation.yield(value)
  3. 値の更新は、直接 AsyncStream.send(value) するのではなく、 CurrentValueSubjectsend(value) する
  4. AsyncStream が終了するタイミングで CurrentValueSubject の subscribe を終了。つまりonTermination { cancellable.cancel() }

AsyncStreamとCombineを組み合わせた実装を参考。

ただし、AsyncStream を使っているので、Back pressure の問題は解決されていない。解決したい場合は、AsyncChannel をベースにする。

yamada-k-25yamada-k-25

しかし上記の https://zenn.dev/link/comments/9d95efe09f5352 で実装してしまうと、onTerminationSendable なクロージャなため、 キャプチャリストの cancellableSendable に適合している必要があるが、AnyCancellableSendable に適合おらず、Swift 6 ではエラーとなる。なので、そのまま使うのはまずそう。

yamada-k-25yamada-k-25

@preconcurrency import Combine とすれば、このエラーを黙らせることはできるが、CombineAnyCancellable の実装は公開されておらず、Sendable Classes に書いてある class の型が Sendable に適合できる条件を満たしているか、あるいはスレッドセーフであるかどうかも確認する手立てがないため、これは危険な方法になると思うので良くなさそう。

yamada-k-25yamada-k-25

AsyncChannel であれば、consumer 側である for await sent in stream { ... } 側の Task がキャンセルすれば、自動的に for await sent in stream { ... } が終了するはずなので、この方法であれば、https://zenn.dev/link/comments/d079f22a9259b5 のような問題は起こさずに済みそう。(ついでに back pressure の問題も解決できるし)

yamada-k-25yamada-k-25

AsyncChannel の微妙なところはいくつかありそうだ...(整理したいが)

無理矢理 Combine ベースの実装を Swift Concurrency ベースな実装に当てはめるのではなく Combine のいいところと Swift Concurrency のいいところを使い分けていくのが現段階のベストなのかなぁ。

yamada-k-25yamada-k-25

https://forums.swift.org/t/se-0406-backpressure-support-for-asyncstream/66771/19

  • 効率的なブロードキャスト・アルゴリズムを実装するのは難しく、AsyncStreamの実装が著しく複雑になる。
  • ルートとなる非同期シーケンス型が常にブロードキャストである場合、ブロードキャストされたシーケンスを必要としないシナリオでの使い勝手が制限される。

ブロードキャストを必要としないユースケースでの使い勝手ってどうゆうふうに制限される?

ざっくり読む感じでは、AsyncSequence にブロードキャストは必要であることに賛成だが、それをデフォルトで組み込むことには懸念がある感じ?

yamada-k-25yamada-k-25

AsyncSequence を拡張して Combine にあるようなオペレータやオブジェクトを追加したパッケージ。

これらを使えば、一応 AsyncSequence でもブロードキャストをすることはできる。