パイプラインのデプロイ

Apache Beam パイプラインを構築してテストしたら、Cloud Dataflow マネージド サービスを使用して、そのパイプラインをデプロイし、実行できます。Cloud Dataflow サービスでは、パイプライン コードが Cloud Dataflow ジョブになります。

Cloud Dataflow サービスは、Compute EngineCloud Storage などの Google Cloud Platform(GCP)サービスを完全に管理して Cloud Dataflow ジョブを実行し、必要なリソースを自動的に起動および破棄します。また、Cloud Dataflow サービスは Cloud Dataflow モニタリング インターフェースCloud Dataflow コマンドライン インターフェースなどのツールを使用してジョブの可視性を提供します。

パイプライン コードで実行パラメータを設定することで、Cloud Dataflow サービスがジョブを実行する方法の一部の側面を制御できます。たとえば、実行パラメータは、パイプラインのステップをワーカー仮想マシン、Cloud Dataflow サービスのバックエンド、またはローカルで実行するかどうかを指定します。

GCP リソースの管理に加えて、Cloud Dataflow サービスは分散並列処理の多くの側面を自動的に実行し、最適化します。たとえば、次のようなものがあります。

  • 並列化と分散。Cloud Dataflow は、データを自動的に分割し、並列処理のためにワーカーコードを Compute Engine インスタンスに分散します。
  • 最適化。Cloud Dataflow は、パイプライン コードを使用して、パイプラインの PCollection と変換を表す実行グラフを作成し、最も効率的なパフォーマンスとリソース使用率を実現するためにグラフを最適化します。Cloud Dataflow は、データ集計など、コストがかかる可能性のあるオペレーションも自動的に最適化します。
  • 自動チューニング機能。Cloud Dataflow サービスには、自動スケーリングや動的作業再調整など、リソース割り当てとデータ パーティショニングのオンザフライ調整を提供する機能がいくつか含まれます。これらの機能は、Cloud Dataflow サービスがジョブをできるだけ素早く効率的に実行するのに役立ちます。

パイプラインのライフサイクル: パイプライン コードから Cloud Dataflow ジョブまで

Cloud Dataflow プログラムを実行すると、Cloud Dataflow はすべての変換とその関連処理関数(DoFn など)を含む Pipeline オブジェクトを作成するコードから実行グラフを作成します。このフェーズは、グラフ作成時間と呼ばれます。グラフ作成中に、Cloud Dataflow はさまざまなエラーをチェックし、パイプライン グラフに無効なオペレーションが含まれないことを確認します。実行グラフは JSON 形式に変換され、JSON 実行グラフは Cloud Dataflow サービス エンドポイントに転送されます。

注: グラフ作成は、パイプラインをローカルで実行したときにも行われますが、グラフは JSON に変換されたりサービスに転送されたりしません。代わりに、グラフは Cloud Dataflow プログラムを起動したマシンと同じマシンでローカルに実行されます。詳しくは、ローカル実行の構成をご覧ください。

Cloud Dataflow サービスは、JSON 実行グラフを検証します。グラフが検証されると、Cloud Dataflow サービスでジョブになります。Cloud Dataflow Monitoring Interface を使用して、ジョブ、その実行グラフ、ステータス、ログ情報を参照できるようになります。

Java: SDK 2.x

Cloud Dataflow サービスは、Cloud Dataflow プログラムを実行したマシンにレスポンスを送信します。このレスポンスは、Cloud Dataflow ジョブの jobId を含むオブジェクト DataflowPipelineJob にカプセル化されます。Cloud Dataflow モニタリング インターフェースCloud Dataflow コマンドライン インターフェースを使用すると、jobId に基づいて特定のジョブをモニタリング、追跡、トラブルシューティングできます。詳しくは、DataflowPipelineJob の API リファレンスをご覧ください。

Python

Cloud Dataflow サービスは、Cloud Dataflow プログラムを実行したマシンにレスポンスを送信します。このレスポンスは、Cloud Dataflow ジョブの job_id を含むオブジェクト DataflowPipelineResult にカプセル化されます。Cloud Dataflow モニタリング インターフェースCloud Dataflow コマンドライン インターフェースを使用すると、job_id に基づいて特定のジョブをモニタリング、追跡、トラブルシューティングできます。

Java: SDK 1.x

Cloud Dataflow サービスは、Cloud Dataflow プログラムを実行したマシンにレスポンスを送信します。このレスポンスは、Cloud Dataflow ジョブの jobId を含むオブジェクト DataflowPipelineJob にカプセル化されます。Cloud Dataflow モニタリング インターフェースCloud Dataflow コマンドライン インターフェースを使用すると、jobId に基づいて特定のジョブをモニタリング、追跡、トラブルシューティングできます。詳しくは、DataflowPipelineJob の API リファレンスをご覧ください。

実行グラフ

Cloud Dataflow は、変換と、Pipeline オブジェクトの作成時に使用したデータに基づいて、パイプラインを表すステップのグラフを作成します。これはパイプライン実行グラフです。

Apache Beam SDK に含まれる WordCount サンプルには、テキストのコレクション内の個々の単語と各単語のオカレンスの読み取り、抽出、カウント、書式設定、書き込みを行う一連の変換が含まれます。次の図は、WordCount パイプライン内の変換が実行グラフにどのように展開されるかを示しています。

Cloud Dataflow サービスで実行するステップの実行グラフに展開された WordCount サンプル プログラムの変換。
図 1: WordCount サンプルの実行グラフ

多くの場合、実行グラフは、パイプラインの作成時に変換を指定した順序とは異なります。これは、Cloud Dataflow サービスがマネージド クラウド リソースに対して実行される前に実行グラフに対してさまざまな最適化と融合を実行するためです。Cloud Dataflow サービスは、パイプラインの実行時にデータの依存関係を尊重しますが、間にデータ依存関係がないステップは任意の順序で実行できます。

