高度な並列ワークフローのベスト プラクティス

このページでは、パイプラインで外部コードを使用する方法、パイプラインを実行する方法、エラー処理の管理方法など、Dataflow HPC の高度な並列ワークフローを構築および実行する際のベスト プラクティスについて説明します。

パイプラインに外部コードを含める

高度な並列パイプラインの重要な違いは、標準の Apache Beam SDK 言語ではなく、DoFn 内で C++ コードを使用する点です。Java パイプラインの場合は、パイプラインで C++ ライブラリを使いやすくするために、外部プロシージャ呼び出しを使用することをおすすめします。このセクションでは、Java パイプラインで外部コード(C++)を実行するために使用される一般的な方法について説明します。

Apache Beam パイプラインの定義には次の重要なコンポーネントがあります。

  • PCollections は、同種要素の不変のコレクションです。
  • PTransforms は、別の PCollection を生成する PCollection への変換を定義するために使用されます。
  • パイプラインは、コードを介して PTransformsPCollections の間のインタラクションを宣言できるようにする構造です。パイプラインは有向非巡回グラフ(DAG)として表されます。

標準の Apache Beam SDK 言語以外の言語のコードを使用する場合は、DoFn 内の PTransform にコードを配置し、標準のパイプライン自体を定義する SDK 言語を使用します。Python SDK には、他のコードを簡単に使用できるようにするユーティリティ クラスがあります。このため、パイプラインの定義に Apache Beam Python SDK を使用することをおすすめします。ただし、他の Apache Beam SDK も使用できます。

このコードを使用すると、完全なビルドを必要とすることなく素早く実験を行うことができます。本番環境システムでは、通常は独自のバイナリを作成し、必要に応じてプロセスを自由に調整できます。

次の図は、パイプライン データの 2 つの使用法を示しています。

  • データを使用してプロセスを駆動する。
  • データを処理中に取得し、駆動データに結合する。

2 段階のパイプライン データ

このページでは、(ソースからの)プライマリ データを駆動データと呼び、(処理フェーズからの)セカンダリ データを結合データと呼びます。

金融分野の使用例では、駆動データが数十万件の取引になることがあります。各取引は、市場データと併せて処理する必要があります。その場合、市場データが結合データとなります。マスメディアの使用例では、駆動データは、処理は必要とするが他のデータソースは必要としない画像データファイルである可能性があり、したがって結合データは使用しません。

駆動データのサイズに関する考慮事項

駆動データ要素のサイズがメガバイト単位の低い範囲にある場合は、このデータ要素を通常の Apache Beam パラダイムで処理する、つまり、ソースから PCollection オブジェクトを作成し、オブジェクトを Apache Beam 変換に送信して処理する必要があります。

一般的なマスメディアのように、駆動データ要素のサイズがメガバイト単位の高い範囲かギガバイト単位の場合は、駆動データを Cloud Storage に配置できます。次に、開始 PCollection オブジェクトでは、ストレージ URI を参照し、そのデータへの URI 参照のみが使用されます。

結合データのサイズに関する考慮事項

結合データが数百メガバイト以下の場合、副入力を使用してこのデータを Apache Beam 変換に送信します。副入力は、それを必要とするすべてのワーカーにデータパケットを送信します。

結合データがギガバイトまたはテラバイトの範囲にある場合は、Bigtable または Cloud Storage のいずれかを使用して、データの性質に応じて結合データを駆動データにマージします。Bigtable は、市場データが Bigtable から Key-Value ルックアップとしてアクセスされることが多い金融シナリオに最適です。時系列データの使用に関する推奨事項を含め、Bigtable スキーマの設計の詳細については、Bigtable の次のドキュメントを参照してください。

外部コードを実行する

Apache Beam では、さまざまな方法で外部コードを実行できます。

  • Dataflow 変換内の DoFn オブジェクトから呼び出されるプロセスを作成します。

  • Java SDK で JNI を使用します。

  • サブプロセスを DoFn オブジェクトから直接作成します。これは最も効率の良い方法でありませんが、堅牢で実装が簡単です。JNI の使用には潜在的な問題があるため、このページではサブプロセス呼び出しの使用について説明します。

