🤹

【Swift】TaskGroupで並列に処理するタスクの数を制限したい

2024/03/22に公開

はじめに

以下のコードは、TaskGroupaddTask(priority:operation:)でタスクを追加し、並列に処理されたタスクをfor awaitで取り出しています。
TaskGroupを使った基本的な実装です。

Task {
    let numbers = try await echoNumbers(Array(0..<100))

    // 結果: [0, 1, ... 98, 99]
    print(numbers)
}

func echoNumbers(_ numbers: [Int]) async throws -> [Int] {
    
    try await withThrowingTaskGroup(of: Int.self) { group in
        for num in numbers {
            group.addTask { try await echoNumber(num) }
        }
    
        var result: [Int] = []
        for try await num in group {
            print(num)
            result.append(num)
        }
        
        return result.sorted()
    }
}

func echoNumber(_ number: Int) async throws -> Int {
    
    try await Task.sleep(for: .milliseconds(Int.random(in: 500...1000)))
    return number
}

この例では100個のタスクを並列に処理しますが、サーバーなどのリソースの使用を一定の範囲内に保ちたい場合など、並列に処理するタスクの数を制限したいケースを考えます。

例えば、並列に処理するタスクを20個に制限したいとします。呼び出し元で値を20個ずつ渡して、5回分のechoNumbersを実行することが考えられますが、効率が良くありません。

そうではなくて、最初に20個のタスクを追加して、ひとつタスクが終わって19個になったらひとつタスクを追加して、またひとつ終わったらひとつ追加して〜ということをひとつのTaskGroupで行い、常に20個のタスクが処理されていることが理想です。

実装

最初に追加するタスクの数を制限し、for awaitTaskGroupから値を取り出すたびに、処理したいタスクが無くなるまでTaskGroupにタスクを追加するだけです。

func echoNumbers(_ numbers: [Int]) async throws -> [Int] {
        
    try await withThrowingTaskGroup(of: Int.self) { group in
        let limitTaskCount = min(20, numbers.count)
        for index in 0..<limitTaskCount {
            group.addTask { try await echoNumber(numbers[index]) }
        }
    
        var result: [Int] = []
        var nextIndex = limitTaskCount
        for try await num in group {
            if nextIndex < numbers.count {
                let index = nextIndex
                group.addTask { try await echoNumber(numbers[index]) }
                nextIndex += 1
            }
            print(num)
            result.append(num)
        }

        return result.sorted()
    }
}

ループでTaskGroupから値を取り出しながら、さらにそのスコープ内でTaskGroupにタスクを追加するという実装は、違和感があるかもしれません。通常のfor文では、反復処理中にイテレーション対象を変更することはできないからです。

しかし、TaskGroupは、for await構文を使っていることから分かる通りAsyncSequenceであり、タスクを動的に追加・処理できるため、このようなコードを書くことができます。

おわりに

TaskGroupを使って、効率的で柔軟な非同期処理を、シンプルに実装することができました。

参照

株式会社Never

Discussion