yohhoyの日記

技術的メモをしていきたい日記

threadの利用と例外安全(その2)

C++11標準ライブラリとBoost.Threadライブラリ(Boost 1.48.0)に含まれる、threadオブジェクトのデストラクタの振る舞いと例外安全に関するメモ。その1 の続き。

2020-12-02追記:C++2a(C++20)標準ライブラリでは、デストラクタで自動的にjoinを呼び出すstd::jthreadが追加される。std::thread動作はC++11時点と同一。

2013-02-05追記:Boost.Thread 1.50.0〜1.56.0では記事内容に関する破壊的変更が行われる。id:yohhoy:20120206 も参照のこと。

問題の要因

この深刻な非決定的メモリ破壊の問題は、下記状況が重なることで引き起こされる。

  • [1] ローカルな変数への参照またはポインタを別スレッドへ渡した後に、
  • [2] 例外スローというコード上に直接表現されない制御パスで関数を抜けるとき、
  • [3] threadデストラクタにより意図せず実行中スレッドのdetachが行われれる。

要因[1]は、シングルスレッドプログラムでの王道禁じ手「ローカル変数への参照またはポインタを関数の外へ返す」を、マルチスレッドプログラム上での表現に変形したものと考えることができる*1。また要因[2]は、通常returnと例外スローともに関数を抜ける制御パスではあるが、処理途中での通常returnがソースコード上に明に現れるのに対して、例外スローはあらゆる箇所で暗黙的に生じうるため問題を引き起こしやすい。なおC++11標準ライブラリstd::threadでは、デストラクタ動作仕様を変更して要因[3]に対処している。

対処策0: グローバル変数の利用

下記コードのようにグローバル変数*2を利用すればオブジェクト生存期間の問題を回避できる。しかし、この対処策を受容できるケースはほぼ無いと思われる...*3

#include <boost/thread/thread.hpp>

int resultA, resultB;  // 並行処理結果

void funcA() { /* 処理A; 結果をresultAに設定 */ }
void funcB() { /* 処理B; 結果をresultBに設定 */ }

int call()
{
  boost::thread th1(funcA);
  funcB();  // (1) funcBが例外送出すると...
  th1.join();
  return (resultA + resultB);
  // (2) 別スレッド上のfuncA処理は続行するが問題は生じない
}

対処策1: packaged_task+unique_future

下記コードでは、処理結果を関数戻り値として返すように変更し、別スレッド上の処理完了待機をthread::joinメンバ関数ではなくunique_future::get関数にて行っている。この構造ではローカル変数への参照/ポインタを別スレッドに渡す必要がないため、オブジェクト生存期間の問題は存在しない。

#include <boost/thread.hpp>

int funcA() { /* 処理A; 結果をreturn */ }
int funcB() { /* 処理B; 結果をreturn */ }

int call()
{
  boost::packaged_task<int> task(funcA);
  boost::unique_future<int> ftr = task.get_future();
  // 新スレッドでfuncAを実行(スレッドは即座にdetach)
  boost::thread( boost::move(task) ).detach();

  int b = funcB();  // (1) funcBが例外送出すると...
  int a = ftr.get();
  return (a + b);
  // (2) ftrデストラクタは何もしない。またtaskはムーブ済み。
  // (3) 別スレッドのfuncA戻り値は誰にも必要とされないため破棄される。
}

対処策2: std::async(C++11のみ)

C++11標準ライブラリが利用可能であれば、packaged_taskよりもstd:async関数の方が扱いが簡単。(→id:yohhoy:20120203

#include <future>

int funcA() { /* 処理A; 結果をreturn */ }
int funcB() { /* 処理B; 結果をreturn */ }

int call()
{
  auto ftr = std::async(std::launch::async, funcA);
  int b = funcB();
  int a = ftr.get();
  return (a + b);
}

対処策3: RAIIイディオムによる自動join

データグループを複数域に分割し並列処理するケース(データ並列)では、依存関係のないタスクを並列処理するケース(タスク並列)に比べて、futureによる処理結果の返却機構を利用しずらい。下記の並列ソートアルゴリズムでは、thread_joinerデストラクタにてスレッドjoinを必ず行うようにし、並行処理完了を待機してからmy_sort関数を抜けることを保証している*4

#include <algorithm>
#include <boost/thread.hpp>

// デストラクタでthread::join()を行うクラス
struct thread_joiner {
  boost::thread& th;
  thread_joiner(boost::thread& t) : th(t) {}
  ~thread_joiner() {
    boost::this_thread::disable_interruption di;
    if (th.joinable())
      th.join();
  }
};

// 並列ソートアルゴリズム
void my_sort(int* begin, int* end)
{
  // 本関数のどこかで例外を送出しうる(という設定)

  std::ptrdiff_t size = std::distance(begin, end);
  if (size <= 10000) {
    // 処理対象範囲が閾値以下なら逐次アルゴリズムで処理
    std::sort(begin, end);
    return;
  }
  int* middle = begin + size / 2;
  std::nth_element(begin, middle, end);

  boost::packaged_task<void> task(boost::bind(my_sort, middle, end));
  boost::unique_future<void> ftr = task.get_future();
  boost::thread th(boost::move(task));
  thread_joiner guard(th);

  my_sort(begin, middle);  // (1) 例外送出すると...
  ftr.get();
  // (2) guardデストラクタによりth.join()が呼ばれスレッド完了を待機
  // (3) thデストラクタは既にjoin済みのため何もしない。
}

注意:この並列ソートアルゴリズムはデータ並列の説明用コードであり、正しく機能するものの実行効率はさほど高くない。さらに O(N) オーダー個のスレッドを生成するため大量のリソースを消費することに注意*5。単に並列ソート処理が必要ならIntel TBBMicrosoft PPL(のSample Pack)に含まれるparallel_sortアルゴリズム利用を検討すること。

その3 に続く。

関連URL

*1:シングルスレッド版の “禁じ手” とは異なり、マルチスレッド版のそれはオブジェクト生存期間にさえ十分注意すれば禁じ手というわけではない。Boost.Threadのマニュアルでも別スレッドへローカル変数への参照を渡すときは生存期間について考慮するよう記載がある。

*2:静的記憶域期間(static storage duration)を持つ変数であればよい。

*3:実際にこの回避策を適用した場合は次の問題が生じる:[1] call関数を同時に1つしか呼び出せないため、スレッドセーフではなく Composability に欠ける。[2] 前回のcall関数でのスレッドが依然としてfuncA実行中かもしれず、"次の" call関数を安全に呼び出す事ができない(問題[1]の変形)。

*4:この例で boost::thread へ直接 my_sort 関数を指定せずに packaged_task / unique_future を利用しているのは、新スレッド my_sort 関数が例外を送出したときに unique_future::get メンバ関数まで例外伝搬させるため。仮に boost::thread th(my_sort, middle, end); とした場合、新スレッド上の my_sort 関数が例外送出すると例外キャッチが行われず std::terminate でプログラムが終了してしまう。これ動作はC++11標準ライブラリの std::thread においても同様。

*5:逐次アルゴリズムへの切替閾値を1とすると、入力データ100要素に対して最大99個のスレッドが同時生成される。