ワークフローを設計する際は、完全なエンドツーエンドのパイプラインを検討します。プロセスの実行方法が非効率的でも、ソースからシンクまでのデータ移動を単一のパイプラインで達成することにより埋め合わせることができます。この方法を他の方法と比較する場合は、パイプラインのエンドツーエンドの時間と費用を確認するようにしてください。

ホストにバイナリを pull する

ネイティブの Apache Beam 言語を使用すると、Apache Beam SDK は必要なすべてのコードを自動的にワーカーに移動します。ただし、外部コードを呼び出すときは、コードを手動で移動する必要があります。

バケットに格納されたバイナリ ファイル

コードを移動するには、次の操作を行います。この例では Apache Beam Java SDK の手順を示します。

  1. コンパイルされた外部コードをバージョニング情報とともに Cloud Storage に保存します。
  2. @Setup メソッドの中に、コードファイルがローカル リソース上で利用可能かどうかを確認するための同期ブロックを作成します。実際のコードの状況を確認するのではなく、最初のスレッドが終了したときの静的変数を使用することで利用可能かどうかが確認できます。
  3. ファイルが利用できない場合は、Cloud Storage クライアント ライブラリを使用して、ファイルを Cloud Storage バケットからローカル ワーカーに pull します。このタスクには Apache Beam の FileSystems クラスを使用することをおすすめします。
  4. ファイルを移動した後、コードファイルに実行ビットが設定されていることを確認します。
  5. 本番環境システムでは、バイナリのハッシュをチェックして、ファイルが正しくコピーされていることを確認します。

Apache Beam の filesToStage 関数を使用するという方法もありますが、Java コードを自動的にパッケージ化して移動できるというランナーの利点を活用できなくなります。さらに、サブプロセスを呼び出すにはファイルの絶対位置が必要なため、クラスパス、つまり filesToStage によって移動されたファイルの位置を判別するためのコードを使用する必要があります。このアプローチは推奨していません。

外部バイナリを実行する

外部コードを実行するには、その前に外部コード用のラッパーを作成する必要があります。このラッパーは、外部コードと同じ言語(C++ など)か、シェル スクリプトとして記述します。ラッパーを使用すると、このページの低い CPU サイクルに対する処理の設計で説明しているように、ファイル ハンドルを渡して、最適化手法を実装できます。ラッパーは高度なものである必要はありません。次のスニペットは、C++ のラッパーの概要を示しています。

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr << "Required return file and data to process" << '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

このコードは、引数リストから 2 つのパラメータを読み取ります。最初のパラメータは、データが push されるリターン ファイルの場所です。2 番目のパラメータは、コードがユーザーに表示するデータです。実際の実装では、このコードは「Hello, world」と表示する以上の機能を果たします。

ラッパーコードを記述したら、次の手順で外部コードを実行します。

  1. データを外部コードのバイナリに送信します。
  2. バイナリを実行してエラーをキャッチし、エラーと結果を記録します。
  3. ロギング情報を処理します。
  4. 完了した処理からデータをキャプチャします。

バイナリにデータを送信する

ライブラリの実行プロセスを開始するには、データを C++ コードに送信します。このステップでは Dataflow と他の Google Cloud ツールの統合の利点を活用できます。Bigtable のようなツールは、非常に大きなデータセットを扱い、低レイテンシで同時実行性に優れたアクセスを処理できるため、何千ものコアがデータセットに同時にアクセスできます。さらに、Bigtable でデータを前処理して、データ整形、拡張、フィルタリングを行うことができます。この作業はすべて、外部コードを実行する前に Apache Beam 変換で行うことができます。

