C++11標準ライブラリで新しく追加されたstd::async
関数についてメモ。
asyncの基本
asyncは関数オブジェクトの非同期処理機構*1を提供する関数テンプレート。async関数はstd::future
オブジェクトを返すため、同オブジェクトを介して処理完了待ちと処理結果の取り出しを行う。また、async関数に対しては起動ポリシー(launch policy)を指定でき、これに基づいて非同期処理の動作メカニズムが決定される。C++11標準規格では2つの起動ポリシーstd::launch::async
とstd::launch::deferred
を定義する。
- launch::asyncポリシー
- 新しいスレッドを作成し、その新スレッド上で非同期処理を実行する*2。
launch::async
ポリシーによる動作は「std::thread
によるスレッド起動と全引数のバインド+packaged_task/futureによる同期機構と処理結果の受け渡し」のイメージが近い。 - launch::deferredポリシー
- 関数オブジェクトとその引数で遅延関数(deferred function)を構成する。関数オブジェクトの実行は処理結果が必要になるときまで遅延される。
launch::deferred
ポリシーにて動作する場合、新しいスレッド作成は行われないためシングルスレッドプログラムでも利用可能。動作としては「std::bind
で全引数がバインドされたstd::function
オブジェクトをfuture経由でアクセス」のイメージが近い。
async関数は下記2つのオーバーロードを持ち、前者[A]は後者[B]を起動ポリシー(launch::async | launch::deferred
)として呼び出したのと等価*3。複数の起動ポリシーを指定した場合、処理系には指定ポリシーのうち任意動作を選択する自由度が与えられている。
// [A] 既定の起動ポリシーで動作 template <class F, class... Args> future<typename result_of<F(Args...)>::type> async(F&& f, Args&&... args); // [B] 起動ポリシーを指定して動作 template <class F, class... Args> future<typename result_of<F(Args...)>::type> async(launch policy, F&& f, Args&&... args);
下記コードでは、処理系による適切な並行処理が行われることを期待している*4。しかしながら実際の動作は処理系依存であり、C++11標準規格はこのコードがマルチスレッドで並列処理されることを保証しない。
#include <future> int process(int* begin, int* end) { return /* 範囲[begin, end)を処理 */; } int main() { const std::size_t N = /*...*/; int data[N] = /*...*/; // データ範囲を3分割して非同期処理 std::future<int> f1 = std::async(process, data, data + N/3); std::future<int> f2 = std::async(process, data + N/3, data + N*2/3); std::future<int> f3 = std::async(process, data + N*2/3, data + N); // 全ての非同期処理について完了を待機して結果集計 int result = f1.get() + f2.get() + f3.get(); return 0; }
起動ポリシーによらない共通部分
- 処理結果の設定は、関数オブジェクトから普通に値を返す(return)か例外送出(throw)すればよい。
- 処理結果の取り出しは
std::future
オブジェクトを介して行う。
launch::asyncポリシー
launch::deferredポリシー
launch::deferred
ポリシーを指定した場合、shared objectには遅延関数として格納されている。このときタイムアウト付き待機関数(future::wait_for
/wait_until
メンバ関数)は状態std::future_status::deferred
を返す。- 関数オブジェクトの実行は、最初に
future::get
/wait
メンバ関数を呼び出したそのスレッド上で行われる。(タイムアウト付き待機関数の呼び出しでは関数オブジェクトは実行されない。) - いったん関数オブジェクトが実行されると、shared objectには処理結果が格納されてready状態となる。(タイムアウト付き待機関数は状態
future_status::ready
を返す。) launch::async
ポリシーと組み合わせて指定した場合、並行処理のためのリソースが不十分ならば、処理系は関数呼び出しを遅延させるか起動ポリシー選択を遅延させる。*6
その他の起動ポリシー
C++11標準規格はlaunch::async
, launch::deferred
の2つしか定義しないが、処理系による起動ポリシー拡張を明示的に許容している。
N3337 30.6.1/p1-2より宣言とNoteを引用。
namespace std {
enum class launch : unspecified {
async = unspecified,
deferred = unspecified,
implementation-defined
};
}
[Note: Implementations can provide bitmasks to specify restrictions on task interaction by functions launched by
async()
applicable to a corresponding subset of available launch policies. Implementations can extend the behavior of the first overload ofasync()
by adding their extensions to the launch policy under the "as if" rule. -- end note]
関連URL
*1:ここでの “非同期処理” は必ずしもマルチスレッドによる並行処理を指しておらず、「async関数び呼び出し後に任意のタイミングで実行する処理」を意味している。
*2:厳密にはC++標準規格の "as if" ルールに基づき、新しいスレッドが作成されたのと同じように振る舞いさえすればよい。例えば、処理系が用意したスレッドプールを利用してスレッド生成/破棄コストを避ける実装などが考えられる。
*3:std::launch 型はビットマスク(bitmask)型として定義されており、|演算子は通常のビット論理和と同じセマンティクスを持つ。(17.5.2.1.3)
*4:理想的な処理系のもとでは、実行環境におけるプロセッサ数などのハードウェア並列度や動作時のシステム負荷に応じた、処理タスクのスレッドへの最適割当が行われると期待される。一般的に、実行スレッド数がハードウェア並列度を上回る(Oversubscription)と、スレッド制御オーバーヘッドが並行処理で得られるスピードアップを相殺してしまう。理想的にはハードウェア並列度と同数のスレッドを用いてタスク処理を実行するのが望ましい。例:4論理プロセッサシステムでは4スレッドが常に何らかのタスク処理を行っている状態が理想。ただし、この議論ではブロッキングI/O処理によるスレッド休止を考慮しておらず、実際のプログラムではより複雑な状況となる。
*5:2012年2月現在、http://cplusplus.github.com/LWG/lwg-active.html#2100 "2100. timed waiting functions cannot timeout if launch::async policy used" にて「タイムアウト付き待機関数でのthread join動作をpthread環境では実装できない。joinを保証するのはタイムアウトなし待機関数に限定すべき。」という指摘があげられている。
*6:30.6.8/p3 Noteでは、まず遅延関数として保持しておき(deferredポリシー動作)、任意タイミングで新スレッド上にて処理タスクが実行(asyncポリシー動作)される動作を許容している。例えばwork-stealingスケジューラのように、処理タスクは一度タスクキューに投入され、ワーカースレッド群がキュー取り出して実行していく実装が考えられる。