Apache Beam を使用してグリッド コンピューティング ワークロードのための C++ バイナリを実行する

この記事では、大量のグリッド コンピューティング ワークロードを処理するために Cloud Dataflow で C++ コード(およびその他の「非ネイティブ」コード)を実行する方法について説明します。

概要

銀行、マスメディア、ライフ サイエンス企業にとって、驚異的並列ワークロードは珍しいことではありません。このような並列ワークロードを実現するため、グリッド コンピューティングと呼ばれる構成で、それぞれが独立した処理タスクを実行する複数のコンピューティング ノードからなるクラスタをデプロイすることがよくあります。こうした並列ワークロードのデータ処理に、Apache Beam を使用できます。

Beam には現在、Java および Python 用のネイティブ SDK が用意されています。しかし、驚異的並列ワークロードの多くは C++ で書かれたコードを使用します。この記事では、Apache Beam、Cloud Dataflow ランナー、その他の Google Cloud Platform(GCP)サービスを使用して、C++ バイナリ(ライブラリ)を外部(または非ネイティブ)コードとして実行する方法について説明します。C++ バイナリを使用すると、フルマネージド サービスを使用してこういったタイプのワークロードに対応することができ、バッチモードとストリーミング モードの両方で高度な有向非巡回グラフ(DAG)を使用してエンドツーエンド パイプラインを構築できます。

この記事で説明しているアプローチは、C++ バイナリの実行に重点を置いています。ただし、このアプローチはスタンドアロン バイナリをコンパイルできる他の言語で書かれたコードにも適用できます。

このシナリオで使用するコンポーネント

グリッド コンピューティング アプリケーションでは、多数のコアで実行されている機能にデータを分散する必要があります。このようなパターンでは、並行性の高い読み取りが必要になることが多く、下流のシステムに取り込まれる大規模なデータ ファンアウトがしばしば発生します。

この記事で説明するアーキテクチャでは、次の GCP リソースを使用しています。

  • Apache Beam の Cloud Dataflow ランナー。このランナーは、有向非巡回グラフ(DAG)から導出された処理フローを使用して、各グリッドノードに作業を分配します。単一の Beam DAG を使用することで、複雑なマルチステージ パイプラインであっても、副入力結合を使って並列するパイプライン ステージが集約されるように定義できます。

  • Cloud Storage。このサービスにより、C++ バイナリをステージングする場所が提供されます。マスメディアのユースケースの多くがそうであるように、大きなファイルを保存する必要がある場合は、そのファイルも Cloud Storage に保存されます。

この記事で説明するシナリオでは、Cloud BigtableBigQuery も使用しています。これらのサービスは、ソースにもシンクにも使用されています。以下の図に、アーキテクチャの概要を示します。

グリッド コンピューティング ソリューションのアーキテクチャ

他のストレージ システムも使用できます。詳細は、Apache Beam のドキュメントのパイプライン I/O ページのストレージ システムとストリーミング ソースのリストを参照してください。

Apache Beam の Cloud Dataflow ランナー

Cloud Dataflow はフルマネージド型の GCP サービスであり、ストリーミング(リアルタイム)モードやバッチ(履歴)モードのデータを、信頼性と表現力を損なうことなく変換および拡張できます。Cloud Dataflow は Apache Beam に基づいています。

Cloud Storage

Cloud Storage は、ライブデータ配信、データ分析、機械学習(ML)、データ アーカイブを網羅する統合型オブジェクト ストレージです。この記事で説明するシナリオでは、Cloud Storage は C++ バイナリへのアクセスを提供します。一部の使用例では、Cloud Storage は処理段階で必要なデータ用のロケーションも提供します。

グリッド コンピューティングで必要とされる高速読み込みについては、Cloud Storage のパフォーマンス特性を理解している必要があります。Cloud Storage のデータ配信パフォーマンスの詳細については、Cloud Storage ドキュメントのリクエスト レートとアクセス配信のガイドラインを参照してください。

Cloud Bigtable

Cloud Bigtable は、大規模な分析ワークロードや運用ワークロード用に最適化された高性能 NoSQL データベース サービスです。Cloud Bigtable は、その主な特徴である低レイテンシの読み書き(90 パーセンタイルで 6 ミリ秒)により、何千もの同時クライアントや大規模なワークロードを処理できるため、Cloud Dataflow を補完します。このような機能を持つ Cloud Bigtable はシンクとしてのみならず、Cloud Dataflow でデータを処理する際に使用する DoFn 関数のデータソースとしても最適です。この点については後で説明します。

BigQuery

BigQuery は、大規模なデータ分析に対応した、高速で経済的なエンタープライズ向けフルマネージド データ ウェアハウスです。グリッドの結果は分析に使用されることが多く、グリッドのデータ出力に対して大規模な集計を実行できます。

