yohhoyの日記

技術的メモをしていきたい日記

並列パイプライン処理のひな形

Intel TBB(Threading Building Blocks)を利用した並列パイプライン処理実装のひな形コード。下記の前提条件・要件に適合する。

  • データの “入力 → 変換 → 出力” 処理フローにおいて、スループット改善を目的として変換処理を並列化する。
  • データ変換は並行実行されうるため、変換処理はデータ単位で分離独立していること。
  • データ入力/出力処理時間に比べて、並列化対象のデータ変換処理時間の方が十分大きいこと。*1
  • データ入力/出力については順序維持(in-order)の逐次処理とする。(例:シーケンシャルファイル処理など)
  • 並列処理ワーカスレッド数は、論理プロセッサ数(またはそれ以下で任意指定)とする。
  • データ変換に必要となる作業領域を、予め並列処理数分だけ確保する。(例:中間データを配置するバッファ領域など)

逐次処理版

struct WorkSet {
  // 入力データ保持領域
  // 出力データ保持領域
  // データ変換に必要な作業領域など
};

bool input_data(WorkSet&);    // データ入力
void process_data(WorkSet&);  // データ変換
void output_data(WorkSet&);   // データ出力

// 逐次処理ループ
WorkSet ws;
while (input_data(ws)) {
  process_data(ws);
  output_data(ws);
}

並列処理版

#include <vector>
#include "tbb/tbb.h"
using namespace tbb;

size_t token_num = task_scheduler_init::default_num_threads();
// default_num_threads()メンバ関数は論理プロセッサ数を返す
// (ワーカスレッド数を制限したい場合はtoken_numを変更)

// 並列処理トークンに紐付くデータを初期化
std::vector<WorkSet> working_set(token_num);
size_t next_token = 0;

// TBB並列パイプライン処理 フィルタチェイン構築
auto filter_chain =
  // 入力フィルタ[serial_in_order]
  make_filter<void, size_t>(
    filter::serial_in_order,
    [&](flow_control& fc) -> size_t {
      size_t token = (next_token++ % working_set.size());
      WorkSet& ws = working_set[token];
      if (!input_data(ws))
        fc.stop();
      return token;
    }) &
  // 変換フィルタ[parallel]
  make_filter<size_t, size_t>(
    filter::parallel,
    [&](size_t token) -> size_t {
      WorkSet& ws = working_set[token];
      process_data(ws);
      return token;
    }) &
  // 出力フィルタ[serial_in_order]
  make_filter<size_t, void>(
    filter::serial_in_order,
    [&](size_t token) -> void {
      WorkSet& ws = working_set[token];
      output_data(ws);
    });

// ワーカスレッド数token_numにて並列パイプラインを実行
task_scheduler_init sched(token_num);
parallel_pipeline(token_num, filter_chain);

parallel_pipeline並列アルゴリズムの最大トークン数は、必要十分なWorkSet領域を確保するためにTBBワーカスレッド数と同一としておく。最大トークン数<ワーカスレッド数とした場合は、ワーカスレッドは作成されるものの最大トークン数以下のスレッドしか利用されない。最大トークン数>ワーカスレッド数とした場合は、必要なWorkSet領域が増大する一方で同時実行タスクはワーカスレッド数以下に制限されるため、今回の単純なフィルタチェインでは処理スループットの改善が期待されない。*2
入/出力フィルタをserial_in_orderとしているため、トークン発行は単純ラウンドロビン方式(next_token++ % N)で良い*3。つまり入力フィルタがトークンを再利用する時点で、同トークンの出力フィルタ処理まで完了済みであることが保証される。
なお、並列処理版でトークン数token_numを1とすると逐次処理版と等価な振る舞いとなる。このとき追加のワーカスレッドは作成されず、各フィルタ処理がメインスレッド上で逐次実行される。

*1:入出力処理時間=S, 変換処理時間=P, 並列度=Nとしたとき、理論上は不等式 (S + P) / S ≧ N を満たすあいだ線形スケーリング性能を得られる。おおよそ変換÷入出力の処理時間比が並列度N以上であればよい。

*2:最大トークン数>ワーカスレッド数の利得が得られるのは、parallel 指定フィルタの処理時間にばらつきが大きく、serial_in_order 指定フィルタで処理開始できないトークンが多発するケースに限られる。parallel 指定フィルタ処理内容がデータに依存せず均質である場合は、必要な WorkSet 領域とのトレードオフから最大トークン数=ワーカスレッド数(+α)が最適となるケースが多い。

*3:http://www.threadingbuildingblocks.org/docs/help/tbb_userguide/Using_Circular_Buffers.htm http://www.xlsoft.com/jp/products/intel/perflib/tbb/41/tbb_userguide_lnx/tbb_userguide/Using_Circular_Buffers.htm