Open5

AsyncStreamとは?

muranakarmuranakar

公式をDeepLで翻訳

AsyncSequenceはSequence型に似ており、一度に1つずつステップスルーできる値のリストを提供し、非同期性を追加します。AsyncSequenceは、最初に使用するときに、その値のすべて、一部、またはまったく利用できない場合があります。その代わりに、awaitを使用して、値が利用可能になったときに値を受け取ります。

Sequenceと同様に、通常はfor await-inループでAsyncSequenceを繰り返し処理します。ただし、呼び出し側は値を待つ可能性があるため、await キーワードを使用します。次の例では、1からhowHigh値までのInt値を生成するカスタムAsyncSequenceであるCounterを繰り返し処理する方法を示します:

for await number in Counter(howHigh: 10) {
    print(number, terminator: " ")
}
// Prints "1 2 3 4 5 6 7 8 9 10 "

AsyncSequence は、値を生成したり格納したりするのではなく、値へのアクセス方法を定義するだけです。値の型を Element と呼ばれる関連型として定義すると同時に、AsyncSequence は makeAsyncIterator() メソッドを定義します。これは AsyncIterator 型のインスタンスを返します。標準の IteratorProtocol と同様に、AsyncIteratorProtocol は、要素を生成するための 1 つの next() メソッドを定義します。違いは、AsyncIterator はその next() メソッドを async として定義していることで、呼び出し元は await キーワードで次の値を待機する必要があります。

AsyncSequence は、標準ライブラリの基本 Sequence が提供する操作をモデルとして、受け取った要素を処理するメソッドも定義します。メソッドには、単一の値を返すものと、別のAsyncSequenceを返すものの2つのカテゴリがあります。

単一の値を返すメソッドでは、for await-inループが不要になり、代わりに1回のawait呼び出しで済むようになります。例えば、contains(_:)メソッドは、与えられた値がAsyncSequence内に存在するかどうかを示すブール値を返します。前の例の Counter シーケンスがあれば、1 行の呼び出しでシーケンスメンバの存在をテストできます:

let found = await Counter(howHigh: 10).contains(5) // true

別の AsyncSequence を返すメソッドは、そのメソッドのセマンティクスに固有の型を返します。例えば、.map(:) メソッドは AsyncMapSequence(または、map(:) メソッドに提供するクロージャがエラーをスローできる場合は AsyncThrowingMapSequence)を返します。これらの返されたシーケンスは、シーケンスの次のメンバを待ち望むことはありません。一般的には、これらのシーケンスに対してfor await-inで繰り返し処理を行います。次の例では、map(_:) メソッドが Counter シーケンスから受け取った Int を String に変換しています:

let stream = Counter(howHigh: 10)
    .map { $0 % 2 == 0 ? "Even" : "Odd" }
for await s in stream {
    print(s, terminator: " ")
}
// Prints "Odd Even Odd Even Odd Even Odd Even Odd Even "
muranakarmuranakar

公式をDeepLで翻訳

AsyncStream

新しい要素を生成するために継続を呼び出すクロージャから生成される非同期シーケンス。

概要

AsyncStream は AsyncSequence に準拠し、非同期イテレータを手動で実装することなく非同期シーケンスを作成する便利な方法を提供する。特に、非同期ストリームは、コールバックまたはデリゲーションベースの API を async-await に適合させるのに適している。

AsyncStream.Continuationを受け取るクロージャでAsyncStreamを初期化します。このクロージャで要素を生成し、継続の yield(_:) メソッドを呼び出してストリームに提供します。生成する要素がなくなったら、継続の finish() メソッドを呼び出します。これにより、シーケンスのイテレータはnilを生成し、シーケンスが終了します。継続は Sendable に準拠しており、AsyncStream の反復処理の外部コンテキストから呼び出すことができます。

任意の要素のソースは、呼び出し元が要素を反復処理するよりも速く要素を生成することができます。このため、AsyncStreamはバッファリング動作を定義し、ストリームが特定の数の最も古い要素または最も新しい要素をバッファリングできるようにしています。デフォルトでは、バッファーの上限はInt.maxであり、これは値が無制限であることを意味します。

ストリームを使用するための既存のコードの適応

既存のコールバックコードをasync-awaitを使用するように適応させるには、継続のyield(_:)メソッドを使用して、ストリームに値を提供するためにコールバックを使用します。

地震を検出するたびにQuakeインスタンスを呼び出し元に提供する、仮想的なQuakeMonitor型を考えてみましょう。コールバックを受け取るために、呼び出し元はモニターの quakeHandler プロパティの値としてカスタムのクロージャを設定し、モニターは必要に応じてコールバックを呼び出します。

class QuakeMonitor {
    var quakeHandler: ((Quake) -> Void)?


    func startMonitoring() {}
    func stopMonitoring() {}
}

これをasync-awaitを使用するように適応させるには、QuakeMonitorを拡張してasyncStream<Quake>型のquakesプロパティを追加します。このプロパティのゲッターでは、AsyncStreamを返し、そのビルドクロージャ(ストリームを作成するために実行時に呼び出される)は、次のステップを実行するために継続を使用します:

  1. QuakeMonitorインスタンスを作成します。
  2. モニターの quakeHandler プロパティを、各 Quake インスタンスを受け取り、継続の yield(_:) メソッドを呼び出してストリームに転送するクロージャに設定します。
  3. 継続の onTermination プロパティを、モニターの stopMonitoring() を呼び出すクロージャに設定します。
  4. QuakeMonitorのstartMonitoringを呼び出します。
extension QuakeMonitor {


    static var quakes: AsyncStream<Quake> {
        AsyncStream { continuation in
            let monitor = QuakeMonitor()
            monitor.quakeHandler = { quake in
                continuation.yield(quake)
            }
            continuation.onTermination = { @Sendable _ in
                 monitor.stopMonitoring()
            }
            monitor.startMonitoring()
        }
    }
}

ストリームはAsyncSequenceであるため、コールポイントはfor-await-in構文を使用して、ストリームが生成する各Quakeインスタンスを処理することができます:

for await quake in QuakeMonitor.quakes {
    print("Quake: \(quake.date)")
}
print("Stream finished.")
muranakarmuranakar

workoutのマルチデバイスでの非同期通信を実装しているサンプルアプリより

  1. SessionSateChange型の値を最新の値だけをバッファに持たせるAsyncStreamを作成する。
let asynStreamTuple = AsyncStream.makeStream(of: SessionSateChange.self, bufferingPolicy: .bufferingNewest(1))
  1. SessionSateChange型の値が渡される。
asynStreamTuple.continuation.yield(sessionSateChange)
  1. 上記のAsyncStreamを用いた処理を実装しており、.yield()メソッドで値が渡されるたびに処理が走る。
Task {
            for await value in asynStreamTuple.stream {
                await consumeSessionStateChange(value)
            }
        }

finish()メソッドを用いて、AsyncStreamを終了できる。

onTerminationを用いて、finishメソッドを行った後に最後の処理を走らせることができる。