パイプラインのデプロイ

Apache Beam パイプラインを作成してテストしたら、Dataflow マネージド サービスを使用してデプロイして実行できます。Dataflow サービスでは、パイプライン コードが Dataflow ジョブになります。

Dataflow サービスは、Compute EngineCloud Storage などの Google Cloud サービスを完全に管理して Dataflow ジョブを実行し、必要なリソースを自動的に起動および破棄します。Dataflow サービスは、Dataflow モニタリング インターフェースDataflow コマンドライン インターフェースなどのツールを通じてジョブの可視性を提供します。

パイプライン コードで実行パラメータを設定することで、Dataflow サービスがジョブを実行する方法の一部の側面を制御できます。たとえば、実行パラメータはパイプラインのステップをワーカー仮想マシン、Dataflow サービス バックエンド、またはローカルで実行するかどうかを指定します。

Dataflow サービスは、Google Cloud リソースの管理に加え、分散並列処理のさまざまな側面を自動的に実行し、最適化します。たとえば、次のようなものがあります。

  • 並列化と分散。Dataflow は、データを自動的に分割し、並列処理のためにワーカーコードを Compute Engine インスタンスに分散します。
  • 最適化。Dataflow は、パイプラインコードを使用してパイプラインの PCollection と変換を表す実行グラフを作成し、グラフを最適化して、最も効率的なパフォーマンスとリソース使用率を実現します。Dataflow は、データ集計など、コストがかかる可能性のあるオペレーションも自動的に最適化します。
  • 自動チューニング機能。Dataflow サービスには、自動スケーリングや動的作業再調整など、リソース割り当てとデータ分割のオンザフライ調整を提供する機能がいくつか含まれます。これらの機能は、Dataflow サービスがジョブをできるだけ素早く効率的に実行するのに役立ちます。

パイプラインのライフサイクル: パイプライン コードから Dataflow ジョブまで

Dataflow プログラムを実行すると、Dataflow はすべての変換とその関連処理関数(DoFn など)を含む Pipeline オブジェクトを作成するコードから実行グラフを作成します。このフェーズは、グラフ作成時間と呼ばれます。グラフ作成中に、Dataflow はさまざまなエラーをチェックし、パイプライン グラフに無効なオペレーションが含まれないことを確認します。実行グラフは JSON 形式に変換され、JSON 実行グラフは Dataflow サービス エンドポイントに転送されます。

注: グラフ作成は、パイプラインをローカルで実行したときにも行われますが、グラフは JSON に変換されたりサービスに転送されたりしません。代わりに、グラフは Dataflow プログラムを起動したのと同じマシンでローカルに実行されます。詳しくは、ローカル実行の構成をご覧ください。

Dataflow サービスは JSON 実行グラフを検証します。グラフが検証されると、Dataflow サービスでジョブになります。Dataflow モニタリング インターフェースを使用して、ジョブ、その実行グラフ、ステータス、ログ情報を参照できるようになります。

Java: SDK 2.x

Dataflow サービスは、Dataflow プログラムを実行したマシンにレスポンスを送信します。このレスポンスは、Dataflow ジョブの jobId を含むオブジェクト DataflowPipelineJob にカプセル化されます。Dataflow モニタリング インターフェースDataflow コマンドライン インターフェースを使用して、jobId を基準に特定のジョブのモニタリング、追跡、トラブルシューティングを行うことができます。詳しくは、DataflowPipelineJob の API リファレンスをご覧ください。

Python

Dataflow サービスは、Dataflow プログラムを実行したマシンにレスポンスを送信します。このレスポンスは、Dataflow ジョブの job_id を含むオブジェクト DataflowPipelineResult にカプセル化されます。Dataflow モニタリング インターフェースDataflow コマンドライン インターフェースを使用して、job_id を基準に特定のジョブのモニタリング、追跡、トラブルシューティングを行うことができます。

Java: SDK 1.x

実行グラフ

Dataflow は、変換と、Pipeline オブジェクトの作成時に使用したデータに基づいて、パイプラインを表すステップのグラフを作成します。これはパイプライン実行グラフです。

Apache Beam SDK に含まれる WordCount サンプルには、テキストのコレクション内の個々の単語と各単語のオカレンスの読み取り、抽出、カウント、書式設定、書き込みを行う一連の変換が含まれます。次の図は、WordCount パイプライン内の変換が実行グラフにどのように展開されるかを示しています。

Cloud Dataflow サービスで実行するステップの実行グラフに展開された WordCount サンプル プログラムの変換。
図 1: WordCount サンプルの実行グラフ

多くの場合、実行グラフは、パイプラインの作成時に変換を指定した順序とは異なります。これは、Dataflow サービスがマネージド クラウド リソースに対して実行される前に実行グラフに対してさまざまな最適化と融合を実行するためです。Dataflow サービスは、パイプラインの実行時にデータの依存関係を尊重しますが、間にデータ依存関係がないステップは任意の順序で実行できます。

Dataflow モニタリング インターフェースでジョブを選択するときに Dataflow がパイプラインに対して生成した最適化されていない実行グラフを参照できます。

並列化と分散