本番環境システムでは、プロトコル バッファを使用して入力データをカプセル化することをおすすめします。入力データをバイトに変換し、外部ライブラリに渡す前に base64 エンコードできます。このデータを外部ライブラリに渡す方法は 2 つあります。

  • 小さな入力データ。システムのコマンド引数の最大長を超えない小さなデータの場合は、java.lang.ProcessBuilder で構築されているプロセスの 2 番目の位置に引数を渡します。
  • 大きな入力データ。より大きいデータサイズの場合は、プロセスで必要なデータを格納するために、名前に UUID を含むファイルを作成します。

C++ コードの実行、エラーのキャッチ、ロギング

エラー情報のキャプチャと処理はパイプラインの重要な部分です。Dataflow ランナーが使用するリソースは一時的なものであるため、ワーカーログ ファイルを検査することは困難な場合が多いです。すべての有用な情報を取得して Dataflow ランナーログに push し、ロギングデータを 1 つ以上の Cloud Storage バケットに格納する必要があります。

推奨されるアプローチは、stdoutstderr をファイルにリダイレクトすることです。これにより、メモリ不足の問題を回避できます。たとえば、C++ コードを呼び出す Dataflow ランナーでは、次のような行を含めることができます。

Java

  import java.lang.ProcessBuilder.Redirect;
  ...
      processbuilder.redirectError(Redirect.appendTo(errfile));
      processbuilder.redirectOutput(Redirect.appendTo(outFile));

Python

# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
    integers
    | beam.Map(collatz.total_stopping_time).with_exception_handling(
        use_subprocess=True))

# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
    os.path.splitext(output_path)[0] + '-bad.txt')

ロギング情報を処理する

多くの使用例では、何百万もの要素を処理しています。正常な処理で生成されるログは、価値がほとんど、あるいはまったくないため、ログデータの保持に関するビジネス上の決定を下す必要があります。たとえば、すべてのログデータを単純に保持する代わりに、次の方法を検討してみます。

  • 正常に要素を処理した際に得られたログに含まれる情報に価値がない場合は、ログを保持しない。
  • ログエントリ 10,000 件ごとにサンプリングするなど、ログデータをサンプリングするロジックを作成する。処理が同質である場合(すなわち、コードが何回反復されても、生成されるログデータが本質的に同一である場合)、この手法によってログデータの保持と処理の最適化のバランスを効果的にとることができます。

障害が発生している場合、ログに出力されるデータ量が大きくなる可能性があります。大量のエラー ログデータを処理する有効な方法は、ログエントリの最初の数行を読み込み、その行だけを Cloud Logging に push することです。残りのログファイルは Cloud Storage バケットに読み込めます。このアプローチでは、後でエラーログの最初の数行を確認し、必要に応じて Cloud Storage でファイル全体を参照できます。

ログファイルのサイズを確認することも有効です。ファイルサイズがゼロの場合、無視しても問題ありませんが、ファイルにデータがないという簡単なログ メッセージを記録することもできます。

完了した処理からデータを取得する

stdout を使用して、計算結果を DoFn 関数に戻すことはおすすめしません。C++ コードが呼び出す他のコードやユーザー独自のコードが stdout にもメッセージを送信し、ロギングデータを格納する stdoutput ストリームを汚染する可能性があります。代わりに、C++ ラッパーコードを変更して、値を格納するファイルをどこに作成するかを示すパラメータをコードが受け入れられるようにすることをおすすめします。このファイルは、プロトコル バッファを使用して言語に依存しない方法で格納する必要ことが理想的です。これにより、C++ コードでオブジェクトを Java または Python コードに戻すことができます。DoFn オブジェクトではファイルから直接結果が読み込まれ、結果情報が自身の output 呼び出しに渡されます。

これまでの経験から、プロセス自体を扱う単体テストを実行することの重要性は明らかです。Dataflow パイプラインとは独立してプロセスを実行する単体テストを実装することが重要です。プロセスがスタンドアロンで、パイプライン全体を実行する必要がない場合、ライブラリのデバッグをより効率的に行うことができます。

小さな CPU サイクルの処理を設計する

サブプロセスの呼び出しにはオーバーヘッドがあります。ワークロードによっては、行われる作業に占めるプロセスの開始と終了にかかる管理オーバーヘッドの比率を削減するために、余分な作業が必要になる場合があります。

