WG21/P2300 std::execution
P2300R10 std::execution 🎉[Approved C++26]
- https://github.com/cplusplus/papers/issues/1054
- https://github.com/cplusplus/sender-receiver
- https://github.com/NVIDIA/stdexec
- https://github.com/intel/cpp-baremetal-senders-and-receivers
- https://github.com/bemanproject/execution
onihusubeさん解説
- [C++]WG21月次提案文書を眺める(2021年06月)
- [C++]WG21月次提案文書を眺める(2021年07月)
- [C++]WG21月次提案文書を眺める(2021年10月)
- [C++] ExecutorとNetworking TSで起きていたこと
- [C++]WG21月次提案文書を眺める(2021年12月)
- [C++]WG21月次提案文書を眺める(2022年01月)
- [C++]WG21月次提案文書を眺める(2022年04月)
- [C++]WG21月次提案文書を眺める(2023年02月)
- [C++]WG21月次提案文書を眺める(2023年05月)
- [C++]WG21月次提案文書を眺める(2024年04月)
- [C++]WG21月次提案文書を眺める(2024年07月) [Approved C++26]
Approved C++26🎉
- P3325R5 A Utility for Creating Execution Environments [2024-11 Wrocław]
- P3396R1 std::execution wording fixes [2024-11 Wrocław]
- P2079R10 Parallel scheduler
- P3149R11 async_scope -- Creating scopes for non-sequential concurrency
- P3284R4
write_envandunstoppableSender Adaptors - P3433R1 Allocator Support for Operation States (PDF)
- P3481R4
std::execution::bulk()issues - P3552R3 Add a Coroutine Task Type (PDF)
- P3557R3 High-Quality Sender Diagnostics with Constexpr Exceptions
- P3570R2 optional variants in sender/receiver
-
P3682R0 Remove
std::execution::split(PDF)
Others
- P2403R0 Presentation on P2300 - std::execution (PDF) [info]
- P2428R0 Issues and questions with P2300 (PDF) [info]
- P2430R0 Partial success scenarios with P2300 (PDF) [info]
- P2431R0 Plans for P2300 Revision 2 (PDF) [info]
-
P2444R0 The Asio asynchronous model(PDF) [No consensus] -
P2463R0 The Asio asynchronous model, Slides for P2444r0(PDF) -
P2464R0 Ruminations on networking and executors[No consensus] -
P2469R0 Response to P2464: The Networking TS is baked, P2300 Sender/Receiver is not.(PDF) [No consensus] - P2470R0 Slides for presentation of P2300R2: std::execution (sender/receiver) (PDF) [info]
- P2471R1 NetTS, ASIO and Sender Library Design Comparison (PDF) [info]
-
P2479R0 Slides for P2464 - Senders and Receivers, Composition, for real(PDF) - P2480R0 Response to P2471: "NetTS, Asio, and Sender library design comparison" - corrected and expanded (PDF) [info]
- P2500R2 C++ parallel algorithms and P2300
-
P2504R0 Computations as a global solution to concurrency[closed] -
P2532R0 Removing exception_ptr from the Receiver Concepts[Merged into P2300] -
P2555R1 Naming improvements for std::execution[Reject] -
P2690R1 Presentation for C++17 parallel algorithms and P2300(PDF) [Superseded P2500] -
P2855R1 Member customization points for Senders and Receivers[Merged into P2300] -
P2999R3 Sender Algorithm Customization[Merged into P2300] - P3090R0 std::execution Introduction (PDF) [info]
- P3143R0 An in-depth walk-through of the example in P3090R0 (PDF) [info]
-
P3175R3 Reconsidering the std::execution::on algorithm[Merged into P2300] -
P3187R1 remove ensure_started and start_detached from P2300[Merged into P2300] -
P3409R1 Enabling more efficient stop-token based cancellation of senders[closed] -
P3456R0 system_scheduler on Win32, Darwin and Linux[closed] -
P3685R0 Rename async_scope_token(PDF) [Merged into P3149] -
P3706R0 Rename join and nest in async_scope proposal[Merged into P3149]
Misc.
P2300 解読用メモ
Receiver
- completion function =
execution::set_{value,error,stopped}(CPO) -
complete(index, state, rcvr, tag, args...)-
Index::value= 基本0/ 子Senderインデクス(例:when_all_t) -
Tag型 =set_{value,error,stoppped}_t(completion tag) -
args...= completion function引数(value=N個/error=1個/stopped=0個)
-
Sender
- Senderオブジェクト = タプル [
sender-cpo-t, CPO引数, 子Sender... ] 相当-
sender-cpo-t=tag_of_t<Sndr>Sender CPOの型(例:just_t, then_t, ...) product-type<Tag, Data, Child...>
-
-
impls-for<sender-cpo-t>特殊化による静的ディスパッチ-
get-attrs(data, child...)= Sender::get_envメンバ関数の処理 -
get-env(index, state, rcvr)= Receiver::get_envメンバ関数の処理 -
get-state(sndr, rcvr)= OperationStateオブジェクトを返す -
start(state, rcvr, ops...)= OperationState::startメンバ関数の処理 -
complete(tag, rcvr)= Receiver::set_{value,error,stopped}メンバ関数の処理
-
-
completion_signatures<Fns...>= 完了操作の関数シグネチャ集合-
Fns=set_value_t(Vs...)|set_error_t(Err)|set_stopped_t()
-
-
get_completion_scheduler<completion-tag>(get_env(sndr))->scheduler-
set_{value,error,stoppped}_t別の完了スケジューラ(completion scheduler)
-
Queries
-
execution::get_env(sndr/rcvr)CPO- environment == Receiverのqueryable object
- attributes = Senderのqueryable object
- domain = 子Senderの完了Scheduler識別,
transform_{sender,env}+apply_senderカスタマイズポイント-
get-domain-early(sndr): 各種Sender CPO定義で利用 -
get-domain-late(sndr, env):connectとget_completion_signaturesで利用
-
-
default_domainクラス-
transform_sender(sndr, env?):sndr.transform_sender(sndr, env...)orsndr -
transform_env(sndr, env):sndr.transform_env(sndr, env)orenv -
apply_sender(sender-cpo-t, sndr, args...):sender-cpo-t().apply_sender(sndr, args...) - 注:
execution::同名CPOは先頭引数にDomainが追加される
-
C++コルーチン統合
Coroutine ⇄ Sender
- Coroutine Awaitable型: SenderとしてReceiverと接続可能
-
senderコンセプト =is-sender | is-awaitable - 完了シグネチャ =
set_value_t(T)+set_error_t(exception_ptr)+set_stopped_t()
-
- Promise型が条件を満たすCoroutine: co_await式でSenderを待機可能
-
awaitable-senderコンセプト =requires(Promise& p) { p.unhandled_stopped() -> coroutine_handle; } -
execution::with_awaitable_senders<Promise>からCRTP派生-
P::await_transform(v)->execution::as_awaitable(v, p)CPO -
P::unhandled_stopped->std::terminateプログラム終了 -
execution::stopped_as_{optional,error}CPOで停止完了を明示ハンドリング -
P::set_continuation(coroutine_handle)=set_stopppedの継続コルーチンハンドルを設定
-
-
-
P3570 optional variants in sender/receiver (Approved)
- Awaitable Sender co_await式オペランド指定時値完了シグネチャのカスタマイズポイントを定義
-
execution::get_await_completion_adapterクエリオブジェクトで変換用Senderアダプタを返す -
execution::as_awaitableCPOでco_await演算子適用時にSender変換(Senderアダプタ適用)- 例:
set_value()+set_value(T)->set_value(optional<T>)
- 例:
P3552R3 std::execution::task<T> (Approved)
-
execution::task<T, Environment>クラステンプレート- Sender(
task::sender_concept) かつ コルーチン戻り値型(task::promise_type) -
co_return value->set_value(value) -
throw ex->set_error(ex) -
co_yield execution::with_error{err}->set_error(err) -
co_await execution::just_stopped()->set_stopped() -
co_await execution::change_coroutine_scheduler{sch}= Scheduler変更 -
Environment::allocator_typeorstd::allocator<std::byte> -
Environment::scheduler_typeorexecution::task_scheduler -
Environment::stop_source_typeorstd::inplace_stop_source -
Environment::error_typesorcompletion_signatures<set_error_t(exception_ptr)>
- Sender(
-
execution::task_schedulerクラス- Schedulerの型消去保持
-
execution::inline_schedulerクラス-
start(op)->set_value呼び出し
-
-
execution::affine_on(sndr, sch)CPO [pipeable]- Senderを指定Scheduler上で完了操作
Papers
P2999R3 Sender Algorithm Customization (Merged)
For every invocation of a sender algorithm, the implementation looks for a customization twice: once immediately while the algorithm is constructing a sender to return, and once later when the resulting sender is
connect-ed with a receiver.
-
get-domain-early(sndr):Sender作成時のカスタマイズ -
get-domain-late(sndr, get_env(rcvr)):connect時のカスタマイズ
- Dispatching via execution domain tags
- Late (sender/receiver connection-time) customization
- Early (sender construction-time) customization
- Decomposable senders
P3175R3 Reconsidering the std::execution::on algorithm (Merged)
| P2999R3 | P2300R10 | 引数 | 動作 |
|---|---|---|---|
on |
starts_on |
(sch, sndr) | 指定Schedulerで開始 |
transfer |
continues_on |
(sndr, sch) | 指定Schedulerで継続 |
| N/A | on |
(sch, sndr) (sndr, sch, closure) |
指定Schedulerで実行(戻ってくる) |
タスク並列実行Senderアルゴリズム
P3481R4 std::execution::bulk() issues (Approved)
| Sender | P2300R10 | P3481R4 | 動作 |
|---|---|---|---|
bulk |
(sndr, shape, f) | (sndr, policy, shape, f) |
bulk_chunkedへ処理委譲 |
bulk_chunked |
N/A | (sndr, policy, shape, f2) | 部分範囲単位で並列実行許可 |
bulk_unchunked |
N/A | (sndr, shape, f) | インデクス単位で並列実行許可 |
-
shape:インデクス範囲[0, shape)std::integralコンセプトを満たす整数型 -
f:(index, values...) -> R(戻り値は破棄) -
f2:(begin, end, values...) -> R(戻り値は破棄) -
policy:f,f2動作仕様par=並列実行可能/seq=逐次実行のみ- 逐次/並列実行制御はSchedulerによるカスタマイズ実装提供が必要
-
execution::get_parallel_scheduler()Parallel Senderに実装委譲(P2079R10)
- Senderアダプタデフォルト実装
-
bulk:bulk_chunked(sndr, policy, shape, [](b,e,vs...){ while(b!=e){f(b++, vs...);}}) -
bulk_chunked:Scheduler上でf2(0, shape, value...)呼び出し -
bulk_unchunked:Scheduler上でf(i, values...)呼び出し(i=[0, shape))
-
P2079R10 Parallel scheduler (Approved)
-
execution::get_parallel_scheduler関数-
forward_progress_guarantee::parallel保証Scheduler
-
-
execution::schedule(sch)Schedule sender-
execution::bulk_chunked,bulk_unchunkedをカスタマイズ(domain経由) - Receiver停止要求検知 ->
set_stopped() - 内部エラー発生 ->
set_error(exception_ptr)
-
-
execution::system_context_replaceability名前空間(SCR)- システム提供Parallel Schedulerの動作カスタマイズポイント
- フロントエンド=
bulkファミリSenderAPI実装 + バックエンド=SchedulerAPI実装 -
SCR::query_parallel_scheduler_backend関数- リンク時にユーザ置換可能(replaceable)な関数として定義
- バックエンド実装(Parallel Scheduler)インスタンスを返す
-
SCR::parallel_scheduler_backendクラス- Parallel Scheduler APIの基底クラス
-
schedule:sch.schedule()のバックエンド実装 -
schedule_bulk_(un)chunked:bulk_(un)chunked開始時のバックエンド実装
-
SCR::receiver_proxy>SCR::bulk_item_receiver_proxyクラス- バックエンド実装からアクセスされるフロントエンドAPIの基底クラス
-
set_value(),set_error(exception_ptr),set_stopped():完了ハンドラ実装 -
try_query<P>(q) -> optional<P>:q=get_stop_tokenクエリ応答 -
execute(begin, end):bulk_(un)chunked処理のf(i)/f2(i, j)呼び出し
P3284R4 write_env and unstoppable Sender Adaptors (Approved)
-
write_env(sndr, env):Receiver環境(environment)上書き -
unstoppable(sndr):write_env(sndr, prop(get_stop_token, never_stop_token{}))
P3682R0 Remove std::execution::split (Approved)
Deficiencies of
std::execution::split
- Dynamic Allocation
- Shared Ownership
- Eagerness
- Naming
非同期スコープ
P3149R11 async_scope - Creating scopes for non-sequential concurrency (Approved)
- 構造化並行性(structured concurrency):一連の非同期処理完了までブロッキング待機する
- 非同期スコープ(async scope):複数Sender非同期処理と紐づける範囲(合流終端)を表現
-
simple_counting_scopeクラス:カウント数ベースの非同期スコープ型 -
counting_scopeクラス:非同期キャンセル対応版simple_counting_scope -
scope_tokenコンセプト:Senderと紐づける非同期スコープトークン
-
-
scope_tokentry_assosiate() -> booldisassosiate() noexcept -> voidwrap(sndr) -> sender
-
associate(sndr, token) -> sender- 入力
sndrと非同期スコープと紐付けたSenderを返す(token.try_associate) - 紐付け失敗時の返却Senderは開始(start)==
set_stopped() - 返却Senderの完了シグネチャ集合は入力
sndrと同一
- 入力
- Sender即時開始
-
spawn(snd, token, env?) -> void:完了シグネチャset_value_t()|set_stopped_t() -
spawn_future(sndr, token, env?) -> sender:完了シグネチャset_value_t(Vs...)|set_error_t(Err)|set_stopped_t()
-
P3296R4 let_async_scope (WIP)
- 非同期スコープSenderアダプタ
-
let_async_scope(sndr, f):let_async_scope_with_error<exception_ptr>(sndr, f) -
let_async_scope_with_error<Errs...>(sndr, f):f(token, values...)呼び出し
-
Papers
- Executors: a Change of Perspective, Lucian Radu Teodorescu, 2021/10
- P2504R0 Computations as a global solution to concurrency, Lucian Radu Teodorescu, 2021/12
-
P3090R0
std::executionIntroduction, Inbal Lev, Eric Niebler, 2024/2 - What are Senders Good For, Anyway?, Eric Niebler, 2024/2
- Senders/receivers in C++, Lucian Radu Teodorescu, 2024/8
- Using Sender/Receiver to Implement Control Flow for Async Procesing, Steve Downey, CppNow2023
- Extending std::execution - Implementing Custom Algorithms with Senders & Receivers, Robert Leahy, CppNow2025
Video: Working with Asynchrony Generally: From Zero to Sender/Receiver in ~60 minutes, Eric Niebler, 2022/6
Watch Eric Niebler do a live coding session on implementation of async patterns.
The code is available here: https://godbolt.org/z/dnrabsbdq
slides
Executers == A standard async programming model
- Sender: "lazy value"
- Receiver: "continuation", or "callback"
- Scheduler: "handle to a compute resource"
concept
- scheduler
-
schedule(scheduler) -> sender
-
- sender
-
connect(sender, receiver) -> operation_state
-
- receiver
-
set_value(receiver, values...) -> void -
set_error(receiver, error) -> void -
set_stopped(receiver) -> void
-
- operation_state
-
start(operation_state) -> void
-
- Step1:
just(T) -> Ssender factory- {
just_sender<T>,just_operation<R, T>} cout_receiver
- {
- Step2:
then(S, F) -> Ssender adaptor- {
then_sender<S, F>,then_receiver<S, F>,then_operation<S, R, F>}
- {
- Step3:
sync_wait(S) -> Rsender consumersync_wait_receiver<T>S::result_t
- Step4:
run_loopexecution context- {
run_loop::sender,run_loop::operation} run_loop::scheduler
- {
- Step5:
thread_contextexecution contextthread_context : run_loop
#include <execution>
using namespace stdexec = std::execution;
int main()
{
auto snd0 = stdexec::just(21);
// tag_of_t<decltype(snd0)> == just_t;
auto snd1 = snd0 | stdexec::then([](int n){ return n*2; });
// tag_of_t<decltype(snd1)> == then_t;
auto result = std::this_thread::sync_wait(snd1);
// decltype(result) == optional<tuple<int>>;
auto [value] = result.value();
// value == 42
}
stdexec::just(21);
// ↓ execution::just CPO
make-sender(just, product-type{21});
// ↓
basic-sender<just_t, int>{21};
snd0 | stdexec::then([](int n){ return n*2; });
// ↓
stdexec::then(snd0, [](int n){ return n*2; });
// ↓ execution::then CPO
auto fn = [](int n){ return n*2; };
transform_sender(get-domain-early(snd0), make-sender(then, fn, snd0));
// ↓
using Snd0 = basic-sender<just_t, int>;
using Fn = decltype(fn);
transform_sender(default_domain{}, basic-sender<then_t, Fn, Snd0>{fn, snd0});
// ↓ execution::transform_sender CPO
// then.transform_sender(sndr); is ill-formed
basic-sender<then_t, Fn, Snd0>{fn, snd0};
std::this_thread::sync_wait(snd1);
// ↓ this_thread::sync_wait CPO
apply_sender(get-domain-early(snd1), sync_wait, snd1);
// ↓
apply_sender(default_domain{}, sync_wait, snd1);
// ↓ execution::apply_sender CPO
default_domain{}.apply_sender(sync_wait, snd1);
// ↓
sync_wait.apply_sender(snd1);
// ↓
{
using Snd1 = basic-sender<then_t, Fn, basic-sender<just_t, int>>;
sync-wait-state<Snd1> state;
auto op = connect(snd1, sync-wait-receiver<Snd1>{&state});
start(op);
state.loop.run();
if (state.error) { rethrow_exception(std::move(state.error)); }
return std::move(state.result);
}
connect(snd1, sync-wait-receiver<Snd1>{&state});
// ↓ execution::connect CPO
auto rcvr = sync-wait-receiver<Snd1>{&state};
auto new_sndr = transform_sender(decltype(get-domain-late(snd1, get_env(rcvr))){}, snd1, get_env(rcvr));
new_sndr.connect(rcvr);
// ↓
{ // new_sndr
// ↓ execution::get_env CPO
// ↓ sync-wait-receiver::get_env
auto rcvr_env = sync-wait-env{&state->loop};
auto new_sndr = transform_sender(decltype(get-domain-late(snd1, rcvr_env)){}, snd1, rcvr_env);
// ↓
auto new_sndr = transform_sender(default_domain{}, snd1, rcvr_env);
// ↓ execution::transform_sender CPO
// then.transform_sender(snd1, rcvr_env) is ill-formed
auto new_sndr = snd1;
}
// ↓
snd1.connect(rcvr);
// ↓ basic-sender::connect
basic-operation<Snd1, sync-wait-receiver<Snd1>>{snd1, rcvr};
{ // state
impls-for<then>::get-state(snd1, rcvr);
// ↓ default-impls::get-state
fn // type Fn
}
{ // inner-ops
connect-all(op, snd1, indices-for<Snd1>());
// ↓
using Rcvr = sync-wait-receiver<Snd1>;
auto& [_, fn, snd0] = snd1;
product-type{connect(snd0, basic-receiver<Snd1, Rcvr, 0>{op})};
// ↓
product-type{snd0.connect(basic-receiver<Snd1, Rcvr, 0>{op})};
// ↓ execution::connect CPO
using Rcv1 = basic-receiver<Snd1, Rcvr, 0>{op};
product-type{basic-operation<Snd0, Rcv1>{snd0, Rcv1{op}}};
}
// ↓
op = basic-operation<Snd1, sync-wait-receiver<Snd1>>
rcvr = sync-wait-receiver<Snd1>{&state}
state = fn
inner-ops[0] = basic-operation<Snd0, Rcv1>
rcvr = Rcv1{op}
state = 21
using Snd0 = basic-sender<just_t, int>;
using Snd1 = basic-sender<then_t, Fn, Snd0>;
auto op = basic-operation<Snd1, sync-wait-receiver<Snd1>>{snd1, rcvr};
using Rcvr = sync-wait-receiver<Snd1>;
using Rcv1 = basic-receiver<Snd1, Rcvr, 0>{op};
start(op);
// ↓ execution::start CPO
op.start();
// ↓ basic-operation::start
impls-for<then>::start(op.state, rcvr, op.inner-ops);
// ↓ default-impls::start
execution::start(op.inner-ops[0]);
// ↓
impls-for<just>::start(op.inner-ops[0].state, Rcv1{op});
// ↓ impls-for<just>::start
auto [ts] = op.inner-ops[0].state; // 21
set_value(Rcv1{op}, ts);
// ↓ execution::set_value CPO
Rcv1{op}.set_value(21);
// ↓ basic-receiver::set_value
auto& complete = impls-for<then>::complete;
complete(0, fn, rcvr, set_value_t(), 21);
// ↓ impls-for<then>::complete
set_value(rcvr, invoke(fn, 21));
// ↓
set_value(rcvr, 42);
// ↓ execution::set_value CPO
rcvr.set_value(42);
// ↓ sync-wait-receiver::set_value
sync-wait-state<Snd1> state;
try {
state.result.emplace(42);
} catch (...) {
state.error = current_exception();
}
state.loop.finish();