Dataflow サービスは、パイプラインの処理ロジックを自動的に並列化し、ジョブを実行するために割り当てたワーカーに分散します。Dataflow は、プログラミング モデル内の抽象化を使用して、並列処理機能を表します。たとえば、ParDo 変換により、Dataflow は処理コード(DoFn で表現)を、並列で実行される複数のワーカーに自動的に分散します。

ユーザーコードの構造化

DoFn コードは小さな独立したエンティティと考えることができます。異なるマシンで多くのインスタンスが実行され、それぞれが互いを認識していない可能性があります。このため、純粋な関数(隠蔽された状態や外部状態に依存せず、目に見える副作用がなく、確定的な関数)は、DoFn の並列的で分散的な性質に理想的なコードです。

純粋な関数モデルは厳しく固定されているわけではありません。Dataflow サービスが保証しない事項にコードが依存しない限り、状態情報や外部初期化データは DoFn とその他の関数オブジェクトに対して有効である場合があります。ParDo 変換を構成し、DoFn を作成する場合は、次のガイドラインを念頭に置いてください。

  • Dataflow サービスは、入力 PCollection 内のすべての要素が DoFn インスタンスによって必ず 1 回だけ処理されることを保証します。
  • Dataflow サービスは、DoFn が何回呼び出されるかを保証しません。
  • Dataflow サービスは、分散された要素が厳密にどのようにグループ化されるかを保証しません。つまり、どの要素(存在する場合)がまとめて処理されるかを保証しません。
  • Dataflow サービスは、パイプラインで作成される DoFn インスタンスの正確な数を保証しません。
  • Dataflow サービスはフォールト トレラントであり、ワーカーに問題が発生した場合にコードを複数回再試行することがあります。Dataflow サービスはコードのバックアップ コピーを作成する場合があり、手動の副作用が問題になることがあります(たとえば、コードが一意の名前を持たない一時ファイルに依存するか、これを作成する場合など)。
  • Dataflow サービスは、DoFn インスタンスごとに要素の処理をシリアル化します。コードは厳密にスレッドセーフである必要はありませんが、複数の DoFn インスタンス間で共有される状態はスレッドセーフである必要があります。

ユーザーコードの作成について詳しくは、プログラミング モデル ドキュメントのユーザー指定関数の要件をご覧ください。

エラーと例外の処理

データの処理中にパイプラインによって例外がスローされることがあります。これらのエラーのいくつかは一過性ですが(たとえば、一時的に外部サービスにアクセスできないなど)、破損したか解析不能な入力データや計算中の null ポインタを原因とするエラーなど、一部のエラーは永続的です。

バンドル内のいずれかの要素についてエラーがスローされた場合、Dataflow はそのバンドル内の要素を処理し、バンドル全体を再試行します。バッチモードで実行している場合、失敗した項目を含むバンドルは 4 回再試行されます。単一のバンドルが 4 回失敗した場合はパイプラインが完全に失敗します。ストリーミング モードで実行している場合、失敗した項目を含むバンドルは無期限に再試行され、パイプラインが恒久的に滞るおそれがあります。

融合の最適化

パイプラインの実行グラフの JSON フォームが検証されると、Dataflow サービスは最適化を実行するためにグラフを修正することがあります。このような最適化には、パイプラインの実行グラフ内の複数のステップまたは変換を単一のステップに融合することが含まれます。ステップを融合すると、Dataflow サービスはパイプラインの中間 PCollection をすべて実体化する必要がなくなります。実体化はメモリと処理のオーバーヘッドの点でコストが高くなることがあります。

パイプラインの作成で指定したすべての変換がサービスで実行されますが、それらは異なる順序で実行されることも、パイプラインを最も効率よく実行するために融合された、より大きな変換の一部として実行されることもあります。Dataflow サービスは、実行グラフのステップ間のデータ依存関係を尊重しますが、それ以外ではステップが任意の順序で実行される可能性があります。

融合の例

次の図は、Apache Beam SDK for Java に含まれる WordCount サンプルの実行グラフを、効率的な実行のために Dataflow サービスによって最適化および融合する方法を示しています。

最適化され、Cloud Dataflow サービスによって融合されたステップのある WordCount サンプル プログラムの実行グラフ。
図 2: WordCount サンプルの最適化された実行グラフ

融合の防止

パイプラインで、Dataflow サービスが融合最適化を実行しないようにする必要があるケースがいくつかあります。これらは、Dataflow サービスがパイプラインのオペレーションを融合する最適な方法を間違って推測し、それによって Dataflow サービスがすべての使用可能ワーカーを利用する能力が制限される可能性があるケースです。

たとえば、Dataflow がワーカー使用状況を最適化する能力が融合によって制限される 1 つのケースは、「高ファンアウト」の ParDo です。このようなオペレーションでは、入力コレクションの要素が比較的少数でも、ParDo は数百または数千倍の要素数の出力を生成し、その後に別の ParDo が続くことがあります。Dataflow サービスがこれらの ParDo オペレーションを融合すると、中間 PCollection により多くの要素が含まれている場合でも、このステップの並列性は入力コレクション内のアイテム最大数に制限されます。

