Open16

WG21/P2300 std::execution

yohhoyyohhoy
yohhoyyohhoy

Approved C++26🎉

https://github.com/cplusplus/papers/issues?q=label%3Aplenary-approved label%3Asenders%2Freceivers

Others

Hidden comment
yohhoyyohhoy

P2300 解読用メモ

Receiver

  • completion function = execution​::​set_{value,error,stopped}(CPO)
  • complete(index, state, rcvr, tag, args...)
    • Index::value = 基本0 / 子Senderインデクス(例:when_all_t)
    • Tag型 = set_{value,error,stoppped}_t(completion tag)
    • args... = completion function引数(value=N個/error=1個/stopped=0個)

Sender

  • Senderオブジェクト = タプル [ sender-cpo-t, CPO引数, 子Sender... ] 相当
    • sender-cpo-t = tag_of_t<Sndr> Sender CPOの型(例:just_t, then_t, ...)
    • product-type<Tag, Data, Child...>
  • impls-for<sender-cpo-t> 特殊化による静的ディスパッチ
    • get-attrs(data, child...) = Sender::get_envメンバ関数の処理
    • get-env(index, state, rcvr) = Receiver::get_envメンバ関数の処理
    • get-state(sndr, rcvr) = OperationStateオブジェクトを返す
    • start(state, rcvr, ops...) = OperationState::startメンバ関数の処理
    • complete(tag, rcvr) = Receiver::set_{value,error,stopped}メンバ関数の処理
  • completion_signatures<Fns...> = 完了操作の関数シグネチャ集合
    • Fns = set_value_t(Vs...) | set_error_t(Err) | set_stopped_t()
  • get_completion_scheduler<completion-tag>(get_env(sndr)) -> scheduler
    • set_{value,error,stoppped}_t別の完了スケジューラ(completion scheduler)

Queries

  • execution::get_env(sndr/rcvr) CPO
    • environment == Receiverのqueryable object
    • attributes = Senderのqueryable object
  • domain = 子Senderの完了Scheduler識別, transform_{sender,env}+apply_senderカスタマイズポイント
    • get-domain-early(sndr) : 各種Sender CPO定義で利用
    • get-domain-late(sndr, env) : connectget_completion_signaturesで利用
  • default_domainクラス
    • transform_sender(sndr, env?) : sndr.transform_sender(sndr, env...) or sndr
    • transform_env(sndr, env) : sndr.transform_env(sndr, env) or env
    • apply_sender(sender-cpo-t, sndr, args...) : sender-cpo-t().apply_sender(sndr, args...)
    • 注:execution::同名CPOは先頭引数にDomainが追加される
yohhoyyohhoy

C++コルーチン統合

Coroutine ⇄ Sender

  • Coroutine Awaitable型: SenderとしてReceiverと接続可能
    • senderコンセプト = is-sender | is-awaitable
    • 完了シグネチャ = set_value_t(T)+set_error_t(exception_ptr)+set_stopped_t()
  • Promise型が条件を満たすCoroutine: co_await式でSenderを待機可能
    • awaitable-senderコンセプト = requires(Promise& p) { p.unhandled_stopped() -> coroutine_handle; }
    • execution​::​with_awaitable_senders<Promise>からCRTP派生
      • P::await_transform(v) -> execution::as_awaitable(v, p) CPO
      • P::unhandled_stopped -> std::terminateプログラム終了
      • execution::stopped_as_{optional,error} CPOで停止完了を明示ハンドリング
      • P::set_continuation(coroutine_handle) = set_stopppedの継続コルーチンハンドルを設定
  • P3570 optional variants in sender/receiver (Approved)
    • Awaitable Sender co_await式オペランド指定時値完了シグネチャのカスタマイズポイントを定義
    • execution::get_await_completion_adapterクエリオブジェクトで変換用Senderアダプタを返す
    • execution​::​as_awaitable CPOでco_await演算子適用時にSender変換(Senderアダプタ適用)
      • 例:set_value()+set_value(T) -> set_value(optional<T>)