Cloud Dataflow DAG

Beam SDK を使用すると、表現力の高い DAG を構築できます。これにより、ストリームまたはバッチのマルチステージ パイプラインを簡単に作成できます。データの移動はランナーによって処理され、データはイミュータブルな並列要素コレクションである PCollection オブジェクトとして表現されます。

次の図は、このフローを示しています。

DAG を使用したフロー

Apache Beam SDK を使用すると DAG を定義できます。DAG ではユーザー定義コードを関数として含めることができます。通常、DAG の宣言とユーザー定義コードの両方に対して同じプログラミング言語(Java または Python)が使用されます。ただし、ここで説明するシナリオについては、ユーザー定義コードは C++ です。

DAG の作成の詳細については、Cloud Dataflow のドキュメントを参照してください。

外部コードを実行する一般的なアプローチ

このセクションでは、外部(C++)コードの実行に使用する一般的なアプローチについて説明します。

このコードを使用すると、完全なビルドを必要とすることなく素早く実験を行うことができます。本番環境システムでは、通常は独自のバイナリを作成し、必要に応じてプロセスを自由に調整できます。

次の図は、パイプライン データの 2 つの使用法を示しています。

  • データを使用してプロセスを駆動する。
  • データを処理中に取得し、駆動データに結合する。

2 段階のパイプライン データ

この記事では、ソースからのプライマリ データを駆動データと呼び、処理段階からのセカンダリ データを結合データと呼びます。

金融分野の使用例では、駆動データが数十万件の取引になることがあります。各取引は、市場データと併せて処理する必要があります。その場合、市場データが結合データとなります。マスメディアの使用例では、駆動データは、処理は必要とするが他のデータソースは必要としない画像データファイルである可能性があり、したがって結合データは使用しません。

駆動データのサイズに関する考慮事項

駆動データ要素のサイズがメガバイト単位の低い範囲にある場合は、このデータ要素を通常の Beam パラダイムで処理する、つまり、ソースから PCollection オブジェクトを作成し、オブジェクトを Beam トランスフォームに送信して処理する必要があります。

一般的なマスメディアのように、駆動データ要素のサイズがメガバイト単位の高い範囲かギガバイト単位の場合は、駆動データを Cloud Storage に配置できます。次に、開始 PCollection オブジェクトでは、ストレージ URI を参照し、そのデータへの URI 参照のみが使用されます。このアプローチについては、この記事の後半にある外部ライブラリを実行するに詳しい説明があります。

結合データのサイズに関する考慮事項

結合データが数百メガバイト以下の場合、副入力を使用してこのデータを Beam トランスフォームに送信します。副入力は、それを必要とするすべてのワーカーにデータパケットを送信します。

結合データがギガバイトまたはテラバイトの範囲にある場合は、Cloud Bigtable または Cloud Storage のいずれかを使用して、データの性質に応じて結合データを駆動データにマージします。Cloud Bigtable は、市場データが Cloud Bigtable から Key-Value ルックアップとしてアクセスされることが多い金融シナリオに最適です。時系列データの使用に関する推奨事項を含め、Cloud Bigtable スキーマの設計の詳細については、Cloud Bigtable の次のドキュメントを参照してください。

外部コードの実行

Beam では、さまざまな方法で外部コードを実行できます。たとえば、マイクロサービスを作成して Cloud Dataflow 変換の中で DoFn オブジェクトから呼び出すという方法や、Java SDK を使用するときに JNI を使用するという方法があります。この記事で説明するアプローチでは、サブプロセスを DoFn オブジェクトから直接作成するという方法を使用しています。このアプローチが最も効率的というわけではありませんが、堅牢で実装が簡単です。

この記事を読んで、エンドツーエンドの完全なパイプラインを検討してください。プロセスの実行方法が非効率的でも、ソースからシンクまでのデータ移動を単一のパイプラインで達成することにより埋め合わせることができます。この方法を他の方法と比較する場合は、パイプラインのエンドツーエンドの時間と費用を確認するようにしてください。

バイナリをホストに pull する

ネイティブの Apache Beam 言語(Java または Python)を使用すると、Beam SDK は必要なすべてのコードを自動的にワーカーに移動します。ただし、外部コードを呼び出すときは、コードを手動で移動する必要があります。

バケットに格納されたバイナリ ファイル

コードを移動するには、次の操作を行います。

  1. コンパイルされた外部コードをバージョニング情報とともに Cloud Storage に保存します。
  2. @Setup メソッドの中に、コードファイルがローカル リソース上で利用可能かどうかを確認するための同期ブロックを作成します。実際のコードの状況を確認するのではなく、最初のスレッドが終了したときの静的変数を使用することで利用可能かどうかが確認できます。
  3. ファイルが利用できない場合は、Cloud Storage クライアント ライブラリを使用して、ファイルを Cloud Storage バケットからローカル ワーカーに pull します。このタスクには Beam の FileSystems クラスを使用することをおすすめします。
  4. ファイルを移動した後、コードファイルに実行ビットが設定されていることを確認します。
  5. 本番環境システムでは、バイナリのハッシュをチェックして、ファイルが正しくコピーされていることを確認します。

