🤹
【Swift】TaskGroupで並列に処理するタスクの数を制限したい
はじめに
以下のコードは、TaskGroup
にaddTask(priority:operation:)
でタスクを追加し、並列に処理されたタスクをfor await
で取り出しています。
TaskGroup
を使った基本的な実装です。
Task {
let numbers = try await echoNumbers(Array(0..<100))
// 結果: [0, 1, ... 98, 99]
print(numbers)
}
func echoNumbers(_ numbers: [Int]) async throws -> [Int] {
try await withThrowingTaskGroup(of: Int.self) { group in
for num in numbers {
group.addTask { try await echoNumber(num) }
}
var result: [Int] = []
for try await num in group {
print(num)
result.append(num)
}
return result.sorted()
}
}
func echoNumber(_ number: Int) async throws -> Int {
try await Task.sleep(for: .milliseconds(Int.random(in: 500...1000)))
return number
}
この例では100個のタスクを並列に処理しますが、サーバーなどのリソースの使用を一定の範囲内に保ちたい場合など、並列に処理するタスクの数を制限したいケースを考えます。
例えば、並列に処理するタスクを20個に制限したいとします。呼び出し元で値を20個ずつ渡して、5回分のechoNumbersを実行することが考えられますが、効率が良くありません。
そうではなくて、最初に20個のタスクを追加して、ひとつタスクが終わって19個になったらひとつタスクを追加して、またひとつ終わったらひとつ追加して〜ということをひとつのTaskGroup
で行い、常に20個のタスクが処理されていることが理想です。
実装
最初に追加するタスクの数を制限し、for await
でTaskGroup
から値を取り出すたびに、処理したいタスクが無くなるまでTaskGroup
にタスクを追加するだけです。
func echoNumbers(_ numbers: [Int]) async throws -> [Int] {
try await withThrowingTaskGroup(of: Int.self) { group in
let limitTaskCount = min(20, numbers.count)
for index in 0..<limitTaskCount {
group.addTask { try await echoNumber(numbers[index]) }
}
var result: [Int] = []
var nextIndex = limitTaskCount
for try await num in group {
if nextIndex < numbers.count {
let index = nextIndex
group.addTask { try await echoNumber(numbers[index]) }
nextIndex += 1
}
print(num)
result.append(num)
}
return result.sorted()
}
}
ループでTaskGroup
から値を取り出しながら、さらにそのスコープ内でTaskGroup
にタスクを追加するという実装は、違和感があるかもしれません。通常のfor文では、反復処理中にイテレーション対象を変更することはできないからです。
しかし、TaskGroup
は、for await
構文を使っていることから分かる通りAsyncSequence
であり、タスクを動的に追加・処理できるため、このようなコードを書くことができます。
おわりに
TaskGroup
を使って、効率的で柔軟な非同期処理を、シンプルに実装することができました。
Discussion