P3552R3 std::execution::task<T> (Approved)

  • execution::task<T, Environment>クラステンプレート
    • Sender(task::sender_concept) かつ コルーチン戻り値型(task::promise_type)
    • co_return value -> set_value(value)
    • throw ex -> set_error(ex)
    • co_yield execution::with_error{err} -> set_error(err)
    • co_await execution::just_stopped() -> set_stopped()
    • co_await execution::change_coroutine_scheduler{sch} = Scheduler変更
    • Environment::allocator_type or std::allocator<std::byte>
    • Environment::scheduler_type or execution::task_scheduler
    • Environment::stop_source_type or std::inplace_stop_source
    • Environment::error_types or completion_signatures<set_error_t(exception_ptr)>
  • execution::task_schedulerクラス
    • Schedulerの型消去保持
  • execution::inline_schedulerクラス
    • start(op) -> set_value呼び出し
  • execution::affine_on(sndr, sch) CPO [pipeable]
    • Senderを指定Scheduler上で完了操作

yohhoyyohhoy

P2999R3 Sender Algorithm Customization (Merged)

For every invocation of a sender algorithm, the implementation looks for a customization twice: once immediately while the algorithm is constructing a sender to return, and once later when the resulting sender is connect-ed with a receiver.

  • get-domain-early(sndr):Sender作成時のカスタマイズ
  • get-domain-late(sndr, get_env(rcvr))connect時のカスタマイズ
  • Dispatching via execution domain tags
  • Late (sender/receiver connection-time) customization
  • Early (sender construction-time) customization
  • Decomposable senders

P3175R3 Reconsidering the std::execution::on algorithm (Merged)

P2999R3 P2300R10 引数 動作
on starts_on (sch, sndr) 指定Schedulerで開始
transfer continues_on (sndr, sch) 指定Schedulerで継続
N/A on (sch, sndr)
(sndr, sch, closure)
指定Schedulerで実行(戻ってくる)
yohhoyyohhoy

タスク並列実行Senderアルゴリズム

P3481R4 std::execution::bulk() issues (Approved)

Sender P2300R10 P3481R4 動作
bulk (sndr, shape, f) (sndr, policy, shape, f) bulk_chunkedへ処理委譲
bulk_chunked N/A (sndr, policy, shape, f2) 部分範囲単位で並列実行許可
bulk_unchunked N/A (sndr, shape, f) インデクス単位で並列実行許可
  • shape:インデクス範囲[0, shape) std::integralコンセプトを満たす整数型
  • f:(index, values...) -> R(戻り値は破棄)
  • f2:(begin, end, values...) -> R(戻り値は破棄)
  • policyf, f2動作仕様 par=並列実行可能/seq=逐次実行のみ
    • 逐次/並列実行制御はSchedulerによるカスタマイズ実装提供が必要
    • execution::get_parallel_scheduler()Parallel Senderに実装委譲(P2079R10)
  • Senderアダプタデフォルト実装
    • bulkbulk_chunked(sndr, policy, shape, [](b,e,vs...){ while(b!=e){f(b++, vs...);}})
    • bulk_chunked:Scheduler上でf2(0, shape, value...)呼び出し
    • bulk_unchunked:Scheduler上でf(i, values...)呼び出し(i=[0, shape))

P2079R10 Parallel scheduler (Approved)

  • execution::get_parallel_scheduler関数
    • forward_progress_guarantee::parallel保証Scheduler
  • execution::schedule(sch) Schedule sender
    • execution::bulk_chunked,bulk_unchunkedをカスタマイズ(domain経由)
    • Receiver停止要求検知 -> set_stopped()
    • 内部エラー発生 -> set_error(exception_ptr)
  • execution::system_context_replaceability名前空間(SCR)
    • システム提供Parallel Schedulerの動作カスタマイズポイント
    • フロントエンド=bulkファミリSenderAPI実装 + バックエンド=SchedulerAPI実装
    • SCR::query_parallel_scheduler_backend関数
      • リンク時にユーザ置換可能(replaceable)な関数として定義
      • バックエンド実装(Parallel Scheduler)インスタンスを返す
    • SCR::parallel_scheduler_backendクラス
      • Parallel Scheduler APIの基底クラス
      • schedulesch.schedule()のバックエンド実装
      • schedule_bulk_(un)chunkedbulk_(un)chunked開始時のバックエンド実装
    • SCR::receiver_proxy > SCR::bulk_item_receiver_proxyクラス
      • バックエンド実装からアクセスされるフロントエンドAPIの基底クラス
      • set_value(), set_error(exception_ptr), set_stopped():完了ハンドラ実装
      • try_query<P>(q) -> optional<P>q=get_stop_tokenクエリ応答
      • execute(begin, end)bulk_(un)chunked処理のf(i)f2(i, j)呼び出し
