Intel TBB フローグラフ(flow graph)のシーケンサノード sequencer_node では、入力アイテムからシーケンス序数(sequence order number)を返すユーザ定義のSequencerファンクタを与えて、後続ノードへ送るアイテム順序を変更する。このシーケンス序数の割当てで落とし穴に嵌ったのでメモ。
結論:アイテムに対するシーケンス序数には、値0から連続する一意な整数値(std::size_t型)を割り当てる必要がある。
期待通り動作しない例
下記コードでは、単純なパイプライン並列処理「並列に入力値を2倍し入力順序で値を出力」を行うフローグラフを構築し、入力数列{0,1,2,...9}に対して出力数列{0,2,4,...18} を期待している。しかし実際にTBB 4.0で動作させると {0} の1要素しか出力されない*1。
#include <iostream> #include "tbb/flow_graph.h" using namespace tbb::flow; graph g; // 処理ノード[並列度:unlimited]: 入力値を2倍する function_node<double, double> proc_node(g, unlimited, [](double x) -> double { return x * 2.0; }); // 整列ノード: 入力順に並び替え sequencer_node<double> seq_node(g, [](double x) -> std::size_t { return std::size_t(x); // NG: 入力値をそのままシーケンス序数とする }); // 出力ノード[並列度:serial(1)]: 標準出力へダンプ function_node<double> dump_node(g, serial, [](double x) { std::cout << x << std::endl; }); // flow graph: proc_node --> seq_node --> dump_node を構築 make_edge(proc_node, seq_node); make_edge(seq_node, dump_node); // proc_nodeに 数列{0,1,2,...9} を入力 for (int n = 0; n < 10; ++n) proc_node.try_put(n); g.wait_for_all();
この動作は、シーケンサノードseq_nodeで割り当てたシーケンス序数(size_t型)が、0からの連続した整数値ではないことによる。ノードseq_nodeは"値=0/序数=0"を通過させた後、序数1となるアイテムが入力されるのを永遠に待機している。出力されなかった後半の数列{2,4,...18}はノードseq_nodeの内部バッファに蓄積されており、後続ノードdump_nodeに渡されないままグラフgが破棄される。
修正方法
sequencer_nodeのSequencerファンクタにて、0からの連続整数値を返すように修正すると期待通りの動作となる。(ただし、この実装では先攻ノードproc_nodeの処理内容に強く依存している。)
// 整列ノード: 入力順に並び替え sequencer_node<double> seq_node(g, [](double x) -> std::size_t { return std::size_t(x / 2.0); // OK: 入力値/2をシーケンス序数とする(前提条件; 入力値/2==入力順序) });
より実用的には、下記実装のように入力順序を保持したままフローグラフで処理する方式が考えられる。
#include <tuple> typedef std::tuple<double, std::size_t> ItemType; // タプル(値, index) で値と入力順序を保持 // 処理ノード: 入力値を2倍する function_node<ItemType, ItemType> proc_node(g, unlimited, [](const ItemType& x) -> ItemType { return std::make_tuple(std::get<0>(x) * 2.0, std::get<1>(x)); }); // 整列ノード: 入力順に並び替え sequencer_node<ItemType> seq_node(g, [](const ItemType& x) -> std::size_t { return std::get<1>(x); // OK: アイテムのindexを返す }); // 出力ノード[並列度:serial(1)]: 標準出力へダンプ function_node<ItemType> dump_node(g, serial, [](const ItemType& x) { std::cout << std::get<0>(x) << std::endl; }); // proc_nodeに 数列{0,1,2,...9} を入力 for (int n = 0; n < 10; ++n) proc_node.try_put(ItemType(n, n)); g.wait_for_all();
*1:期待した動作ではないが決定的(deterministic)であり、プログラムは毎回同じ結果を出力する。