yohhoyの日記

技術的メモをしていきたい日記

C++26 Executionライブラリ(Senderアルゴリズム編)

C++2c(C++26)標準ライブラリに追加される実行制御ライブラリ(execution control library)についてメモ。別名:std::execution, Senders/Receivers(S/R)

2025年5月現在、ベースライン提案文書P2300R10までを採択済み。
2025-07-20追記:2025年6月会合にて関連する一連の提案文書P2079R10,
P3149R11, P3284R4, (PDF)P3433R1, P3481R5, P3552R3, P3557R3, P3570R2, (PDF)P3682R0が採択された。

Senderアルゴリズム

Senderアルゴリズム(sender algorithm)はSenderを引数/戻り値とする関数であり、 下記3つのカテゴリに区分される。C++2c標準ライブラリでは、Senderアルゴリズムは全てCPO(Customization Point Object)として定義される。

区分 引数 戻り値
Senderファクトリ
(sender factory)
任意個の引数
(非Sender)
Sender execution::
just
Senderアダプタ
(sender adaptor)
1個以上のSender
+任意個の引数
Sender execution::
then
Senderコンシューマ
(sender consumer)
1個以上のSender
+任意個の引数
任意戻り値
(非Sender)
this_thread::
sync_wait
パイプライン記法

一部のSenderアダプタは|演算子によるパイプライン記法をサポートする。(標準RangesライブラリのRangeアダプタと同様)

#include <execution>
namespace exec = std::execution;

auto calc(int, int) -> int { /*...*/ }

// 関数呼び出し
exec::sender auto snd0 = exec::just(2, 3);
exec::sender auto snd1 = exec::then(snd0, calc);
// パイプライン記法
exec::sender auto sndr = exec::just(2, 3) | exec::then(calc);

パイプライン記法をサポートするSenderアダプタ(例:then)はパイプ可能Senderアダプタオブジェクト(pipeable sender adaptor object)と定義され、第1引数(Sender)以外を束縛したオブジェクト(例:then(calc))はパイプ可能Senderアダプタクロージャオブジェクト(pipeable sender adaptor closure object)となる。後者は|演算子の右オペランドや、Senderアダプタexecution::onの引数*1としてSenderチェイン構築時に利用される。

非同期処理フレームワークを拡張するライブラリ実装者向けに、パイプ可能Senderアダプタクロージャオブジェクト型Dの実装ヘルパとして基底クラスexecution::sender_adaptor_closure<D>が提供され、独自Senderアダプタのパイプライン記法対応がサポートされる。*2

Senderチェイン

SenderアダプタやSenderコンシューマに指定した入力Senderは生成Senderオブジェクトのメンバ変数として保持されるため、ライブラリ仕様では子(child)Senderと呼ばれる。同様に、Senderアルゴリズムが生成するSenderオブジェクトは親(parent)Senderと呼ばれる。

  • 子Sender = Senderチェインの上流側
  • 親Sender = Senderチェインの下流
snd1() | snd2() | snd3()  // パイプライン記法
snd3(snd2(snd1()))        // 関数呼び出し
// 生成Senderオブジェクトは下記の入れ子構造
snd3{ snd2 { snd1{} } }

Senderアルゴリズムが生成する親Senderは子Senderの接続先となるため、Senderオブジェクト内部ではReceiverインタフェースを定義する。Sender-Receiver間の接続(connect)はSenderコンシューマ適用時にSenderチェイン上で親→子の順序で実行され、Sender同様に入れ子構造をとるOperation Stateを生成する。親Operation Stateに対して開始(start)操作を行うと子Operation State=Senderチェイン上流側の処理が開始(start)され、以降は子Operation Stateが親SenderのReceiver完了操作を呼び出すことで上流→下流の順に非同期操作全体が実行される。

std::this_thread::sync_wait( exec::just(2, 3) | exec::then(calc) );
// Sender構造: sync_wait{ then{ just{ {}, {2, 3} }, calc } }
  • 1) sync_waitコンシューマ内部で、子Sender-親SenderのReceiver間を接続(connect)する。
  • 2) sync_waitコンシューマ内部で、Operation Stateを開始(start)する。
  • 3) justのOperation Stateが、送信値2, 3thenのReceiverハンドラを呼び出す。
  • 4) thenのReceiverハンドラ受信値2, 3calcに適用し、結果値でsync_waitのReceiverハンドラを呼び出す。
  • 5) sync_waitのReceiver受信値を戻り値として返す。

Senderファクトリ

名前空間std::executionにて下記Senderファクトリが提供される。

ファクトリ 呼び出し形式 概要
just (values...) 値完了を送信
just_error (err) エラー完了を送信
just_stopped () 停止完了を送信
schedule (sch) Scheduler上で非同期処理を開始
read_env (q) 接続先Receiver環境への問合せ

ノート:

  • schedule:指定Schedulerschを値完了Scheduler(value completion scheduler)として、空の値を送信する。
  • read_env:指定クエリオブジェクトqを用いて、非同期操作の開始(start)時に後続Receiver環境へ問い合わせて得られた結果を値送信する。Senderオブジェクト構築時やReceiver接続(connect)時には何もしない。