Cloud Dataflow Monitoring Interface でジョブを選択するときに Cloud Dataflow がパイプラインに対して生成した最適化されていない実行グラフを参照できます。

並列化と分散

Cloud Dataflow サービスは、パイプラインの処理ロジックを自動的に並列化し、ジョブを実行するために割り当てたワーカーに分散します。Cloud Dataflow は、プログラミング モデル内の抽象化を使用して並列処理機能を表します。たとえば ParDo 変換の場合、Cloud Dataflow は、(DoFn で表現される)処理コードを、並列で実行される複数のワーカーに自動的に分散します。

ユーザーコードの構造化

DoFn コードは小さな独立したエンティティと考えることができます。異なるマシンで多くのインスタンスが実行され、それぞれが互いを認識していない可能性があります。このため、純粋な関数(隠蔽された状態や外部状態に依存せず、目に見える副作用がなく、確定的な関数)は、DoFn の並列的で分散的な性質に理想的なコードです。

純粋な関数モデルは厳しく固定されているわけではありません。Cloud Dataflow サービスが保証しない事項にコードが依存しない限り、状態情報や外部初期化データは DoFn とその他の関数オブジェクトに対して有効である場合があります。ParDo 変換を構成し、DoFn を作成する場合は、次のガイドラインを念頭に置いてください。

  • Cloud Dataflow サービスは、入力 PCollection 内のすべての要素が DoFn インスタンスによって必ず 1 回だけ処理されることを保証します。
  • Cloud Dataflow サービスは、DoFn が何回呼び出されるかを保証しません。
  • Cloud Dataflow サービスは、分散された要素が厳密にどのようにグループ化されるかを保証しません。つまり、どの要素(存在する場合)がまとめて処理されるかを保証しません。
  • Cloud Dataflow サービスは、パイプラインで作成される DoFn インスタンスの正確な数を保証しません。
  • Cloud Dataflow サービスはフォールト トレラントであり、ワーカーに問題が発生した場合にコードを複数回再試行することがあります。Cloud Dataflow サービスはコードのバックアップ コピーを作成する場合があり、手動の副作用が問題になることがあります(たとえば、コードが一意の名前を持たない一時ファイルに依存するか、これを作成する場合など)。
  • Cloud Dataflow サービスは、DoFn インスタンスごとに要素の処理をシリアル化します。 コードは厳密にスレッドセーフである必要はありませんが、複数の DoFn インスタンス間で共有される状態はスレッドセーフである必要があります。

ユーザーコードの作成について詳しくは、プログラミング モデル ドキュメントのユーザー指定関数の要件をご覧ください。

エラーと例外の処理

データの処理中にパイプラインによって例外がスローされることがあります。これらのエラーのいくつかは一過性です(たとえば一時的に外部サービスにアクセスできないなど)。一方、永続的にエラーもあります(破損または解析不能な入力データや、計算中の null ポインタが原因となるエラーなど)。

バンドル内のいずれかの要素についてエラーがスローされた場合、Cloud Dataflow はそのバンドル内の要素を処理し、バンドル全体を再試行します。バッチモードで実行している場合、失敗した項目を含むバンドルは 4 回にわたり再試行されます。単一のバンドルが 4 回失敗した場合はパイプラインが完全に失敗します。ストリーミング モードで実行している場合、失敗した項目を含むバンドルは無期限に再試行され、パイプラインが恒久的に滞るおそれがあります。

融合の最適化

パイプラインの実行グラフの JSON フォームが検証されると、Cloud Dataflow サービスは最適化を実行するためにグラフを修正することがあります。このような最適化には、パイプラインの実行グラフ内の複数のステップまたは変換を単一のステップに融合することが含まれます。ステップを融合すると、Cloud Dataflow サービスはパイプラインの中間 PCollection をすべて実体化する必要がなくなります。実体化はメモリと処理のオーバーヘッドの点でコストが高くなることがあります。

パイプライン構築で指定したすべての変換がサービスで実行されますが、それらは異なる順序で実行されることがあります。また、パイプラインを最も効率よく実行するために融合された、より大きな変換の一部として実行されることもあります。Cloud Dataflow サービスは、実行グラフのステップ間のデータ依存関係を尊重しますが、それ以外ではステップが任意の順序で実行される可能性があります。

融合の例

次の図は、Cloud Dataflow SDK for Java に含まれる WordCount サンプルの実行グラフを、効率的な実行のために Cloud Dataflow サービスによって最適化および融合する方法を示しています。

最適化され、Cloud Dataflow サービスによって融合されたステップのある WordCount サンプル プログラムの実行グラフ。
図 2: WordCount サンプルの最適化された実行グラフ

融合の防止

パイプラインで、Cloud Dataflow サービスが融合最適化を実行しないようにする必要があるケースがいくつかあります。これらは、Cloud Dataflow サービスがパイプラインのオペレーションを融合する最適な方法を間違って推測し、それによって Cloud Dataflow サービスがすべての使用可能ワーカーを利用する能力が制限される可能性があるケースです。

たとえば、Cloud Dataflow がワーカー使用状況を最適化する能力が融合によって制限される 1 つのケースは、「高ファンアウト」の ParDo です。このようなオペレーションでは、入力コレクションの要素が比較的少数でも、ParDo は数百または数千倍の要素数の出力を生成し、その後に別の ParDo が続くことがあります。Cloud Dataflow サービスがこれらの ParDo オペレーションを融合すると、中間 PCollection により多くの要素が含まれている場合でも、このステップの並列性は入力コレクション内のアイテム最大数に制限されます。