Dataflow サービスに中間 PCollection の実体化を強制するオペレーションをパイプラインに追加することで、このような融合を防ぐことができます。次のオペレーションの 1 つを使用することを検討してください。

  • GroupByKey を挿入し、最初の ParDo の後でグループ化解除できます。Dataflow サービスは、集約で ParDo オペレーションを決して融合しません。
  • 中間 PCollection副入力として別の ParDo に渡すことができます。Dataflow サービスは常に副入力を実体化します。

結合の最適化

集約オペレーションは、大規模なデータ処理における重要なコンセプトです。集約は、概念的に非常に異なるデータをまとめて、関連付けに極めて有用にします。Dataflow プログラミング モデルでは、集約オペレーションを GroupByKeyCoGroupByKeyCombine 変換として表します。

Dataflow の集約オペレーションは、データセット全体でデータを結合します。これには、複数のワーカーにまたがる可能性のあるデータも含まれます。多くの場合、このような集約オペレーション中に、インスタンスをまたがるデータを結合する前にデータをできるだけローカルに結合するのが最も効率的です。GroupByKey または他の集約変換を適用する場合、Dataflow サービスは、メインのグループ化オペレーションの前に部分的なローカル結合を自動的に実行します。

部分結合または複数レベル結合を実行する場合、Dataflow サービスはパイプラインがバッチデータとストリーミング データのどちらを操作するかに基づいて異なる決定を行います。制限付きデータの場合、サービスは効率性を重視し、できるだけローカルの結合を実行します。制限なしデータの場合、サービスは低レイテンシを重視し、部分結合は実行しないことがあります(レイテンシが増加するため)。

自動チューニング機能

Dataflow サービスには、いくつかの自動チューニング機能が含まれます。この機能では、Dataflow ジョブを実行中に動的にさらに最適化できます。こうした機能には、自動スケーリング動的作業再調整が含まれます。

自動スケーリング

自動スケーリングを有効にすると、Dataflow サービスは、ジョブの実行に必要な適切な数のワーカー インスタンスを自動的に選択します。また、Dataflow サービスは、実行時にジョブの特性を考慮して、より多くのワーカーまたは少数のワーカーを動的に再割り当てします。パイプラインの特定の部分は他よりも計算負荷が高い場合があり、Dataflow サービスはジョブのこれらのフェーズ中に追加のワーカーを自動的に起動できます(また、不要になったときにそれらをシャットダウンします)。

Java: SDK 2.x

自動スケーリングは、すべてのバッチ Dataflow ジョブと、Streaming Engine を使用するストリーミング ジョブでデフォルトで有効になります。パイプラインの実行時にフラグ --autoscalingAlgorithm=NONE を明示的に指定することで自動スケーリングを無効化できます。その場合、Dataflow サービスは --numWorkers オプションに基づいてワーカー数を設定します。これはデフォルトで 3 になります。

自動スケーリングを有効にすると、Dataflow サービスはジョブに割り当てられる厳密なワーカー インスタンス数のユーザー制御を許可しません。パイプラインの実行時に --maxNumWorkers オプションを指定することで、引き続きワーカー数を制限できます。

バッチジョブの場合、--maxNumWorkers フラグはオプションです。デフォルトは 1000 です。 Streaming Engine を使用したストリーミング ジョブの場合、--maxNumWorkers フラグはオプションです。デフォルトは 100 です。Streaming Engine を使用しないストリーミング ジョブの場合は、--maxNumWorkers フラグが必要です。

Python

自動スケーリングは、Apache Beam SDK for Python バージョン 0.5.1 以降を使用して作成されたすべてのバッチ Dataflow ジョブではデフォルトで有効化されます。パイプラインの実行時にフラグ --autoscaling_algorithm=NONE を明示的に指定することで自動スケーリングを無効化できます。その場合、Dataflow サービスは --num_workers オプションに基づいてワーカー数を設定します。これはデフォルトで 3 になります。

Java: SDK 1.x

Dataflow は、パイプラインの並列処理に基づいてスケーリングします。パイプラインの並列処理は、所定の時間でデータを最も効率的に処理するために必要なスレッド数の推定です。

並列処理は、外部サービスの帯域幅が低すぎない限り、数分おきに計算されます。並列処理が増えると、Dataflow はスケールアップしてワーカーを追加します。並列処理が減ると、Dataflow はスケールダウンしてワーカーを削除します。

次の表は、自動スケーリングによってバッチストリーミングのパイプラインでワーカーの数が増減する状況をまとめたものです。

バッチ パイプライン ストリーミング パイプライン
スケールアップ

残りの作業が新しいワーカーを起動するよりも時間がかかり、現在のワーカーの CPU 使用率が平均で 5% を超える場合、Dataflow がスケールアップすることがあります。

次のソースでは、新しいワーカーの数が制限されることがあります: 少量のデータ、分割不可能なデータ(圧縮ファイルなど)、データを分割しない I/O モジュールで処理されるデータ。

既存のファイルに書き込む Cloud Storage の宛先みなど、固定数のシャードに書き込むように構成されたシンクは、新しいワーカーの数を制限することがあります。

ストリーミング パイプラインがバックログに記録され、ワーカーの CPU 使用率が平均で 20% を超える場合、Dataflow がスケールアップすることがあります。バックログは、現在のワーカーごとのスループットを考慮して約 150 秒以内にクリアされます。

スケールダウン

