このドキュメントでは、費用を最小限に抑えることを目的として Dataflow ジョブを最適化するためのベスト プラクティスについて説明します。費用に影響する要因について説明し、費用をモニタリングして管理する手法を紹介します。
Dataflow ジョブの費用の計算方法の詳細については、Dataflow の料金をご覧ください。
ジョブの費用に大きな影響を与える要因はいくつかあります。
- 実行時間の設定
- パイプライン パフォーマンス
- パイプラインのスループット要件
以降のセクションでは、ジョブのモニタリング方法、ジョブの費用に影響する要因、パイプラインの効率を改善するための提案について詳しく説明します。
SLO を定義する
最適化を開始する前に、パイプラインのサービスレベル目標(SLO)(特にスループットとレイテンシ)を定義します。これらの要件は、費用とその他の要素のトレードオフについて検討する際に役立ちます。
- パイプラインでエンドツーエンドの取り込みレイテンシを短くする必要がある場合、パイプラインの費用が高くなる可能性があります。
- 遅延して到着したデータを処理する必要がある場合、パイプラインの全体的な費用が高くなる可能性があります。
- ストリーミング パイプラインに処理が必要なデータスパイクがある場合、パイプラインに追加の容量が必要になるため、費用が増加する可能性があります。
ジョブのモニタリング
ジョブを最適化する方法を判断するには、まずジョブの動作を理解する必要があります。Dataflow モニタリング ツールを使用して、パイプラインの実行をモニタリングします。この情報を使用して、パフォーマンスと効率を改善します。
費用のモニタリング
次の手法を使用して、費用を予測してモニタリングします。
- 本番環境でパイプラインを実行する前に、データのサブセットで 1 つ以上の小規模なジョブを実行します。多くのパイプラインで、この手法を使用して費用を見積もることが可能です。
- Dataflow モニタリング インターフェースの [料金] ページを使用して、ジョブの推定料金をモニタリングします。推定料金は、契約上の割引など、さまざまな理由で実際のジョブの料金が反映されていない場合がありますが、費用の最適化の適切なベースラインとして使用できます。詳細については、費用のモニタリングをご覧ください。
- Cloud Billing データを BigQuery にエクスポートし、請求エクスポート テーブルでコスト分析を行います。Cloud Billing のエクスポート機能を使用すると、詳細な Google Cloud 請求データを 1 日を通して BigQuery データセットに自動的にエクスポートできます。請求データには、使用量、費用の見積もり、料金データが含まれます。
- 予期しない費用を回避するには、Dataflow ジョブが定義したしきい値を超えたときにモニタリング アラートを作成します。詳細については、Dataflow パイプラインに Cloud Monitoring を使用するをご覧ください。
ジョブのモニタリング
ジョブをモニタリングし、パイプラインの効率を改善できる可能性がある領域を特定します。
- Dataflow ジョブ モニタリング インターフェースを使用して、パイプラインの問題を特定します。モニタリング インターフェースには、各パイプラインのジョブグラフと実行の詳細が表示されます。これらのツールはどちらも、パイプラインを理解し、遅いステージ、停止したステージ、経過時間が長すぎるステップを特定するのに役立ちます。
- Metrics Explorer を使用して、Dataflow ジョブの詳細な指標を表示します。カスタム指標を使用して、パフォーマンス データを取得できます。
Distribution
指標は、パフォーマンス データの収集に特に役立ちます。 - CPU 使用率の高いパイプラインの場合は、Cloud Profiler を使用して、パイプライン コードで最も多くのリソースを消費する部分を特定します。
- データ サンプリングを使用して、データに関する問題を特定します。データ サンプリングを使用すると、Dataflow パイプラインの各ステップでデータをモニタリングできます。実行中のジョブや完了したジョブの実際の入出力を確認できるので、この情報はパイプラインに関する問題のデバッグに役立ちます。
- プロジェクト モニタリング ダッシュボードをカスタマイズして、費用が高くなる可能性のあるジョブを表示します。詳細については、Dataflow モニタリング ダッシュボードをカスタマイズするをご覧ください。
ロギングには上限があり、ロギングが過剰になるとジョブのパフォーマンスが低下する可能性があるため、大容量のパイプラインで要素ごとの処理指標をロギングすることはおすすめしません。
ランタイム設定を最適化する
次のランタイム設定は、費用に影響する可能性があります。
- ストリーミング ジョブとバッチジョブのどちらを実行するか
- ジョブの実行に使用するサービス(Streaming Engine や FlexRS など)
- ワーカー VM のマシンタイプ、ディスクサイズ、GPU の数
- 自動スケーリング モード
- 最初のワーカー数とワーカーの最大数
- ストリーミング モード(1 回限りモードまたは 1 回以上モード)
このセクションでは、ジョブを最適化するために行う可能性のある変更について説明します。これらの推奨事項がワークロードに適しているかどうかを判断するには、パイプラインの設計と要件を考慮してください。すべての提案がすべてのパイプラインに適しているとは限りません。
大規模な変更を行う前に、データのサブセットを使用する小規模なパイプラインで変更をテストします。詳細については、「大規模バッチ パイプラインのベスト プラクティス」の大規模なジョブに対して小規模なテストを実施するをご覧ください。
ジョブのロケーション
ほとんどの Dataflow ジョブは、データストアやメッセージング システムなどの他のサービスとやり取りします。これらのロケーションを検討します。
- ジョブで使用するリソースと同じリージョンでジョブを実行します。
- ステージング ファイルと一時ジョブファイルを保存する Cloud Storage バケットを、ジョブと同じリージョンに作成します。詳細については、
gcpTempLocation
とtemp_location
のパイプライン オプションをご覧ください。
マシンタイプを調整する
ワーカー VM を次のように調整すると、費用対効果が向上する可能性があります。
- 必要な最小のマシンタイプでジョブを実行します。パイプラインの要件に基づいて、必要に応じてマシンタイプを調整します。たとえば、CPU 使用率の高いパイプラインを使用するストリーミング ジョブでは、マシンタイプをデフォルトから変更するとメリットが得られることがあります。詳細については、マシンタイプをご覧ください。
- メモリやコンピューティングを多用するワークロードには、適切なマシンタイプを使用します。詳細については、VM の CoreMark スコア(ファミリー別)をご覧ください。
- 最初のワーカー数を設定します。ジョブがスケールアップされると、新しい VM に作業を再分配する必要があります。ジョブに必要なワーカーの数がわかっている場合は、最初のワーカー数を設定することで、この費用を回避できます。最初のワーカー数を設定するには、
numWorkers
またはnum_workers
パイプライン オプションを使用します。 - ワーカーの最大数を設定します。このパラメータに値を設定すると、ジョブの総費用を制限できる可能性があります。パイプラインを初めてテストするときは、比較的低い最大値から始めます。次に、本番環境ワークロードを実行するのに十分な値になるまで値を増やします。最大値を設定する前に、パイプラインの SLO を検討してください。詳細については、水平自動スケーリングをご覧ください。
- Right Fitting を使用して、特定のパイプライン ステップのリソース要件をカスタマイズします。
- パイプラインによっては、GPU を使用するとメリットがあります。詳細については、GPU と Dataflow をご覧ください。Right Fitting を使用すると、パイプラインの特定のステップに GPU を構成できます。
- 特に、オンプレミス データにアクセスする必要がある場合は、ワーカー VM からデータにアクセスするのに十分なネットワーク帯域幅があることを確認します。
バッチジョブの設定を最適化する
このセクションでは、バッチジョブのランタイム設定を最適化するための推奨事項について説明します。バッチジョブの場合、ジョブステージは順番に実行されるため、パフォーマンスと費用に影響する可能性があります。
柔軟なリソース スケジューリングを使用する
バッチジョブが時間の制約が厳しくない場合は、Flexible Resource Scheduling(FlexRS)の使用を検討してください。FlexRS は、ジョブを開始する最適なタイミングを見つけて、プリエンプティブル VM インスタンスと標準 VM の組み合わせを使用することで、バッチ処理のコストを削減します。プリエンプティブル VM は標準 VM と比べてはるかに低価格で利用できるため、総費用を削減できます。FlexRS は、プリエンプティブル VM と標準 VM の組み合わせを使用することで、Compute Engine がプリエンプティブル VM をプリエンプトした場合でも、パイプラインの処理を続行できるようにします。
非常に小さなジョブの実行を避ける
可能であれば、処理するデータ量が非常に少ないジョブの実行は避けてください。可能であれば、より大きなデータセットに対して、より少ないジョブを実行します。ワーカー VM の起動と停止には費用が発生するため、より多くのデータで実行するジョブの数を減らすことで、効率を向上させることができます。
Dataflow Shuffle が有効になっていることを確認します。バッチジョブは、デフォルトで Dataflow Shuffle を使用します。
自動スケーリングの設定を調整する
デフォルトでは、バッチジョブは自動スケーリングを使用します。短時間で実行されるジョブなど、一部のジョブでは自動スケーリングは必要ありません。パイプラインで自動スケーリングのメリットが得られないと思われる場合は、自動スケーリングを無効にします。詳細については、水平自動スケーリングをご覧ください。
Dynamic Thread Scaling を使用して、CPU 使用率に基づいて Dataflow がスレッド数を調整できるようにすることも可能です。または、ジョブに最適なスレッド数がわかっている場合は、numberOfWorkerHarnessThreads
または number_of_worker_harness_threads
パイプライン オプションを使用して、ワーカーあたりのスレッド数を明示的に設定します。
長時間実行ジョブを停止する
ジョブが事前に設定された実行時間を超えた場合に自動的に停止するように設定します。ジョブの実行にかかるおおよその時間がわかっている場合は、max_workflow_runtime_walltime_seconds
サービス オプションを使用して、ジョブが想定よりも長く実行された場合に自動的に停止します。
ストリーミング ジョブの設定を最適化する
このセクションでは、ストリーミング ジョブのランタイム設定を最適化するための推奨事項について説明します。
Streaming Engine を使用する
Streaming Engine は、パイプラインの実行をワーカー VM から Dataflow サービスのバックエンドに移動し、効率を高めます。ストリーミング ジョブには Streaming Engine を使用することをおすすめします。
1 回以上モードを検討する
Dataflow は、ストリーミング ジョブに対して 2 つのモード(1 回限りモードと 1 回以上モード)をサポートしています。ワークロードで重複レコードを許容できる場合は、1 回以上モードを使用すると、ジョブの費用を大幅に削減できます。1 回以上モードを有効にする前に、パイプラインでレコードの 1 回限りの処理が必要かどうかを評価します。詳細については、パイプライン ストリーミング モードを設定するをご覧ください。
料金モデルを選択する
Dataflow ストリーミング ジョブの確約利用割引(CUD)では、一定量の Dataflow コンピューティング リソースを 1 年以上継続的に使用することを確約することで、割引料金が適用されます。 Dataflow CUD は、ストリーミング ジョブの Dataflow のコンピューティング容量に対する費用に、1 年以上の期間確約できる予測可能な最小値が含まれる場合に有益です。CUD を使用すると、Dataflow ジョブの費用を削減できる可能性があります。
リソースベースの課金の使用も検討してください。リソースベースの課金では、ジョブによって使用された Streaming Engine リソースが、Streaming Engine コンピューティング単位数で測定されます。ワーカー CPU、ワーカーメモリ、Streaming Engine コンピューティング単位数に対して課金されます。
自動スケーリングの設定を調整する
自動スケーリングのヒントを使用して、自動スケーリングの設定を調整します。詳細については、ストリーミング パイプラインの水平自動スケーリングを調整するをご覧ください。Streaming Engine を使用するストリーミング ジョブの場合、ジョブの停止や置換を行わずに自動チューニング設定を更新できます。詳細については、処理中のジョブ オプションの更新をご覧ください。
パイプラインで自動スケーリングのメリットが得られないと思われる場合は、自動スケーリングを無効にします。詳細については、水平自動スケーリングをご覧ください。
ジョブに最適なスレッド数がわかっている場合は、numberOfWorkerHarnessThreads
または number_of_worker_harness_threads
パイプライン オプションを使用して、ワーカーあたりのスレッド数を明示的に設定します。
長時間実行ジョブを停止する
ストリーミング ジョブの場合、Dataflow は失敗した作業項目をいつまでも再試行します。ジョブが終了することはありません。ただし、問題が解決するまでジョブが機能停止する可能性があります。モニタリング ポリシーを作成して、システム レイテンシの増加やデータの更新頻度の低下など、機能停止したパイプラインの兆候を検出します。パイプライン コードにエラーロギングを実装することで、繰り返し失敗する作業項目を特定できます。
- パイプライン エラーをモニタリングするには、ワーカーのエラーログのカウントをご覧ください。
- エラーのトラブルシューティングについては、Dataflow エラーのトラブルシューティングをご覧ください。
パイプライン パフォーマンス
実行速度が速いパイプラインは、コストが低くなる可能性があります。パイプラインのパフォーマンスに影響を与える要因は次のとおりです。
- ジョブで使用可能な並列処理
- パイプラインで使用される変換、I/O コネクタ、コーダーの効率
- データのロケーション
パイプラインのパフォーマンスを改善する最初の手順は、処理モデルを理解することです。
- Apache Beam モデルと Apache Beam 実行モデルについて学習します。
- Dataflow が並列処理を管理する方法や、使用する最適化戦略など、パイプラインのライフサイクルについて学習します。Dataflow ジョブは複数のワーカー VM を使用し、各ワーカーは複数のスレッドを実行します。
PCollection
からの要素バンドルは、各ワーカー スレッドに分散されます。
パイプライン コードを作成する際は、次のベスト プラクティスに従ってください。
- 可能な場合は、サポートされている最新の Apache Beam SDK バージョンを使用します。さまざまなバージョンの変更内容については、リリースノートをご覧ください。
- パイプライン コードの作成に関するベスト プラクティスに従います。
- I/O コネクタのベスト プラクティスに従います。
- Python パイプラインの場合は、カスタム コンテナの使用を検討してください。依存関係の事前パッケージングにより、ワーカーの起動時間が短縮されます。
ロギング
ロギングを行う際は、次のベスト プラクティスに従ってください。
- 過剰なロギングはパフォーマンスを低下させる可能性があります。
- ログの量を減らすには、パイプラインのログレベルの変更を検討してください。詳細については、ログの量を制御するをご覧ください。
- 個々の要素はログに記録しないでください。代わりにデータ サンプリングを有効にします。
- 要素ごとのエラーには、各エラーをロギングするのではなく、デッドレター パターンを使用します。
テスト
パイプラインをテストすると、SDK のアップグレード、パイプラインのリファクタリング、コードレビューなどに役立ちます。CPU 使用率の高いカスタム変換の再設計など、多くの最適化は、Dataflow でジョブを実行しなくてもローカルでテストできます。
バッチ パイプラインの要素の合計数、ストリーミング パイプラインの 1 秒あたりの要素数、要素のサイズ、キーの数など、ワークロードの現実的なテストデータを使用して、大規模なパイプラインをテストします。パイプラインを 2 つのモードでテストします。安定状態と、クラッシュ復旧をシミュレートするための大量のバックログの処理です。
単体テスト、統合テスト、エンドツーエンド テストの作成の詳細については、パイプラインをテストするをご覧ください。テストの例については、dataflow-ordered-processing
GitHub リポジトリをご覧ください。