Cloud Dataflow サービスに中間 PCollection の実体化を強制するオペレーションをパイプラインに追加することで、このような融合を防ぐことができます。次のオペレーションの 1 つを使用することを検討してください。

  • GroupByKey を挿入し、最初の ParDo の後でグループ化解除できます。Cloud Dataflow サービスは、集約で ParDo オペレーションを決して融合しません。
  • 中間 PCollection副入力として別の ParDo に渡すことができます。Cloud Dataflow サービスは常に副入力を実体化します。

結合の最適化

集約オペレーションは、大規模なデータ処理における重要なコンセプトです。集約は、概念的に非常に異なるデータをまとめて、関連付けに極めて有用にします。Cloud Dataflow プログラミング モデルは、集約オペレーションを GroupByKeyCoGroupByKeyCombine 変換として表します。

Cloud Dataflow の集約オペレーションは、データセット全体でデータを結合します。これには、複数のワーカーにまたがる可能性のあるデータも含まれます。多くの場合、このような集約オペレーション中に、インスタンスをまたがるデータを結合する前にデータをできるだけローカルに結合するのが最も効率的です。GroupByKey または他の集約変換を適用する場合、Cloud Dataflow サービスは、メインのグループ化オペレーションの前に部分的なローカル結合を自動的に実行します。

部分結合または複数レベル結合を実行する場合、Cloud Dataflow サービスはパイプラインがバッチデータとストリーミング データのどちらを操作するかに基づいて異なる決定を行います。制限付きデータの場合、サービスは効率性を重視し、できるだけローカルの結合を実行します。制限なしデータの場合、サービスは低レイテンシを重視し、部分結合は実行しないことがあります(レイテンシが増加するため)。

自動チューニング機能

Cloud Dataflow サービスには、いくつかの自動チューニング機能が含まれます。この機能では、実行中の Cloud Dataflow ジョブを動的にさらに最適化できます。こうした機能には、自動スケーリング動的作業再調整が含まれます。

自動スケーリング

自動スケーリングを有効にすると、Cloud Dataflow サービスは、ジョブの実行に必要な適切な数のワーカー インスタンスを自動的に選択します。また、Cloud Dataflow サービスは、実行時にジョブの特性を考慮して、より多数のワーカーまたは少数のワーカーを動的に再割り当てします。パイプラインの特定の部分は他よりも計算負荷が高い場合があり、Cloud Dataflow サービスはジョブのこれらのフェーズ中に追加のワーカーを自動的に起動できます(また、不要になったときにそれらをシャットダウンします)。

Java: SDK 2.x

自動スケーリングは、すべてのバッチ Cloud Dataflow ジョブにおいてデフォルトで有効化されます。パイプラインの実行時にオプション --autoscalingAlgorithm=NONE を明示的に指定することで自動スケーリングを無効化できます。その場合、Cloud Dataflow サービスは --numWorkers オプションに基づいてワーカー数を設定します。これはデフォルトで 3 になります。

Cloud Dataflow ジョブが以前のバージョンの SDK を使用する場合は、パイプラインの実行時にオプション --autoscalingAlgorithm=THROUGHPUT_BASED指定することで自動スケーリングを有効化できます。

Python

自動スケーリングは、Cloud Dataflow SDK for Python バージョン 0.5.1 以降を使用して作成されたすべてのバッチ Cloud Dataflow ジョブではデフォルトで有効化されます。パイプラインの実行時にオプション --autoscaling_algorithm=NONE を明示的に指定することで自動スケーリングを無効化できます。その場合、Cloud Dataflow サービスは --num_workers オプションに基づいてワーカー数を設定します。これはデフォルトで 3 になります。

Cloud Dataflow ジョブが以前のバージョンの SDK を使用する場合は、パイプラインの実行時にオプション --autoscaling_algorithm=THROUGHPUT_BASED指定することで自動スケーリングを有効化できます。

Java: SDK 1.x

自動スケーリングは、Cloud Dataflow SDK for Java バージョン 1.6.0 以降を使用して作成されたすべてのバッチ Cloud Dataflow ジョブではデフォルトで有効化されます。パイプラインの実行時にオプション --autoscalingAlgorithm=NONE を明示的に指定することで自動スケーリングを無効化できます。その場合、Cloud Dataflow サービスは --numWorkers オプションに基づいてワーカー数を設定します。これはデフォルトで 3 になります。

Cloud Dataflow ジョブが以前のバージョンの SDK を使用する場合は、パイプラインの実行時にオプション --autoscalingAlgorithm=THROUGHPUT_BASED指定することで自動スケーリングを有効化できます。

バッチ自動スケーリング

バッチモードの制限付きデータでは、Cloud Dataflow はパイプラインの各ステージの作業量とそのステージの現在のスループットに基づいてワーカー数を自動的に選択します。

ユーザーが実装したカスタム データソースがパイプラインで使用される場合、より多くの情報を Cloud Dataflow サービスの自動スケーリング アルゴリズムに提供し、パフォーマンスを向上させる可能性のあるいくつかのメソッドを実装できます。

Java: SDK 2.x

  • BoundedSource サブクラスで、メソッド getEstimatedSizeBytes を実装します。Cloud Dataflow サービスは、パイプラインで使用するワーカーの初期数を計算するときに getEstimatedSizeBytes を使用します。
  • BoundedReader サブクラスで、メソッド getFractionConsumed を実装します。Cloud Dataflow サービスは、getFractionConsumed を使用して読み取りの進行状況を追跡し、読み取り中に使用する適切なワーカー数に収束します。

Python

  • BoundedSource サブクラスで、メソッド estimate_size を実装します。Cloud Dataflow サービスは、パイプラインで使用するワーカーの初期数を計算するときに estimate_size を使用します。
  • RangeTracker サブクラスで、メソッド fraction_consumed を実装します。Cloud Dataflow サービスは、fraction_consumed を使用して読み取りの進行状況を追跡し、読み取り中に使用する適切なワーカー数に収束します。

