未来への約束(C++ の future と promise に関する備忘録)
マルチスレッドが必要になった時は、本当にそれが必要なのかどうかを考えたほうが良い。
使わなくて済むならそれに越したことはないし、現にそうやって過ごしてきたので、今さらになって10年以上前に標準ライブラリに追加されたスレッドサポートライブラリについて調べている。
正直 thread と mutex さえ使えれば何とかなるだろうと思っていたが、ライブラリの充実っぷりからして、そんなに甘い世界ではなさそうな気がしてきた。とりあえず std::future と std::promise(とその周辺)についてまとめておく。
future と promise の基本
future を一言で表現すると「そのうち値が現れる転送ボックス」といった感じになる。同様に promise を一言で表現するなら「値を入れると future に転送される箱」となるだろう。
作業スレッドからの結果を呼び出し側のスレッドで取得したい場合、呼び出し側は future の get を使って値を取りに行くだけでよい。ただし値が得られるまで処理はブロックされる。この値は作業スレッド側が promise に対して set_value を呼び出すことで future に転送される。
future と promise はどちらも <future> ヘッダにて定義されており、promise の get_future を使って future インスタンスを生成することで互いの通信チャネルを確立する。
簡単な例を見てみよう。
#include <iostream>
#include <future>
int main()
{
std::promise<int> pr; // int を受け渡すための promise を作成
std::future<int> ft = pr.get_future(); // これで pr と ft が繋がった
pr.set_value(100); // pr に 100 を入れると…
int i = ft.get(); // ft でその 100 を受け取れる(スレッド関係ないやん…)
std::cout << i << std::endl;
}
簡単すぎた……当然だがスレッドをまたいだ値の受け渡しも可能。(というかそのためのライブラリなので。)
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
int main()
{
std::promise<int> pr; // int を受け渡すための promise を作成
std::future<int> ft = pr.get_future(); // これで pr と ft が繋がった
std::thread th([&pr]() { // 作業スレッドを作成(pr の寿命に注意)
std::this_thread::sleep_for(std::chrono::seconds(3)); // すごい計算の結果、
pr.set_value(100); // 100 が得られたので、それを future に転送
});
// こっちはこっちでそれなりの処理をしつつ…
std::this_thread::sleep_for(std::chrono::seconds(2));
int i = ft.get(); // ft 経由で作業スレッド側の pr.set_value を待つ
std::cout << i << std::endl;
th.join(); // pr の生存をこの行に頼っているが、マジメにやるなら move を使うべき
}
送信側が promise、受信側が future。「そのうち値を渡すから待ってて」と約束(promise)し、将来(future)その値が得られると信じて待つ。このワードセンスよ。
ちなみにこれは余談だが、pr の寿命についてはこちらの記事がとても参考になる[1]。
future と promise の色々な使い方
延々と待ち続けたくない場合
future オブジェクトに対して get を呼び出すと、promise 側から値が得られるまで延々と待ち続けるわけだが、待たずに値があるかどうかだけを確認したいこともあるだろう。そんな時は get ではなく wait_for を使えばよい。
// 待ち時間ゼロで wait_for を呼ぶと、値の有無を即座に返す
while (ft.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
// ready が返るまで何か別のことをし続ける
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << '.';
}
int i = ft.get(); // wait_for が ready を返しているので、ここでは待たない
もちろん wait_for を使えば「500 ms だけ待ってみる」といったことも可能。上の例の 500 ms 待つ部分は wait_for に移動できる。
複数のスレッドで同じ future から値を取り出したい場合
実は future の get は1回しか使えない。値の取得が終わると valid が false を返すようになり、2回目以降の呼び出しでは std::future_error 例外が送出される。
そのため、複数のスレッドが1つの promise からの値を待ち望んでいるといったようなケースに future オブジェクトは使えない。2つ目以降のスレッドが値を取得できないという問題が発生するからだ[2]。
そういったケースでは、future オブジェクトを各スレッドで共有するのではなく、future オブジェクトを共有専用の std::shared_future に変換して、そのコピー[3]を各スレッドに分配するという方法をとらなければならない。(今回の例はこれまでとは逆に作業スレッド側で待っている点に注意。)
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
#include <cassert>
int main()
{
std::promise<int> pr;
std::future<int> ft = pr.get_future(); // これで pr と ft が繋がった
std::shared_future sft = ft.share(); // future から shared_future に変換
assert(!ft.valid()); // この時点で ft は invalid になっている
std::thread th1([sft]() { // 作業スレッド1を作成(sft はコピーで渡す)
std::this_thread::sleep_for(std::chrono::seconds(2)); // 何か処理をして、
int i = sft.get(); // promise からの値を待つ
std::cout << "th1: " << i << std::endl;
});
std::thread th2([sft]() { // 作業スレッド2を作成(sft はコピーで渡す)
std::this_thread::sleep_for(std::chrono::seconds(1)); // 何か処理をして、
int i = sft.get(); // promise からの値を待つ
std::cout << "th2: " << i << std::endl;
});
// こっちはこっちでそれなりの処理をしつつ…
std::this_thread::sleep_for(std::chrono::seconds(3));
pr.set_value(100); // th1 と th2 に値を送信
th1.join();
th2.join();
}
クソデカオブジェクトを受け渡すとどうなるのか
たぶんコレと、
void promise::set_value(R&& r);
→ N4861 32.9.6/16
コレで move が効くはず。(手抜き)
future::get() returns the value v stored in the object's shared state as std::move(v).
→ N4861 32.9.7/16.1
例外処理を追加したい
future/promise の仕組みを使えば、future に対する get で作業スレッドの結果を取得できる。例えば巨大なファイルから何らかの特徴量を抽出するといったような時間のかかる処理を裏で走らせつつ、その特徴量を使った別の処理のための準備をした後で、おもむろに get を使って特徴量を取り出すようなケースを考えてみよう。この時、ファイル破損によるエラーはどこでどのように受け取れば良いのだろう?
get でエラーを意味する特徴量を返す? nullptr を返す? このようなケースでは一般に例外が使われるように、future/promise でも例外を扱うことができる。
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
#include <stdexcept>
int main()
{
std::promise<int> pr;
std::future<int> ft = pr.get_future();
std::thread th([&pr]() {
try {
std::this_thread::sleep_for(std::chrono::seconds(2));
throw std::runtime_error("file is broken"); // エラー発生!
pr.set_value(100); // 結果を ft に転送…できない
}
catch (...) {
// 飛んできた例外を promise に渡す
pr.set_exception(std::current_exception());
}
});
// こっちはこっちでそれなりの処理をしつつ…
std::this_thread::sleep_for(std::chrono::seconds(1));
try {
int i = ft.get(); // 作業スレッドから結果を取得...できない
std::cout << i << std::endl;
}
catch (const std::exception& err) { // promise に渡された例外は ft.get で送出
std::cout << err.what() << std::endl;
}
th.join();
}
このように、set_value の代わりに set_exception で例外を転送すればよい。
packaged_task と async
ついでなので、future/promise と同じ <future> ヘッダに含まれている packaged_task と async にも触れておこう。
packaged_task
前述の例外処理の例を見ると、int i = ft.get();
の部分はint i = timeConsumingFunc();
と置き換えても良さそうなくらいに関数呼び出しに似ている。何か時間のかかる処理をする関数を呼び出したようなイメージだ。例外も普通に飛んでくる(ように見える)し。
そして、関数の戻り値を promise に放り込んで future で取得するという一連の流れが見えてくると、この promise の部分はユーザから隠蔽できるような気がしてくる。
これを1つにまとめたものが packaged_task だ。
#include <iostream>
#include <future>
int main()
{
// std::function の要領で関数を登録する
std::packaged_task<int(int, int)> task([](int a, int b) { return a + b; });
std::future<int> ft = task.get_future(); // 関数の戻り値を受け取る future
task(3, 5); // 関数のように呼び出せるが、戻り値は void
int i = ft.get(); // task の戻り値は future を経由して取得できる
std::cout << i << std::endl;
}
まだスレッドを使っていない点に注意。このプログラムはシングルスレッドで動く。関数の戻り値が(隠蔽された)promise に放り込まれ、それを future で取得するという流れが見えてくると思う。promise に放り込む値を関数の戻り値に限定することで、promise に対する操作どころか存在そのものをユーザから隠蔽しているのだ。
そして、future/promise の機構はスレッドをまたいだ時に本領を発揮する[4]。
#include <iostream>
#include <future>
#include <thread>
#include <stdexcept>
int main()
{
// std::function の要領で関数を登録する
std::packaged_task<int(int, int)> task([](int a, int b) {
if (a < 0 || b < 0) // 例外も飛ばせる
throw std::invalid_argument("positive only");
return a + b;
});
std::future<int> ft = task.get_future(); // 関数の戻り値を受け取る future
std::thread th(std::move(task), 3, 5); // task(3, 5) を別スレッドで実行
try {
int i = ft.get(); // task の戻り値は future を経由して取得できる
std::cout << i << std::endl;
}
catch (const std::exception& err) { // 例外も普通に飛んでくる
std::cout << err.what() << std::endl;
}
th.join();
}
ここで promise に対する set_value が隠蔽されているのと同様に、set_exception も隠蔽されている点にも注目したい。どちらを呼び出すかといった部分も含めて、全て packaged_task 内で処理されているため、ユーザは単に値を返すか例外を throw するだけでよい。
async
ここまでくると、もう thread の作成すら面倒に思えてくる。
Yes! そこまで面倒を見てくれるのが async である。
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
int main()
{
std::future<int> ft = std::async(
[](int a, int b) {
// 分かりやすいように待ちを入れておく
std::this_thread::sleep_for(std::chrono::seconds(2));
return a + b;
},
3, 5);
std::cout << "waiting..." << std::endl;
int i = ft.get(); // 戻り値は future を経由して取得できる
std::cout << i << std::endl;
}
thread オブジェクトが見事に隠蔽されている。時間のかかる処理を async で開始すると同時に future を取得、あとは get するだけというコードだ。非常に分かりやすい。
そして勘の良い方なら、これまでずっと最後に置いてあった th.join();
の1行が消えていることにお気付きであろう。そう、join も自動で行ってくれるのである。
なお、お手軽さを強調するために async の第1引数を省略するバージョンを利用したが、厳密には上記のコードは実装依存であり、実装によっては非同期処理にならない可能性もある。
別スレッドによる非同期処理を明示したバージョンがこちら。上のコードに async の第1引数を追加しただけである。
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
int main()
{
std::future<int> ft = std::async(
std::launch::async, // 非同期処理であることを明示する
[](int a, int b) {
// 分かりやすいように待ちを入れておく
std::this_thread::sleep_for(std::chrono::seconds(2));
return a + b;
},
3, 5);
std::cout << "waiting..." << std::endl;
int i = ft.get(); // 戻り値は future を経由して取得できる
std::cout << i << std::endl;
}
第1引数に std::launch::async と指定することで非同期処理を明示できる。これを std::launch::deferred に変えると遅延モードとなり、get の呼び出し時に関数が実行されるようになる。そして、第1引数を省略した場合はこの遅延モードか非同期モードかの2択が実装依存となる[5]。
ところで、async で隠蔽された thread オブジェクトに対する join はどのタイミングで呼ばれるのだろうか? それは N4861 32.9.9/6.4 で以下のように分かりにくく言及されている。
If the implementation chooses the launch::async policy,
the associated thread completion synchronizes with the return from the first function that successfully detects the ready status of the shared state or with the return from the last function that releases the shared state, whichever happens first.
(以下、筆者による超訳)
std::launch::async が指定された場合、以下のどちらかの(最初に起きた方の)タイミングでスレッドの終了を待機する。
- (get や wait 等で)shared state が ready 状態になった後
- shared state を最後に手放す関数(つまり ~future)が呼ばれた後
1つ目は、async で登録した関数が別スレッドで実行され、戻り値(あるいは例外)が promise に set された後で join するという自然な流れを想定しており、2つ目は、async で登録した関数が別スレッドで実行中に future のデストラクタが呼ばれるパターンを想定している。
この2つ目のパターンが曲者で、以下のような直感に反する挙動の原因となる。
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
int task(int id)
{
std::cout << "task " << id << ": started" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "task " << id << ": finished" << std::endl;
return 100;
}
int main()
{
{
// ft1 と ft2 はスコープの終わりで破棄される
auto ft1 = std::async(std::launch::async, task, 1); // (a)
auto ft2 = std::async(std::launch::async, task, 2); // (b)
}
{
// async が生成する future は即座に破棄される
std::async(std::launch::async, task, 1); // (c)
std::async(std::launch::async, task, 2); // (d)
}
}
task 1: started
task 2: started
task 1task 2: finished
: finished
task 1: started
task 1: finished
task 2: started
task 2: finished
(a) で作られた future と (b) で作られた future はスコープの終わりまで(まぁ一瞬ではあるが)生存し続け、スコープの終わりで順番に破棄されると同時に join が呼ばれ、各スレッドの終了を待つ。この時、それぞれの処理はそれぞれのスレッドで実行しているため、おおよそ2秒ほどで両方とも処理を終える。また、面白いことに、2つのスレッドがほぼ同時に終わる場合もあり、この例では finished のメッセージが完全に競合状態を引き起こしている[6]。
一方、(c) では async が生成した future オブジェクトを誰も受け取らないため、セミコロンの時点で破棄されると同時に join で待機。この時点ではまだ (d) の async が呼ばれていないため、ここでは (c) の処理を2秒ほど待つだけとなる。その後 (d) が呼ばれて同じことが起き、さらに2秒ほど待った後で終了となる。
このように async の戻り値を無視するかどうかで挙動が変わるのはおかしいから修正すべきという提案(N3451 async and ~future)も提出されたが、これは C++14 の時点で却下。C++20 では std::async の一部に C++17 で追加された [[nodiscard]]
を付けることでお茶を濁している。
つまり、C++20 で上記のコードをコンパイルすると、(c) と (d) のところで「戻り値を無視したらマズイよ」と警告が出るようになる[7]。
用途に応じた使い分け
これで future と promise まわりのライブラリは一通り網羅したはず。
future/promise + thread
< future/packaged_task + thread
< future/async
の順に抽象度が上がっていくが、ベースとなる future/promise が「同期機構を備えた通信チャネル」であるという点を踏まえると、用途に応じて使い分けるというよりは、可能な限り抽象度の高いライブラリを選択するという方針で良いのではないだろうか。
また、生の std::thread を作って join で終了を待つだけでも充分にお手軽な気もするが、例外をうまく扱えるという点で future を利用する価値はあると思う。
future/async
遅延モードはマルチスレッドではないので除外。非同期モードのみと考えてよい。
以下の制限をすべて許せるなら async が楽。
- thread が勝手に始まる
- 関数が値を返すと thread も終了する
- future のデストラクタで(まだスレッドが動作中ならば)スレッドの終了(=関数の終了)を待つ
future/packaged_task + thread
自前で thread を用意しなければならないが、その分だけ自由度が上がる。
関数の終了が promise への値のセットという分かりやすさを維持したまま、thread の細かい制御が可能という特徴は、packaged_task を登録する形の thread pool と相性が良さそう。
future/promise + thread
自由度は高いが、これらすべてのコンポーネントを生で使うのは少しやりすぎな印象。thread 単体や thread + mutex と比べると、例外処理が楽になるというメリットは残る。
あとがき
future と promise まわりに限定したにも関わらず結構な分量になった。やはりマルチスレッド関連は魔境である。
加筆・修正は随時行う予定。
-
私は例に挙げられた「不具合のあるコード」の不具合を見破れなかったし、「誤った解釈」の部分は私の思考そのものだった。ずぼらな私は確実にハマる。本当に怖い。 ↩︎
-
そもそも future は複数のスレッドから同時にアクセスされることを想定していないので、get 以外の部分でも競合が発生する可能性が高い。N4861 32.9.7/2 の "Member functions of future do not synchronize with themselves or with member functions of shared_future." がこれに該当するのだと思う。 ↩︎
-
N4861 32.9.8/2 に "Member functions of shared_future do not synchronize with themselves, but they synchronize with the shared state." とあるので、shared_future 自体は future と同様に複数のスレッドから同時に操作されることを想定していないと思われる。 ↩︎
-
というよりも、そのためのライブラリなので。ちなみに task を実行する前に get を呼び出してしまうと、誰も set_value してくれないので簡単にデッドロックする。 ↩︎
-
これはマルチスレッドに対応していない処理系を考慮したもの。そのような処理系ではデフォルトの動作が遅延モードになり、std::launch::async を指定すると future_error 例外が送出される。 ↩︎
-
std::osyncstream を使えば回避可能 ↩︎
-
個人的には、まぁこれくらいが落としどころとしては妥当かなという印象。 ↩︎
Discussion