Senderアダプタ

名前空間std::executionにて下記Senderアダプタが提供される。※列はパイプライン記法の対応可否を表す。

アダプタ 呼び出し形式 概要
write_env (sndr, env) Receiver環境を上書き[P3284R4]
unstoppable (sndr) 下流側の停止要求を無視[P3284R4]
start_on (sch, sndr) Scheduler上でSender開始
continues_on (sndr, sch) Scheduler上で後続継続
on (sch, sndr) 指定Senderのみ別Scheduler上で実行
on (sndr, sch, closure) 指定非同期操作のみ別Scheduler上で実行
schedule_from (sch, sndr) continues_on実装補助*3
then (sndr, f) 値完了の後続処理アタッチ
upon_error (sndr, f) エラー完了の後続処理アタッチ
upon_stopped (sndr, f) 正常完了の後続処理アタッチ
let_value (sndr, fn) 値完了をネスト非同期操作へ変換
let_error (sndr, fn) エラー完了をネスト非同期操作へ変換
let_stopped (sndr, fn) 停止完了をネスト非同期操作へ変換
bulk (sndr, policy, n, f) 指定関数を複数回実行*4
bulk
_chunked
(sndr, policy, n, f) 指定関数をグループ単位で複数回実行[P3481R5]
bulk
_unchunked
(sndr, policy, n, f) 指定関数を個別に複数回実行[P3481R5]
split (sndr) P3682R0で削除*5
when_all (sndrs...) 全Sender完了待機+値結合
when_all
_with_variant
(sndrs...) 全Sender完了待機+値結合
複数値完了シグネチャ対応版
into_variant (sndr) 複数値完了シグネチャvariant型に統合
stopped_as
_optional
(sndr) 停止完了をoptional値完了に変換
stopped_as
_error
(sndr, err) 停止完了をエラー完了に変換
associate (sndr, token) 非同期スコープに関連付け[P3149R11]
spawn_future (sndr, token, env?) 非同期スコープでSender早期開始[P3149R11]*6
affine_on (sndr, sch) コルーチンでのSchedulerアフィニティ指定[P3552R3]

ノート:

  • then/upon_{error,stopped}:受信値args...に対して、f(args...)戻り値を値完了で送信する。
  • let_{value,error,stopped}:受信値args...に対して、fn(args...)戻り値Senderに後続Receiverを接続する。
  • bulk:受信値values...に対して、範囲[0, n)の各idxに対してf(idx, values...)が逐次的に呼び出される。
    • 2025-07-20追記:bulk動作はbulk_chunkedに移譲される。既定動作はP2300R10定義と同様に逐次実行。[P3481R5]
  • bulk_chunked:既定動作はbulkと同様に逐次実行。並列Scheduler上かつ実行ポリシーpolicyで並列処理を許可(execution::par)した場合、範囲[0, n)を分割した重複しない部分区間[begin, end)に対して複数スレッド上でf(begin, end, values...)が並列実行される。[P2079R10, P3481R5]
  • bulk_unchunked:既定動作はbulkと同様に逐次実行。並列Scheduler上かつ実行ポリシーpolicyで並列処理を許可(execution::par)した場合、範囲[0, n)の各idxに対して複数スレッド上でf(idx, values...)が並列実行される。[P2079R10, P3481R5]
  • when_all:入力Sender#nからの受信値vs_n...に対して、tuple<Vs_0..., ..., Vs_k...>型に統合して送信する。いずれかの入力Senderからエラー完了/停止完了を受信した場合、他の入力Senderに対して停止要求を行う。*7
  • when_all_with_variantwhen_all(into_variant(sndrs)...)相当。
  • into_variant:入力Senderから受信可能性のある値vs_m...に対して、variant<tuple<Vs_0...>, ..., tuple<Vs_j...>>型に統合して送信する。
  • spawn_future:任意の値完了シグネチャset_value_t(Vs...)をもつ入力Senderを非同期スコープトークtokenに関連付けて、非同期操作を即座に開始(start)する。[P3149R11] → id:yohhoy:20251002

Senderコンシューマ

名前空間std::this_threadにて下記Senderコンシューマが提供される。

コンシューマ 呼び出し形式 概要
sync_wait (sndr) Sender完了待機し結果取得
sync_wait
_with_variant
(sndr) Sender完了待機し結果取得
複数値完了シグネチャ対応版

名前空間std::executionにて下記Senderコンシューマが提供される。

コンシューマ 呼び出し形式 概要
spawn (sndr, token, env?) 非同期スコープでSender早期開始[P3149R11]

ノート:

  • sync_wait:値完了のときoptional<tuple<Vs...>>値を、停止完了のとき無効値(nullopt)を返す。エラー完了のときは例外送出する。
  • sync_wait_with_variant:ほぼsync_wait(into_variant(sndr))相当。*8
  • spawn:空の値完了シグネチャset_value_t()をもつ入力Senderを非同期スコープトークtokenに紐づけて、非同期操作を即座に開始(start)する。[P3149R11] → id:yohhoy:20251002
