パイプラインのライフサイクル

このページでは、パイプライン コードから Dataflow ジョブまでのパイプラインのライフサイクルの概要について説明します。

このページでは、次のことを説明します。

  • 実行グラフの概要と、Apache Beam パイプラインが Dataflow ジョブになる仕組み。
  • Dataflow がパイプラインの処理ロジックを自動的に並列化し、ジョブを実行するワーカーに分散する方法。
  • Dataflow によるエラーの処理方法。
  • Dataflow によるジョブの最適化。

実行グラフ

Dataflow パイプラインを実行すると、Dataflow はすべての変換とその関連する処理関数(DoFn など)を含む Pipeline オブジェクトを作成するコードから実行グラフを作成します。パイプラインの実行グラフで、フェーズはグラフの作成時間と呼ばれます。

グラフの作成中、Apache Beam はパイプライン コードのメイン エントリ ポイントからコードをローカルで実行して、ソース、シンク、変換ステップの呼び出しで停止し、これらの呼び出しをグラフのノードに変えます。したがって、パイプラインのエントリ ポイント(Java と Go の main() メソッド、または Python スクリプトのトップレベル)のコードの一部は、パイプラインを実行するマシンのローカルで実行されますが、DoFn オブジェクトのメソッドで宣言された同じコードは Dataflow ワーカーで実行されます。

たとえば、Apache Beam SDK に含まれる WordCount サンプルには、テキストのコレクション内の個々の単語と各単語のオカレンスの読み取り、抽出、カウント、書式設定、書き込みを行う一連の変換が含まれます。次の図は、WordCount パイプライン内の変換が実行グラフにどのように展開されるかを示しています。

Dataflow サービスで実行するステップの実行グラフに展開された WordCount サンプル プログラムの変換。

図 1: WordCount サンプルの実行グラフ

多くの場合、実行グラフは、パイプラインの作成時に変換を指定した順序とは異なります。これは、Dataflow サービスが管理対象のクラウド リソースに対して実行される前に実行グラフに対してさまざまな最適化と融合を実行するためです。Dataflow サービスは、パイプラインの実行時にデータの依存関係を尊重しますが、間にデータの依存関係がないステップは任意の順序で実行できます。

Dataflow モニタリング インターフェースでジョブを選択するときに Dataflow がパイプラインに対して生成した、最適化されていない実行グラフを参照できます。ジョブの表示の詳細については、Dataflow モニタリング インターフェースの使用をご覧ください。

グラフ作成中に、Apache Beam は、パイプラインによって参照されるリソース(Cloud Storage バケット、BigQuery テーブル、Pub/Sub トピック、サブスクリプションなど)が実際に存在し、アクセス可能であることを検証します。検証は、各サービスに対する標準 API 呼び出しを通じて行われるため、パイプラインの実行に使用されるユーザー アカウントに必要なサービスへの適切な接続が確立され、API を呼び出す権限があることが不可欠です。パイプラインを Dataflow サービスに送信する前に、Apache Beam は他のエラーもチェックし、パイプライン グラフに不正なオペレーションが含まれていないことを確認します。

実行グラフは JSON 形式に変換され、JSON 実行グラフは Dataflow サービス エンドポイントに転送されます。

Dataflow サービスは、JSON 実行グラフを検証します。グラフが検証されると、Dataflow サービスでジョブになります。Dataflow モニタリング インターフェースを使用して、ジョブ、その実行グラフ、ステータス、ログ情報を参照できるようになります。

Java

Dataflow サービスは、Dataflow プログラムを実行したマシンにレスポンスを送信します。このレスポンスは、Dataflow ジョブの jobId を含むオブジェクト DataflowPipelineJob にカプセル化されます。Dataflow モニタリング インターフェースDataflow コマンドライン インターフェースを使用して、jobId を基準にジョブのモニタリング、追跡、トラブルシューティングを行うことができます。詳しくは、DataflowPipelineJob の API リファレンスをご覧ください。

Python

Dataflow サービスは、Dataflow プログラムを実行したマシンにレスポンスを送信します。このレスポンスは、Dataflow ジョブの job_id を含むオブジェクト DataflowPipelineResult にカプセル化されます。Dataflow モニタリング インターフェースDataflow コマンドライン インターフェースを使用して、job_id を基準にジョブのモニタリング、追跡、トラブルシューティングを行うことができます。

Go

Dataflow サービスは、Dataflow プログラムを実行したマシンにレスポンスを送信します。このレスポンスは、Dataflow ジョブの jobID を含むオブジェクト dataflowPipelineResult にカプセル化されます。Dataflow モニタリング インターフェースDataflow コマンドライン インターフェースを使用して、jobID を基準にジョブのモニタリング、追跡、トラブルシューティングを行うことができます。

並列化と分散

Dataflow サービスは、パイプラインの処理ロジックを自動的に並列化し、ジョブを実行するために割り当てたワーカーに分散します。Dataflow は、プログラミング モデルでの抽象化を使用して、並列処理機能を表します。たとえば、パイプラインの ParDo 変換により、Dataflow は並列で実行される複数のワーカーに処理コード(DoFn で表現)を自動的に分散します。

エラーと例外の処理

データの処理中にパイプラインによって例外がスローされることがあります。これらのエラーのいくつかは一過性ですが(たとえば、一時的に外部サービスにアクセスできないなど)、破損したか解析不能な入力データや計算中の null ポインタを原因とするエラーなど、一部のエラーは永続的です。

バンドル内のいずれかの要素についてエラーがスローされた場合、Dataflow はそのバンドル内の要素を処理し、バンドル全体を再試行します。バッチモードで実行している場合、失敗した項目を含むバンドルは 4 回再試行されます。単一のバンドルが 4 回失敗した場合はパイプラインが完全に失敗します。ストリーミング モードで実行している場合、失敗した項目を含むバンドルは無期限に再試行され、パイプラインが恒久的に滞るおそれがあります。