Java: SDK 1.x

  • BoundedSource サブクラスで、メソッド getEstimatedSizeBytes を実装します。Cloud Dataflow サービスは、パイプラインで使用するワーカーの初期数を計算するときに getEstimatedSizeBytes を使用します。
  • BoundedReader サブクラスで、メソッド getFractionConsumed を実装します。Cloud Dataflow サービスは、getFractionConsumed を使用して読み取りの進行状況を追跡し、読み取り中に使用する適切なワーカー数に収束します。

ストリーミング自動スケーリング

Java: SDK 2.x

ストリーミング自動スケーリングにより、Cloud Dataflow サービスは負荷とリソースの使用率の変化に応じて、ストリーミング パイプラインを実行するために使用されるワーカーの数を適切に変更できます。ストリーミング自動スケーリングは無料の機能であり、ストリーミング パイプラインの実行時に使用されるリソースのコストを削減するように設計されています。

自動スケーリングを使用しない場合は、(--numWorkers を指定して)固定のワーカー数を選択し、パイプラインを実行することになります。入力ワークロードは時間とともに変化するため、この数は多すぎたり少なすぎたりする可能性があります。プロビジョニングしたワーカーが多すぎると余分なコストがかかり、プロビジョニングしたワーカーが少なすぎると処理されたデータのレイテンシが高くなります。自動スケーリングを有効にすることで、リソースは必要なときにのみ使用されます。

スケーリングを決定するために、自動スケーリングは、ビジーワーカーの状況とワーカーが入力ストリームから遅れないかどうかを評価するいくつかの指標に依存します。主要な指標には、CPU 使用率、スループット、バックログなどがあります。その目的は、ワーカーの使用率とスループットを最大化しながらバックログを最小限に抑え、負荷の急増に迅速に対応することです。自動スケーリングを有効にすることにより、プロビジョニングについてピーク時の負荷と新しい結果のどちらかを選択する必要はなくなります。CPU 使用率とバックログが増加するとワーカーが追加され、これらの指標が低下すると削除されます。この方法により、必要なものについてのみ支払うことになり、ジョブは可能な限り効率的に処理されます。

パイプラインでカスタムの制限なしソースが使用されている場合、ソースがバックログについて Cloud Dataflow サービスに通知することが不可欠です。バックログは、ソースによってまだ処理されていない入力の推定値(バイト単位)です。バックログについてサービスに通知するには、UnboundedReader クラスに次のメソッドのいずれかを実装します。

  • getSplitBacklogBytes() - ソースの現在のスプリットのバックログ。サービスで、すべてのスプリットのバックログを集約します。
  • getTotalBacklogBytes() - すべてのスプリットのグローバル バックログ。スプリットごとにバックログを取得できず、すべてのスプリットでのみバックログを計算できる場合があります。最初のスプリット(スプリット ID「0」)のみが合計バックログを提供する必要があります。
Apache Beam リポジトリには、UnboundedReader クラスを実装するカスタムソースのがいくつか含まれています。
ストリーミング自動スケーリングを有効にする

自動スケーリングを有効にするには、パイプラインを起動するときに次の実行パラメータを設定します。

--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=<N>

自動スケーリングは、パイプラインの実行中に N÷15~N ワーカーの間で変動する場合があります。N は --maxNumWorkers の値です。たとえば、パイプラインで定常状態のワーカーが 3 つまたは 4 つ必要な場合、--maxNumWorkers=15 を設定できます。これにより、パイプラインは 1~15 の間でワーカーの自動スケーリングを行います。

ストリーミング パイプラインは、--maxNumWorkers と同じ数の永続ディスクの固定プールを伴ってデプロイされます。--maxNumWorkers を指定するときはこのことを考慮し、この値がパイプラインにとって十分なディスク数であることを確認してください。

使用量と料金

Compute Engine の使用量は平均ワーカー数に基づき、永続ディスクの使用量は --maxNumWorkers の数に基づきます。永続ディスクは、各ワーカーの接続ディスク数が等しくなるように再配布されます。

上記の例(--maxNumWorkers=15)では、1~15 個の Compute Engine インスタンスと 15 個の永続ディスクに対して支払いが発生します。

Python

この機能は Apache Beam SDK for Python ではまだサポートされていません。

Java: SDK 1.x

ストリーミング自動スケーリングにより、Cloud Dataflow サービスは負荷とリソースの使用率の変化に応じて、ストリーミング パイプラインを実行するために使用されるワーカーの数を適切に変更できます。ストリーミング自動スケーリングは無料の機能であり、ストリーミング パイプラインの実行時に使用されるリソースのコストを削減するように設計されています。

自動スケーリングを使用しない場合は、(--numWorkers を指定して)固定のワーカー数を選択し、パイプラインを実行することになります。入力ワークロードは時間とともに変化するため、この数は多すぎたり少なすぎたりする可能性があります。プロビジョニングしたワーカーが多すぎると余分なコストがかかり、プロビジョニングしたワーカーが少なすぎると処理されたデータのレイテンシが高くなります。自動スケーリングを有効にすることで、リソースは必要なときにのみ使用されます。

スケーリングを決定するために、自動スケーリングは、ビジーワーカーの状況とワーカーが入力ストリームから遅れないかどうかを評価するいくつかの指標に依存します。主要な指標には、CPU 使用率、スループット、バックログなどがあります。その目的は、ワーカーの使用率とスループットを最大化しながらバックログを最小限に抑え、負荷の急増に迅速に対応することです。自動スケーリングを有効にすることにより、プロビジョニングについてピーク時の負荷と新しい結果のどちらかを選択する必要はなくなります。CPU 使用率とバックログが増加するとワーカーが追加され、これらの指標が低下すると削除されます。この方法により、必要なものについてのみ支払うことになり、ジョブは可能な限り効率的に処理されます。

