yohhoyの日記

技術的メモをしていきたい日記

C++26 Executionライブラリ:非同期スコープ

C++2c(C++26)実行制御ライブラリ(execution control library)(→id:yohhoy:20250609)における非同期操作の早期開始と非同期スコープについて。

Sender型で表現されたタスクチェインに対して、非同期操作の開始要求と完了待機を一括実施する遅延開始(lazy start)と、非同期スコープ(async scope)との組み合わせによる開始要求と完了待機を分離した早期開始(eager start)を行うSenderアルゴリズム(→id:yohhoy:20250612)が提供される。

まとめ:

  • 遅延開始:Senderアルゴリズムthis_thread::sync_wait(_with_variant)により、非同期操作を開始して完了まで待機する。処理結果は戻り値/例外送出によって取得される。
  • 早期開始:Senderアルゴリズムexecution::spawn, execution::spawn_futureにより、非同期スコープと関連付けて非同期操作を早期開始する。完了待機には非同期スコープから生成した合流(join)Senderを利用し、非同期操作の結果はspawn_futureが返すSender経由で取得する。
    • spawnコンシューマ:空の値完了set_value_t()と停止完了set_stopped_t()のみ対応。エラー完了には対応しない。
    • spawn_futureアダプタ:任意の完了シグネチャ集合に対応。
  • Senderアルゴリズムexecution::associate:Senderを非同期スコープと関連付け、非同期操作の生存期間(lifetime)をトラッキングする。任意の完了シグネチャ集合に対応。

spawn/spawn_futrueアルゴリズム

// C++2c
#include <execution>
namespace exec = std::execution;

void proc(int) noexcept;  // 戻り値なし
int compute(int) noexcept; // 戻り値あり

void start_lazy()
{
  // タスクチェインを構成: システムスレッドプール上でproc(1)を呼び出す
  exec::sender auto work = exec::on(
    exec::get_parallel_scheduler(),
    exec::just(1) | exec::then(proc));

  proc(2);

  // 非同期操作を遅延開始し、処理完了まで待機する
  std::this_thread::sync_wait(std::move(work));

  proc(3);
  // proc(2)→proc(1)→proc(3)順に呼び出される
}

void start_eager()
{
  // タスクチェインを構成: システムスレッドプール上でproc(1)を呼び出す
  exec::sender auto work = exec::on(
    exec::get_parallel_scheduler(),
    exec::just(1) | exec::then(proc));

  // 非同期スコープに関連付けて非同期操作を早期開始する
  exec::counting_scope scope;
  exec::spawn(std::move(work), scope.get_token());

  proc(2);

  // 非同期スコープに関連付けられた全ての非同期操作完了を待機する
  std::this_thread::sync_wait(scope.join());

  proc(3);
  // {proc(1) | proc(2)}→proc(3)順に呼び出される
  // proc(1)とproc(2)は同時並行に実行されうる
}

void start_eager_future()
{
  // タスクチェインを構成: システムスレッドプール上でcompute(1)を呼び出す
  exec::sender auto work = exec::on(
    exec::get_parallel_scheduler(),
    exec::just(1) | exec::then(compute));

  // 非同期スコープに関連付けて非同期操作を早期開始する
  exec::counting_scope scope;
  exec::sender auto future =
    exec::spawn_future(std::move(work), scope.get_token());

  int r2 = compute(2);

  // 非同期スコープに関連付けられた全ての非同期操作完了を待機する
  auto result = std::this_thread::sync_wait(
                  exec::when_all(std::move(future), scope.join()));
  auto [r1] = result.value();

  // compute(1)とcompute(2)は同時並行に実行されうる
}

Senderチェイン内で入れ子の非同期スコープを自動管理するSenderアダプタexecution::let_async_scopeは提案文書P3296にて検討中。*1

// P3296
std::this_thread::sync_wait(
  exec::just(1)
  | exec::let_async_scope(
    [] (auto token, int n) {
      // 非同期スコープに関連付けて非同期操作を早期開始する
      exec::spawn(exec::on(
        exec::get_parallel_scheduler(),
        exec::just(n) | exec::then(proc)
      ), token);

      proc(2);
    }
  ) | then([] {
      proc(3);
    }
  ));