起動ワーカーのエラー(ワーカーへのパッケージのインストールの失敗など)は一時的なものです。このため、再試行が無期限に発生し、パイプラインが恒久的に滞るおそれがあります。

融合の最適化

パイプラインの実行グラフの JSON フォームが検証されると、Dataflow サービスは最適化を実行するためにグラフを修正することがあります。このような最適化には、パイプラインの実行グラフ内の複数のステップまたは変換を単一のステップに融合することが含まれます。ステップを融合すると、Dataflow サービスはパイプラインの中間 PCollection をすべて実体化する必要がなくなります。実体化はメモリと処理のオーバーヘッドの点でコストが高くなることがあります。

パイプラインの作成で指定したすべての変換がサービスで実行されますが、それらは異なる順序で実行されることも、パイプラインを最も効率よく実行するために融合された、より大きな変換の一部として実行されることもあります。Dataflow サービスは、実行グラフのステップ間のデータ依存関係を尊重しますが、それ以外ではステップが任意の順序で実行される可能性があります。

融合の例

次の図は、Apache Beam SDK for Java に含まれる WordCount サンプルの実行グラフを、効率的な実行のために Dataflow サービスによって最適化および融合する方法を示しています。

最適化され、Dataflow サービスによって融合されたステップのある WordCount サンプル プログラムの実行グラフ。

図 2: WordCount サンプルの最適化された実行グラフ

融合を防ぐ

ただし、パイプラインで Dataflow サービスによる融合の最適化を防ぐべきケースがいくつかあります。これらは、Dataflow サービスがパイプラインの操作を融合する最適な方法を間違って推測し、それによって Dataflow サービスがすべての使用可能ワーカーを利用する能力が制限される可能性があるケースです。

たとえば、Dataflow がワーカー使用状況を最適化する能力が融合によって制限される 1 つのケースは、「高ファンアウト」の ParDo です。このようなオペレーションでは、入力コレクションの要素が比較的少数でも、ParDo は数百または数千倍の要素数の出力を生成し、その後に別の ParDo が続くことがあります。Dataflow サービスでこれらの ParDo オペレーションを融合されると、中間 PCollection にさらに多くの要素が含まれている場合でも、この手順での並列処理は多くても入力コレクション内のアイテム数に制限されます。

Dataflow サービスに中間 PCollection の実体化を強制するオペレーションをパイプラインに追加することで、このような融合を防ぐことができます。次のオペレーションの 1 つを使用することを検討してください。

  • GroupByKey を挿入し、最初の ParDo の後でグループ化解除できます。Dataflow サービスが、集約間で ParDo オペレーションを融合することはありません。
  • 中間 PCollection副入力として別の ParDo に渡すことができます。Dataflow サービスは常に副入力を実体化します。
  • Reshuffle ステップを挿入できます。Reshuffle によって、融合の防止、データのチェックポイント設定、レコードの重複除去が行われます。再シャッフルは、Apache Beam のドキュメントで非推奨とされている場合でも、Dataflow でサポートされています。

融合をモニタリングする

Google Cloud コンソール、gcloud CLI、または API を使用して、最適化されたグラフと融合ステージにアクセスできます。

コンソール

コンソールでグラフの融合ステージとステップを確認するには、Dataflow ジョブの [実行の詳細] タブで [ステージのワークフロー] グラフビューを開きます。

ステージに融合されたコンポーネント ステップを表示するには、グラフで融合ステージをクリックします。[ステージ情報] ペインの [コンポーネントのステップ] 行に融合ステージが表示されます。1 つの複合変換の一部分が複数のステージに融合される場合があります。

gcloud

gcloud CLI を使用して、最適化されたグラフと融合ステージにアクセスするには、次の gcloud コマンドを実行します。

  gcloud dataflow jobs describe --full JOB_ID --format json

JOB_ID は、Dataflow ジョブの ID に置き換えます。

関連するビットを抽出するには、gcloud コマンドの出力を jq にパイプします。

gcloud dataflow jobs describe --full JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'

出力レスポンス ファイルの融合ステージの説明を表示するには、ComponentTransform 配列内の ExecutionStageSummary オブジェクトを確認します。

API

API を使用して、最適化されたグラフと融合ステージにアクセスするには、project.locations.jobs.get を呼び出します。

出力レスポンス ファイルの融合ステージの説明を表示するには、ComponentTransform 配列内の ExecutionStageSummary オブジェクトを確認します。

結合の最適化

集約オペレーションは、大規模なデータ処理における重要なコンセプトです。集約は、概念的に非常に異なるデータをまとめて、関連付けに極めて有用にします。Dataflow プログラミング モデルでは、集約オペレーションを GroupByKeyCoGroupByKeyCombine 変換として表します。

Dataflow の集約操作は、データセット全体でデータを結合します。これには、複数のワーカーにまたがる可能性のあるデータも含まれます。多くの場合、このような集約オペレーション中に、インスタンスをまたがるデータを結合する前にデータをできるだけローカルに結合するのが最も効率的です。GroupByKey または他の集約変換を適用する場合、Dataflow サービスは、メインのグループ化オペレーションの前に部分的なローカル結合を自動的に実行します。

部分結合または複数レベル結合を実行する場合、Dataflow サービスはパイプラインがバッチデータとストリーミング データのどちらを操作するかに基づいて異なる決定を行います。制限付きデータの場合、サービスは効率性を重視し、できるだけローカルの結合を実行します。制限なしデータの場合、サービスは低レイテンシを重視し、部分結合は実行しないことがあります(レイテンシが増加するため)。