yohhoyの日記

技術的メモをしていきたい日記

C++26 Executionライブラリ:Cancellable Sender

C++2c(C++26)実行制御ライブラリ(execution control library)(→id:yohhoy:20250609)において、キャンセル可能なSenderの実装方法。

実行制御ライブラリにおける非同期操作のキャンセルは、Receiver側で発行された停止要求(stop request)をSender側が明示的に確認する協調的キャンセル(cooperative cancellation)によって実現する。Senderにおいて明示的に対応しない限りは、同Senderによる非同期操作は停止要求を無視して継続処理される。停止要求の発行/確認を行うクラス群は<stop_token>ヘッダで提供される。*1

関数オブジェクトfを引数にとり、下記動作を行うキャンセル可能なSenderMySender{f}の実装例:

  • 下流側で停止要求が行われていた場合、何もせず停止完了を送信する。
  • 関数呼び出し式f()が正常終了(return)したとき、戻り値を値送信する。
  • 関数呼び出し式f()が例外終了(throw)したとき、例外をエラー送信する。
// C++2c
#include <exception>
#include <stop_token>
#include <type_traits>
#include <execution>
namespace exec = std::execution;

// MySender接続結果Operation State
template<typename Rcvr, typename F>
struct OpState {
  using operation_state_concept = exec::operation_state_t;

  // exec::start(op)カスタマイズ
  void start() & noexcept {
    // 停止トークンをReceiver環境から取得
    auto st = std::get_stop_token(exec::get_env(rcvr_));
    if (st. stop_requested()) {
      // 停止要求が存在する場合は停止完了を送信
      exec::set_stopped(std::move(rcvr_));
      return;
    }
    try {
      // 関数呼び出しf()の戻り値で値完了
      exec::set_value(std::move(rcvr_), f_());
    } catch (...) {
      // 送出された例外でエラー完了
      exec::set_error(std::move(rcvr_), std::current_exception());
    }
  }

  Rcvr rcvr_;
  F f_;
};

template<typename F>
struct MySender {
  using sender_concept = exec::sender_t;
  using result_type = std::invoke_result_t<F>;

  // MySenderの完了シグネチャ集合を宣言
  using completion_signatures = exec::completion_signatures<
    exec::set_value_t(result_type),
    exec::set_error_t(std::exception_ptr),
    exec::set_stopped_t()
  >;

  template <class Self>  // P3557R3
  static consteval auto get_completion_signatures()
    { return completion_signatures{}; }

  // exec::connect(sndr, rcvr)カスタマイズ
  template<typename Rcvr>
  auto connect(Rcvr rcvr) {
    return OpState{std::move(rcvr), std::move(f_)};
  }

  F f_;
};

2025-08-20追記:P3557R3採択によってSender完了シグネチャ集合のカスタマイズ方式が当初提案P2300R10から変更され、メンバ型completion_signaturesではなく静的メンバ関数テンプレートget_completion_signaturesを定義する必要がある。→ id:yohhoy:20250823

C++標準ライブラリ提供のSenderアダプタ(→id:yohhoy:20250612)から生成されるSenderは、Receiver側での停止要求を無視するため協調的キャンセルを行わない。ただし、上流側Senderからの停止トークン取得問い合わせstd::get_stop_token(→id:yohhoy:20250609)を下流側Receiverへと転送する動作仕様となっており、タスクチェイン全体での非同期操作キャンセルを阻害しないよう設計されている。*2

例外として、全入力Sender完了を待機するexecution::when_allSenderアダプタでは、下流側Receiverでの停止要求検知もしくは入力Senderからのエラー完了受信をトリガとして停止要求を発行し、他の入力Senderに対して協調的キャンセルを促す。*3

// タスクチェインを構築
exec::sender auto snd1 = MySender{[] -> int { return 1; }};
exec::sender auto snd2 = MySender{[] -> int { throw 42; }};
exec::sender auto snd3 = MySender{[] -> int { return 3; }};
exec::sender auto sndr = exec::when_all(snd1, snd2, snd3);

try {
  auto tup = std::this_thread::sync_wait(sndr).value();
  // 1) snd1は戻り値1を値送信
  // 2) snd2は例外42をエラー送信
  // 3) when_all内部で停止要求を発行
  // 4) snd3で停止要求を検知し停止完了を送信
  // 5) sndr自体は例外42でエラー完了
  std::println("val={}", tup);
} catch (int e) {
  std::println("catch {}", e);
}
// "catch 42"

2025-07-22追記:P3284R4にてSenderアダプタexecution::unstoppableが追加される。上流側Senderからのstd::get_stop_token問い合わせに対してstd::never_stop_tokenを返すようReceiver環境を書き換えることで、タスクチェイン下流側からの非同期操作キャンセル伝搬を遮断する。上流側からの停止完了送信には影響しない。

自作Senderアダプタにおいて独自基準で非同期操作キャンセルを追加サポートする場合、下流側Receiver環境から取得した停止トークン(stop token)を確認する以外に下記対応が必要となる。

  • Operation Stateがstd::inplace_stop_source型の停止ソース(stop source)を内包する。*4
  • 下流側Receiver環境から取得した停止トークンに対して、自身の停止ソースへ停止要求を発行(request_stop)する停止コールバック(stop callback)を登録する。
  • 上流側Senderと接続されるReceiverの環境は、std::get_stop_token問い合わせに対して停止ソースから取得した停止トークンを返す。

複雑なキャンセルサポートの実装例として、提案文書P2300R10, §4.1 Asynchronous Windows socket recvなども参照のこと。

関連URL

*1:<stop_token>ヘッダはC++20標準ライブラリで協調的な中断処理をサポートする std::jthread クラスと同時に導入された。実行制御ライブラリの提案文書 P2300R10 では、同ヘッダに std::inplace_stop_{token,source} クラスが追加される。

*2:Senderアダプタ間で対象クエリオブジェクトによる問い合わせを転送するか否かは、別のクエリオブジェクト std::forwarding_query への問い合わせ結果(bool値)によって制御される。

*3:厳密には、下流側Receiverでの停止要求発行もしくは上流側Senderからのエラー完了/停止完了いずれかの初回受信タイミングで停止要求を発行する。いずれかの入力Senderがエラー完了した場合、初回受信したエラー完了値が when_all のエラー完了値となる。入力Senderが値完了/停止完了のいずれかであれば、when_all は停止完了を送信する。

*4:C++20から存在する std::stop_source でも問題ないが、動的メモリ確保が発生しない std::inplace_stop_source の方が好ましい。C++標準ライブラリ execution::when_all Senderアダプタの仕様定義でも std::inplace_stop_source が利用されている。