// {proc(1) | proc(2)}→proc(3)順に呼び出される
// proc(1)とproc(2)は同時並行に実行されうる

非同期スコープ: Counting Scope

C++2c標準ライブラリは、カウンタ値に基づく非同期スコープ実装クラスを提供する。これらの非同期スコープ型はコピー/ムーブ不可。

  • execution::simple_counting_scope:非同期操作の早期開始でカウンタをインクリメントし、非同期操作の完了時にカウンタをデクリメントする。
    • get_token操作:非同期スコープトークン(scope_token)を取得する。
    • close操作:以降は非同期操作との関連付けを抑止し、早期開始操作(spawn, spawn_future)では非同期操作をスキップする。spawn_future戻り値Senderでは停止完了(set_stopped)が即時送信される。
    • join操作:カウンタ値が0になるまで合流待機(join)するSenderを生成する。合流Senderの完了シグネチャ集合=空の値完了(set_value_t())+任意のエラー完了(set_error_t(E))+停止完了(set_stopped_t()) *2
  • execution::counting_scopesimple_counting_scope+非同期キャンセル(request_stop)のサポート。

Counting Scopeクラスは内部状態を管理しており、各操作をトリガとして状態遷移が行われる。

  • オブジェクト構築後の初期状態:unused
  • オブジェクト破棄時の事前条件:unused/joined/unused-and-closedのいずれか

操作assocはSenderとの関連付け成功、操作disassocは非同期操作の完了によりカウンタ値が0となる最終関連付けの解除、操作start-joinは合流Senderの開始を表す。

状態\操作 assoc disassoc start-join close
unused open - joined unused-and-closed
open (+1) - open-and-joining closed
open-and-joining (+1) joined (open-and-joining) closed-and-joining
closed - - closed-and-joining -
unused-and-closed - - joined -
closed-and-joining - joined (closed-and-joining) -
joined - - (joined) -

表中の下線付き状態はオブジェクト破棄可能であることを、括弧は内部状態が変化しない自己遷移を表す。"(+1)" はカウンタのインクリメントのみ行われ、状態遷移しない。

associateアルゴリズム

入力Senderに対して非同期スコープとの関連付けを行い、関連付けされた(associated)Senderを返す。同Senderがそのまま破棄されるか、開始(start)された非同期操作の完了までを非同期スコープを介して追跡する。

exec::counting_scope scope;

// タスクチェインを構成し、非同期スコープに関連付ける
exec::sender auto work =
  exec::just(1)
  | exec::then(proc)
  | exec::associate(scope.get_token());

// タスクを別スレッド上で開始 or そのまま破棄する操作へ委譲
async_start_or_discard_task(std::move(work));

// (自スレッドの処理)

// 非同期操作の完了 or タスク破棄を待機
std::this_thread::sync_wait(scope.join());

associateアルゴリズムが関連付けに失敗すると関連付けの無い(unassociated)Senderを返し、同Senderの開始操作は上流側Senderを開始せず即座に停止完了を送信する。Counting Scopeに対してclose操作を行うと以降の関連付けが失敗するようになり、別スレッドからの非同期タスクの開始をの受付停止機構を実現できる。

exec::scheduler auto sch = /*Scheduler*/;
exec::counting_scope scope;

// 非同期要求の受付停止とリソース解放
void finish()
{
  // 非同期スコープをcloseし、以降の非同期タスク開始を抑止する
  scope.close();
  // 関連付けが行われた全タスクの完了(or破棄)を待機する
  std::this_thread::sync_wait(scope.join());

  // (リソース解放処理)
}

// sch上で実行される非同期スコープに関連付けたタスクを返す
//   finish前: 処理Xが実行され、戻り値を値送信する
//   finish後: 停止完了が送信される(処理Xは実行されない)
exec::sender auto create_new_task(int args);
{
  return exec::schedule(sch)
       | exec::then([args](){ return /*処理X*/; })
       | exec::associate(scope.get_token());
}

関連URL

*1:https://github.com/cplusplus/papers/issues/1948

*2:Receiver環境から取得したSchedulerに対するスケジュールSender(schedule sender)の完了シグネチャ集合に相当する。

*3:https://github.com/cplusplus/papers/issues/2315

*4:https://github.com/cplusplus/papers/issues/2335