yohhoyの日記

技術的メモをしていきたい日記

tbb::flow::sequencer_nodeとsequence order numberの落とし穴

Intel TBB フローグラフ(flow graph)のシーケンサノード sequencer_node では、入力アイテムからシーケンス序数(sequence order number)を返すユーザ定義のSequencerファンクタを与えて、後続ノードへ送るアイテム順序を変更する。このシーケンス序数の割当てで落とし穴に嵌ったのでメモ。

結論:アイテムに対するシーケンス序数には、値0から連続する一意な整数値(std::size_t型)を割り当てる必要がある。

期待通り動作しない例

下記コードでは、単純なパイプライン並列処理「並列に入力値を2倍し入力順序で値を出力」を行うフローグラフを構築し、入力数列{0,1,2,...9}に対して出力数列{0,2,4,...18} を期待している。しかし実際にTBB 4.0で動作させると {0} の1要素しか出力されない*1

#include <iostream>
#include "tbb/flow_graph.h"
using namespace tbb::flow;

graph g;
// 処理ノード[並列度:unlimited]: 入力値を2倍する
function_node<double, double> proc_node(g, unlimited, [](double x) -> double {
  return x * 2.0;
});
// 整列ノード: 入力順に並び替え
sequencer_node<double> seq_node(g, [](double x) -> std::size_t {
  return std::size_t(x);  // NG: 入力値をそのままシーケンス序数とする
});
// 出力ノード[並列度:serial(1)]: 標準出力へダンプ
function_node<double> dump_node(g, serial, [](double x) {
  std::cout << x << std::endl;
});

// flow graph: proc_node --> seq_node --> dump_node を構築
make_edge(proc_node, seq_node);
make_edge(seq_node, dump_node);

// proc_nodeに 数列{0,1,2,...9} を入力
for (int n = 0; n < 10; ++n)
  proc_node.try_put(n);
g.wait_for_all();

この動作は、シーケンサノードseq_nodeで割り当てたシーケンス序数(size_t型)が、0からの連続した整数値ではないことによる。ノードseq_nodeは"値=0/序数=0"を通過させた後、序数1となるアイテムが入力されるのを永遠に待機している。出力されなかった後半の数列{2,4,...18}はノードseq_nodeの内部バッファに蓄積されており、後続ノードdump_nodeに渡されないままグラフgが破棄される。

修正方法

sequencer_nodeのSequencerファンクタにて、0からの連続整数値を返すように修正すると期待通りの動作となる。(ただし、この実装では先攻ノードproc_nodeの処理内容に強く依存している。)

// 整列ノード: 入力順に並び替え
sequencer_node<double> seq_node(g, [](double x) -> std::size_t {
  return std::size_t(x / 2.0);  // OK: 入力値/2をシーケンス序数とする(前提条件; 入力値/2==入力順序)
});

より実用的には、下記実装のように入力順序を保持したままフローグラフで処理する方式が考えられる。

#include <tuple>
typedef std::tuple<double, std::size_t> ItemType;  // タプル(値, index) で値と入力順序を保持

// 処理ノード: 入力値を2倍する
function_node<ItemType, ItemType> proc_node(g, unlimited, [](const ItemType& x) -> ItemType {
  return std::make_tuple(std::get<0>(x) * 2.0, std::get<1>(x));
});
// 整列ノード: 入力順に並び替え
sequencer_node<ItemType> seq_node(g, [](const ItemType& x) -> std::size_t {
  return std::get<1>(x);  // OK: アイテムのindexを返す
});
// 出力ノード[並列度:serial(1)]: 標準出力へダンプ
function_node<ItemType> dump_node(g, serial, [](const ItemType& x) {
  std::cout << std::get<0>(x) << std::endl;
});

// proc_nodeに 数列{0,1,2,...9} を入力
for (int n = 0; n < 10; ++n)
  proc_node.try_put(ItemType(n, n));
g.wait_for_all();

*1:期待した動作ではないが決定的(deterministic)であり、プログラムは毎回同じ結果を出力する。