外部バイナリの実行

外部コードを実行するには、その前に外部コード用のラッパーを作成する必要があります。このラッパーは、外部コードと同じ言語(C++ など)か、シェル スクリプトとして記述します。ラッパーを使用すると、後述の低い CPU サイクルに対する処理の設計で説明しているように、ファイル ハンドルを渡して、最適化手法を実装できます。ラッパーは高度なものである必要はありません。次のスニペットは、C++ のラッパーの概要を示しています。

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr &lt;&lt; "Required return file and data to process" &lt;&lt; '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

このコードは、引数リストから 2 つのパラメータを読み取ります。最初のパラメータは、データが push されるリターン ファイルの場所です。2 番目のパラメータは、コードがユーザーに表示するデータです。(実際の実装では、このコードは「Hello, world」と表示する以上の機能を果たします。)

ラッパーコードを記述したら、次のようにして外部コードを実行します。

  1. データを外部コードのバイナリに送信します。
  2. バイナリを実行し、エラーをキャッチし、エラーと結果を記録します。
  3. ロギング情報を処理します。
  4. 完了した処理からデータをキャプチャします。

データをバイナリに送信する

ライブラリの実行プロセスを開始するには、データを C++ コードに送信します。ここでは Cloud Dataflow と他の GCP ツールの統合の利点を活用できます。Cloud Bigtable のようなツールは、非常に大きなデータセットを扱い、低レイテンシかつ並行性の高いアクセスを処理できます。これにより、何千ものコアが同時にデータセットにアクセスできます。さらに、Cloud Bigtable はデータを事前処理して、データ整形、拡張、フィルタリングを行うことができます。この作業はすべて、外部コードを実行する前に Beam 変換で行うことができます。

本番環境システムでは、プロトコル バッファを使用して入力データをカプセル化することをおすすめします。入力データはバイトに変換され、外部ライブラリに渡される前に base64 でエンコードされます。このデータを外部ライブラリに渡すには、次の 2 つの方法があります。

  • 小さな入力データ。システムのコマンド引数の最大長を超えない小さなデータの場合は、java.lang.ProcessBuilder で構築されているプロセスの 2 番目の位置に引数を渡します。
  • 大きい入力データ。より大きいデータサイズの場合は、プロセスで必要なデータを格納するために、名前に UUID を含むファイルを作成します。

C++ コードの実行、エラーのキャッチ、ロギング

エラー情報の取得と処理は、このソリューションの重要な部分です。Cloud Dataflow ランナーが使用するリソースは一時的なものであるため、ワーカーログ ファイルを検査することは困難な場合が多いです。すべての有用な情報を取得して Cloud Dataflow ランナーログに push し、ログデータを 1 つ以上の Cloud Storage バケットに格納する必要があります。

推奨されるアプローチは、stdoutstderr をファイルにリダイレクトすることです。これにより、メモリ不足の問題を回避できます。たとえば、C++ コードを呼び出す Cloud Dataflow ランナーでは、次のような行を含めることができます。

import java.lang.ProcessBuilder.Redirect;
...
    processbuilder.redirectError(Redirect.appendTo(errfile));
    processbuilder.redirectOutput(Redirect.appendTo(outFile));</pre>

ログ情報の処理

多くの使用例では、何百万もの要素を処理しています。正常な処理で生成されるログは、価値がほとんど、あるいはまったくないため、ログデータの保持に関するビジネス上の決定を下す必要があります。たとえば、すべてのログデータを単純に保持する代わりに、次の方法を検討してみます。

  • 正常に要素を処理した際に得られたログに含まれる情報に価値がない場合は、ログを保持しない。
  • ログエントリ 10,000 件ごとにサンプリングするなど、ログデータをサンプリングするロジックを作成する。処理が同質である場合(すなわち、コードが何回反復されても、生成されるログデータが本質的に同一である場合)、この手法によってログデータの保持と処理の最適化のバランスを効果的にとることができます。

障害が発生している場合、ログに出力されるデータ量が大きくなる可能性があります。大量のエラー ログデータを処理する有効な方法は、ログエントリの最初の数行を読み込み、その行だけを Stackdriver Logging に push することです。残りのログファイルは Cloud Storage バケットに読み込むことができます。これにより、後でエラーログの最初の数行を確認し、必要に応じて Cloud Storage でファイル全体を調べることができます。