パイプラインでカスタムの制限なしソースが使用されている場合、ソースがバックログについて Cloud Dataflow サービスに通知することが不可欠です。バックログは、ソースによってまだ処理されていない入力の推定値(バイト単位)です。バックログについてサービスに通知するには、UnboundedReader クラスに次の 2 つのメソッドのいずれかを実装します。

  • getSplitBacklogBytes() - ソースの現在のスプリットのバックログ。サービスで、すべてのスプリットのバックログを集約します。
  • getTotalBacklogBytes() - すべてのスプリットのグローバル バックログ。スプリットごとにバックログを取得できず、すべてのスプリットでのみバックログを計算できる場合があります。最初のスプリット(スプリット ID「0」)のみが合計バックログを提供する必要があります。
ストリーミング自動スケーリングを有効にする

自動スケーリングを有効にするには、パイプラインを起動するときに次の実行パラメータを設定します。

--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=<N>

自動スケーリングは、パイプラインの実行中に N÷15~N ワーカーの間で変動する場合があります。N は --maxNumWorkers の値です。たとえば、パイプラインで定常状態のワーカーが 3 つまたは 4 つ必要な場合、--maxNumWorkers=15 を設定できます。これにより、パイプラインは 1~15 の間でワーカーの自動スケーリングを行います。

ストリーミング パイプラインは、--maxNumWorkers と同じ数の永続ディスクの固定プールを伴ってデプロイされます。--maxNumWorkers を指定するときはこのことを考慮し、この値がパイプラインにとって十分なディスク数であることを確認してください。

現在、ストリーミング パイプラインでの自動スケーリングをサポートする唯一のソースは、PubsubIO です。SDK で提供されているすべてのシンクがサポートされています。このベータリリースでは、小さいバッチで公開されたトピックに関連付けられた Cloud Pub/Sub サブスクリプションから読み取るときや、低レイテンシのシンクに書き込むときに、自動スケーリングは最もスムーズに機能します。極端な場合(つまり、大きい公開バッチでの Cloud Pub/Sub サブスクリプションや非常に高レイテンシのシンク)には、自動スケーリングは大まかになることが知られています。これは今後のリリースで改善される予定です。

使用量と料金

Compute Engine の使用量は平均ワーカー数に基づき、永続ディスクの使用量は --maxNumWorkers の数に基づきます。永続ディスクは、各ワーカーの接続ディスク数が等しくなるように再配布されます。

上記の例(--maxNumWorkers=15)では、1~15 個の Compute Engine インスタンスと 15 個の永続ディスクに対して支払いが発生します。

ストリーミング パイプラインを手動でスケーリングする

Java: SDK 2.x

ストリーミング モードでの自動スケーリングが一般提供されるまでは、Cloud Dataflow の更新機能を使用して、ストリーミング パイプラインを実行するワーカー数を手動でスケーリングできます。

実行中にストリーミング パイプラインのスケーリングが必要になることがわかっている場合は、パイプラインの起動時に次の実行パラメータを設定してください。

  • --maxNumWorkers は、パイプラインで使用可能にするワーカーの最大数と同じ値に設定します。
  • --numWorkers は、実行開始時にパイプラインで使用するワーカーの初期数と同じ値に設定します。

パイプラインが稼働状態になったら、パイプラインを更新し、--numWorkers パラメータを使って新規ワーカー数を指定します。新しい --numWorkers に設定する値は N から --maxNumWorkers までの範囲でなければなりません。ここで、N--maxNumWorkers÷15 です。

更新により、実行中のジョブが(新しいワーカー数を使って)新規ジョブに置換されますが、前のジョブに関連付けられている状態情報はすべて保持されます。

Python

この機能は Apache Beam SDK for Python ではまだサポートされていません。

Java: SDK 1.x

ストリーミング モードでの自動スケーリングが一般提供されるまでは、Cloud Dataflow の更新機能を使用して、ストリーミング パイプラインを実行するワーカー数を手動でスケーリングできます。

実行中にストリーミング パイプラインのスケーリングが必要になることがわかっている場合は、パイプラインの起動時に次の実行パラメータを設定してください。

  • --maxNumWorkers は、パイプラインで使用可能にするワーカーの最大数と同じ値に設定します。
  • --numWorkers は、実行開始時にパイプラインで使用するワーカーの初期数と同じ値に設定します。

パイプラインが稼働状態になったら、パイプラインを更新し、--numWorkers パラメータを使って新規ワーカー数を指定します。新しい --numWorkers に設定する値は N から --maxNumWorkers までの範囲でなければなりません。ここで、N--maxNumWorkers÷15 です。

更新により、実行中のジョブが(新しいワーカー数を使って)新規ジョブに置換されますが、前のジョブに関連付けられている状態情報はすべて保持されます。

動的作業再調整

Cloud Dataflow サービスの動的作業再調整機能では、サービスが実行時の条件に基づいて作業を動的に再分割できます。これらの条件は、次のものが含まれることがあります。

  • 作業割り当ての不均衡
  • 終了に予想より長い時間がかかるワーカー
  • 予想よりも早く終了するワーカー

Cloud Dataflow サービスはこれらの条件を自動的に検出し、未使用または十分に使用されていないワーカーに作業を動的に再割り当てして、ジョブの全体的な処理時間を短縮します。

制限事項

動的作業再調整は、Cloud Dataflow サービスが一部の入力データを並列に処理している場合にのみ行われます。データを外部入力ソースから読み取っている場合、実体化された中間 PCollection を操作している場合、または GroupByKey などの集約の結果を操作している場合です。ジョブの多数のステップが融合される場合、ジョブの中間 PCollection は少なくなり、動的作業再調整は、ソースの実体化された PCollection 内の要素の数に限定されます。パイプラインの特定の PCollection に動的作業再調整が確実に適用されるようにするには、いくつかの異なる方法で融合を防止することで、動的並列性が確保されます。