残りの作業が新しいワーカーを起動するよりも時間がかからず、現在のワーカーの CPU 使用率が平均で 5% を超える場合、Dataflow がスケールダウンすることがあります。

ストリーミング パイプラインのバックログが 20 秒未満で、ワーカーの CPU 使用率が平均で 80% 未満の場合、Dataflow がスケールダウンすることがあります。スケールダウンすると、新しいワーカー数での CPU 使用率は平均で 75% 未満になります。

自動スケーリングなし

I/O がデータ処理よりも時間がかかる場合、またはワーカーの CPU 使用率が平均で 5% 未満の場合、並列処理は再計算されません。

ワーカーの CPU 使用率が平均で 20% 未満の場合、並列処理は再計算されません。

バッチ自動スケーリング

バッチ パイプラインでは、Dataflow はパイプラインの各ステージの作業量とそのステージの現在のスループットに基づいてワーカー数を自動的に選択します。Dataflow は、現在のワーカーセットで処理されているデータの量を判断し、残りの作業の処理にかかる時間を推定します。

ユーザーが実装したカスタム データソースがパイプラインで使用される場合、より多くの情報を Dataflow サービスの自動スケーリング アルゴリズムに提供し、パフォーマンスを向上させる可能性のあるいくつかのメソッドを実装できます。

Java: SDK 2.x

  • BoundedSource サブクラスで、メソッド getEstimatedSizeBytes を実装します。Dataflow サービスは、パイプラインで使用するワーカーの初期数を計算するときに getEstimatedSizeBytes を使用します。
  • BoundedReader サブクラスで、メソッド getFractionConsumed を実装します。Dataflow サービスは、getFractionConsumed を使用して読み取りの進行状況を追跡し、読み取り中に使用する適切なワーカー数に収束します。

Python

  • BoundedSource サブクラスで、メソッド estimate_size を実装します。Dataflow サービスは、パイプラインで使用するワーカーの初期数を計算するときに estimate_size を使用します。
  • RangeTracker サブクラスで、メソッド fraction_consumed を実装します。Dataflow サービスは、fraction_consumed を使用して読み取りの進行状況を追跡し、読み取り中に使用する適切なワーカー数に収束します。

Java: SDK 1.x

ストリーミング自動スケーリング

ストリーミング自動スケーリングにより、Dataflow サービスは負荷とリソースの使用率の変化に応じて、ストリーミング パイプラインを実行するために使用されるワーカーの数を適切に変更できます。ストリーミング自動スケーリングは無料の機能であり、ストリーミング パイプラインの実行時に使用されるリソースのコストを削減するように設計されています。

自動スケーリングを使用しない場合は、numWorkers または num_workers で固定のワーカー数を選択して、パイプラインを実行します。入力ワークロードは時間とともに変化するため、この数は多すぎることも、少なすぎることもあります。プロビジョニングしたワーカーが多すぎると余分なコストがかかり、プロビジョニングしたワーカーが少なすぎると処理されたデータのレイテンシが高くなります。自動スケーリングを有効にすることで、リソースは必要なときにのみ使用されます。

ストリーミング パイプラインの自動スケーリングの目的は、ワーカーの使用率とスループットを最大化しながらバックログを最小限に抑え、負荷の急増に迅速に対応することです。自動スケーリングを有効にすることにより、プロビジョニングについてピーク時の負荷と新しい結果のどちらかを選択する必要はなくなります。CPU 使用率とバックログが増加するとワーカーが追加され、これらの指標が低下すると削除されます。この方法により、必要なものについてのみ支払うことになり、ジョブは可能な限り効率的に処理されます。

Java: SDK 2.x

制限のないカスタムソース

パイプラインでカスタムの制限なしソースが使用されている場合、ソースがバックログについて Dataflow サービスに通知する必要があります。バックログは、ソースによってまだ処理されていない入力の推定値(バイト単位)です。バックログについてサービスに通知するには、UnboundedReader クラスに次のメソッドのいずれかを実装します。

  • getSplitBacklogBytes() - ソースの現在のスプリットのバックログ。サービスで、すべてのスプリットのバックログを集約します。
  • getTotalBacklogBytes() - すべてのスプリットのグローバル バックログ。スプリットごとにバックログを取得できず、すべてのスプリットでのみバックログを計算できる場合があります。最初のスプリット(スプリット ID「0」)のみが合計バックログを提供する必要があります。
Apache Beam リポジトリには、UnboundedReader クラスを実装するカスタムソースのがいくつか含まれています。
ストリーミング自動スケーリングを有効にする

Streaming Engine を使用したストリーミング ジョブの場合、自動スケーリングはデフォルトで有効になっています。

Streaming Engine を使用しないジョブの自動スケーリングを有効にするには、パイプラインを起動するときに次の実行パラメータを設定します。

    --autoscalingAlgorithm=THROUGHPUT_BASED
    --maxNumWorkers=N
    

Streaming Engine を使用しないストリーミング ジョブの場合、ワーカーの最小数は --maxNumWorkers 値の 1/15 です。

ストリーミング パイプラインは、--maxNumWorkers と同じ数の永続ディスクの固定プールを伴ってデプロイされます。--maxNumWorkers を指定するときはこのことを考慮し、この値がパイプラインにとって十分なディスク数であることを確認してください。