マスメディアのユースケースでは、駆動データ要素のサイズがメガバイト単位の高い範囲かギガバイト単位になります。その結果、各データ要素の処理に数分かかることがあります。その場合、サブプロセスを呼び出すコストは全体の処理時間に比べて重要ではありません。この状況での最善のアプローチは、単一の要素にそれ自身のプロセスを開始させることです。

ただし、財務などのユースケースでは、CPU 時間を非常に短く必要があります(数十ミリ秒)。その場合、サブプロセスを呼び出すオーバーヘッドは無視できないほど大きくなります。この問題の解決策として、Apache Beam の GroupByKey 変換を利用して、50~100 要素からなるバッチを作成し、プロセスに組み入れます。たとえば、次の操作を行います。

  • DoFn 関数の中で、Key-Value ペアを作成します。金融取引の処理を行う場合は、取引番号をキーとして使用できます。または、キーとして使用する固有の番号がない場合は、データからチェックサムを生成し、モジュロ関数を使用して 50 の要素を持つパーティションを作成できます。
  • キーを GroupByKey.create 関数に送信すると、50 の要素を含んだ KV<key,Iterable<data>> コレクションが返されるので、これをプロセスに送ることができます。

ワーカーの並列処理を制限する

Dataflow ランナーでネイティブにサポートされている言語を使用する場合は、ワーカーで何が起こっているかを考える必要はありません。Dataflow には、フロー制御とスレッドをバッチモードまたはストリーム モードで監督するプロセスが多数あります。

しかし、C++ のような外部言語を使用している場合は、サブプロセスを開始することによって、通常とは違ったことをしていることに注意してください。バッチモードでは、Dataflow ランナーはストリーミング モードの場合と比較して、CPU に対する作業スレッドの使用割合が小さくなります。特にストリーミング モードでは、クラス内でセマフォを作成して、個々のワーカーの並列処理をより直接的に制御することをおすすめします。

たとえばメディア処理では、何百ものコード変換要素を 1 つのワーカーで並列処理をしたくない場合があります。このような場合、DoFn 関数に対して作業の実行の許可を出すユーティリティ クラスを作成できます。このクラスを使用すると、パイプライン内のワーカー スレッドを直接制御できます。

Google Cloud で大容量のデータシンクを使用する

データは処理された後、データシンクに送信されます。シンクはグリッド処理ソリューションによって作成される大量の結果を処理できる必要があります。

次の図は、Dataflow がグリッド ワークロードを実行しているときに Google Cloud で使用可能なシンクの一部を示しています。

Google Cloud で利用可能なシンク

Bigtable、BigQuery、Pub/Sub はすべて非常に大きなデータ ストリームに対応できます。たとえば、Bigtable の各ノードは最大 1K のサイズの挿入を 1 秒あたり 10,000 件処理でき、水平方向へのスケールも容易です。その結果、100 ノードの Bigtable クラスタでは、Dataflow グリッドによって生成された 1 秒あたり 1,000,000 件のメッセージを取り込めます。

segfaults を管理する

パイプライン内で C++ コードを使用する場合、セグメンテーションを適切に処理しないとローカル以外で予期しない問題が発生するため、segfaults の管理方法を決めておく必要があります。Dataflow ランナーは、Java、Python、または Go で必要に応じてプロセスを作成し、バンドル形式でプロセスに作業を割り当てます。

C++ コードの呼び出しが JNI や Cython などの密結合されたツールを使用して行われ、C++ が segfaults を処理していると、呼び出し側のプロセスと Java 仮想マシン(JVM)もクラッシュします。このシナリオでは、不適切なデータポイントをキャッチできません。不適切なデータポイントをキャッチするには、緩い結合を使用します。これにより、不適切なデータが分散し、パイプラインが続行されます。ただし、すべてのデータ バリエーションに対して完全にテストされた成熟した C++ コードでは、Cython などのメカニズムを使用できます。

次のステップ