📌

【翻訳】Task-based concurrency in Swift

2023/07/20に公開

逐次コードと同じように、並行コードにもさまざまな形があります。データの一部を非同期で取って来るのか、ディスクから重いファイルをロードするのか、関連する操作のグループを実行するのか、何を達成しようとしているのかによって、最適な抽象化はユースケースによって大きく異なります。

そのような並行プログラミングの抽象化の1つがタスクです。タスクは一見、 Futures & Promises とFoundation の Operation API に非常に似ているように見えますが、その動作や API ユーザーに与える制御のレベルには明確な違いがあります。

今週は、それらの違いのいくつかと、タスクが本当に便利になるシナリオを見てみましょう。

この記事は、Swiftの並行システムがSwift 5.5で導入されるずっと前に書かれたことに注意してください。その新しい同時実行システムの詳細については、このDiscoverページをチェックしてください。
https://www.swiftbysundell.com/discover/concurrency/

手元のタスク

私たちがソーシャルネットワーキングアプリを構築していて、投稿を公開するときに写真と動画の両方を添付するオプションをユーザーに提供しているとしましょう。現在、ユーザーが "Publish "を押すたびに、以下のような関数を呼び出して、添付されたメディアを全てアップロードし、投稿自体のデータもアップロードしています。

func publish(_ post: Post) {
    for photo in post.photos {
        upload(photo)
    }

    for video in post.videos {
        upload(video)
    }

    upload(post)
}

上記のコードは非常にシンプルですが、いくつかの問題があります。まず、投稿の公開が完了したときに通知を受け取る方法がありません。また、エラー処理も行っていないため、写真や動画のアップロードに失敗した場合は、投稿データのアップロードを続行することになります。

上記の問題を解決する方法はいくつかあります。1つのアイデアとしては、組み込みの OperationOperationQueue 型を使って、全てのオペレーションを順次実行することができるかもしれません。これらの選択肢はどちらも有効ですが、元のコードにかなり大きな変更を加える必要があるため、少し「強引」だと感じるかもしれません。

幸運なことに、必要な制御を得るために必要なことは、スタックに一段階深く入り、 Operation と OperationQueue がベースにしているフレームワーク、 Grand Central Dispatch を使うことだと分かりました。

