Intel TBB(Threading Building Blocks)を利用した並列パイプライン処理実装のひな形コード。下記の前提条件・要件に適合する。
- データの “入力 → 変換 → 出力” 処理フローにおいて、スループット改善を目的として変換処理を並列化する。
- データ変換は並行実行されうるため、変換処理はデータ単位で分離独立していること。
- データ入力/出力処理時間に比べて、並列化対象のデータ変換処理時間の方が十分大きいこと。*1
- データ入力/出力については順序維持(in-order)の逐次処理とする。(例:シーケンシャルファイル処理など)
- 並列処理ワーカスレッド数は、論理プロセッサ数(またはそれ以下で任意指定)とする。
- データ変換に必要となる作業領域を、予め並列処理数分だけ確保する。(例:中間データを配置するバッファ領域など)
逐次処理版
struct WorkSet { // 入力データ保持領域 // 出力データ保持領域 // データ変換に必要な作業領域など }; bool input_data(WorkSet&); // データ入力 void process_data(WorkSet&); // データ変換 void output_data(WorkSet&); // データ出力 // 逐次処理ループ WorkSet ws; while (input_data(ws)) { process_data(ws); output_data(ws); }
並列処理版
#include <vector> #include "tbb/tbb.h" using namespace tbb; size_t token_num = task_scheduler_init::default_num_threads(); // default_num_threads()メンバ関数は論理プロセッサ数を返す // (ワーカスレッド数を制限したい場合はtoken_numを変更) // 並列処理トークンに紐付くデータを初期化 std::vector<WorkSet> working_set(token_num); size_t next_token = 0; // TBB並列パイプライン処理 フィルタチェイン構築 auto filter_chain = // 入力フィルタ[serial_in_order] make_filter<void, size_t>( filter::serial_in_order, [&](flow_control& fc) -> size_t { size_t token = (next_token++ % working_set.size()); WorkSet& ws = working_set[token]; if (!input_data(ws)) fc.stop(); return token; }) & // 変換フィルタ[parallel] make_filter<size_t, size_t>( filter::parallel, [&](size_t token) -> size_t { WorkSet& ws = working_set[token]; process_data(ws); return token; }) & // 出力フィルタ[serial_in_order] make_filter<size_t, void>( filter::serial_in_order, [&](size_t token) -> void { WorkSet& ws = working_set[token]; output_data(ws); }); // ワーカスレッド数token_numにて並列パイプラインを実行 task_scheduler_init sched(token_num); parallel_pipeline(token_num, filter_chain);
parallel_pipeline並列アルゴリズムの最大トークン数は、必要十分なWorkSet
領域を確保するためにTBBワーカスレッド数と同一としておく。最大トークン数<ワーカスレッド数とした場合は、ワーカスレッドは作成されるものの最大トークン数以下のスレッドしか利用されない。最大トークン数>ワーカスレッド数とした場合は、必要なWorkSet
領域が増大する一方で同時実行タスクはワーカスレッド数以下に制限されるため、今回の単純なフィルタチェインでは処理スループットの改善が期待されない。*2
入/出力フィルタをserial_in_order
としているため、トークン発行は単純ラウンドロビン方式(next_token++ % N)
で良い*3。つまり入力フィルタがトークンを再利用する時点で、同トークンの出力フィルタ処理まで完了済みであることが保証される。
なお、並列処理版でトークン数token_num
を1とすると逐次処理版と等価な振る舞いとなる。このとき追加のワーカスレッドは作成されず、各フィルタ処理がメインスレッド上で逐次実行される。
*1:入出力処理時間=S, 変換処理時間=P, 並列度=Nとしたとき、理論上は不等式 (S + P) / S ≧ N を満たすあいだ線形スケーリング性能を得られる。おおよそ変換÷入出力の処理時間比が並列度N以上であればよい。
*2:最大トークン数>ワーカスレッド数の利得が得られるのは、parallel 指定フィルタの処理時間にばらつきが大きく、serial_in_order 指定フィルタで処理開始できないトークンが多発するケースに限られる。parallel 指定フィルタ処理内容がデータに依存せず均質である場合は、必要な WorkSet 領域とのトレードオフから最大トークン数=ワーカスレッド数(+α)が最適となるケースが多い。
*3:http://www.threadingbuildingblocks.org/docs/help/tbb_userguide/Using_Circular_Buffers.htm http://www.xlsoft.com/jp/products/intel/perflib/tbb/41/tbb_userguide_lnx/tbb_userguide/Using_Circular_Buffers.htm