#include <execution>
namespace exec = std::execution;

// 値(123, 'X')を送信
exec::sender auto sndr = exec::just(123, 'X');
try {
  auto result = std::this_thread::sync_wait(sndr);
  if (result) {
    // 値完了
    auto [n, c] = *result;
    // n := int型/123, c := char型/'X'
  } else {
    // 停止完了
  }
} catch (...) {
  // エラー完了
}

実行ドメインとカスタマイゼーションポイント

サードパーティ・ライブラリによる柔軟な機能拡張を可能とするため、実行ドメイン(execution domain)を介したSenderアルゴリズムのカスタマイゼーションポイントを定義する。独自実行ドメインに関連付けた独自Senderと標準Senderアルゴリズムを組み合わせるとき、標準Senderアルゴリズムの動作セマンティクスを実現する独自のSender型へと変換することができる。標準ライブラリではデフォルト実行ドメイン(execution::default_domain)のみ定義し、標準Senderアルゴリズムは暗黙に同ドメインと関連付けられる。

カスタマイゼーションポイント実装側でSenderアルゴリズムを識別するため、大半の標準Senderアルゴリズムは同名タグ型(例:justアルゴリズムはSenderタグ型just_t)を持つ*9。また標準Senderアルゴリズムで作成されるSenderオブジェクトは構造化束縛(structured binding)に対応しており、カスタマイゼーションポイント実装側でSenderタグ型、オブジェクト構築時引数、子Senderを容易に取り出せる仕様となっている。

標準Senderアルゴリズムは下記CPO呼び出しを介したカスタマイゼーションポイントを規定する。一部の標準Senderアルゴリズム仕様では、これらのカスタマイゼーションポイントとデフォルト実行ドメイン動作に依存した仕様記述を行う。(動作を追うのが、とてもつらい。)

  • execution::transform_sender:Senderアルゴリズム呼び出しによるSenderオブジェクト構築時、および後続Receiverへの接続(connect)時のSender型変換*10。既定動作は無変換、一部SenderアルゴリズムではSenderタグ型へ処理ディスパッチ。
  • execution::transform_env:後続Receiver接続(connect)時の環境(environment)変換。既定動作は無変換。
  • execution::apply_sender:Senderコンシューマ適用時の動作。既定動作はSenderタグ型へ処理ディスパッチ。

例:starts_onアルゴリズムは後続Receiver接続(connect)時にschedulelet_valueの組合せへと変換される。以下は簡略化したstarts_on.transform_senderカスタマイゼーションポイント実装。

// 呼び出し式 execution::starts_on(sch, sndr)

// out_sndr := starts_onアルゴリズムで生成されるSenderオブジェクト
// OutSndr  := 上記Senderの型
auto&& [_, sch, sndr] = out_sndr;
// sch  := 非同期処理を実行するScheduler
// sndr := 実行させる非同期処理(Sender)
return let_value(
  schedule(sch),
  [sndr = std::forward_like<OutSndr>(sndr)]() mutable {
    return std::move(sndr);
  });


関連URL

*1:Senderアダプタ execution::on は2種類の呼び出し形式をサポートしており、パイプ可能Senderアダプタクロージャオブジェクトは on(sndr, sch, closure) 形式に対して指定する。

*2:C++23標準Rangesライブラリでも類似の ranges::range_adaptor_closure<D> が提供され、独自Rangeアダプタ実装者向けのパイプライン記法サポートが存在する。

*3:P2300R10: "schedule_from is not meant to be used in user code; it is used in the implementation of continues_on."

*4:2025-07-20追記:P3481R5にてbulkアルゴリズムCPO引数リストが (sndr, n, f) から (sndr, policy, n, f) へと変更された。P2079R10で導入される並列Schedulerと組み合わせてスレッド並列を実現する。

*5:P3682R0: "Deficiencies of std::execution::split, Dynamic Allocation, Shared Ownership, Eagerness, Naming"

*6:P3149R11, §6.4: "To keep consistency with spawn() this paper doesn’t support pipe operator for spawn_future()."

*7:各Senderが停止要求を認識し、早期に停止完了を送信するか否かはSenderの実装に依存する。

*8:sync_wait_with_variantアルゴリズムの戻り値型は optional<variant<tuple<...>...>> となる。単純に sync_wait( into_variant(sndr) ) とすると、冗長なtuple層が追加された optional<tuple<variant<tuple<...>...>>> 型となってしまう。

*9:read_envアルゴリズムのみ例外的にSenderタグ型を規定しない(unspecified)。2025-07-20追記:write_env, unstoppableも同様にSenderタグ型を規定しない[P3284R4]。

*10:同一関数 transform_sender の呼び出し引数リストの差異により、2つのタイミングのカスタマイゼーションポイントを定義する。Senderオブジェクト構築時はSenderのみが、Receiver接続時はSenderと環境が実引数に渡される。