使用量と料金

Compute Engine の使用量は平均ワーカー数に基づき、永続ディスクの使用量は --maxNumWorkers の値に基づきます。永続ディスクは、各ワーカーの接続ディスク数が等しくなるように再配布されます。

上記の例(--maxNumWorkers=15)では、1~15 個の Compute Engine インスタンスと 15 個の永続ディスクに対して支払いが発生します。

Python

ストリーミング自動スケーリングを有効にする

自動スケーリングを有効にするには、パイプラインを起動するときに次の実行パラメータを設定します。

    --autoscaling_algorithm=THROUGHPUT_BASED
    --max_num_workers=N
    

Streaming Engine を使用しないストリーミング ジョブの場合、ワーカーの最小数は --maxNumWorkers 値の 1/15 です。

ストリーミング パイプラインは、--maxNumWorkers と同じ数の永続ディスクの固定プールを伴ってデプロイされます。--maxNumWorkers を指定するときはこのことを考慮し、この値がパイプラインにとって十分なディスク数であることを確認してください。

使用量と料金

Compute Engine の使用量は平均ワーカー数に基づき、永続ディスクの使用量は --max_num_workers の値に基づきます。永続ディスクは、各ワーカーの接続ディスク数が等しくなるように再配布されます。

上記の例(--max_num_workers=15)では、1~15 個の Compute Engine インスタンスと 15 個の永続ディスクに対して支払いが発生します。

Java: SDK 1.x

ストリーミング パイプラインを手動でスケーリングする

ストリーミング モードでの自動スケーリングが一般提供されるまでは、Dataflow の更新機能を使用して、ストリーミング パイプラインを実行するワーカー数を手動でスケーリングできます。

Java: SDK 2.x

実行中にストリーミング パイプラインのスケーリングを行う場合は、パイプラインの起動時に次の実行パラメータを設定してください。

  • --maxNumWorkers は、パイプラインで使用可能にするワーカーの最大数と同じ値に設定します。
  • --numWorkers は、実行開始時にパイプラインで使用するワーカーの初期数と同じ値に設定します。

パイプラインが稼働状態になったら、パイプラインを更新し、--numWorkers パラメータを使って新規ワーカー数を指定します。新しい --numWorkers に設定する値は N から --maxNumWorkers までの範囲でなければなりません。ここで、N--maxNumWorkers ÷15 です。

パイプラインの更新により、実行中のジョブが(新しいワーカー数を使って)新規ジョブに置換されますが、前のジョブに関連付けられている状態情報はすべて保持されます。

Python

実行中にストリーミング パイプラインのスケーリングを行う場合は、パイプラインの起動時に次の実行パラメータを設定してください。

  • --max_num_workers は、パイプラインで使用可能にするワーカーの最大数と同じ値に設定します。
  • --num_workers は、実行開始時にパイプラインで使用するワーカーの初期数と同じ値に設定します。

パイプラインが稼働状態になったら、パイプラインを更新し、--num_workers パラメータを使って新規ワーカー数を指定します。新しい --num_workers に設定する値は N から --max_num_workers までの範囲でなければなりません。ここで、N--max_num_workers ÷15 です。

パイプラインの更新により、実行中のジョブが(新しいワーカー数を使って)新規ジョブに置換されますが、前のジョブに関連付けられている状態情報はすべて保持されます。

Java: SDK 1.x

動的作業再調整

Dataflow サービスの動的作業再調整機能では、サービスが実行時の条件に基づいて作業を動的に再分割できます。これらの条件は、次のものが含まれることがあります。

  • 作業割り当ての不均衡
  • 終了に予想より長い時間がかかるワーカー
  • 予想よりも早く終了するワーカー

Dataflow サービスはこれらの条件を自動的に検出し、未使用または十分に使用されていないワーカーに作業を動的に再割り当てして、ジョブの全体的な処理時間を短縮します。

制限事項

動的作業再調整は、Dataflow サービスが一部の入力データを並列に処理している場合にのみ行われます。データを外部入力ソースから読み取っている場合、実体化された中間 PCollection を操作している場合、または GroupByKey などの集約の結果を操作している場合です。ジョブの多数のステップが融合される場合、ジョブの中間 PCollection は少なくなり、動的作業再調整は、ソースの実体化された PCollection 内の要素の数に限定されます。パイプラインの特定の PCollection に動的作業再調整が確実に適用されるようにするには、いくつかの異なる方法で融合を防止することで、動的並列性が確保されます。

動的作業再調整は、単一のレコードよりも細かくデータを再並列化できません。処理時間の大幅な遅延を引き起こす個別レコードがデータに含まれる場合、Dataflow は個々の「ホット」レコードを分割して複数のワーカーに再分散できないため、ジョブが遅延する可能性があります。

Java: SDK 2.x

パイプラインの最終出力に固定数のシャードを(たとえば、TextIO.Write.withNumShards を使用してデータを書き込むことで)設定した場合、並列化は選択したシャード数に基づいて制限されます。

Python

パイプラインの最終出力に固定数のシャードを(たとえば、beam.io.WriteToText(..., num_shards=...) を使用してデータを書き込むことで)設定した場合、Dataflow は選択したシャード数に基づいて並列化を制限します。

Java: SDK 1.x

