🔀

【Swift Concurrency】TaskGroup で動的な回数のループ処理を並列実行し、高速化する

2023/05/20に公開

こんにちは。kamimiです。🌞

最近 API を Swift で実装したのですが、そのレスポンス速度が遅かったので Swift Concurrency を使って改善しました。(API の技術的詳細はこちらをご覧ください)

先にまとめを言うと、ループする回数が動的な場合に、Swift Concurrency の TaskGroup を使用して同時並行で処理を実行することで、API のレスポンス速度がより高速になるよう改善しました。🎉

完全に n 番煎じの内容であることは理解しつつ、自分の勉強記録用に改善プロセスを書きます。先人の記事に大感謝です。

改善前

改善方法の説明をする前に、改善前したいコードの処理や課題を整理します。

改善前の処理の流れ

API にはあるメッセージを生成する関数があり、改善前は以下のように実装されていました。

func generateMessage(appIDs: [String]) async throws -> [Message] {
	var messages = [Message]()

	// 1. 引数で渡された appIDs の配列の要素分、ループ処理を行う
	for appID in appIDs {
		// 2. ネットワーク通信が発生する関数2つを並列実行する
		async let app = try fetchAppName(id: appID)
		async let appStoreVersion = try fetchAppStoreVersion(id: appID)
		// 3. 2つの関数の結果を待って、message に値を格納する
		let message = try await generateMessageForEachApp(app: app, appStoreVersion: appVersion)
		messages.append(message)
	}
	return messages
}

1. 引数で渡された appIDs の配列の要素分、ループ処理を行う

要素の数は引数で渡されるので、動的な数になります。

2. ネットワーク通信が発生する関数2つを並列実行する

fetchAppName(id:)fetchAppStoreVersion(id:) というネットワーク通信が発生する非同期処理を行う関数を呼び出しています。またこの2つの関数は例外を投げる可能性があるので、tryがついています。

本題とは異なりますが、ここで async let に注目します。
async let をつけることで、この2つの関数の処理は、同時並行的に実行されます。

もう少し詳しく見てみます。async let を使わず、以下のように書いていた場合を考えます。

let app = try await fetchAppName(id: appID)
let appStoreVersion = try await fetchAppStoreVersion(id: appID)

この場合、それぞれの関数の前に await がついているので、fetchAppName(id:) の処理が終わった後に fetchAppStoreVersion(id:) が実行されます。
しかし、今回の API の実装においては、この2つの関数の実行順序は重要ではありませんでした。つまり fetchAppName(id:) の結果が fetchAppStoreVersion(id:) で必要なわけではありませんでした。
ということはわざわざ await でそれぞれの結果を待つ必要はありません。

3. 2つの関数の結果を待って、message に値を格納する

ここで3つ目のコメントの処理を見てみます。generateMessageForEachApp(app:appStoreVersion:)という関数を実行しており、引数には先ほどの2つの関数の結果を渡す必要があります。

先ほど fetchAppName(id:)fetchAppStoreVersion(id:) の2つの関数の実行順序は重要ではありません、と書きました。
確かに実行順序は重要ではありませんが、generateMessageForEachApp(app:appStoreVersion:)という関数を実行する前までには、2つの関数の結果が取得できている必要があります。

そこで使用するのが、async let です。これをつけると、子タスクが作成され、2つの関数の処理はその子タスクで実行されます。先ほど書いたように2つの関数の実行順序は重要ではないので、await をつける必要はありません。

ですが、generateMessageForEachApp(app:appStoreVersion:)という関数を実行する前までには結果が必要なので、この関数の前に await をつけます。

こうすることで、一部で並行処理を行いながら、必要な箇所ではその結果を待つということが実現できます。

課題

ここまで処理の流れについて説明してきました。ですがまだ課題があることに気づきます。

確かに asycn let を使用することで一部の処理を並行で実行できています。ですがこのままだと引数の appIDs の要素数が増えれば増えるほど、この関数の実行完了が遅くなります。

改善プロセス

TaskGroup を使用して改善します。
ドキュメントに記載の通り、「動的に作成された子タスクを含むグループ」ということで、今回の課題解決に良さそうです。

1. withThrowingTaskGroup を使用する

まず、withThrowingTaskGroup(of:returning:body:) を使用して、並行で実行したいループ処理を囲います。

func generateMessage(appIDs: [String]) async throws -> [Message] {  // ここを追加
	var messages = [Message]()
	
	try await withThrowingTaskGroup(of: Void.self) { group in
		for appID in appIDs {
			async let app = try fetchAppName(id: appID)
			async let appStoreVersion = try fetchAppStoreVersion(id: appID)

			let message = try await generateMessageForEachApp(app: app, appStoreVersion: appVersion)
			messages.append(message)
		}
	}
	return messages
}

2. 子タスクを作成する

