AsyncStream で multi broadcast
参考:
- https://qiita.com/takehilo/items/a25839256f8f6d9a94b2
- https://forums.swift.org/t/consuming-an-asyncstream-from-multiple-tasks/54453
- https://forums.swift.org/t/asyncsequence-stream-version-of-passthroughsubject-or-currentvaluesubject/60395
- https://github.com/apple/swift-async-algorithms/blob/main/Evolution/0016-share.md
- https://github.com/apple/swift-async-algorithms/blob/main/Evolution/0016-mutli-producer-single-consumer-channel.md
AsyncStream は for await sent in stream { ... } といった感じで書けるが、multi broadcast には対応していない。つまり、stream, consumer1, consumer2 がある場合、stream.send(value) によって値を consumer1, consumer2 で送信した場合に、consumer1 か consumer2 のどちらかでしか value を受け取ることができず奪い合いになる。また、AsyncStream は Consumer 側が受け取って処理が終わったかどうかは関係なく次の値を送信してしまうため、Consumer 側の処理が追いつかないために値が失われてしまう可能性がある。これを Back pressure と呼ぶ。
Multi broadcast に対応していないことを確認する実験コードは AsyncStreamを使った実装 の2番目を参照。
Multi broadcast 対応案 1 : AsyncStream + CurrentValueSubject
(Swift 6 以降では、non-sendable な型である cancellable が Sendable なクロージャである onTermination 内でキャプチャされているせいでコンパイルエラーとなっており、今のところ良い手立てを考えられてないため、良い方法とは言えない -> https://zenn.dev/link/comments/d079f22a9259b5)
-
CurrentValueSubjectに broadcast したい値を保持させる -
CurrentValueSubjectを AsyncStream で subscribe し、値が流れてくるたびにcontinuation.yield(value) - 値の更新は、直接
AsyncStream.send(value)するのではなく、CurrentValueSubjectにsend(value)する -
AsyncStreamが終了するタイミングでCurrentValueSubjectの subscribe を終了。つまりonTermination { cancellable.cancel() }
AsyncStreamとCombineを組み合わせた実装を参考。
ただし、AsyncStream を使っているので、Back pressure の問題は解決されていない。解決したい場合は、AsyncChannel をベースにする。
しかし上記の https://zenn.dev/link/comments/9d95efe09f5352 で実装してしまうと、onTermination は Sendable なクロージャなため、 キャプチャリストの cancellable は Sendable に適合している必要があるが、AnyCancellable は Sendable に適合おらず、Swift 6 ではエラーとなる。なので、そのまま使うのはまずそう。

@preconcurrency import Combine とすれば、このエラーを黙らせることはできるが、Combine の AnyCancellable の実装は公開されておらず、Sendable Classes に書いてある class の型が Sendable に適合できる条件を満たしているか、あるいはスレッドセーフであるかどうかも確認する手立てがないため、これは危険な方法になると思うので良くなさそう。
AsyncChannel であれば、consumer 側である for await sent in stream { ... } 側の Task がキャンセルすれば、自動的に for await sent in stream { ... } が終了するはずなので、この方法であれば、https://zenn.dev/link/comments/d079f22a9259b5 のような問題は起こさずに済みそう。(ついでに back pressure の問題も解決できるし)
AsyncChannel の微妙なところはいくつかありそうだ...(整理したいが)
無理矢理 Combine ベースの実装を Swift Concurrency ベースな実装に当てはめるのではなく Combine のいいところと Swift Concurrency のいいところを使い分けていくのが現段階のベストなのかなぁ。
AsyncStream の multi consumer 周りのスレッド
- 効率的なブロードキャスト・アルゴリズムを実装するのは難しく、AsyncStreamの実装が著しく複雑になる。
- ルートとなる非同期シーケンス型が常にブロードキャストである場合、ブロードキャストされたシーケンスを必要としないシナリオでの使い勝手が制限される。
ブロードキャストを必要としないユースケースでの使い勝手ってどうゆうふうに制限される?
ざっくり読む感じでは、AsyncSequence にブロードキャストは必要であることに賛成だが、それをデフォルトで組み込むことには懸念がある感じ?
AsyncSequence を拡張して Combine にあるようなオペレータやオブジェクトを追加したパッケージ。
これらを使えば、一応 AsyncSequence でもブロードキャストをすることはできる。
Swift Forums の Kickoff of a new season of development for AsyncAlgorithms; Share が公開されてから、マルチキャストな AsyncSequence の議論が活発化した。
.share() というオペレータを追加した PR が swift-async-algorighms パッケージで main にマージにされたようだ。
どうゆうオペレータなのか、追加されたドキュメントと実装を確認してみる。
Evolution/0016-share.md にドキュメントがある。
swift-async-algorithms の新しいバージョン 1.1 の pre-realse tag が出た。
問題が報告されなければ、12月の第1週にリリースされるようだ。