yohhoyの日記

技術的メモをしていきたい日記

(翻訳)Intel Threading Building Blocks フローグラフを用いた特徴検出の例

元記事:A feature-detection example using the Intel® Threading Building Blocks flow graph | Intel® Software, Michael Voss氏, 2011/9/9

自分自身の理解のために日本語訳を行ったIntel TBB フローグラフ解説記事。

Intel® Threading Building Blocks フローグラフを用いた特徴検出の例

Intel® Threading Building Blocks (Intel® TBB) フローグラフ(flow graph)はIntel® TBB 4.0でフルサポートされました。フローグラフに馴染みがなければ、こちら(訳注:同記事の拙訳)の紹介を参照してください。

下の図1は、単純な特徴検出アプリケーションを実装するフローグラフを表しています。複数の画像をグラフへ入力すると、各画像に対して2種類の特徴検出アルゴリズムがそれぞれ実行されます。いずれかのアルゴリズムが特徴量を検出すると、その画像は後続処理のために格納されます。本記事では、このグラフにおける各ノード/頂点(node)の利用方法を説明し、全体が動作する実装を示しながら説明していきます。

図1
図1: 特徴検出サンプルのIntel® TBB フローグラフ

この図中には、アプリケーションの構築に4種類の異なるノードタイプが用いられています:1個の source_node、1個の queue_node、2個の join_node、そして数個の function_node です。サンプル実装を示す前に、各ノードの概要を説明していきます。

最初のノードタイプは source_node です(下記シンボルで図示)。このノードタイプは先行ノードを持たず、グラフへ投入するメッセージの生成に用いられます。ノードはユーザファンクタ(またはラムダ式)を実行して出力を生成します。シンボルの右側に付いた小さな白丸は、ノード自身の出力をバッファリングし、かつ同バッファが確保(reserve)可能であることを表します。source_nodeは1アイテムだけバッファリングします。バッファが確保済みの場合、呼び出し側が値を消費または解放するまで、その値は呼び出し側にて保持されます。
source_node

2番目のノードタイプは queue_node です(下図シンボル)。queue_nodeは上限なしFIFO(first-in first-out)バッファです。source_nodeと同様に、その出力は確保可能です。(訳注:queue_nodeの入力は黒丸;確保不可となっている。受け取ったアイテムは即座に上限なしFIFOバッファに移動されるため、入力ポートは確保不可でよい。)
queue_node

3番目のノードタイプは join_node で、今回の例では2種類が使われています。join_nodeは複数の入力ポートを持ち、各ポートでの受信値をまとめて単一の出力タプルを生成します。join_nodeでは入力ポートに関して、queueing, reserving, tag_matchingといった異なるポリシーを用います。queueing join_nodeは到着しだい全メッセージを消費し、各入力キューに1つでもアイテムがあれば出力を生成します。reserving join_nodeは各入力ポートにおいてアイテム確保に成功したときだけ、タプルを生成しようとします。もし全入力ポートで確保が成功しなければ、全ての確保を解放し、前回確保できなかったポートからメッセージ受信したときに再試行します。最後のtag_matching join_nodeは入力ポートでのメッセージバッファリングにハッシュテーブルを用います。各ポートでキーが一致するメッセージを受信したときに、該当メッセージをまとめた出力タプルを生成します。図1で利用されているreservingとtag_matching join_nodeのシンボルを下に示します。
reserving_jointag_matching_join

今回の例で用いる最後のノードタイプは function_node です(下図シンボル)。function_nodeは入力メッセージを引数としてユーザ提供のファンクタまたはラムダ式を実行し、その戻り値を後続ノードへ渡します。function_nodeは制限付き(limited)または無制限(unlimited)の並列度を指定して構築されます。無制限並列度のfunction_nodeは、到着メッセージと同数だけファンクタを適用するタスクを生成します。制限付き並列度のfunction_nodeでは許された並列度までのタスクを生成し、必要に応じて入力メッセージをバッファリングするためメッセージ欠落はありません。
function_node