動的作業再調整は、単一のレコードよりも細かくデータを再並列化できません。処理時間の大幅な遅延を引き起こす個別レコードがデータに含まれる場合、Cloud Dataflow は個々の「ホット」レコードを分割して複数のワーカーに再分散できないため、ジョブが遅延する可能性があります。

Java: SDK 2.x

パイプラインの最終出力に固定数のシャードを(たとえば、TextIO.Write.withNumShards を使用してデータを書き込むことで)設定した場合、並列化は選択したシャード数に基づいて制限されます。

Python

パイプラインの最終出力に固定数のシャードを(たとえば、beam.io.WriteToText(..., num_shards=...) を使用してデータを書き込むことで)設定した場合、Cloud Dataflow は選択したシャード数に基づいて並列化を制限します。

Java: SDK 1.x

パイプラインの最終出力に固定数のシャードを(たとえば、TextIO.Write.withNumShards を使用してデータを書き込むことで)設定した場合、並列化は選択したシャード数に基づいて制限されます。

固定シャード制限は一時的とみなすことができ、Cloud Dataflow サービスの今後のリリースで変更の対象となる可能性があります。

カスタム データソースの操作

Java: SDK 2.x

自分が提供するカスタム データソースをパイプラインで使用する場合は、splitAtFraction メソッドを実装して、ソースが動的作業再調整機能と連動できるようにする必要があります。

Python

自分が提供するカスタム データソースをパイプラインで使用する場合は、RangeTrackertry_claimtry_splitposition_at_fractionfraction_consumed を実装して、ソースが動的作業再調整機能と連動できるようにする必要があります。

詳しくは、RangeTracker の API リファレンス情報をご覧ください。

Java: SDK 1.x

自分が提供するカスタム データソースをパイプラインで使用する場合は、splitAtFraction メソッドを実装して、ソースが動的作業再調整機能と連動できるようにする必要があります。

リソースの使用率と管理

Cloud Dataflow サービスは、GCP のリソースをジョブごとに完全に管理します。これには Compute Engine インスタンス(ワーカーまたは VM とも呼ばれる)の起動と停止、I/O と一時ファイル ステージングの両方に対するプロジェクトの Cloud Storage バケットへのアクセスが含まれます。ただし、パイプラインで BigQueryCloud Pub/Sub などの GCP データ ストレージ テクノロジーを操作する場合は、こうしたサービスのリソースと割り当てを管理する必要があります。

Cloud Dataflow は、ユーザーが提供する Cloud Storage 内の場所をファイルのステージングに使用します。この場所はユーザーの制御下にあり、ジョブがそこから読み取る限り、場所の有効性が維持されることを確認する必要があります。SDK の組み込みキャッシュはジョブの開始時間を短縮できるため、複数のジョブ実行に同じステージング場所を再利用できます。

ジョブ

GCP プロジェクトあたり 25 個までの Cloud Dataflow ジョブを同時に実行できます。

Cloud Dataflow サービスは現在、20 MB 以下のサイズの JSON ジョブ リクエストの処理に限定されています。ジョブ リクエストのサイズはパイプラインの JSON 表現に相関します。パイプラインが大きいほど、リクエストが大きくなります。

パイプラインの JSON リクエストのサイズを見積もるには、次のオプションを指定してパイプラインを実行します。

Java: SDK 2.x

--dataflowJobFile=< path to output file >

Python

--dataflow_job_file=< path to output file >

Java: SDK 1.x

--dataflowJobFile=< path to output file >

このコマンドは、ジョブの JSON 表現をファイルに書き込みます。シリアル化されたファイルのサイズは、リクエストのサイズの適切な推定値となります。リクエストにはいくつかの追加情報が含まれるので、実際のサイズはわずかに大きくなります。

詳細については、「413 Request Entity Too Large」(413 リクエスト エンティティが大きすぎます)/「The size of serialized JSON representation of the pipeline exceeds the allowable limit」(パイプラインのシリアル化 JSON 表現のサイズが上限を超えています)のトラブルシューティング ページをご覧ください。

また、ジョブグラフのサイズは 10 MB を超えないでください。詳細については、「The job graph is too large. Please try again with a smaller job graph, or split your job into two or more smaller jobs.」(ジョブグラフが大きすぎます。小さいジョブグラフでもう一度試すか、ジョブを複数の小さいジョブに分割してください)のトラブルシューティング ページをご覧ください。

ワーカー

Cloud Dataflow サービスでは、現在最大でジョブあたり 1,000 個の Compute Engine インスタンスが許可されます。バッチジョブのデフォルトのマシンタイプは n1-standard-1、ストリーミングは n1-standard-4 です。したがって、デフォルトのマシンタイプを使用している場合、Cloud Dataflow サービスは最大でジョブあたり 4,000 コアを割り当てることができます。

Cloud Dataflow では、n1 シリーズのワーカーとカスタム マシンタイプがサポートされます。パイプラインの作成時に適切な実行パラメータを設定することで、パイプラインのマシンタイプを指定できます。

Java: SDK 2.x

マシンタイプを変更するには、--workerMachineType オプションを設定します。

Python

マシンタイプを変更するには、--worker_machine_type オプションを設定します。

Java: SDK 1.x

マシンタイプを変更するには、--workerMachineType オプションを設定します。

リソース割り当て

Cloud Dataflow サービスは、ジョブの開始とワーカー インスタンスの最大数へのスケーリングのために、ジョブの実行に必要な Compute Engine リソース割り当てが GCP プロジェクトにあることをチェックします。十分なリソース割り当てが使用可能でない場合、ジョブの開始は失敗します。

