📡
AsyncStream を複数購読したいときの解決策
🙌 はじめに
どうも tattsun です。
本記事では、AsyncStream でマルチブロードキャストする方法 についてまとめています!
AsyncStream は単一の購読しか行えず、複数箇所で購読するには Combine などを利用する必要がありました。Concurrency に移行を進める中で可能であれば、Combine は利用したくないので AsyncSream を使って解決できないかと考えたのが今回の記事になります。
📢 お知らせ(余談)
「クラシルリワード」の名称が 「レシチャレ」 に変わりました 🎉
これからより一層、ユーザー価値を届けていけるように開発に取り組んでいこうと思います!
✨ 実際のコード
はじめに実際に完成したコードを添付しておきます。
import Foundation
/// AsyncStream を複数の対象から監視するためのヘルパー
public actor AsyncBroadcastStream<Element: Sendable> {
public typealias Stream = AsyncStream<Element>
public typealias Continuation = Stream.Continuation
private var continuations: [UUID: Continuation]
public init(continuations: [UUID: Continuation] = [:]) {
self.continuations = continuations
}
deinit {
// 念のため、所有者解放時も全購読者を終了
for continuation in continuations.values {
continuation.finish()
}
continuations.removeAll()
}
}
// MARK: - Public Function
extension AsyncBroadcastStream {
public func stream() -> AsyncStream<Element> {
let uuid = UUID()
return AsyncStream { continuation in
continuations[uuid] = continuation
continuation.onTermination = { [weak self] _ in
Task { [weak self] in
await self?.removeContinuation(for: uuid)
}
}
}
}
public func yield(_ value: Element) {
for continuation in continuations.values {
continuation.yield(value)
}
}
public func finish() {
for continuation in continuations.values {
continuation.finish()
}
continuations.removeAll()
}
}
// MARK: - Private Function
extension AsyncBroadcastStream {
private func removeContinuation(for uuid: UUID) {
continuations[uuid] = nil
}
}
🐛 AsyncStream の課題
単一の AsyncStream の値を複数で購読すると値を奪い合ってしまいます。A, B と複数の対象から同じ AsyncStream を購読すると以下のように A/B で受け取る値が分散してしまいます。

動作確認用の簡易実装
import _Concurrency
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
// MARK: - サンプル: AsyncStream を2つの購読者で読んでみる
/// 0..<20 を高速で流す AsyncStream を作る
func makeNumberStream() -> AsyncStream<Int> {
AsyncStream { continuation in
// 別タスクで値を発行
Task {
for i in 0..<20 {
continuation.yield(i)
// 見やすいように少しだけ待つ
try? await Task.sleep(nanoseconds: 50_000_000) // 0.05 秒
}
continuation.finish()
}
}
}
func demoMultiConsumer() {
let stream = makeNumberStream()
// 同じ AsyncStream インスタンスを 2 つの Task から for-await する
let task1 = Task {
var received: [Int] = []
for await value in stream {
print("[A] received \(value)")
received.append(value)
}
print("[A] finished. values =", received.sorted())
return received
}
let task2 = Task {
var received: [Int] = []
for await value in stream {
print("[B] received \(value)")
received.append(value)
}
print("[B] finished. values =", received.sorted())
return received
}
Task {
let (valuesA, valuesB) = await (task1.value, task2.value)
let setA = Set(valuesA)
let setB = Set(valuesB)
let union = setA.union(setB)
print("----------- summary -----------")
print("A count:", setA.count, "B count:", setB.count, "union:", union.count)
print("A ∩ B is empty? ->", setA.isDisjoint(with: setB))
print("union == 0..<20 ? ->", union == Set(0..<20))
print("--------------------------------")
}
}
demoMultiConsumer()
👀 AsyncBroadcastStream の動作を見る
AsyncBroadcastStream では、複数で購読しても値を奪い合わずに A, B の両方で全ての値を受け取れています。また、途中で B をキャンセルしても A はキャンセルされずに最後まで値を受け取れています。

動作確認用の簡易実装
import Foundation
import PlaygroundSupport
import _Concurrency
PlaygroundPage.current.needsIndefiniteExecution = true
// MARK: - AsyncBroadcastStream の実装
@available(iOS 13.0, macOS 10.15, *)
public actor AsyncBroadcastStream<Element: Sendable> {
public typealias Stream = AsyncStream<Element>
public typealias Continuation = Stream.Continuation
private var continuations: [UUID: Continuation]
public init() {
self.continuations = [:]
}
public func stream() -> AsyncStream<Element> {
let id = UUID()
return AsyncStream { continuation in
continuations[id] = continuation
continuation.onTermination = { [weak self] _ in
Task { [weak self] in
await self?.removeContinuation(id)
}
}
}
}
public func yield(_ value: Element) {
for c in continuations.values {
c.yield(value)
}
}
public func finish() {
for c in continuations.values {
c.finish()
}
continuations.removeAll()
}
private func removeContinuation(_ id: UUID) {
continuations[id] = nil
}
}
// MARK: - Playground Demo
func demo() {
let broadcaster = AsyncBroadcastStream<Int>()
// 購読者 A
let taskA = Task {
print("A START")
let stream = await broadcaster.stream()
var received: [Int] = []
for await value in stream {
print("[A] recv:", value)
received.append(value)
}
print("A END:", received.sorted())
return received
}
// 購読者 B
let taskB = Task {
print("B START")
let stream = await broadcaster.stream()
var received: [Int] = []
for await value in stream {
print("[B] recv:", value)
received.append(value)
}
print("B END:", received.sorted())
return received
}
// 発行側
Task {
for i in 1...20 {
await broadcaster.yield(i)
try? await Task.sleep(nanoseconds: 100_000_000) // 0.1秒
if i == 10 {
print(">>> CANCEL B (after receiving 1〜10)")
taskB.cancel()
}
}
print(">>> FINISH BROADCAST")
await broadcaster.finish()
let a = await taskA.value
let b = await taskB.value
print("===== SUMMARY =====")
print("A:", a.sorted())
print("B:", b.sorted())
print("====================")
}
}
demo()
🤐 おわりに
AsyncStream が抱える複数の購読で値を奪い合ってしまう問題を AsyncStream だけで解消することが出来ました!大規模なアプリでは、複数の画面などで値を共有したいケースはたくさんあると思うので実装に悩んでいる方の参考になれたら幸いです。
Discussion