Dartのisolateを使った並列処理による効率化
Dartのマルチスレッド処理を使った効率化: 素数判定を例に
Dartはシングルスレッドベースの言語ですが、isolate
という仕組みを使って並列処理をサポートしています。この並列処理を使ってパフォーマンスを大幅に向上させることができます。今回は、整数のリストに対して素数判定を行う例を通して、シングルスレッドとマルチスレッドの処理の実行時間の比較をして、並列処理の効果を見ていきます。
素数判定のコード
まず、素数判定を行うための基本的な関数を定義します。(この記事で使う素数判定のアルゴリズムはシンプルなもので効率的でないですが、並列処理の例としては十分だと思います)
List<int> divisors(int n) {
return List.generate(n, (index) => index + 1)
.where((m) => (n % m) == 0).toList();
}
(int, bool) isPrime(int n) {
return (n, divisors(n).length == 2);
}
Future<(int,bool)> isPrimeAsync(int n) async {
return (n, divisors(n).length == 2);
}
Future<List<(int, bool)>> isPrimeList(List<int> ns) async {
return Future.wait(ns.map((n) => isPrimeAsync(n)));
}
このコードでは、与えられた整数が素数であるかどうかを判定するisPrime関数を定義しています。素数であるかどうかは、その数の正の約数の個数が2つのみであるかどうかで判断しています。isPrimeAsyncはisPrimeの非同期のバージョンで、isPrimeListは受け取った整数のリストの各要素をFuture.waitを使って、(int,bool)に置き換える関数です。
使う整数のリストとユーティリティ
// 使う整数のリスト
const numberCount = 100;
final numbers = List.generate(numberCount, (index) => index+1000000).toList();
// リストの連結関数
List<T> concat<T>(List<List<T>> list){
return list.reduce((l1, l2) => l1+l2);
}
// 受け取った非同期処理の完了時間を返す関数
Future<String> timer(Future<void> Function() process,)async{
final start = DateTime.now();
await process();
final end = DateTime.now();
return end.difference(start).inMilliseconds.toString();
}
処理の効率化: 直列処理 vs Future.wait vs マルチスレッド
次に、直列処理、Future.waitを使った並列処理(厳密には並行処理)、そしてisolateを使ったマルチスレッド処理を比較します。
直列処理
シングルスレッドで直列処理によって、整数のリストに対して素数判定を行う場合、次のようなコードになります。
Future<String> serialProcess() async {
final time = await timer(() async {
final result = numbers.map((n) => isPrime(n)).toList();
print(result.toString());
});
return 'serialProcess completed in $time milliseconds';
}
Future.waitを使った処理
次に、DartのFuture.waitを使って複数の非同期処理を実行するための関数です.
Future<String> futureWaitProcess(int maxWorkerCount) async {
final time = await timer(() async {
assert(numberCount % maxWorkerCount == 0);
final result = concat(await Future.wait([
for (int i = 0; i < maxWorkerCount; i++)
isPrimeList(numbers.getRange(i * numberCount ~/ maxWorkerCount, (i + 1) * numberCount ~/ maxWorkerCount).toList()),
]));
print(result.toString());
});
return 'future wait process with ${maxWorkerCount} worker completed in $time milliseconds';
}
computeを使ったマルチスレッド処理
最後に、computeを使って複数のisolateを作成し、並列処理を行う方法です。
Future<String> multiThreadProcess(int maxWorkerCount) async {
final time = await timer(() async {
assert(numberCount % maxWorkerCount == 0);
final result = await compute((workerCount) async {
return concat(await Future.wait([
for (int i = 0; i < maxWorkerCount; i++)
compute((_) {
return isPrimeList(numbers
.getRange(
i * numberCount ~/ maxWorkerCount, (i + 1) * numberCount ~/ maxWorkerCount)
.toList());
}, null),
]));
}, maxWorkerCount);
print(result.toString());
});
return 'multithread process with ${maxWorkerCount} worker completed in $time milliseconds';
}
パフォーマンス結果
それぞれの手法で、100個の大きな整数のリストに対して素数判定を行った結果は次のとおりです。
[log] serialProcess completed in 9785 milliseconds
[log] future wait process with 2 worker completed in 8196 milliseconds
[log] future wait process with 5 worker completed in 7302 milliseconds
[log] future wait process with 10 worker completed in 6687 milliseconds
[log] multi thread process with 2 worker completed in 3529 milliseconds
[log] multi thread process with 10 worker completed in 2549 milliseconds
[log] multi thread process with 20 worker completed in 2625 milliseconds
[log] multi thread process with 25 worker completed in 2640 milliseconds
[log] multi thread process with 50 worker completed in 3730 milliseconds
結論
シングルスレッド処理では約9.8秒かかった素数判定処理が、computeを使ってisolateを活用することで、2.5秒程度まで短縮できました。ただし、isolateの数を増やしすぎるとisolate作成、クローズ、メッセージ送信などの負荷が増え、逆に処理が遅くなる場合があります。
参考文献
concurrency-vs-parallelism
isolates
Future.wait
compute
Build a word puzzle with Flutter
Discussion