固定シャード制限は一時的とみなすことができ、Dataflow サービスの今後のリリースで変更の対象となる可能性があります。

カスタム データソースの操作

Java: SDK 2.x

自分が提供するカスタム データソースをパイプラインで使用する場合は、splitAtFraction メソッドを実装して、ソースが動的作業再調整機能と連動できるようにする必要があります。

Python

自分が提供するカスタム データソースをパイプラインで使用する場合は、RangeTrackertry_claimtry_splitposition_at_fractionfraction_consumed を実装して、ソースが動的作業再調整機能と連動できるようにする必要があります。

詳しくは、RangeTracker の API リファレンス情報をご覧ください。

Java: SDK 1.x

リソースの使用率と管理

Dataflow サービスは、Google Cloud のリソースをジョブごとに完全に管理します。これには Compute Engine インスタンス(ワーカーまたは VM とも呼ばれる)の起動と停止、I/O と一時ファイル ステージングの両方に対するプロジェクトの Cloud Storage バケットへのアクセスが含まれます。ただし、パイプラインで BigQueryCloud Pub/Sub などの Google Cloud データ ストレージ テクノロジーを操作する場合は、こうしたサービスのリソースと割り当てを管理する必要があります。

Dataflow は、ユーザーが提供する Cloud Storage 内の場所をファイルのステージングに使用します。この場所はユーザーの制御下にあり、ジョブがそこから読み取る限り、場所の有効性が維持されることを確認する必要があります。SDK の組み込みキャッシュはジョブの開始時間を短縮できるため、複数のジョブ実行に同じステージング場所を再利用できます。

求人

Google Cloud プロジェクトあたり 100 個までの Dataflow ジョブを同時に実行できます。

Dataflow サービスは現在、20 MB 以下のサイズの JSON ジョブ リクエストの処理に限定されています。ジョブ リクエストのサイズはパイプラインの JSON 表現に関連付けられています。パイプラインが大きいほど、リクエストが大きくなります。

パイプラインの JSON リクエストのサイズを見積もるには、次のオプションを指定してパイプラインを実行します。

Java: SDK 2.x

    --dataflowJobFile=< path to output file >
    

Python

    --dataflow_job_file=< path to output file >
    

Java: SDK 1.x

このコマンドは、ジョブの JSON 表現をファイルに書き込みます。シリアル化されたファイルのサイズは、リクエストのサイズの適切な推定値となります。リクエストにはいくつかの追加情報が含まれるので、実際のサイズはわずかに大きくなります。

詳細については、「413 Request Entity Too Large」(413 リクエスト エンティティが大きすぎます)/「The size of serialized JSON representation of the pipeline exceeds the allowable limit」(パイプラインのシリアル化 JSON 表現のサイズが上限を超えています)のトラブルシューティング ページをご覧ください。

また、ジョブグラフのサイズは 10 MB を超えないでください。詳細については、「The job graph is too large. Please try again with a smaller job graph, or split your job into two or more smaller jobs.」(ジョブグラフが大きすぎます。小さいジョブグラフでもう一度試すか、ジョブを複数の小さいジョブに分割してください)のトラブルシューティング ページをご覧ください。

ワーカー

Dataflow サービスでは、現在最大でジョブあたり 1,000 個の Compute Engine インスタンスが許可されます。 バッチジョブのデフォルトのマシンタイプは n1-standard-1、ストリーミングは n1-standard-4 です。したがって、デフォルトのマシンタイプを使用している場合、Dataflow サービスは最大でジョブあたり 4,000 コアを割り当てることができます。

Dataflow は、カスタムマシン タイプと同様に n1 シリーズのワーカーもサポートします。パイプラインの作成時に適切な実行パラメータを設定することで、パイプラインのマシンタイプを指定できます。

Java: SDK 2.x

マシンタイプを変更するには、--workerMachineType オプションを設定します。

Python

マシンタイプを変更するには、--worker_machine_type オプションを設定します。

Java: SDK 1.x

リソース割り当て

Dataflow サービスは、ジョブの開始とワーカー インスタンスの最大数へのスケーリングのために、ジョブの実行に必要な Compute Engine リソース割り当てが Google Cloud プロジェクトにあることをチェックします。十分なリソース割り当てが使用可能でない場合、ジョブの開始は失敗します。

Dataflow の自動スケーリング機能は、プロジェクトの使用可能 Compute Engine 割り当てによって制限されます。ジョブの開始時に十分な割り当てがあっても、別のジョブがプロジェクトの使用可能割り当ての残りを使用する場合、最初のジョブは実行されますが、完全にはスケーリングできません。

しかし、Dataflow サービスはプロジェクトのリソース割り当てを超えるジョブの割り当ての増加を管理しません。ユーザーは、追加のリソース割り当てについて必要なリクエストを行う責任があります。これは、Google Cloud Console で行えます。

永続ディスク リソース

Dataflow サービスは、ストリーミング ジョブの実行時に、ワーカー インスタンスあたり 15 個の永続ディスクに制限されています。各永続ディスクは、個々の Compute Engine 仮想マシンに対してローカルです。ジョブは永続ディスクより多くのワーカーを持つことができません。ワーカーとディスク間の 1:1 の比率が最小リソース割り当てです。