スペース節約のため、今回の例中では偽の画像処理を行います。特に、画像を単に文字の配列としています。文字'A'を含む画像はアルゴリズムAで認識される特徴をもち、文字'B'を含む画像はアルゴリズムBで認識される特徴をもつとしています。本記事では、図1で示した構造のフローグラフを構築・実行する完全なコードを示していきますが、実際の計算を行う箇所はトリビアルな処理で置き換えています。

下記コードはimage構造体の宣言、および関数ノードの本体に用いるトリビアルな実装です。関数get_next_imageはノードsource_nodeで使われ、処理対象となる画像を生成します。関数get_next_imageでは、画像11枚毎にアルゴリズムAで検知される特徴が、13枚毎にアルゴリズムBで検知される特徴が含まれることに気付くと思います。関数preprocess_imageでは各文字に対してオフセット値を加算し、関数detect_with_Aとdetect_with_Bではそれぞれ文字'A'および'B'を検索しています。

#include <cstring>
#include <cstdio>

const int num_image_buffers = 100;
int image_size = 10000000;

struct image {
   const int N;
   char *data;
   image();
   image( int image_number, bool a, bool b );
};

image::image() : N(image_size) {
   data = new char[N];
}

image::image( int image_number, bool a, bool b ) : N(image_size) {
    data = new char[N];
    memset( data, '\0', N );
    data[0] = (char)image_number - 32;
    if ( a ) data[N-2] = 'A';
    if ( b ) data[N-1] = 'B';
}

int img_number = 0;
int num_images = 64;
const int a_frequency = 11;
const int b_frequency = 13;

image *get_next_image() {
    bool a = false, b = false;
    if ( img_number < num_images ) {
        if ( img_number%a_frequency == 0 ) a = true;
        if ( img_number%b_frequency == 0 ) b = true;
        return new image( img_number++, a, b );
    } else {
        return 0;
    }
}

void preprocess_image( image *input_image, image *output_image ) {
    for ( int i = 0; i < input_image->N; ++i ) {
        output_image->data[i] = input_image->data[i] + 32;
    }
}

bool detect_with_A( image *input_image ) {
    for ( int i = 0; i < input_image->N; ++i ) {
        if ( input_image->data[i] == 'a' )
            return true;
    }
    return false;
}

bool detect_with_B( image *input_image ) {
    for ( int i = 0; i < input_image->N; ++i ) {
        if ( input_image->data[i] == 'b' )
            return true;
    }
    return false;
}

void output_image( image *input_image, bool found_a, bool found_b ) {
    bool a = false, b = false;
    int a_i = -1, b_i = -1;
    for ( int i = 0; i < input_image->N; ++i ) {
        if ( input_image->data[i] == 'a' ) { a = true; a_i = i; }
        if ( input_image->data[i] == 'b' ) { b = true; b_i = i; }
    }
    printf("Detected feature (a,b)=(%d,%d)=(%d,%d) at (%d,%d) for image %p:%d\n",
           a, b, found_a, found_b, a_i, b_i, input_image, input_image->data[0]);
}

フローグラフを実装するコードは、下記のmain関数内にあります。フローグラフコンポーネントを使う箇所を示すため、mainコードの途中にテキストを挟んでいます。(訳注:コメント"Put image buffers into the buffer queue"のこと。)このサンプルコードをビルドしたい場合は、全てのコード片を上から順に連結して単一ファイルへコピー&ペーストしてください。

int num_graph_buffers = 8;

#include "tbb/flow_graph.h"

using namespace tbb;
using namespace tbb::flow;

