🗺️

Swift concurrenyたまに使う高度な制御

2024/04/08に公開

Swift concurrencyを使う際、タイマー制御や並列実行など、まれに高度なイベント処理が必要なことがあります。
標準ライブラリはシンプルな制御関数しか提供していませんが、それでも十分な表現力を持っています。
複雑な状態管理クラスを導入しなくても大抵のことができるのです。

とはいえパッと思いつくことが難しい場合も多いです。
本記事ではSwift concurrencyを使う上で少し複雑な制御を行うためのいくつかのスニペットを掲載します。

紹介するものはすべて標準ライブラリ提供のAPIのみ使用します。
swift-async-algorithmsを使えばより高度な処理を行うこともできますが、今回はスコープ外にします。

複数処理を同時並列処理しつつ、並列数にCapを設ける

複数ファイルを同時にダウンロードしたいけど、同時に通信する数は4つまでにしたい、みたいなパターン。
TaskGroupは実はAsyncSequenceであり、子タスクの結果を1つずつ取り出すことができます。結果が返るごとに新しいタスクを追加すれば同時実行数を制御できます。

func runParallel(
    _ operations: [@Sendable () async throws -> ()],
    parallelCount: Int
) async throws {
    return try await withThrowingTaskGroup(of: Void.self) { g in
        var operations = operations
        for _ in 0..<parallelCount {
            if let operation = operations.popLast() {
                g.addTask(operation: operation)
            }
        }
        for try await _ in g {
            if let operation = operations.popLast() {
                g.addTask(operation: operation)
            }
        }
    }
}
使用例
var operations = (0..<10).map { i in
    return { @Sendable in
        print("Task \(i) begin")
        try await Task.sleep(for: .seconds(1))
        print("Task \(i) end")
    }
}
try await runParallel(operations, parallelCount: 4)
// Task 9 begin
// Task 8 begin
// Task 7 begin
// Task 6 begin
// Task 8 end
// Task 7 end
// Task 6 end
// Task 5 begin
// Task 4 begin
// Task 3 begin
// Task 9 end
// Task 2 begin
// Task 5 end
// Task 4 end
// Task 1 begin
// Task 0 begin
// Task 2 end
// Task 3 end
// Task 0 end
// Task 1 end

タイムアウトを設ける

一定時間経過しても処理が終わらなかった場合キャンセルする処理です。
これは色々なやり方があると思います。一例を紹介します。

struct TimeoutError: Error {}

func runWithTimeout<T>(
    timeout: Duration,
    _ operation: @Sendable () async throws -> T
) async throws -> T {
    return try await withoutActuallyEscaping(operation) { operation in
        return try await withThrowingTaskGroup(of: T?.self) { g in
            g.addTask(operation: operation)
            g.addTask {
                try await Task.sleep(for: timeout)
                return nil
            }
            if let result = try await g.next(), let result {
                return result
            }
            throw TimeoutError() // タイムアウトした際のエラー
        }
    }
}

他に、単にTaskを発火して、そのTaskを一定時間経過後にキャンセルする方法でも十分だと思います。
TaskGroupを使う方法の場合、キャンセルの理由がタイムアウトであることを自明にできる点、withoutActuallyEscapingの中にすべての処理が閉じていることがわかりやすい点が良いと思ってます。

使用例
try await runWithTimeout(timeout: .milliseconds(500)) {
    print("Task begin")
    try await Task.sleep(for: .seconds(1))
    print("Task end")
}
// Task begin
// Swift/ErrorType.swift:200: Fatal error: Error raised at top level: test.TimeoutError()

複数処理を同時に実行しつつ、1つでもエラーが起きたら他をキャンセルして離脱する

TaskGroupは、1つが失敗たら即座に全体がキャンセルされるようなことはありません。
withThrowingTaskGroupの中で例外を投げて初めて全体がキャンセルされます。waitForAll()は子タスク全員が終了するのを待つので、エラー時に他タスクをキャンセルしたい場合は不向きです。
全体を即座にキャンセルするには以下のように記述します。

func runAllOrFail(
    _ operations: [@Sendable () async throws -> ()]
) async throws {
    try await withThrowingTaskGroup(of: Void.self) { g in
        for operation in operations {
            g.addTask(operation: operation)
        }
        while let _ = try await g.next() {}
    }
}
使用例
enum MyError: Error {
    case foo
}

try await runAllOrFail([
    {
        print("service A begin")
        throw MyError.foo
    },
    {
        try await withTaskCancellationHandler {
            print("service B begin")
            try await Task.sleep(for: .seconds(1))
            print("service B end")
        } onCancel: {
            print("service B cancelled")
        }
    },
])
// service A begin
// service B begin
// service B cancelled
// Swift/ErrorType.swift:200: Fatal error: Error raised at top level: test.MyError.foo

一定時間ごとに処理を行うが、前が詰まってたらその分遅らす

定期的に処理を実行したいけど、もし前の処理が遅れていた場合にその完了まで待ってあげたい場合。
async letsleepを組み合わせることで遅い方に律速させています。タイムアウトの例と逆のことをしているとも言えますね。

func runTimerLoop(
    interval: Duration,
    duration: Duration,
    _ operation: @Sendable () async throws -> ()
) async throws {
    let clock = ContinuousClock()

    let timeoutAt = clock.now.advanced(by: duration)
    while clock.now < timeoutAt {
        try Task.checkCancellation()
        async let task: () = operation()
        try await Task.sleep(for: interval, clock: clock)
        try await task
    }
}
使用例
try await runTimerLoop(interval: .milliseconds(500), duration: .seconds(3)) {
    print("task begin")
    if Bool.random() {
        try await Task.sleep(for: .seconds(1))
    }
    print("task end")
}

おわり

Discussion