ワーカー VM で実行中のジョブの場合、各永続ディスクのデフォルト サイズは、バッチモードで 250 GBストリーミング モードで 400 GB です。Streaming Engine または Dataflow Shuffle を使用するジョブは Dataflow サービスのバックエンドで実行され、より小さいディスクを使用します。

場所

デフォルトでは、Dataflow サービスは、us-central1 リージョンの us-central1-f ゾーンに Compute Engine リソースをデプロイします。--region パラメータを指定することで、この設定をオーバーライドできます。リソースに特定のゾーンを使用する必要がある場合は、パイプラインを作成するときに --zone パラメータを使用します。ただし、リージョンのみを指定し、ゾーンを指定しないでおくことをおすすめします。こうすることで、Dataflow サービスは、ジョブ作成リクエスト時に使用可能なゾーン容量に基づいて、リージョン内の最適なゾーンを自動的に選択できます。詳細については、リージョン エンドポイントのドキュメントをご覧ください。

Streaming Engine

現在、Dataflow パイプライン ランナーは、ストリーミング パイプラインのステップをすべてワーカー仮想マシンで実行し、ワーカーの CPU、メモリ、永続ディスク ストレージを消費します。Dataflow の Streaming Engine は、パイプラインの実行をワーカー VM から Dataflow サービスのバックエンドに移動します。

Streaming Engine の利点

Streaming Engine モデルには、次の利点があります。

  • ワーカー VM 上の CPU、メモリ、永続ディスク ストレージ リソースの消費量が削減されます。 Streaming Engine はより小さいワーカー マシンタイプ(n1-standard-4 ではなく n1-standard-2)の場合に最適に機能し、小さいワーカーのブートディスクを超える永続ディスクを必要としないので、リソースと割り当ての消費量が少なくなります。
  • 受信データ量の変動に対応する、レスポンスに優れた自動スケーリング。Streaming Engine によって、ワーカーのスケーリングがよりスムーズできめ細かになります。
  • サービスの更新を適用するためにパイプラインを再デプロイする必要がないので、サポート性が向上しました。

ワーカー リソースの削減のほとんどは、処理を Dataflow サービスにオフロードすることによって実現されます。そのため、Streaming Engine の使用に関する料金がかかります。ただし、Streaming Engine を使用する Cloud Dataflow パイプラインの合計請求額は、このオプションを使用しない Dataflow パイプラインの総コストとほぼ同じであると予想されます。

Streaming Engine の使用

現在、次のリージョンでストリーミング パイプラインに Streaming Engine を使用できます。他のリージョンでも今後公開していく予定です。

  • us-central1(アイオワ)
  • us-east1(サウスカロライナ)
  • us-west1(オレゴン)
  • europe-west1(ベルギー)
  • europe-west4(オランダ)
  • asia-east1(台湾)
  • asia-northeast1(東京)

Java: SDK 2.x

ストリーミング パイプラインに Streaming Engine を使用するには、次のパラメータを指定します。

  • Apache Beam SDK for Java バージョン 2.11.0 以降を使用している場合は、--enableStreamingEngine
  • Apache Beam SDK for Java バージョン 2.10.0 を使用している場合は、--experiments=enable_streaming_engine

パイプラインに Dataflow Streaming Engine を使用する場合は、--zone パラメータを指定しないでください。代わりに --region パラメータを指定し、その値を、現在 Streaming Engine を使用できるリージョンのいずれかに設定します。Dataflow は、指定されたリージョンのゾーンを自動的に選択します。--zone パラメータを指定する場合、使用可能なリージョンの外部にあるゾーンにそれが設定されると、Dataflow はエラーを報告します。

Streaming Engine は小規模なワーカー マシンタイプで最適に機能するので、--workerMachineType=n1-standard-2 を設定することをおすすめします。また、Streaming Engine はワーカーのブートイメージ用とローカルログ用の容量だけを必要とするので、--diskSizeGb=30 を設定することもできます。これらの値はデフォルト値です。

Python

ストリーミング パイプラインに Streaming Engine を使用するには、次のパラメータを指定します。

--enable_streaming_engine

パイプラインに Dataflow Streaming Engine を使用する場合は、--zone パラメータを指定しないでください。代わりに --region パラメータを指定し、その値を、現在 Streaming Engine を使用できるリージョンのいずれかに設定します。Dataflow は、指定されたリージョンのゾーンを自動的に選択します。--zone パラメータを指定する場合、使用可能なリージョンの外部にあるゾーンにそれが設定されると、Dataflow はエラーを報告します。

Streaming Engine は小規模なワーカー マシンタイプで最適に機能するので、--machine_type=n1-standard-2 を設定することをおすすめします。また、Streaming Engine はワーカーのブートイメージ用とローカルログ用の容量だけを必要とするので、--disk_size_gb=30 を設定することもできます。これらの値はデフォルト値です。

Java: SDK 1.x

Dataflow Shuffle