Cloud Dataflow の自動スケーリング機能は、プロジェクトの使用可能 Compute Engine 割り当てによって制限されます。ジョブの開始時に十分な割り当てがあっても、別のジョブがプロジェクトの使用可能割り当ての残りを使用する場合、最初のジョブは実行されますが、完全にはスケーリングできません。

しかし、Cloud Dataflow サービスはプロジェクトのリソース割り当てを超えるジョブの割り当ての増加を管理しません。ユーザーは、追加のリソース割り当てについて必要なリクエストを行う責任があります。これは、Google Cloud Platform Console で行えます。

永続ディスク リソース

Cloud Dataflow サービスは、ストリーミング ジョブの実行時に、ワーカー インスタンスあたり 15 個の永続ディスクに制限されています。各永続ディスクは、個々の Compute Engine 仮想マシンに対してローカルです。ジョブは永続ディスクより多くのワーカーを持つことができません。ワーカーとディスク間の 1:1 の比率が最小リソース割り当てです。

各永続ディスクのデフォルト サイズは、バッチモードで 250 GBストリーミング モードで 400 GB です。

ロケーション

デフォルトでは、Cloud Dataflow サービスは、us-central1 リージョンの us-central1-f ゾーンに Compute Engine リソースをデプロイします。--region パラメータを指定することで、この設定をオーバーライドできます。リソースに特定のゾーンを使用する必要がある場合は、パイプラインを作成するときに --zone パラメータを使用します。ただし、リージョンのみを指定し、ゾーンを指定しないでおくことをおすすめします。こうすることで、Cloud Dataflow サービスは、ジョブ作成リクエスト時に使用可能なゾーン容量に基づいて、リージョン内の最適なゾーンを自動的に選択できます。詳細については、リージョン エンドポイントのドキュメントをご覧ください。

Streaming Engine

現在、Cloud Dataflow パイプライン ランナーは、ストリーミング パイプラインのステップをすべてワーカー仮想マシンで実行し、ワーカーの CPU、メモリ、永続ディスク ストレージを消費します。Cloud Dataflow の Streaming Engine は、パイプラインの実行をワーカー VM から Cloud Dataflow サービスのバックエンドに移動します。

Streaming Engine の利点

Streaming Engine モデルには、次の利点があります。

  • ワーカー VM 上の CPU、メモリ、永続ディスク ストレージ リソースの消費量が削減されます。Streaming Engine はより小さいワーカー マシンタイプ(n1-standard-4 ではなく n1-standard-2)の場合に最適に機能し、小さいワーカーのブートディスクを超える永続ディスクを必要としないので、リソースと割り当ての消費量が少なくなります。
  • 着信データ量の変動に対応する、レスポンスに優れた自動スケーリング。Streaming Engine によって、ワーカーのスケーリングがよりスムーズできめ細かになります。
  • サービスの更新を適用するためにパイプラインを再デプロイする必要がないので、サポート性が向上しました。

ワーカー リソースの削減のほとんどは、処理を Cloud Dataflow サービスにオフロードすることによって実現されます。そのため、Streaming Engine の使用に関する料金がかかります。ただし、Streaming Engine を使用する Cloud Dataflow パイプラインの合計請求額は、このオプションを使用しない Cloud Dataflow パイプラインの総コストとほぼ同じであると予想されます。

Streaming Engine の使用

現在、次のリージョンでストリーミング パイプラインに Streaming Engine を使用できます。他のリージョンでも今後公開していく予定です。

  • us-central1(アイオワ)
  • europe-west1(ベルギー)
  • europe-west4(オランダ)
  • asia-northeast1(東京)

Java: SDK 2.x

ストリーミング パイプラインに Streaming Engine を使用するには、次のパラメータを指定します。

  • --enableStreamingEngine(Apache Beam SDK for Java バージョン 2.11.0 以降を使用している場合)。
  • --experiments=enable_streaming_engine(Apache Beam SDK for Java バージョン 2.10.0 を使用している場合)。

パイプラインに Cloud Dataflow Streaming Engine を使用する場合は、--zone パラメータを指定しないでください。代わりに --region パラメータを指定し、その値を、現在 Streaming Engine を使用できるリージョンのいずれかに設定します。Cloud Dataflow は、指定されたリージョンのゾーンを自動的に選択します。--zone パラメータを指定する場合、使用可能なリージョンの外部にあるゾーンにそれが設定されると、Cloud Dataflow はエラーを報告します。

Streaming Engine は小規模なワーカー マシンタイプで最適に機能するので、--workerMachineType=n1-standard-2 を設定することをおすすめします。また、Streaming Engine はワーカーのブートイメージ用とローカルログ用の容量だけを必要とするので、--diskSizeGb=30 を設定することもできます。こうした値を明示的に設定しない場合は、デフォルト値が使用されます。

Python

この機能は Apache Beam SDK for Python ではまだサポートされていません。

Java: SDK 1.x

Streaming Engine は、Cloud Dataflow SDK for Java バージョン 1.x ではサポートされていません。この機能を使用するには、Apache Beam SDK for Java 2.8.0 以降を使用する必要があります。

Cloud Dataflow Shuffle

Cloud Dataflow Shuffle は、GroupByKeyCoGroupByKeyCombine などの Cloud Dataflow 変換の背後で行われる基本オペレーションです。Cloud Dataflow Shuffle オペレーションは、スケーラブルで効率的、かつフォールト トレラントな方法に従い、キーによってデータを分割してグループ化します。現在、Cloud Dataflow はシャッフル実装を使用しています。シャッフル実装はすべてワーカー仮想マシンで実行され、ワーカーの CPU、メモリ、永続ディスク ストレージを消費します。バッチ パイプラインのみで利用可能なサービスベースの新しい Cloud Dataflow Shuffle 機能では、シャッフル オペレーションがワーカー VM から Cloud Dataflow サービスのバックエンドに移動されます。