ログファイルのサイズはいつでも確認するに足ります。ファイルサイズがゼロの場合、無視しても問題ありませんが、ファイルにデータがないという簡単なログメッセージを記録することもできます。

完了した処理からのデータのキャプチャ

stdout を使用して、計算結果を DoFn 関数に戻すことはおすすめしません。C++ コードが呼び出す他のコードやユーザー独自のコードが stdout にもメッセージを送信し、ログデータを格納する stdoutput ストリームを汚染する可能性があります。代わりに、C++ ラッパーコードを変更して、値を格納する .ret ファイルをどこに作成するかを示すパラメータをコードが受け入れられるようにすることをおすすめします。このファイルは、プロトコル バッファを使用して言語に依存しない方法で格納する必要ことが理想的です。これにより、C++ コードでオブジェクトを Java コードに戻すことができます。DoFn オブジェクトは .ret ファイルから直接結果を読み込み、結果情報を自身の output 呼び出しに渡すことができます。

その他の考慮事項

このセクションでは、Cloud Dataflow で C++ コードと他の非ネイティブ バイナリを実行する際の追加の考慮事項について説明します。

低い CPU サイクルに対する処理の設計

サブプロセスの呼び出しには、ある程度のオーバーヘッドを伴います。ワークロードによっては、行われる作業に占めるプロセスの開始と終了にかかる管理オーバーヘッドの比率を削減するために、余分な作業が必要になる場合があります。

マスメディアの使用例の場合、各データ要素の処理に数分かかることがあります。その場合、サブプロセスを呼び出すコストは全体の処理時間に比べて重要ではありません。この状況での最善のアプローチは、単一の要素にそれ自身のプロセスを開始させることです。

ただし、他の使用例(金融など)では、処理に必要な CPU 時間はごくわずか(数十ミリ秒)です。その場合、サブプロセスを呼び出すオーバーヘッドは釣り合わないほどに大きくなります。この問題の簡単な解決策の 1 つは、Beam の GroupByKey 変換を利用して 50〜100 要素からなるバッチを作成し、これをプロセスに渡すというものです。たとえば、次のことを行うことができます。

  • DoFn 関数の中で、Key-Value ペアを作成します。金融取引の処理を行う場合は、取引番号をキーとして使用できます。または、キーとして使用する固有の番号がない場合は、データからチェックサムを生成し、モジュロ関数を使用して 50 の要素を持つパーティションを作成できます。
  • キーを GroupByKey.create 関数に送信すると、50 の要素を含んだ KV<key,Iterable<data>> コレクションが返されるので、これをプロセスに送ることができます。

ワーカーの並列性を制限する

Cloud Dataflow ランナーでネイティブにサポートされている Java や Python などの言語を処理する場合、ワーカーに何が起こっているのかを考える必要はありません。Cloud Dataflow には、フロー制御とスレッドをバッチモードまたはストリーム モードで監督するプロセスが多数あります。

しかし、C++ のような外部言語を使用している場合は、サブプロセスを開始することによって、通常とは違ったことをしていることに注意する必要があります。バッチモードでは、Cloud Dataflow ランナーはストリーム モードを使用する場合と比較して、CPU に対する作業スレッドの使用割合が小さくなります。特にストリーム モードでは、クラス内でセマフォを作成して、個々のワーカーの並列処理をより直接的に制御することをおすすめします。

たとえばメディア処理では、何百ものコード変換要素を 1 つのワーカーで並列で処理したくない場合があります。このような場合、DoFn 関数に対して作業の実行の許可を出すユーティリティ クラスを作成できます。このクラスを使用すると、パイプライン内のワーカー スレッドを直接制御できます。

GCP で大容量データシンクを使用する

データは処理された後、データシンクに送信されます。シンクはグリッド処理ソリューションによって作成される大量の結果を処理できる必要があります。

次の図は、Cloud Dataflow がグリッド ワークロードを実行しているときに GCP で使用可能なシンクの一部を示しています。

GCP で利用可能なシンク

Cloud Bigtable、BigQuery、Cloud Pub/Sub はすべて非常に大きなデータ ストリームに対応できます。たとえば、Cloud Bigtable の各ノードは最大 1,000 のサイズの挿入を 1 秒あたり 10,000 件処理でき、水平方向に容易に拡張可能です。これにより、100 ノードの Cloud Bigtable クラスタでは、Cloud Dataflow グリッドによって生成される 1 秒あたり 1,000,000 件のメッセージに対応できます。

次のステップ

Beam で外部ライブラリを使用する方法を示すコード例を入手するには、Apache サイトで Add examples of running external libraries on workers の Jira リクエストに従ってください。

DAG の作業の標準的なパターンをいくつか調べるには、次のブログ記事をご覧ください。

その他の GCP 機能を試すには、チュートリアルをご覧ください。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...