次に addTask(priority:operation:) の関数を使って子タスクを作成しグループに追加することで、任意のタイミングで fetchAppName(id:)fetchAppStoreVersion(id:) の処理を実行できるようにします。

func generateMessage(appIDs: [String]) async throws -> [Message] {
	var messages = [Message]()
	
	try await withThrowingTaskGroup(of: Void.self) { group in
		for appID in appIDs {
			group.addTask {  // ここを追加
				async let app = try fetchAppName(id: appID)
				async let appStoreVersion = try fetchAppStoreVersion(id: appID)

				let message = try await generateMessageForEachApp(app: app, appStoreVersion: appVersion)
				messages.append(message)
			}
		}
	}
	return messages
}

3. 子タスクが完了した順番に結果を取得する

ここまでで良さそうに見えますが、この状態だと、Mutation of captured var 'messages' in concurrently-executing code というエラーが発生します。⚠️


(※キャプチャはこの記事のコードとは別のコードで実装している時のエラーなので、変数名が thumbnails となっています。)

generateMessageForEachApp(app:appStoreVersion:) の実行後に、messages という変数に要素を追加する処理があります。複数の子タスクが存在する状態で、それらが同時に messages に値を格納しようとするとクラッシュするなど予想外の動作を引き起こす可能性があります。つまりデータ競合が発生します。(データ競合についてはこちらの記事が分かりやすかったです)

そこで以下のように for-await-in のループを使用して、子タスクが完了した順番で messages に値を追加するように直します。そのループで処理するために addTask(priority:operation:) では、各子タスクに値を返すようにします。ここでは message を返します。また withThrowingTaskGroup(of:) の関数の引数には子タスクが返す型を指定する必要がありますので、 Message.self を指定します。

func generateMessage(appIDs: [String]) async throws -> [Message] {
	var messages = [Message]()
	
	try await withThrowingTaskGroup(of: Message.self) { group in  // ここを修正
		for appID in appIDs {
			group.addTask {
				async let app = try fetchAppName(id: appID)
				async let appStoreVersion = try fetchAppStoreVersion(id: appID)

				let message = try await generateMessageForEachApp(app: app, appStoreVersion: appVersion)
				return message  // ここを修正
			}
		}
		// ここ以下を追加
		for try await message in group {
			messages.append(message)
		}
	}
	return messages
}

これで改善は完了です!🎉

追記(2023/5/21):

Twitter で for-await-in のループの代わりに reduce(into:_:) メソッド を使用した実装も可能なことを教えていただきました。treastrain さんありがとうございます!!
以下修正版になります。 🙏🏻

func generateMessage(appIDs: [String]) async throws -> [Message] {	
	try await withThrowingTaskGroup(of: Message.self) { group in  // ここを修正
		for appID in appIDs {
			group.addTask {
				async let app = try fetchAppName(id: appID)
				async let appStoreVersion = try fetchAppStoreVersion(id: appID)

				let message = try await generateMessageForEachApp(app: app, appStoreVersion: appVersion)
				return message  // ここを修正
			}
		}
		// ここ以下を追加
		return try await group.reduce(into: [Message]()) { $0.append($1) }
	}
}

改善後

改めて、コード全体を載せます。

func generateMessage(appIDs: [String]) async throws -> [Message] {	
	try await withThrowingTaskGroup(of: Message.self) { group in
		for appID in appIDs {
			group.addTask {
				async let app = try fetchAppName(id: appID)
				async let appStoreVersion = try fetchAppStoreVersion(id: appID)

				let message = try await generateMessageForEachApp(app: app, appStoreVersion: appVersion)
				return message
			}
		}
		return try await group.reduce(into: [Message]()) { $0.append($1) }
	}
}

TaskGroup を使用することで、動的な数分のループ処理を並列で実行できるようになりました!🎉

ここでは withThrowingTaskGroup(of:returning:body:)を使用していますが、それは子タスクが例外を投げる可能性があるためです。例外を投げない場合は、 withTaskGroup(of:returning:body:) を使います。

参考

https://developer.apple.com/videos/play/wwdc2021/10134

13:30あたりからが、本記事の内容です。

https://zenn.dev/dena/articles/0c4f7a8232e64b

async letTaskGroup の使い分けを分かりやすい例で解説してくださっていました。

https://www.swiftbysundell.com/articles/swift-concurrency-multiple-tasks-in-parallel/

https://qiita.com/maiyama18/items/a1aed9501b8ebfb94d5b

https://qiita.com/yohhoy/items/00c6911aa045ef5729c6

データ競合や競合状態についての分かりやすい記事でした。

おわりに

この記事は基本的には「参考」に記載した WWDC22 の動画の内容です。それを自分のコードに当てはめて、他の人の記事も参考にしながら書いたのが本記事です。

その動画を見た方には不要な記事かとも思うのですが、自分の理解を深めるために調べながら記事を書いてみました。

どなたかのご参考になれば幸いです。🥰

Discussion