yohhoyyohhoy

非同期スコープ

P3149R11 async_scope - Creating scopes for non-sequential concurrency (Approved)

  • 構造化並行性(structured concurrency):一連の非同期処理完了までブロッキング待機する
  • 非同期スコープ(async scope):複数Sender非同期処理と紐づける範囲(合流終端)を表現
    • simple_counting_scopeクラス:カウント数ベースの非同期スコープ型
    • counting_scopeクラス:非同期キャンセル対応版simple_counting_scope
    • async_scope_tokenコンセプト:Senderと紐づける非同期スコープトークン
  • Senderコンシューマ
    • spawn(snd, token, env?) -> void:非同期スコープに紐づけてSender開始
  • Senderアダプタ
    • nest(sndr, token)
    • spawn_future(sndr, token, env?)

P3296R4 let_async_scope (WIP)

  • 非同期スコープSenderアダプタ
    • let_async_scope(sndr, f)let_async_scope_with_error<exception_ptr>(sndr, f)
    • let_async_scope_with_error<Errs...>(sndr, f)f(token, values...)呼び出し
yohhoyyohhoy

Video: Working with Asynchrony Generally: From Zero to Sender/Receiver in ~60 minutes, Eric Niebler, 2022/6

Watch Eric Niebler do a live coding session on implementation of async patterns.
The code is available here: https://godbolt.org/z/dnrabsbdq

slides

Executers == A standard async programming model

  • Sender: "lazy value"
  • Receiver: "continuation", or "callback"
  • Scheduler: "handle to a compute resource"

concept

  • scheduler
    • schedule(scheduler) -> sender
  • sender
    • connect(sender, receiver) -> operation_state
  • receiver
    • set_value(receiver, values...) -> void
    • set_error(receiver, error) -> void
    • set_stopped(receiver) -> void
  • operation_state
    • start(operation_state) -> void
yohhoyyohhoy
  • Step1: just(T) -> S sender factory
    • {just_sender<T>, just_operation<R, T>}
    • cout_receiver
  • Step2: then(S, F) -> S sender adaptor
    • {then_sender<S, F>, then_receiver<S, F>, then_operation<S, R, F>}
  • Step3: sync_wait(S) -> R sender consumer
    • sync_wait_receiver<T>
    • S::result_t
  • Step4: run_loop execution context
    • {run_loop::sender, run_loop::operation}
    • run_loop::scheduler
  • Step5: thread_context execution context
    • thread_context : run_loop
yohhoyyohhoy
#include <execution>
using namespace stdexec = std::execution;

