📡

AsyncStream を複数購読したいときの解決策

に公開

🙌 はじめに

どうも tattsun です。

本記事では、AsyncStream でマルチブロードキャストする方法 についてまとめています!

AsyncStream は単一の購読しか行えず、複数箇所で購読するには Combine などを利用する必要がありました。Concurrency に移行を進める中で可能であれば、Combine は利用したくないので AsyncSream を使って解決できないかと考えたのが今回の記事になります。

📢 お知らせ(余談)

「クラシルリワード」の名称が 「レシチャレ」 に変わりました 🎉
これからより一層、ユーザー価値を届けていけるように開発に取り組んでいこうと思います!

https://prtimes.jp/main/html/rd/p/000000413.000019382.html

✨ 実際のコード

はじめに実際に完成したコードを添付しておきます。

import Foundation

/// AsyncStream を複数の対象から監視するためのヘルパー
public actor AsyncBroadcastStream<Element: Sendable> {
    public typealias Stream = AsyncStream<Element>
    public typealias Continuation = Stream.Continuation

    private var continuations: [UUID: Continuation]

    public init(continuations: [UUID: Continuation] = [:]) {
        self.continuations = continuations
    }

    deinit {
        // 念のため、所有者解放時も全購読者を終了
        for continuation in continuations.values {
            continuation.finish()
        }

        continuations.removeAll()
    }
}

// MARK: - Public Function

extension AsyncBroadcastStream {
    public func stream() -> AsyncStream<Element> {
        let uuid = UUID()
        return AsyncStream { continuation in
            continuations[uuid] = continuation

            continuation.onTermination = { [weak self] _ in
                Task { [weak self] in
                    await self?.removeContinuation(for: uuid)
                }
            }
        }
    }

    public func yield(_ value: Element) {
        for continuation in continuations.values {
            continuation.yield(value)
        }
    }

    public func finish() {
        for continuation in continuations.values {
            continuation.finish()
        }

        continuations.removeAll()
    }
}

// MARK: - Private Function

extension AsyncBroadcastStream {
    private func removeContinuation(for uuid: UUID) {
        continuations[uuid] = nil
    }
}

🐛 AsyncStream の課題

単一の AsyncStream の値を複数で購読すると値を奪い合ってしまいます。A, B と複数の対象から同じ AsyncStream を購読すると以下のように A/B で受け取る値が分散してしまいます。

AsyncStream Demo

動作確認用の簡易実装
import _Concurrency
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true

// MARK: - サンプル: AsyncStream を2つの購読者で読んでみる

/// 0..<20 を高速で流す AsyncStream を作る
func makeNumberStream() -> AsyncStream<Int> {
    AsyncStream { continuation in
        // 別タスクで値を発行
        Task {
            for i in 0..<20 {
                continuation.yield(i)
                // 見やすいように少しだけ待つ
                try? await Task.sleep(nanoseconds: 50_000_000) // 0.05 秒
            }
            continuation.finish()
        }
    }
}

func demoMultiConsumer() {
    let stream = makeNumberStream()

    // 同じ AsyncStream インスタンスを 2 つの Task から for-await する
    let task1 = Task {
        var received: [Int] = []
        for await value in stream {
            print("[A] received \(value)")
            received.append(value)
        }
        print("[A] finished. values =", received.sorted())
        return received
    }

    let task2 = Task {
        var received: [Int] = []
        for await value in stream {
            print("[B] received \(value)")
            received.append(value)
        }
        print("[B] finished. values =", received.sorted())
        return received
    }

    Task {
        let (valuesA, valuesB) = await (task1.value, task2.value)

        let setA = Set(valuesA)
        let setB = Set(valuesB)
        let union = setA.union(setB)

        print("----------- summary -----------")
        print("A count:", setA.count, "B count:", setB.count, "union:", union.count)
        print("A ∩ B is empty? ->", setA.isDisjoint(with: setB))
        print("union == 0..<20 ? ->", union == Set(0..<20))
        print("--------------------------------")
    }
}

demoMultiConsumer()

https://developer.apple.com/documentation/swift/asyncstream?utm_source=chatgpt.com

👀 AsyncBroadcastStream の動作を見る

AsyncBroadcastStream では、複数で購読しても値を奪い合わずに A, B の両方で全ての値を受け取れています。また、途中で B をキャンセルしても A はキャンセルされずに最後まで値を受け取れています。

AsyncBroadcastStream Demo

動作確認用の簡易実装
import Foundation
import PlaygroundSupport
import _Concurrency

PlaygroundPage.current.needsIndefiniteExecution = true

// MARK: - AsyncBroadcastStream の実装

@available(iOS 13.0, macOS 10.15, *)
public actor AsyncBroadcastStream<Element: Sendable> {
    public typealias Stream = AsyncStream<Element>
    public typealias Continuation = Stream.Continuation

    private var continuations: [UUID: Continuation]

    public init() {
        self.continuations = [:]
    }

    public func stream() -> AsyncStream<Element> {
        let id = UUID()
        return AsyncStream { continuation in
            continuations[id] = continuation

            continuation.onTermination = { [weak self] _ in
                Task { [weak self] in
                    await self?.removeContinuation(id)
                }
            }
        }
    }

    public func yield(_ value: Element) {
        for c in continuations.values {
            c.yield(value)
        }
    }

    public func finish() {
        for c in continuations.values {
            c.finish()
        }
        continuations.removeAll()
    }

    private func removeContinuation(_ id: UUID) {
        continuations[id] = nil
    }
}

// MARK: - Playground Demo

func demo() {
    let broadcaster = AsyncBroadcastStream<Int>()

    // 購読者 A
    let taskA = Task {
        print("A START")
        let stream = await broadcaster.stream()
        var received: [Int] = []
        for await value in stream {
            print("[A] recv:", value)
            received.append(value)
        }
        print("A END:", received.sorted())
        return received
    }

    // 購読者 B
    let taskB = Task {
        print("B START")
        let stream = await broadcaster.stream()
        var received: [Int] = []
        for await value in stream {
            print("[B] recv:", value)
            received.append(value)
        }
        print("B END:", received.sorted())
        return received
    }

    // 発行側
    Task {
        for i in 1...20 {
            await broadcaster.yield(i)
            try? await Task.sleep(nanoseconds: 100_000_000)  // 0.1秒

            if i == 10 {
                print(">>> CANCEL B (after receiving 1〜10)")
                taskB.cancel()
            }
        }

        print(">>> FINISH BROADCAST")
        await broadcaster.finish()

        let a = await taskA.value
        let b = await taskB.value

        print("===== SUMMARY =====")
        print("A:", a.sorted())
        print("B:", b.sorted())
        print("====================")
    }
}

demo()

🤐 おわりに

AsyncStream が抱える複数の購読で値を奪い合ってしまう問題を AsyncStream だけで解消することが出来ました!大規模なアプリでは、複数の画面などで値を共有したいケースはたくさんあると思うので実装に悩んでいる方の参考になれたら幸いです。

Kurashiru Tech Blog

Discussion