C++2c(C++26)実行制御ライブラリ(execution control library)(→id:yohhoy:20250609)における非同期操作の早期開始と非同期スコープについて。
Sender型で表現されたタスクチェインに対して、非同期操作の開始要求と完了待機を一括実施する遅延開始(lazy start)と、非同期スコープ(async scope)との組み合わせによる開始要求と完了待機を分離した早期開始(eager start)を行うSenderアルゴリズム(→id:yohhoy:20250612)が提供される。
まとめ:
- 遅延開始:Senderアルゴリズム
this_thread::sync_wait(_with_variant)により、非同期操作を開始して完了まで待機する。処理結果は戻り値/例外送出によって取得される。
- 早期開始:Senderアルゴリズム
execution::spawn, execution::spawn_futureにより、非同期スコープと関連付けて非同期操作を早期開始する。完了待機には非同期スコープから生成した合流(join)Senderを利用し、非同期操作の結果はspawn_futureが返すSender経由で取得する。
spawnコンシューマ:空の値完了set_value_t()と停止完了set_stopped_t()のみ対応。エラー完了には対応しない。
spawn_futureアダプタ:任意の完了シグネチャ集合に対応。
- Senderアルゴリズム
execution::associate:Senderを非同期スコープと関連付け、非同期操作の生存期間(lifetime)をトラッキングする。任意の完了シグネチャ集合に対応。
spawn/spawn_futrueアルゴリズム
#include <execution>
namespace exec = std::execution;
void proc(int) noexcept;
int compute(int) noexcept;
void start_lazy()
{
exec::sender auto work = exec::on(
exec::get_parallel_scheduler(),
exec::just(1) | exec::then(proc));
proc(2);
std::this_thread::sync_wait(std::move(work));
proc(3);
}
void start_eager()
{
exec::sender auto work = exec::on(
exec::get_parallel_scheduler(),
exec::just(1) | exec::then(proc));
exec::counting_scope scope;
exec::spawn(std::move(work), scope.get_token());
proc(2);
std::this_thread::sync_wait(scope.join());
proc(3);
}
void start_eager_future()
{
exec::sender auto work = exec::on(
exec::get_parallel_scheduler(),
exec::just(1) | exec::then(compute));
exec::counting_scope scope;
exec::sender auto future =
exec::spawn_future(std::move(work), scope.get_token());
int r2 = compute(2);
auto result = std::this_thread::sync_wait(
exec::when_all(std::move(future), scope.join()));
auto [r1] = result.value();
}
Senderチェイン内で入れ子の非同期スコープを自動管理するSenderアダプタexecution::let_async_scopeは提案文書P3296にて検討中。*1
std::this_thread::sync_wait(
exec::just(1)
| exec::let_async_scope(
[] (auto token, int n) {
exec::spawn(exec::on(
exec::get_parallel_scheduler(),
exec::just(n) | exec::then(proc)
), token);
proc(2);
}
) | then([] {
proc(3);
}
));
非同期スコープ: Counting Scope
C++2c標準ライブラリは、カウンタ値に基づく非同期スコープ実装クラスを提供する。これらの非同期スコープ型はコピー/ムーブ不可。
execution::simple_counting_scope:非同期操作の早期開始でカウンタをインクリメントし、非同期操作の完了時にカウンタをデクリメントする。
get_token操作:非同期スコープトークン(scope_token)を取得する。
close操作:以降は非同期操作との関連付けを抑止し、早期開始操作(spawn, spawn_future)では非同期操作をスキップする。spawn_future戻り値Senderでは停止完了(set_stopped)が即時送信される。
join操作:カウンタ値が0になるまで合流待機(join)するSenderを生成する。合流Senderの完了シグネチャ集合=空の値完了(set_value_t())+任意のエラー完了(set_error_t(E))+停止完了(set_stopped_t()) *2
execution::counting_scope:simple_counting_scope+非同期キャンセル(request_stop)のサポート。
Counting Scopeクラスは内部状態を管理しており、各操作をトリガとして状態遷移が行われる。
- オブジェクト構築後の初期状態:unused
- オブジェクト破棄時の事前条件:unused/joined/unused-and-closedのいずれか
操作assocはSenderとの関連付け成功、操作disassocは非同期操作の完了によりカウンタ値が0となる最終関連付けの解除、操作start-joinは合流Senderの開始を表す。
| 状態\操作 |
assoc |
disassoc |
start-join |
close |
| unused |
open |
- |
joined |
unused-and-closed |
| open |
(+1) |
- |
open-and-joining |
closed |
| open-and-joining |
(+1) |
joined |
(open-and-joining) |
closed-and-joining |
| closed |
- |
- |
closed-and-joining |
- |
| unused-and-closed |
- |
- |
joined |
- |
| closed-and-joining |
- |
joined |
(closed-and-joining) |
- |
| joined |
- |
- |
(joined) |
- |
表中の下線付き状態はオブジェクト破棄可能であることを、括弧は内部状態が変化しない自己遷移を表す。"(+1)" はカウンタのインクリメントのみ行われ、状態遷移しない。
入力Senderに対して非同期スコープとの関連付けを行い、関連付けされた(associated)Senderを返す。同Senderがそのまま破棄されるか、開始(start)された非同期操作の完了までを非同期スコープを介して追跡する。
exec::counting_scope scope;
exec::sender auto work =
exec::just(1)
| exec::then(proc)
| exec::associate(scope.get_token());
async_start_or_discard_task(std::move(work));
std::this_thread::sync_wait(scope.join());
associateアルゴリズムが関連付けに失敗すると関連付けの無い(unassociated)Senderを返し、同Senderの開始操作は上流側Senderを開始せず即座に停止完了を送信する。Counting Scopeに対してclose操作を行うと以降の関連付けが失敗するようになり、別スレッドからの非同期タスクの開始をの受付停止機構を実現できる。
exec::scheduler auto sch = ;
exec::counting_scope scope;
void finish()
{
scope.close();
std::this_thread::sync_wait(scope.join());
}
exec::sender auto create_new_task(int args);
{
return exec::schedule(sch)
| exec::then([args](){ return ; })
| exec::associate(scope.get_token());
}
関連URL