Dataflow Shuffle は、GroupByKeyCoGroupByKeyCombine などの Dataflow 変換の背後で行われる基本オペレーションです。Dataflow Shuffle オペレーションは、スケーラブルで効率的、かつフォールト トレラントな方法で、キーごとにデータを分割およびグループ化します。現在、Dataflow はシャッフル実装を使用しています。シャッフル実装はすべてワーカー仮想マシンで実行され、ワーカーの CPU、メモリ、永続ディスク ストレージを消費します。バッチ パイプラインのみで利用可能なサービスベースの Dataflow Shuffle 機能では、シャッフル オペレーションがワーカー VM から Dataflow サービスのバックエンドに移動されます。

Dataflow Shuffle の利点

サービスベースの Dataflow Shuffle には、次の利点があります。

  • パイプライン ジョブタイプの大部分で、バッチ パイプラインの実行時間が短縮されます。
  • ワーカー VM 上の CPU、メモリ、永続ディスク ストレージ リソースの消費量が削減されます。
  • VM はシャッフル データを保持しなくなるため、自動スケーリングが改善され、より早期にスケールダウンできます。
  • フォールト トレランスが強化されます。この機能を使用していない場合のように、Dataflow Shuffle データを保持する VM の異常によってジョブ全体が失敗することはありません。

ワーカー リソースの削減のほとんどは、シャッフル処理を Dataflow サービスにオフロードすることによって実現されます。そのため、Dataflow Shuffle の使用に関する料金がかかります。ただし、サービスベースの Dataflow 実装を使用する Dataflow パイプラインの合計請求額は、このオプションを使用しない Dataflow パイプラインのコスト以下であると予想されます。

大部分のパイプライン ジョブタイプでは、Dataflow Shuffle のほうが、ワーカー VM で実行されるシャッフル実装よりも高速に実行されます。ただし、実行時間はそのつど異なります。重要な期限があるパイプラインを実行している場合は、期限までに十分なバッファ時間を割り当てることをおすすめします。さらに、Shuffle の割り当て量を増やすことも検討してください。

ディスクに関する考慮事項

サービスベースの Dataflow Shuffle 機能を使用する場合、ワーカー VM に大きな永続ディスクを接続する必要はありません。Dataflow によって 25 GB の小さなブートディスクが自動的に接続されます。ただし、このように小さなディスクサイズであることから、Dataflow Shuffle を使用する際は、留意すべき重要な考慮事項があります。

  • ワーカー VM は 25 GB のディスク容量の一部をオペレーティング システム、バイナリ、ログ、コンテナに使用します。Dataflow Shuffle を使用する場合、大量のディスク容量を使用するジョブが残りのディスク容量を使い果たして失敗することがあります。
  • 小さなディスクのパフォーマンスにより、ディスク I/O を大量に使用するジョブは処理に時間がかかることがあります。 ディスクサイズ間のパフォーマンスの違いについて詳しくは、Compute Engine 永続ディスクのパフォーマンスに関するページをご覧ください。

以上の考慮事項が 1 つでもジョブに当てはまる場合は、パイプライン オプションを使用してより大きなディスクサイズを指定できます。

Dataflow Shuffle の使用

サービスベースの Dataflow Shuffle は現在、次のリージョンで利用できます。

  • us-central1(アイオワ)
  • us-east1(サウスカロライナ)
  • us-west1(オレゴン)
  • europe-west1(ベルギー)
  • europe-west4(オランダ)
  • asia-east1(台湾)
  • asia-northeast1(東京)

Dataflow Shuffle は、他のリージョンでも今後利用可能になる予定です。

Java: SDK 2.x

バッチ パイプラインでサービスベースの Dataflow Shuffle を使用するには、次のパラメータを指定します。
--experiments=shuffle_mode=service

パイプラインに Dataflow Shuffle を使用する場合は、--zone パラメータを指定しないでください。代わりに、--region パラメータを指定し、その値を、現在 Shuffle を使用できるリージョンのいずれかに設定します。Dataflow は、指定されたリージョンのゾーンを自動的に選択します。--zone パラメータを指定する場合、使用可能なリージョンの外部にあるゾーンにそれが設定されると、Dataflow はエラーを報告します。

Python

バッチ パイプラインでサービスベースの Dataflow Shuffle を使用するには、次のパラメータを指定します。
--experiments=shuffle_mode=service

パイプラインに Dataflow Shuffle を使用する場合は、--zone パラメータを指定しないでください。代わりに、--region パラメータを指定し、その値を、現在 Shuffle を使用できるリージョンのいずれかに設定します。Dataflow は、指定されたリージョンのゾーンを自動的に選択します。--zone パラメータを指定する場合、使用可能なリージョンの外部にあるゾーンにそれが設定されると、Dataflow はエラーを報告します。

Java: SDK 1.x

Dataflow Flexible Resource Scheduling

Dataflow FlexRS は、高度なスケジューリング手法Dataflow Shuffle サービス、プリエンプティブル仮想マシン(VM)インスタンスと通常の VM の組み合わせを使用することで、バッチ処理コストを削減します。プリエンプティブル VM と通常の VM を並列実行することで、システム イベント中に Compute Engine がプリエンプティブル VM インスタンスを停止した場合のDataflow のユーザー エクスペリエンスが向上します。FlexRS は、プリエンプティブル VM が Compute Engine によってプリエンプトされたときに、パイプラインの処理を続行して以前の作業を確実に保持するのに役立ちます。FlexRS の詳細については、Dataflow での Flexible Resource Scheduling の使用をご覧ください。