😊

【翻訳】Using AsyncSequence in Swift

2023/07/16に公開

WWDC2021でSwiftに導入された新しい同時実行APIと並んで、AsyncSequenceがあります。
AsyncSequenceはコレクション・プロトコルで、ループの中でデータを受け取ったり、
filter、map、reduceといった高次の関数を非同期で実行し、
新しいデータが利用可能になるのを待つことができます。

AsyncSequenceの紹介

シーケンスとして、他のシーケンスでできることは何でもできます。
高階関数を適用する以外にも、シーケンスの中を検索したり、要素の数を数えたりすることができます。

理解しなければならないのは、シーケンスの基本的な動作です。

awaitがサスペンドを意味することを思い出してほしい。コードが実行されるとき、
await呼び出しに遭遇すると、
await呼び出しは別の場所で待機中の処理を開始し、コードの実行は停止します。
非同期タスクのダウンロードが終わると、コンパイラーはある時点で、
await呼び出し以下のすべての実行を開始します。

AsyncSequenceも基本的に同じ動作をするが、重要な違いがあります。

リモート・サーバーに次のようなファイルがあるとします。

// videogames.csv
The Legend of Zelda: Ocarina of Time|1998|10
The Legend of Zelda: Majora's Mask|2000|10
The Legend of Zelda: The Wind Waker|2003|10
Tales of Vesperia|2008|8
Tales of Graces|2011|9
Tales of the Abyss|2006|10
Tales of Xillia|2013|10

ご参考までに、このファイルはこちらでご覧いただけます。

AsyncSequenceの使用

このファイルを一行ずつ消費するのはとても簡単です。

struct Videogame {
    let title: String
    let year: Int?
    let score: Int?
    
    init(rawLine: String) {
        let splat = rawLine.split(separator: "|")
        self.title = String(splat[0])
        self.year = Int(splat[1])
        self.score = Int(splat[2])
    }
}

//...

func loadVideogames() async {
    let url = URL(string: "https://www.andyibanez.com/fairesepages.github.io/tutorials/async-await/part11/videogames.csv")!
    
    var videogames: [Videogame] = []
    
    do {
        for try await rawVg in url.lines {
            if rawVg.contains("|") {
                // Valid videogame
                videogames += [Videogame(rawLine: rawVg)]
            }
        }
    } catch {
        // Handle the error
    }
}

linesはAsyncSequenceです - URLがファイルから新しい行を取得すると、1行ずつ処理されます。
配列やその他の特定のコレクションと言うのは正確ではありません。これは単なる抽象化であり、
時間をかけて私たちに値を提供するものです。独自のAsyncSequencesを作成することもできます。

しかし、AsyncSequenceは、もし私たちが等しく意味のある、
より賢明なものにコードをリファクタリングできなければ、半分も面白くないでしょう。

func loadVideogames() async {
    let url = URL(string: "https://www.andyibanez.com/fairesepages.github.io/tutorials/async-await/part11/videogames.csv")!
    
    let videogames =
        url
        .lines
        .filter { $0.contains("|") }
        .map { Videogame(rawLine: $0) }
    
    do {
        for try await videogame in videogames {
            print("\(videogame.title) (\(videogame.year ?? 0))")
        }
    } catch {
        
    }
}

この時点で、特筆すべきことが1つある:AsyncSequenceをこのように使用する場合、
つまり「コレクション」を変換するために複数の呼び出しを連鎖させる場合、
シーケンスが自動的に「開始」しないことに気づくでしょう。
forループを追加しなければ、シーケンスは始まらず、何も表示されません。
これは、ビデオゲームで.countを呼び出して要素数を取得できないなど、
いくつかの制限があることを意味します。また、dropLast()のように、
他で見たことがあるようなメソッドがいくつか欠けていることにも気づきました。

つまり、この例では、ビデオゲームの改行があるたびにawaitが発生します。
新しい値が生成されるたびに、コードは中断され、スレッドは新しい値が生成されるか、終了するか、
エラーが投げられるまで、別の処理を行います。

これは通常の繰り返しなので、ループ内でbreakやcontinueを使うことができます。

for try await videogame in videogames {
    if videogame.score == 10 {
        continue
    }
    print("\(videogame.title) (\(videogame.year ?? 0))")
}

この例では、continue文を追加して、満点のゲームをすべて出力しないようにしています。
もちろん、代わりにvideogamesにこの制約を加えるフィルタを追加して、
スコアが10のゲームはすべて出力されないようにすることもできます。

let videogames =
    url
    .lines
    .filter { $0.contains("|") }
    .map { Videogame(rawLine: $0) }
    .filter { $0.score != 10 } // Apply the filter here

do {
    for try await videogame in videogames {
        print("\(videogame.title) (\(videogame.year ?? 0))")
    }
} catch {
    
}

もうひとつ興味深いのは、この特定のケースでは、
ネットワーク経由でデータを配信するAsyncSequenceを使っていることです。
ローカル・ファイルで使用することも可能です。

AppleはSDK全体でAsyncSequenceを使用する複数のAPIを追加しました。

・ FileHandle.standardInput.bytes.linesは、
コマンドラインやその他のソースから入力を受け取るために使用できます。

・ URLのresourceBytesプロパティを呼び出すことで、
URLは行とバイトの両方にアクセスすることができます。

・ URLSessionにはbytes(from:)メソッドがあり、
これを使用してネットワークからデータをバイト単位でダウンロードできます。