int main() {

最初にグラフgを作成します。全てのノードはこのグラフに所属します。joinノードの出力への言及を簡単にするために、typedefを幾つか行います。

    graph g;

    typedef std::tuple< image *, image * > resource_tuple;
    typedef std::pair< image *, bool > detection_pair;
    typedef std::tuple< detection_pair, detection_pair > detection_tuple;

次に、画像を保持するqueue_nodeノードと、2つのjoinノードを作成します。もう一度、resource_joinはreservingポリシーを、detection_joinはtag_matchingポリシーを持つことに留意してください。tag_matchingポリシーを利用するには、ユーザはアイテムからタグ抽出を行うファンクタを提供しなければいけません;それぞれコンストラクタの追加引数として指定します。

    queue_node< image * > buffers( g );
    join_node< resource_tuple, reserving > resource_join( g );
    join_node< detection_tuple, tag_matching > detection_join( g,
       [](const detection_pair &p) -> size_t { return (size_t)p.first; },
       [](const detection_pair &p) -> size_t { return (size_t)p.first; }  );

続いて、1個のsource_nodeと4個のfunction_nodeを含む、ユーザコードを実行するノード群を作成します。ユーザコードはC++ラムダ式(関数オブジェクトでも良い)を用いて各ノードへ渡します。各ラムダ式の大半は前述の関数を呼び出す小さなラッパコードであり、必要に応じて入力取得や出力生成を行います。make_edge関数呼び出しにより、図1に示す通りにノード間を接続します。

    source_node< image * > src( g,
                                []( image* &next_image ) -> bool {
                                    next_image = get_next_image();
                                    if ( next_image ) return true;
                                    else return false;
                                }
                              );
    make_edge(src, input_port<0>(resource_join) );
    make_edge(buffers, input_port<1>(resource_join) );

    function_node< resource_tuple, image * >
        preprocess_function( g, unlimited,
                             []( const resource_tuple &in ) -> image * {
                                 image *input_image = std::get<0>(in);
                                 image *output_image = std::get<1>(in);
                                 preprocess_image( input_image, output_image );
                                 delete input_image;
                                 return output_image;
                             }
                           );

    make_edge(resource_join, preprocess_function );

    function_node< image *, detection_pair >
        detect_A( g, unlimited,
                 []( image *input_image ) -> detection_pair {
                    bool r = detect_with_A( input_image );
                    return std::make_pair( input_image, r );
                 }
               );

    function_node< image *, detection_pair >
        detect_B( g, unlimited,
                 []( image *input_image ) -> detection_pair {
                    bool r = detect_with_B( input_image );
                    return std::make_pair( input_image, r );
                 }
               );

    make_edge(preprocess_function, detect_A );
    make_edge(detect_A, input_port<0>(detection_join) );
    make_edge(preprocess_function, detect_B );
    make_edge(detect_B, input_port<1>(detection_join) );

    function_node< detection_tuple, image * >
        decide( g, serial,
                 []( const detection_tuple &t ) -> image * {
                     const detection_pair &a = std::get<0>(t);
                     const detection_pair &b = std::get<1>(t);
                     image *img = a.first;
                     if ( a.second || b.second ) {
                         output_image( img, a.second, b.second );
                     }
                     return img;
                 }
               );

    make_edge(detection_join, decide);
    make_edge(decide, buffers);

グラフ先頭にreserving joinノードがあるため、バッファキュー中に有効な画像バッファが存在しない間はグラフは待機状態のままです。下記のforループにて画像バッファを確保しキューへ追加します。ループ後のg.wait_for_all()呼び出しは、全ての画像が処理されてグラフが再び待機状態に戻るまでブロッキングします。

    // Put image buffers into the buffer queue
    for ( int i = 0; i < num_graph_buffers; ++i ) {
        image *img = new image;
        buffers.try_put( img );
    }
    g.wait_for_all();

グラフが待機状態の場合、全ての画像バッファは再びバッファキューに戻っています。そのためqueue_nodeから画像バッファを取り出し解放する必要があります。

    for ( int i = 0; i < num_graph_buffers; ++i ) {
        image *img = NULL;
        if ( !buffers.try_get(img) )
            printf("ERROR: lost a buffer\n");
        else
            delete img;
    }
    return 0;
}

今回の特徴検出サンプルが、ノード間メッセージ送受信を行うフローグラフをどのように実装するかの例示となることを望みます。Intel® Threading Building Blocks 4.0の新機能については http://www.threadingbuildingblocks.org を、またIntel® TBB フローグラフについては http://software.intel.com/en-us/blogs/tag/flow_graph/ にあるブログ記事をチェックしてください。