“A deep dive into Grand Central Dispatch in Swift”(https://www.swiftbysundell.com/articles/a-deep-dive-into-grand-central-dispatch-in-swift/)"で見たように、 GCD は非常に簡単に操作の束をグループ化し、それらの全てが完了したときに通知を受けることを可能にします。それを実現するために、以前のアップロード関数に少し手を加えます - クロージャを使用して完了を観察する方法を提供し、全てのメディアのアップロードが完了したときに通知を受けるために、 DispatchGroup に入ったり出たりするバランスの取れた呼び出しを行います。

func publish(_ post: Post) {
    let group = DispatchGroup()

    for photo in post.photos {
        group.enter()

        upload(photo) {
            group.leave()
        }
    }

    for video in post.videos {
        group.enter()

        upload(video) {
            group.leave()
        }
    }

    // Calling ‘notify’ allows us to observe whenever the whole
    // group was finished using a closure.
    group.notify(queue: .main) {
        upload(post)
    }
}

上記はとてもうまく機能し、とてもきれいなコードになりました。しかし、エラー処理の欠如にはまだ対処していません。メディアのアップロードが正常に完了したかどうかにかかわらず、投稿をやみくもにアップロードしてしまうからです。

これを解決するために、オプションの Error 変数を追加して、発生したエラーを記録するようにしましょう。アップロード関数にもうひとつ手を加えて、完了ハンドラにオプションの Error 引数を渡すようにします。

func publish(_ post: Post) {
    let group = DispatchGroup()
    var anyError: Error?

    for photo in post.photos {
        group.enter()

        upload(photo) { error in
            anyError = anyError ?? error
            group.leave()
        }
    }

    for video in post.videos {
        group.enter()

        upload(video) { error in
            anyError = anyError ?? error
            group.leave()
        }
    }

    group.notify(queue: .main) {
        // If an error was encountered while uploading the
        // post’s media, we’ll call an error handling function
        // instead of proceeding with the post data upload.
        if let error = anyError {
            return handle(error)
        }

        upload(post)
    }
}

これで元のコードの正しさの問題は全て修正されましたが、その過程で、より複雑で読みにくいコードになってしまいました。また、この新しいソリューションは、(ローカルエラー変数とディスパッチグループ呼び出しの追跡が必要な)ボイラープレート的な要素がかなり多くなっています。しかし、非同期コードの多くをこのパターンに移行し始めると、すぐにメンテナンスが難しくなります。

抽象化の時間です!

それでは、 Grand Central Dispatch の上に薄いタスクベースの抽象化レイヤーを導入することで、上記のような操作をより簡単に行えるようになるか試してみましょう。

まず、 Task という型を作ります。これは基本的に、Task のフローを制御する Controller にアクセスするためのクロージャのラッパーにすぎません。

struct Task {
    typealias Closure = (Controller) -> Void

    private let closure: Closure

    init(closure: @escaping Closure) {
        self.closure = closure
    }
}

Controller タイプは、関連する Task を終了させたり失敗させたりするためのメソッドを提供し、 Task の結果を報告するハンドラを呼び出します。

extension Task {
    struct Controller {
        fileprivate let queue: DispatchQueue
        fileprivate let handler: (Outcome) -> Void

        func finish() {
            handler(.success)
        }

        func fail(with error: Error) {
            handler(.failure(error))
        }
    }
}

ちょうど上のコードが示すように、 Outcome は2つのケースを持っています - .success と .failure - 汎用型 Void を持つ Result 型に非常に似ています。実際、 Outcome を独自の列挙型として実装するか、 "The power of type aliases in Swift" (https://www.swiftbysundell.com/articles/the-power-of-type-aliases-in-swift/) のテクニックの 1 つを使用して、 Result<Void> の一般的な省略形にするか、どちらかを選択することができます。

extension Task {
    enum Outcome {
        case success
        case failure(Error)
    }
}

extension Task {
    typealias Outcome = Result<Void>
}

最後に、これから定義する Task を実際に実行する方法が必要です。このメソッドでは、 Task を実行するための明示的なDispatchQueueと、 Task が実行し終わったら呼び出される handler を受け取ります。内部的には、与えられた DispatchQueue を使って Task を非同期に実行します。 controller を作って、それを Task のクロージャに渡します。

extension Task {
    func perform(on queue: DispatchQueue = .global(),
                 then handler: @escaping (Outcome) -> Void) {
        queue.async {
            let controller = Controller(
                queue: queue,
                handler: handler
            )

            self.closure(controller)
        }
    }
}

以上でタスクAPIの初期バージョンは完成です。先ほどの写真アップロード関数を置き換えるタスクを定義することから始めましょう。 - Uploader クラスを呼び出し、 Task の Controller を使って結果を通知します。

extension Task {
    static func uploading(_ photo: Photo,
                          using uploader: Uploader) -> Task {
        return Task { controller in
            uploader.upload(photo.data, to: photo.url) { error in
                if let error = error {
                    controller.fail(with: error)
                } else {
                    controller.finish()
                }
            }
        }
    }
}

最初のタスクが利用できるようになったので、それをコール現場でどのように使うか見てみましょう。

for photo in photos {
    let task = Task.uploading(photo, using: uploader)

    task.perform { outcome in
        // Handle outcome
    }
}

なかなかクールです! しかし、タスクのパワーが本当に輝き始めるのは、タスクをグループ化し、順番に並べるときです。では、続けていきましょう!

グループ化

先程は、 DispatchGroup を使って、いつオペレーションが終了したかを追跡していました。では、そのロジックを新しいタスクシステムに移植してみましょう。そのために、 Task の配列を受け取り、それらをグループ化する静的メソッドを Task に追加します。ボンネットの下では、以前とまったく同じディスパッチ・グループ・ロジックを使います。

extension Task {
    static func group(_ tasks: [Task]) -> Task {
        return Task { controller in
            let group = DispatchGroup()

            // To avoid race conditions with errors, we set up a private
            // queue to sync all assignments to our error variable
            let errorSyncQueue = DispatchQueue(label: "Task.ErrorSync")
            var anyError: Error?

            for task in tasks {
                group.enter()

                // It’s important to make the sub-tasks execute
                // on the same DispatchQueue as the group, since
                // we might cause unexpected threading issues otherwise.
                task.perform(on: controller.queue) { outcome in
                    switch outcome {
                    case .success:
                        break
                    case .failure(let error):
                        errorSyncQueue.sync {
                            anyError = anyError ?? error
                        }
                    }

                    group.leave()
                }
            }

            group.notify(queue: controller.queue) {
                if let error = anyError {
                    controller.fail(with: error)
                } else {
                    controller.finish()
                }
            }
        }
    }
}

以上のようにして、以前のメディアアップロードのコードを大幅に簡略化することができます。あとは、それぞれのメディアを Task にマッピングし、それらの Task を結合した配列を新しいグループAPIに渡すだけです。

let photoTasks = post.photos.map { photo in
    return Task.uploading(photo, using: uploader)
}

let videoTasks = post.videos.map { video in
    return Task.uploading(video, using: uploader)
}

let mediaGroup = Task.group(photoTasks + videoTasks)

グループは、グループ化されたタスクの完了順序に依存しない場合に最適ですが、常にそうとは限りません。全てのメディアが正常にアップロードされたことを確認するまで、投稿のデータをアップロードしないという最初の問題に戻りますが、これはそのようなケースの一つです。私たちが理想とするのは、メディアアップロード操作を、投稿アップロードを終了する操作と連鎖(シーケンス)させることができることです。

Task を拡張してそれをサポートする方法を見てみましょう。

シーケンス

DispatchGroup(操作の順序について何の意見も持たない)を使用するのではなく、現在の Task のインデックスを追跡し、前の Task が終了したら次の Task を継続的に実行することで、シーケンスを実装してみましょう。 Task のリストの最後に到達したら、シーケンスが完了したと見なします。

extension Task {
    static func sequence(_ tasks: [Task]) -> Task {
        var index = 0

        func performNext(using controller: Controller) {
            guard index < tasks.count else {
                // We’ve reached the end of our array of tasks,
                // time to finish the sequence.
                controller.finish()
                return
            }

            let task = tasks[index]
            index += 1

            task.perform(on: controller.queue) { outcome in
                switch outcome {
                case .success:
                    performNext(using: controller)
                case .failure(let error):
                    // As soon as an error was occurred, we’ll
                    // fail the entire sequence.
                    controller.fail(with: error)
                }
            }
        }

        return Task(closure: performNext)
    }
}

シーケンスの実装に単にシリアルのDispatchQueueを使用しない理由は、シーケンスが常にシリアルのキューでディスパッチされるとは想定できないからです。

上記では、Swift がファーストクラス関数とインライン関数定義の両方をサポートしているという事実を利用しています - シーケンスの Task を作成するために、クロージャとして performNext 関数を渡しているからです。

信じられないかもしれませんが、私たちは実際に完全なタスクベースの同時実行システムをゼロから構築したのです!

すべてのピースをまとめる

全てのピースが揃ったので、最後に元の投稿公開コードを更新して、新しいシステムが提供する全てのものを利用できるようにしましょう。エラーを追跡したり、ネットワークコールの予測不可能な性質によるバグに遭遇したりする代わりに、メディアアップロード操作のグループを投稿アップロードタスクと組み合わせることで、単純にシーケンスを形成できるようになりました。

func publish(_ post: Post,
             then handler: @escaping (Outcome) -> Void) {
    let photoTasks = post.photos.map { photo in
        return Task.uploading(photo, using: uploader)
    }

    let videoTasks = post.videos.map { video in
        return Task.uploading(video, using: uploader)
    }

    let sequence = Task.sequence([
        .group(photoTasks + videoTasks),
        .uploading(post, using: uploader)
    ])

    sequence.perform(then: handler)
}

上記のソリューションの優れている点は、ディスパッチ・キューを使用することで、全てが完全に同時並行的に実行されていることです。抽象度が非常に低いので、もし問題や予期せぬ動作に遭遇したら、デバッグのために1つ下のレベルにステップ・ダウンするだけでいいです。

結論

Task は、並列性の高いコードを抽象化するのに最適な方法です。並列性をできるだけ高めて実行したり、前の Task の完了を待ってから先に進んだりします。 Task は本質的に、Grand Central Dispatchの上にシンプルで薄いレイヤーを作る方法を提供します。

この記事で前述したように、もちろんSwiftで並行処理を実装したり使用したりすることができる他の多くの方法があります。RxSwiftのようなフレームワークは、実行のはるかに複雑なチェーンを構築することを可能にしますが、はるかに重い抽象化を使用しています。

私のアドバイスは、Swiftで多くの異なる種類の並行プログラミングを試してみることです。うまくいけば、この記事は、Swiftで Task を実装する1つの方法と、 Task がコミュニティで一般的に使用される他のソリューションとどのように比較されるかについて、あなたに少しの洞察を与えました。そして、もしそうなら - 私の使命は達成されたと思います。

Twitterで私に質問したり、この記事やタスク全般についてどう思ったか教えてください。

読んでくれてありがとう!🚀

【翻訳元の記事】

Task-based concurrency in Swift
https://www.swiftbysundell.com/articles/task-based-concurrency-in-swift/

Discussion