・ NotificationCenterには、指定したタイプの新しい通知を待つためのAPIが追加されました。
これについては、いずれ記事を書くかもしれません。

AsyncStreamの使用

コールバックやデリゲートによって、
特定のイベントの更新を継続的に配信するコードを既に持っている可能性があります。
例えば、CoreLocationを使ってリアルタイムにユーザーの位置情報を受け取っている場合、
新しい位置情報が利用可能になるとそれを受け取るコードがあるとします。

このようなコードは、AsyncStreamを使って効率化することができます。
Converting closure-based code into async/await in Swift
(https://www.andyibanez.com/posts/converting-closure-based-code-into-async-await-in-swift/)と同様に、
リアルタイム "や "ストリーミング "コードを賢明な非同期シーケンスに変換することができます。

これをお見せするために、まず、イベントを受け取るCoreLocationデリゲートメソッドの小さなラッパーを
作成します。これは美しい例です。なぜなら認可ステータスの継続を作成し、
ロケーションイベントのストリームをセットアップするからです。

@MainActor
class LocationUpdater: NSObject, CLLocationManagerDelegate {
    private(set) var authorizationStatus: CLAuthorizationStatus
    
    private let locationManager: CLLocationManager
    
    // The continuation we will use to asynchronously ask the user permission to track their location.
    private var permissionContinuation: CheckedContinuation<CLAuthorizationStatus, Never>?
    
    var locationHandler: ([CLLocation]) -> Void = { _ in }
    
    override init() {
        locationManager = CLLocationManager()
        authorizationStatus = locationManager.authorizationStatus
        
        super.init()
        locationManager.delegate = self
        locationManager.desiredAccuracy = kCLLocationAccuracyBest
    }
    
    func start() {
        locationManager.startUpdatingLocation()
    }
    
    func stop() {
        locationManager.stopUpdatingLocation()
    }
    
    func requestPermission() async -> CLAuthorizationStatus {
        locationManager.requestWhenInUseAuthorization()
        return await withCheckedContinuation { continuation in
            permissionContinuation = continuation
        }
    }
    
    // MARK: - Location Delegate
    
    func locationManager(_ manager: CLLocationManager, didUpdateLocations locations: [CLLocation]) {
        locationHandler(locations)
    }

    func locationManagerDidChangeAuthorization(_ manager: CLLocationManager) {
        authorizationStatus = manager.authorizationStatus
        permissionContinuation?.resume(returning: authorizationStatus)
    }
}

このLocationUpdaterクラスは、permissionContinuationの継続のおかげで、
async awaitでユーザーに認可を求めることができます。
開発者はこのコードを以下のように呼び出すことができます。

let authorizationsStatus = await updater.requestPermission()

このコードは、内部的には結果を得るために2つの異なるメソッドをジャンプしているにもかかわらず、
1行でステータスを返します。もし継続がどのように機能するか知らない場合は、
Converting closure-based code into async/await in Swift
(https://www.andyibanez.com/posts/converting-closure-based-code-into-async-await-in-swift/)
をチェックしてください。

The var locationHandler: ([CLLocation]) -> Void = { _ in }

var locationHandler: ([CLLocation]) -> Void = { _ in } プロパティは、
こちら側でデリゲートを実装することなく、ロケーションイベントを受け取ることができるクロージャです。
この配列をAsyncStreamでラップし、ロケーションイベントが発生したときに受信を開始し、ループで受信し、シーケンス関数を使って後でこの配列を変更することもできます。

func beginTracking() async {
    await requestPermission()
    if authorizationsStatus == .authorizedWhenInUse {
        for await location in locationEvents() {
            print(location.speed)
        }
    }
}

func locationEvents() -> AsyncStream<CLLocation> {
    let locations = AsyncStream(CLLocation.self) { continuation in
        updater.locationHandler = { locations in
            locations.forEach {
                continuation.yield($0)
            }
        }
        updater.start()
    }
    return locations
}

locationEventsはAsyncSequenceです。

ここでの重要な注意点は、継続を聴くことで、
その継続がいつ停止したかを知ることができるということです。
手動で停止させる必要があるシーケンスがある場合や、
イベントを受け取った後に何らかのクリーンアップを行う必要がある場合に、実装しておくと便利です。
そのメソッドは

continuation.onTermination = { _ in}

残念ながら、このメソッドを実装するには、
ストリーミング型(この場合はCLLocation)が@Sendableである必要があります。
CLLocationはSendableではないので、ここでは使えません。
Sendableについて学ぶには、
Understanding Actors in the New Concurrency Model in Swift
(https://www.andyibanez.com/posts/understanding-actors-in-the-new-concurrency-model-in-swift/)
をチェックしてください。
ロケーション・プロパティを1つ持つラッパー型を作ることでこれを回避しようとしましたが、
上手くいきませんでした。
現時点では、CLLocation と同じプロパティを持つ構造体を作成する以外に、
CoreLocation で AsyncStream を使用する最良の方法が分かりません。

まとめ

AsyncSequenceを使うと、リアルタイムで発生するイベントを待つことができます。
ネットワーク・イベントであれ、その他のシステム・イベントであれ、
AsyncSequenceはコードの読み書きをより簡単にするための合理化に役立ちます。
AsyncStreamを使用すると、連続的なイベントエミッターをAsyncSequenceにラップすることができ、
そのイベントをループで受け取ることができます。

【翻訳元の記事】

Using AsyncSequence in Swift
https://www.andyibanez.com/posts/using-asyncsequence-in-swift/

Discussion