このドキュメントでは、費用を最小限に抑えることを目標に Dataflow ジョブを最適化するためのベスト プラクティスについて説明します。費用に影響する要因について説明し、それらの費用をモニタリングして管理する手法について説明します。
Dataflow ジョブの費用の計算方法の詳細については、Dataflow の料金をご覧ください。
ジョブの費用に大きく影響する要因はいくつかあります。
- ランタイム設定
- パイプライン パフォーマンス
- パイプラインのスループット要件
以降のセクションでは、ジョブのモニタリング方法、ジョブ費用に影響する要因、パイプラインの効率を改善するためのヒントについて説明します。
SLO を定義する
最適化を開始する前に、パイプラインのサービスレベル目標(SLO)を定義します。特に、スループットとレイテンシに注意してください。これらの要件は、コストとその他の要素のトレードオフを検討する際に役立ちます。
- パイプラインでエンドツーエンドの取り込みレイテンシを低く抑える必要がある場合、パイプラインの費用が高くなる可能性があります。
- 遅延して到着したデータを処理する必要がある場合、パイプラインの全体的なコストが高くなる可能性があります。
- ストリーミング パイプラインで処理する必要があるデータの急増が発生した場合、パイプラインで追加の容量が必要になり、費用が増加する可能性があります。
ジョブのモニタリング
ジョブを最適化する方法を決定するには、まずジョブの動作を理解する必要があります。Dataflow モニタリング ツールを使用して、パイプラインの実行をモニタリングします。その後、この情報を使用してパフォーマンスと効率を改善します。
費用のモニタリング
次の手法を使用して、費用を予測してモニタリングします。
- 本番環境でパイプラインを実行する前に、データのサブセットで 1 つ以上の小規模なジョブを実行します。多くのパイプラインでは、この手法を使用して費用の見積もりを取得できます。
- Dataflow モニタリング インターフェースの [費用] ページを使用して、ジョブの推定費用をモニタリングします。推定費用は、契約上の割引など、さまざまな理由で実際のジョブ費用が反映されていない場合がありますが、費用の最適化に適したベースラインを提供できます。詳細については、費用モニタリングをご覧ください。
- Cloud Billing データを BigQuery にエクスポートし、課金データのエクスポート テーブルで費用分析を行います。Cloud Billing エクスポートを使用すると、 Google Cloud の詳細な課金データを BigQuery データセットに終日自動的にエクスポートできます。請求データには、使用量、費用の見積もり、料金データが含まれます。
- 予期しない費用が発生しないように、Dataflow ジョブが定義したしきい値を超えたときにモニタリング アラートを作成します。詳細については、Dataflow パイプラインで Cloud Monitoring を使用するをご覧ください。
ジョブのモニタリング
ジョブをモニタリングし、パイプラインの効率を改善できる部分を特定します。
- Dataflow のジョブ モニタリング インターフェースを使用して、パイプラインの問題を特定します。モニタリング インターフェースには、各パイプラインのジョブグラフと実行の詳細が表示されます。どちらのツールも、パイプラインを理解し、遅いステージ、停止したステージ、経過時間が長すぎるステップを特定するのに役立ちます。
- Metrics Explorer を使用して、Dataflow ジョブの詳細な指標を確認します。カスタム指標を使用してパフォーマンス データを取得できます。
Distribution
指標は、パフォーマンス データを収集する場合に特に役立ちます。 - CPU 使用率の高いパイプラインの場合は、Cloud Profiler を使用して、パイプライン コードの中で最もリソースを消費している部分を特定します。
- データ サンプリングを使用して、データの問題を特定します。データ サンプリングを使用すると、Dataflow パイプラインの各ステップでデータをモニタリングできます。実行中のジョブや完了したジョブの実際の入出力を確認できるので、この情報はパイプラインに関する問題のデバッグに役立ちます。
大量のパイプラインで要素ごとの処理指標をロギングすることはおすすめしません。ロギングには上限があり、過剰なロギングによりジョブのパフォーマンスが低下する可能性があるためです。
ランタイム設定を最適化する
次のランタイム設定は費用に影響する可能性があります。
- ストリーミング ジョブとバッチジョブのどちらを実行するか
- ジョブの実行に使用するサービス(Streaming Engine や FlexRS など)
- ワーカー VM のマシンタイプ、ディスクサイズ、GPU 数
- 自動スケーリング モード
- 初期ワーカー数と最大ワーカー数
- ストリーミング モード(1 回限りモードまたは 1 回以上モード)
このセクションでは、ジョブを最適化するために行うことができる変更について説明します。これらの推奨事項がワークロードに適しているかどうかを判断するには、パイプラインの設計と要件を検討してください。すべての候補がすべてのパイプラインに適切または有用であるとは限りません。
大規模な変更を行う前に、データのサブセットを使用する小さなパイプラインで変更をテストします。詳細については、「大規模バッチ パイプラインのベスト プラクティス」の大規模なジョブに対して小規模なテストを実施するをご覧ください。
勤務地
ほとんどの Dataflow ジョブは、データストアやメッセージング システムなどの他のサービスとやり取りします。これらの場所を検討してください。
- ジョブが使用するリソースと同じリージョンでジョブを実行します。
- ジョブと同じリージョンに、ステージング ファイルと一時ジョブ ファイルを保存する Cloud Storage バケットを作成します。詳細については、
gcpTempLocation
とtemp_location
のパイプライン オプションをご覧ください。
マシンタイプを調整する
ワーカー VM に対して次の調整を行うと、費用対効果が向上する可能性があります。
- 必要な最小マシンタイプでジョブを実行します。パイプラインの要件に応じて、必要に応じてマシンタイプを調整します。たとえば、CPU 使用率の高いパイプラインを使用するストリーミング ジョブでは、マシンタイプをデフォルトから変更するとメリットが得られる場合があります。詳細については、マシンタイプをご覧ください。
- メモリ使用量の多いワークロードやコンピューティング負荷の高いワークロードの場合は、適切なマシンタイプを使用します。詳細については、VM の CoreMark スコア(ファミリー別)をご覧ください。
- 初期ワーカー数を設定します。ジョブをスケールアップする場合は、新しい VM に作業を再分散する必要があります。ジョブに必要なワーカー数がわかっている場合は、初期ワーカー数を設定することで、この費用を回避できます。ワーカーの初期数を設定するには、
numWorkers
またはnum_workers
パイプライン オプションを使用します。 - ワーカーの最大数を設定します。このパラメータに値を設定すると、ジョブの総費用を制限できます。パイプラインを初めてテストする場合は、比較的低い最大値から始めます。次に、本番環境ワークロードを実行するのに十分な値になるまで値を増やします。最大値を設定する前に、パイプラインの SLO を検討してください。詳細については、水平自動スケーリングをご覧ください。
- パイプラインによっては、GPU を使用するメリットがあります。詳細については、Dataflow での GPU の使用をご覧ください。
- 特にオンプレミス データにアクセスする必要がある場合は、ワーカー VM からデータにアクセスするのに十分なネットワーク帯域幅があることを確認してください。
バッチジョブの設定を最適化する
このセクションでは、バッチジョブのランタイム設定を最適化するためのヒントを紹介します。バッチジョブの場合、ジョブステージは順番に実行されるため、パフォーマンスと費用に影響する可能性があります。
柔軟なリソース スケジューリングを使用する
バッチジョブに時間的な制約がない場合は、Flexible Resource Scheduling(FlexRS)の使用を検討してください。FlexRS は、ジョブを開始するのに最適なタイミングを特定し、プリエンプティブル VM インスタンスと標準 VM を組み合わせて使用することで、バッチ処理コストを削減します。プリエンプティブル VM は、標準 VM と比べてはるかに低価格で利用できるため、総費用を削減できます。FlexRS は、プリエンプティブル VM と標準 VM を組み合わせて使用することで、Compute Engine がプリエンプティブル VM をプリエンプトした場合でも、パイプラインの処理が確実に進むようにします。
非常に小さなジョブを実行しない
可能であれば、ごく少量のデータを処理するジョブの実行は避けてください。可能であれば、大規模なデータセットで実行するジョブの数を減らします。ワーカー VM の起動と停止にはコストが発生するため、より多くのデータに対してより少ないジョブを実行すると、効率を高めることができます。
Dataflow Shuffle が有効になっていることを確認します。バッチジョブは、デフォルトで Dataflow Shuffle を使用します。
自動スケーリングの設定を調整する
バッチジョブはデフォルトで自動スケーリングを使用します。短時間実行ジョブなど、一部のジョブでは自動スケーリングは必要ありません。パイプラインで自動スケーリングのメリットが得られないと思われる場合は、自動スケーリングをオフにします。詳細については、水平自動スケーリングをご覧ください。
動的スレッド スケーリングを使用して、Dataflow が CPU 使用率に基づいてスレッド数を調整することもできます。または、ジョブに最適なスレッド数がわかっている場合は、numberOfWorkerHarnessThreads
または number_of_worker_harness_threads
パイプライン オプションを使用して、ワーカーあたりのスレッド数を明示的に設定します。
長時間実行ジョブを停止する
事前定義された実行時間が超過した場合にジョブが自動的に停止するように設定します。ジョブの実行時間がおおよそわかっている場合は、max_workflow_runtime_walltime_seconds
サービス オプションを使用して、想定よりも長く実行された場合にジョブを自動的に停止します。
ストリーミング ジョブの設定を最適化する
このセクションでは、ストリーミング ジョブのランタイム設定を最適化するためのヒントを紹介します。
Streaming Engine を使用する
Streaming Engine は、パイプラインの実行をワーカー VM から Dataflow サービスのバックエンドに移動して効率を高めます。ストリーミング ジョブには Streaming Engine を使用することをおすすめします。
1 回以上モードを検討する
Dataflow は、ストリーミング ジョブに対して 2 つのモード(exactly-once モードと at-least-once モード)をサポートしています。ワークロードで重複レコードを許容できる場合は、1 回以上のモードを使用すると、ジョブの費用を大幅に削減できます。1 回以上モードを有効にする前に、パイプラインでレコードの1 回限りの処理が必要かどうかを評価します。詳細については、パイプライン ストリーミング モードを設定するをご覧ください。
料金モデルを選択する
Dataflow ストリーミング ジョブの確約利用割引(CUD)では、一定量の Dataflow コンピューティング リソースを 1 年以上継続的に使用することを確約することで、割引料金が適用されます。Dataflow CUD は、ストリーミング ジョブの Dataflow コンピューティング容量に対する費用に、1 年以上の期間確約できる予測可能な最小値が含まれる場合に便利です。CUD を使用すると、Dataflow ジョブの費用を削減できる可能性があります。
また、リソースベースの課金の使用も検討してください。リソースベースの課金では、ジョブによって使用された Streaming Engine リソースが、Streaming Engine コンピューティング単位数で測定されます。ワーカー CPU、ワーカーメモリ、Streaming Engine コンピューティング単位数に対して課金されます。
自動スケーリングの設定を調整する
自動スケーリングのヒントを使用して、自動スケーリングの設定を調整します。詳細については、ストリーミング パイプラインの水平自動スケーリングを調整するをご覧ください。Streaming Engine を使用するストリーミング ジョブの場合、ジョブの停止や置換を行わずに自動チューニング設定を更新できます。詳細については、処理中のジョブ オプションの更新をご覧ください。
パイプラインで自動スケーリングのメリットが得られないと思われる場合は、自動スケーリングをオフにします。詳細については、水平自動スケーリングをご覧ください。
ジョブに最適なスレッド数がわかっている場合は、numberOfWorkerHarnessThreads
または number_of_worker_harness_threads
パイプライン オプションを使用して、ワーカーあたりのスレッド数を明示的に設定します。
長時間実行ジョブを停止する
ストリーミング ジョブの場合、Dataflow は失敗した作業項目をいつまでも再試行します。ジョブが終了することはありません。ただし、問題が解決するまでジョブが機能停止する可能性があります。モニタリング ポリシーを作成して、システム レイテンシの増加やデータの更新頻度の低下など、機能停止したパイプラインの兆候を検出します。パイプライン コードにエラーロギングを実装することで、繰り返し失敗する作業項目を特定できます。
- パイプライン エラーをモニタリングするには、ワーカーのエラーログのカウントをご覧ください。
- エラーのトラブルシューティングについては、Dataflow エラーのトラブルシューティングをご覧ください。
パイプライン パフォーマンス
パイプラインの実行速度が速いほど、費用を削減できます。パイプラインのパフォーマンスに影響する要因は次のとおりです。
- ジョブで使用できる並列処理
- パイプラインで使用される変換、I/O コネクタ、コーダの効率
- データのロケーション
パイプラインのパフォーマンスを改善する最初のステップは、処理モデルを理解することです。
- Apache Beam モデルと Apache Beam 実行モデルについて学習する。
- Dataflow が並列処理を管理する方法や、使用する最適化戦略など、パイプラインのライフサイクルの詳細を確認する。Dataflow ジョブは複数のワーカー VM を使用し、各ワーカーは複数のスレッドを実行します。
PCollection
の要素バンドルは、各ワーカー スレッドに分散されます。
パイプライン コードを作成する際は、次のベスト プラクティスに従ってください。
- 可能であれば、サポートされている最新の Apache Beam SDK バージョンを使用してください。リリースノートで、各バージョンの変更内容を確認してください。
- パイプライン コードの作成に関するベスト プラクティスに従います。
- I/O コネクタのベスト プラクティスに従う。
- Python パイプラインの場合は、カスタム コンテナの使用を検討してください。依存関係を事前にパッケージ化すると、ワーカーの起動時間が短縮されます。
ロギング
ロギングを行う際は、次のベスト プラクティスに従ってください。
- ロギングを過度に行うと、パフォーマンスが低下する可能性があります。
- ログの量を減らすには、パイプラインのログレベルを変更することを検討してください。詳細については、ログの量を制御するをご覧ください。
- 個々の要素はログに記録しないでください。代わりにデータ サンプリングを有効にします。
- 各エラーをロギングするのではなく、要素ごとのエラーにデッドレター パターンを使用します。
テスト
パイプラインをテストすることには、SDK のアップグレード、パイプラインのリファクタリング、コードレビューの支援など、多くのメリットがあります。CPU 使用率の高いカスタム変換の変更など、多くの最適化は、Dataflow でジョブを実行しなくてもローカルでテストできます。
バッチ パイプラインの要素の合計数、ストリーミング パイプラインの 1 秒あたりの要素数、要素サイズ、キー数など、ワークロードの現実的なテストデータを使用して大規模なパイプラインをテストします。パイプラインを、安定状態と、クラッシュ復旧をシミュレートするために大量のバックログを処理する 2 つのモードでテストします。
単体テスト、統合テスト、エンドツーエンド テストの作成の詳細については、パイプラインをテストするをご覧ください。テストの例については、dataflow-ordered-processing
GitHub リポジトリをご覧ください。