Cloud Dataflow Shuffle の利点

サービスベースの Cloud Dataflow Shuffle には、次の利点があります。

  • パイプライン ジョブタイプの大部分で、バッチ パイプラインの実行時間が短縮されます。
  • ワーカー VM 上の CPU、メモリ、永続ディスク ストレージ リソースの消費量が削減されます。
  • VM はシャッフル データを保持しなくなるため、自動スケーリングが改善され、より早期にスケールダウンできます。
  • フォールト トレランスが強化されます。この機能を使用していない場合のように、Cloud Dataflow Shuffle データを保持する VM の異常によってジョブ全体が失敗することはありません。

ワーカー リソースの削減のほとんどは、シャッフル処理を Dataflow サービスにオフロードすることによって実現されます。そのため、Cloud Dataflow Shuffle の使用に関する料金がかかります。ただし、サービスベースの Cloud Dataflow 実装を使用する Dataflow パイプラインの合計請求額は、このオプションを使用しない Dataflow パイプラインのコスト以下であると予想されます。

大部分のパイプライン ジョブタイプでは、Cloud Dataflow Shuffle のほうが、ワーカー VM で実行されるシャッフル実装よりも高速に実行されます。ただし、実行時間はそのつど異なります。重要な期限があるパイプラインを実行している場合は、期限までに十分なバッファ時間を割り当てることをおすすめします。さらに、シャッフルの割り当て量を増やすことも検討してください。

ディスクに関する考慮事項

サービスベースの Cloud Dataflow Shuffle 機能を使用する場合、ワーカー VM に大きな永続ディスクを接続する必要はありません(Cloud Dataflow によって 25 GB の小さなブートディスクが自動的に接続されます)。ただし、ディスクサイズこのように小さいので、Cloud Dataflow Shuffle を使用する際は、留意すべき重要な考慮事項があります。

  • ワーカー VM は 25 GB のディスク容量の一部をオペレーティング システム、バイナリ、ログ、コンテナに使用します。Cloud Dataflow Shuffle を使用する場合、大量のディスク容量を使用するジョブが残りのディスク容量を使い果たして失敗することがあります。
  • 小さなディスクのパフォーマンスにより、ディスク I/O を大量に使用するジョブは処理に時間がかかることがあります。ディスクサイズ間のパフォーマンスの違いについて詳しくは、Compute Engine 永続ディスクのパフォーマンスに関するページをご覧ください。

以上の考慮事項が 1 つでもジョブに当てはまる場合は、パイプライン オプションを使用してより大きなディスクサイズを指定できます。

Cloud Dataflow Shuffle を使用する

サービスベースの Cloud Dataflow Shuffle は現在、次のリージョンで利用できます。

  • us-central1(アイオワ)
  • europe-west1(ベルギー)
  • europe-west4(オランダ)
  • asia-northeast1(東京)

Cloud Dataflow Shuffle は、他のリージョンでも今後利用可能になる予定です。

Java: SDK 2.x

バッチ パイプラインでサービスベースの Cloud Dataflow Shuffle を使用するには、次のパラメータを指定します。
--experiments=shuffle_mode=service

パイプラインに Cloud Dataflow Shuffle を使用する場合は、--zone パラメータを指定しないでください。代わりに、--region パラメータを指定し、その値を、現在 Shuffle を使用できるリージョンのいずれかに設定します。Cloud Dataflow は、指定されたリージョンのゾーンを自動的に選択します。--zone パラメータを指定する場合、使用可能なリージョンの外部にあるゾーンにそれが設定されると、Cloud Dataflow はエラーを報告します。

Python

バッチ パイプラインでサービスベースの Cloud Dataflow Shuffle を使用するには、次のパラメータを指定します。
--experiments=shuffle_mode=service

パイプラインに Cloud Dataflow Shuffle を使用する場合は、--zone パラメータを指定しないでください。代わりに、--region パラメータを指定し、その値を、現在 Shuffle を使用できるリージョンのいずれかに設定します。Cloud Dataflow は、指定されたリージョンのゾーンを自動的に選択します。--zone パラメータを指定する場合、使用可能なリージョンの外部にあるゾーンにそれが設定されると、Cloud Dataflow はエラーを報告します。

Java: SDK 1.x

バッチ パイプラインでサービスベースの Cloud Dataflow Shuffle を使用するには、次のパラメータを指定します。
--experiments=shuffle_mode=service

パイプラインに Cloud Dataflow Shuffle を使用する場合は、--zone パラメータを指定しないでください。代わりに、--region パラメータを指定し、その値を、現在 Shuffle を使用できるリージョンのいずれかに設定します。Cloud Dataflow は、指定されたリージョンのゾーンを自動的に選択します。--zone パラメータを指定する場合、使用可能なリージョンの外部にあるゾーンにそれが設定されると、Cloud Dataflow はエラーを報告します。

Cloud Dataflow Flexible Resource Scheduling

Cloud Dataflow FlexRS は、高度なスケジューリング手法Cloud Dataflow Shuffle サービス、プリエンプティブ仮想マシン(VM)インスタンスと通常の VM を組み合わせて使用することで、バッチ処理のコストを最小限に抑えます。プリエンプティブル VM と通常の VM を並列実行することで、システム イベント中に Compute Engine がプリエンプティブル VM インスタンスを停止した場合の Cloud Dataflow のユーザー エクスペリエンスが向上します。FlexRS は、プリエンプティブ VM が Compute Engine によってプリエンプトされたときに、パイプラインの処理を続行して以前の作業を確実に保持するのに役立ちます。FlexRS の詳細については、Cloud Dataflow での Flexible Resource Scheduling の使用をご覧ください。
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。