各馬ゲートインから一斉にスタート(C++ の latch と barrier について)
待ち合わせの集合場所にたどり着いたら、みんなグーグー寝ていた。これがスレッドの待ち合わせである。「お、みんな揃ったから出発しよかー」とはならず、最後に到着した人が全員を片っ端から起こしていくわけだ。もし、自分が最後でなければ CPU の負荷を抑えるために眠りにつく。つまり、自分が最後かどうかを判定して、その結果に応じて処理の内容を変えなければならない。スレッドの待ち合わせというのはそこそこ面倒な処理なのだ。
これを楽にする仕組みが C++20 で標準ライブラリに追加された。latch と barrier である。
latch の基本
latch はスレッドの動作をコントロールする仕組みであり、以下のようなケースに適した構造となっている。
- 全員が揃うまで待つ
複数のスレッドが各自の担当分の処理を終えて、他のスレッドの終了を待つパターンがこれ。全員揃ったら、各自一斉に動き始める。 - 全部揃うまで待つ
例えば100個のオブジェクトを3つの作業スレッドで構築し、メインスレッドが構築完了を待つようなケース。作業スレッドはオブジェクトを1つ構築するたびにカウンタを1つずつ減らしていき、カウンタがゼロになったらメインスレッドが動き始める。
分かりやすいイメージとしては、カウンタ付きの錠前が付いたドアだ。カウンタがゼロになるまで解錠されないので先には進めない。
1つ目のケースでは各スレッドがカウンタを1つ減らしてから解錠を待つ(arrive_and_wait)。
2つ目のケースでは作業スレッドがカウンタをどんどん減らしていき(count_down)、メインスレッドは何もせずに解錠を待ち続ける(wait)。
主なメンバ関数は以下の3つ[1]。
メンバ関数 | 説明 |
---|---|
count_down | カウンタを1つ減らす |
wait | カウンタがゼロになるのを待ち続ける |
arrive_and_wait | カウンタを1つ減らしつつ待つ |
それでは具体例を見てみよう。
全員が揃うまで待つ
この condition_variable の解説記事では、2つのスレッドが互いの準備完了を待つというケースで 以下のような condition_variable を使った具体例をサンプルとして挙げていた。
#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <chrono>
int main()
{
std::mutex mtx; // isMainReady と isSubReady を保護するミューテックス
std::condition_variable cv; // これを介して睡眠をコントロールする
bool isMainReady = false; // メインスレッドの準備完了フラグ
bool isSubReady = false; // サブスレッドの準備完了フラグ
// サブスレッド
std::thread th([&]() {
std::this_thread::sleep_for(std::chrono::seconds(2)); // 諸々の準備
{
std::unique_lock<std::mutex> lck(mtx); // isMainReady, isSubReady を保護
isSubReady = true; // サブスレッド側、準備完了
if (isMainReady) { // メイン側が準備できているなら
std::cout << "[Sub] wakes main up" << std::endl;
cv.notify_one(); // 寝ているはずのメインを起こす
}
else { // メイン側が準備に手間取っているなら
std::cout << "[Sub] sleeps until main is ready" << std::endl;
while (!isMainReady) // メインスレッドの準備が整うまで
cv.wait(lck); // 寝る(寝てる間はロックが解除される)
}
}
std::cout << "[Sub] started" << std::endl;
});
// メインスレッド
std::this_thread::sleep_for(std::chrono::seconds(2)); // 諸々の準備
{
std::unique_lock<std::mutex> lck(mtx); // isMainReady, isSubReady を保護
isMainReady = true; // メインスレッド側、準備完了
if (isSubReady) { // サブ側が準備できているなら
std::cout << "[Main] wakes sub up" << std::endl;
cv.notify_one(); // 寝ているはずのサブを起こす
}
else { // サブ側が準備に手間取っているなら
std::cout << "[Main] sleeps until sub is ready" << std::endl;
while (!isSubReady) // サブスレッドの準備が整うまで
cv.wait(lck); // 寝る(寝てる間はロックが解除される)
}
}
std::cout << "[Main] started" << std::endl;
th.join();
}
このような「ゲートに全員入ったらスタート」といったケースは正に latch の得意分野だ。latch を使えば上のコードはここまでシンプルに書ける。
#include <iostream>
#include <thread>
#include <chrono>
#include <latch>
int main()
{
std::latch lch(2); // メインとサブの準備完了を待つ
// サブスレッド
std::thread th([&]() {
std::this_thread::sleep_for(std::chrono::seconds(2)); // 諸々の準備
std::cout << "[Sub] ready" << std::endl;
lch.arrive_and_wait(); // ラッチに到達。カウントダウンしつつ待つ
std::cout << "[Sub] started" << std::endl;
});
// メインスレッド
std::this_thread::sleep_for(std::chrono::seconds(2)); // 諸々の準備
std::cout << "[Main] ready" << std::endl;
lch.arrive_and_wait(); // ラッチに到達。カウントダウンしつつ待つ
std::cout << "[Main] started" << std::endl;
th.join();
}
[Sub] ready
[Main] ready
[Main] started
[Sub] started
[Main] ready
[Sub] ready
[Sub] started
[Main] started
mutex も condition_variable も状態変数すらも要らない。latch で待つスレッド数を指定して、準備ができたら arrive_and_wait を呼び出すだけだ。
もちろんスレッド数が増えてもコードは殆ど変わらない。全員が揃うまで待ち、揃ったら一斉に動き始める。
全部揃うまで待つ
次に、作業スレッドによる準備をメインスレッドが待つケース。これだと単純にスレッドの終了を待てば良いのでサンプルとしてはイマイチだが、スレッドプールによるスレッドの再利用を念頭に置いたケースと読み替えてほしい。
#include <iostream>
#include <thread>
#include <chrono>
#include <latch>
#include <vector>
int main()
{
std::latch lch(100); // 100個のオブジェクトの構築を待つ
// 作業スレッド群
constexpr int NUM_THREADS = 3;
constexpr int NUMS[NUM_THREADS] = { 33, 33, 34 }; // 各スレッドの構築数(手抜き)
std::vector<std::thread> ths; ths.reserve(NUM_THREADS);
for (int i = 0; i < NUM_THREADS; ++i) {
ths.emplace_back(std::thread([&](int num) {
for (int i = 0; i < num; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 構築
std::cout << ".";
lch.count_down(); // カウントダウン
}
}, NUMS[i]));
}
// メインスレッド
std::cout << "waiting" << std::endl;
lch.wait(); // 作業スレッド群による構築の完了を待つ
std::cout << "done" << std::endl;
for (auto& th : ths)
th.join();
}
このように、latch を使えばカウントダウンでゼロになるような何かを簡単に待つことができる。
ちょっと複雑な barrier
実はカウンタがゼロになった latch は再利用できない。コンストラクタで設定した値は再設定できず、count_down で減らすことしかできないのだ。これがゼロになったら終了である。
そこで、再利用可能な latch 機構として barrier も用意されている。
barrier の主なメンバ関数は以下の通り。
メンバ関数 | 説明 |
---|---|
arrive | カウンタを1つ減らす(count_down ではない) |
wait | カウンタがゼロになるのを待ち続ける |
arrive_and_wait | カウンタを1つ減らしつつ待つ |
arrive_and_drop | カウンタを1つ減らし、リセット後の再設定値も1つ減らす(待たない) |
latch と比べると、count_down が arrive に変化し(後述)、arrive_and_drop が追加された。
C++ には condition_variable と condition_variable_any とか、lock_guard と unique_lock のように、機能に制限のある簡易版や高速版が用意されているものもあり、latch と barrier もその一環と思われる[2]。
arrive と wait
barrier のカウンタは自動的にリセットされる。つまり、カウンタがゼロになった瞬間に初期値に戻るという仕様なのだが、これが wait 関数を少しだけ複雑にしてしまう。
これはどういうことかというと、例えば残りカウントが1の状態でほぼ同時に2つのスレッドがカウントダウン処理をしたと仮定しよう。
std::barrier barrier(3);
barrier.arrive(); // スレッドX
barrier.arrive(); // スレッドY
// 残りカウント1で、以下の2つがほぼ同時に発生
barrier.arrive(); // スレッドA
barrier.arrive(); // スレッドB
// 同じ流れで以下の2つもほぼ同時に発生
barrier.wait(); // スレッドA
barrier.wait(); // スレッドB
この擬似コードは、初期カウント3の barrier が残りカウント1の状態でスレッドAとスレッドBからほぼ同時にカウントダウンされた様子を表している。その後スレッドAとスレッドBが wait を呼び出して待ちに入るわけだが、この時に実際に待ち状態になるのはAとBのどちらか片方だけである。先に arrive を呼び出したスレッドはその時点でカウンタがゼロになるので、wait で待つ必要がない。逆に後から arrive を呼び出したスレッドは、カウンタが3に戻った状態での arrive なので wait で待つべきスレッドとなる。
さて、barrier 君は最後の2行のどちらを待ち状態にすれぼ良いのだろう? この問題に対処するために、arrive は arrival_token を返し、wait はこの arrival_token を引数として要求する。つまり先ほどの擬似コードはこう書き換えられる。
// 残りカウント1で、以下の2つがほぼ同時に発生
auto tokenA = barrier.arrive(); // スレッドA
auto tokenB = barrier.arrive(); // スレッドB
// 同じ流れで以下の2つもほぼ同時に発生
barrier.wait(std::move(tokenA)); // スレッドA
barrier.wait(std::move(tokenB)); // スレッドB
barrier 君は tokenA と tokenB のどちらが先に到着したかを知っているので、wait に渡された token を見て待ち状態をコントロールできる。
latch の count_down が barrier では arrive に変化しているのもこの辺りが理由と思われる。barrier で適切に待つためには「カウントダウン (count_down) 処理をどのタイミングで呼び出したのか」という情報も必要で、それは「待ち合わせ場所にいつ到着 (arrive) したのか」というメタファーの方が理解しやすい。
ちなみにこの arrive には [[nodiscard]]
属性が付いており、arrival_token を無視するとコンパイラに小言を言われる。
また、wait に渡す arrival_token は省略できない。これは barrier が latch の完全な上位互換ではないことを意味する。例えば上で挙げた latch の2つ目の使用例「全部揃うまで待つ」の latch を barrier に置き換えると次のようになる。
#include <iostream>
#include <thread>
#include <chrono>
#include <barrier>
#include <vector>
int main()
{
// 100個のオブジェクトの構築を待つが、メインスレッドで待つために1つ追加
std::barrier brr(100 + 1); // *** (A)
// 作業スレッド群
constexpr int NUM_THREADS = 3;
constexpr int NUMS[NUM_THREADS] = { 33, 33, 34 }; // 各スレッドの構築数(手抜き)
std::vector<std::thread> ths; ths.reserve(NUM_THREADS);
for (int i = 0; i < NUM_THREADS; ++i) {
ths.emplace_back(std::thread([&](int num) {
for (int i = 0; i < num; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 構築
std::cout << ".";
auto token = brr.arrive(); // カウントダウン *** (B)
}
}, NUMS[i]));
}
// メインスレッド
std::cout << "waiting" << std::endl;
brr.arrive_and_wait(); // 作業スレッド群による構築の完了を待つ *** (C)
std::cout << "done" << std::endl;
for (auto& th : ths)
th.join();
}
変更箇所は (A), (B), (C) の3か所。
(A). カウントダウン無しでは待てないので、メインスレッドで待つためにカウントを1つ追加
(B). count_down を arrive に変更(コンパイラがうるさいので戻り値を受け取っておく[3])
(C). wait を arrive_and_wait に変更
CompletionFunction
barrier は fork-join model のタスク並列機構を実現するのに適しており、各フェーズの境目で決まった処理を実行できると都合が良い。
そのための仕組みとして、コンストラクタで CompletionFunction を登録できるようになっている。カウンタがゼロになった段階で、待ち状態になっているスレッドのうちのどれか1つがこの関数を呼び出し、それが終わると残りのスレッドも動き始める。
#include <iostream>
#include <thread>
#include <barrier>
#include <vector>
int main()
{
constexpr int NUM_THREADS = 3;
std::barrier brr(NUM_THREADS,
[&]() noexcept { // CompletionFunciton
// どのスレッドが使われているのかな?
std::cout << std::this_thread::get_id() << std::endl;
});
// スレッド群
std::vector<std::thread> ths; ths.reserve(NUM_THREADS);
for (int i = 0; i < NUM_THREADS; ++i) {
ths.emplace_back(std::thread([&]() {
for (int i = 0; i < 10; ++i) { // 10回繰り返して終了
std::cout << ".";
brr.arrive_and_wait();
}
}));
}
// メインスレッドは待つだけで何もしない
for (auto& th : ths)
th.join();
}
3つのスレッドがそれぞれドットを出力して相手を待つだけの簡単なサンプル。全員揃ったらコンストラクタで登録した CompletionFunction が呼び出され、この関数は現在のスレッドのIDを出力する。
実行結果は以下の通り。
...23988
...4520
...23988
...4520
...23988
...4520
...23988
...4520
...4864
...4520
見事にバラバラ。各スレッドの実行時間に差をつけると偏るので到着順に依存していると思われるが、仕様で「どれか」と決められているので、どのスレッドが使われていても仕様通りとなる。
参考
- N4861 32.8.1 Latches [thread.latch]
- N4861 32.8.2 Barriers [thread.barrier]
- std::latch (cpprefjp)
- std::barrier (cpprefjp)
Discussion