int main()
{
  auto snd0 = stdexec::just(21);
  // tag_of_t<decltype(snd0)> == just_t;

  auto snd1 = snd0 | stdexec::then([](int n){ return n*2; });
  // tag_of_t<decltype(snd1)> == then_t;

  auto result = std::this_thread::sync_wait(snd1);
  // decltype(result) == optional<tuple<int>>;
  auto [value] = result.value();
  // value == 42
}
yohhoyyohhoy
stdexec::just(21);
// ↓ execution::just CPO
make-sender(just, product-type{21});
// ↓
basic-sender<just_t, int>{21};
snd0 | stdexec::then([](int n){ return n*2; });
// ↓
stdexec::then(snd0, [](int n){ return n*2; });
// ↓ execution::then CPO
auto fn = [](int n){ return n*2; };
transform_sender(get-domain-early(snd0), make-sender(then, fn, snd0));
// ↓
using Snd0 = basic-sender<just_t, int>;
using Fn = decltype(fn);
transform_sender(default_domain{}, basic-sender<then_t, Fn, Snd0>{fn, snd0});
// ↓ execution​::​transform_sender CPO
// then.transform_sender(sndr); is ill-formed
basic-sender<then_t, Fn, Snd0>{fn, snd0};
std::this_thread::sync_wait(snd1);
// ↓ this_thread::sync_wait CPO
apply_sender(get-domain-early(snd1), sync_wait, snd1);
// ↓
apply_sender(default_domain{}, sync_wait, snd1);
// ↓ execution​::​apply_sender CPO
default_domain{}.apply_sender(sync_wait, snd1);
// ↓
sync_wait.apply_sender(snd1);
// ↓
{
  using Snd1 = basic-sender<then_t, Fn, basic-sender<just_t, int>>;
  sync-wait-state<Snd1> state;
  auto op = connect(snd1, sync-wait-receiver<Snd1>{&state});
  start(op);

  state.loop.run();
  if (state.error) { rethrow_exception(std::move(state.error)); }
  return std::move(state.result);
}
yohhoyyohhoy
connect(snd1, sync-wait-receiver<Snd1>{&state});
// ↓ execution::connect CPO
auto rcvr = sync-wait-receiver<Snd1>{&state};
auto new_sndr = transform_sender(decltype(get-domain-late(snd1, get_env(rcvr))){}, snd1, get_env(rcvr));
new_sndr.connect(rcvr);
// ↓
{ // new_sndr
  // ↓ execution​::​get_env CPO
  // ↓ sync-wait-receiver::get_env
  auto rcvr_env = sync-wait-env{&state->loop};
  auto new_sndr = transform_sender(decltype(get-domain-late(snd1, rcvr_env)){}, snd1, rcvr_env);
  // ↓
  auto new_sndr = transform_sender(default_domain{}, snd1, rcvr_env);
  // ↓ execution::transform_sender CPO
  // then.transform_sender(snd1, rcvr_env) is ill-formed
  auto new_sndr = snd1;
}
// ↓
snd1.connect(rcvr);
// ↓ basic-sender::connect
basic-operation<Snd1, sync-wait-receiver<Snd1>>{snd1, rcvr};
{ // state
  impls-for<then>::get-state(snd1, rcvr);
  // ↓ default-impls::get-state
  fn // type Fn
}
{ // inner-ops
  connect-all(op, snd1, indices-for<Snd1>());
  // ↓
  using Rcvr = sync-wait-receiver<Snd1>;
  auto& [_, fn, snd0] = snd1;
  product-type{connect(snd0, basic-receiver<Snd1, Rcvr, 0>{op})};
  // ↓
  product-type{snd0.connect(basic-receiver<Snd1, Rcvr, 0>{op})};
  // ↓ execution::connect CPO
  using Rcv1 = basic-receiver<Snd1, Rcvr, 0>{op};
  product-type{basic-operation<Snd0, Rcv1>{snd0, Rcv1{op}}};
}
// ↓
op = basic-operation<Snd1, sync-wait-receiver<Snd1>>
  rcvr = sync-wait-receiver<Snd1>{&state}
  state = fn
  inner-ops[0] = basic-operation<Snd0, Rcv1>
    rcvr = Rcv1{op}
    state = 21
yohhoyyohhoy
using Snd0 = basic-sender<just_t, int>;
using Snd1 = basic-sender<then_t, Fn, Snd0>;
auto op = basic-operation<Snd1, sync-wait-receiver<Snd1>>{snd1, rcvr};
using Rcvr = sync-wait-receiver<Snd1>;
using Rcv1 = basic-receiver<Snd1, Rcvr, 0>{op};

start(op);
// ↓ execution::start CPO
op.start();
// ↓ basic-operation::start
impls-for<then>::start(op.state, rcvr, op.inner-ops);
// ↓ default-impls​::​start
execution::start(op.inner-ops[0]);
// ↓
impls-for<just>::start(op.inner-ops[0].state, Rcv1{op});
// ↓ impls-for<just>::start
auto [ts] = op.inner-ops[0].state;  // 21
set_value(Rcv1{op}, ts);
// ↓ execution::set_value CPO
Rcv1{op}.set_value(21);
// ↓ basic-receiver::set_value
auto& complete = impls-for<then>::complete;
complete(0, fn, rcvr, set_value_t(), 21);
// ↓ impls-for<then>::complete
set_value(rcvr, invoke(fn, 21));
// ↓
set_value(rcvr, 42);
// ↓ execution::set_value CPO
rcvr.set_value(42);
// ↓ sync-wait-receiver::set_value
sync-wait-state<Snd1> state;
try {
  state.result.emplace(42);
} catch (...) {
  state.error = current_exception();
}
state.loop.finish();