Dataflow 分析情報を使用する

Dataflow 分析情報を使用すると、ジョブのパフォーマンスを最適化できます。このトピックでは、gcloud または REST API で Dataflow 分析情報を使用する方法を説明します。Dataflow コンソールで分析情報を確認することもできます。Console で分析情報を確認する方法については、推奨事項をご覧ください。

概要

Dataflow 分析情報では、ジョブのパフォーマンスの向上、コストの削減、エラーのトラブルシューティングに関する分析情報が提供されます。Dataflow 分析情報は Recommender サービスの一部であり、google.dataflow.diagnostics.Insight タイプで使用できます。

Dataflow 分析情報を使用する場合、ユースケースに関連しない推奨事項が提供されることがあります。

始める前に

Dataflow 分析情報を使用する前に、次の手順を行う必要があります。

  1. Recommender API を有効にします
  2. アカウントに次の権限が付与されていることを確認します。

    • recommender.dataflowDiagnosticsInsights.get
    • recommender.dataflowDiagnosticsInsights.list
    • recommender.dataflowDiagnosticsInsights.update

    これらの権限は、個別に付与することも、次のいずれかのロールを付与することもできます。

    • roles/recommender.dataflowDiagnosticsViewer
    • roles/recommender.dataflowDiagnosticsAdmin
    • roles/dataflow.viewer
    • roles/dataflow.developer
    • roles/dataflow.admin

Dataflow の分析情報をリクエストする

次のように、Dataflow の分析情報を一覧表示できます。他の種類の分析情報の使い方については、Recommender API の分析情報ガイドをご覧ください。

Dataflow の分析情報を一覧表示する

特定のリージョンのプロジェクトの Dataflow 分析情報を一覧表示するには、次のいずれかの方法を使用します。

gcloud

gcloud recommender insights list コマンドを使用して、指定したリージョンのプロジェクトの Dataflow 分析情報をすべて表示します。

コマンドを実行する前に、次の値を置き換えます。

  • PROJECT_ID: 分析情報を一覧表示するプロジェクトの ID。
  • REGION: Dataflow ジョブが実行されているリージョン。例: us-west1
gcloud recommender insights list --insight-type=google.dataflow.diagnostics.Insight \
  --project=PROJECT_ID \
  --location=REGION

出力には、指定したリージョンのプロジェクトの Dataflow 分析情報が一覧表示されます。

REST

Recommender API の insights.list メソッドを使用して、指定されたリージョンのプロジェクトの Dataflow 分析情報を覧表示できます。

リクエストのデータを使用する前に、次のように置き換えます。

  • PROJECT_ID: 分析情報を一覧表示するプロジェクトの ID。
  • REGION: Dataflow ジョブが実行されているリージョン。例: us-west1

HTTP メソッドと URL:

GET https://recommender.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/insightTypes/google.dataflow.diagnostics.Insight/insights

curl(Linux、macOS、Cloud Shell)を使用してリクエストを送信するには、次のコマンドを実行します。

curl -X GET \
  -H "Authorization: Bearer "$(gcloud auth application-default print-access-token) \
  "https://recommender.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/insightTypes/google.dataflow.diagnostics.Insight/insights"

単一の Dataflow 分析情報を取得する

1 つの分析情報に関する詳細(分析情報の説明、ステータス、推奨事項など)を取得するには、次のいずれかの方法を使用します。

gcloud

1 つの分析情報に関する情報を表示するには、gcloud recommender insights describe コマンドを使用して分析情報 ID を指定します。コマンドを実行する前に、次の値を置き換えます。

  • INSIGHT_ID: 表示する分析情報の ID。
  • PROJECT_ID: 分析情報を一覧表示するプロジェクトの ID。
  • REGION: Dataflow ジョブが実行されているリージョン。例: us-west1
gcloud recommender insights describe INSIGHT_ID \
  --insight-type=google.dataflow.diagnostics.Insight \
  --project=PROJECT_ID \
  --location=REGION

出力に分析情報の詳細が表示されます。

REST

1 つの分析情報を取得するには、Recommender API の insights.get メソッドを使用します。リクエストのデータを使用する前に、次のように置き換えます。

  • PROJECT_ID: 分析情報を一覧表示するプロジェクトの ID。
  • REGION: Dataflow ジョブが実行されているリージョン。例: us-west1
  • INSIGHT_ID: 表示する分析情報の ID。

HTTP メソッドと URL:

GET https://recommender.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/insightTypes/google.dataflow.diagnostics.Insight/insights/INSIGHT_ID

curl(Linux、macOS、Cloud Shell)を使用してリクエストを送信するには、次のコマンドを実行します。

curl -X GET \
  -H "Authorization: Bearer "$(gcloud auth application-default print-access-token) \
  "https://recommender.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/insightTypes/google.dataflow.diagnostics.Insight/insights/INSIGHT_ID"

Dataflow の分析情報を解釈する

1 つの分析情報を取得したら、その内容を確認して、ハイライトされたリソース使用量のパターンを把握できます。Dataflow の分析情報には、標準の分析情報属性に加えて、次のサブタイプが用意されています。

  • AUTOSCALING_NOT_ENABLED: 自動スケーリングを有効にできます。このジョブは CPU 使用率が高く、ワーカーの最大数を設定しています。自動スケーリングを有効にすると、パフォーマンスが向上する場合があります。
  • HIGH_FAN_OUT: 1 つ以上の変換の後に融合ブレークを挿入すると、並列処理を強化できます。
  • MAX_NUM_WORKERS: 自動スケーリング: ワーカーの最大数を増やせる可能性ジョブは自動スケーリングを使用しており、CPU 使用率が高く、ワーカーの最大数が設定されています。ワーカーの最大数を増やすと、パフォーマンスが向上する場合があります。
  • WORKER_OUT_OF_MEMORY: メモリが不足しているため、ジョブの一部のワーカーが失敗しました。これにより、ジョブが遅くなったり、失敗する可能性があります。
  • PREBUILD_NOT_UTILIZED: ワーカー イメージの事前ビルド ワークフローを使用して、ワーカーの起動時間と自動スケーリングの信頼性を向上させます。
  • ACTIVE_KEYSプレビュー): アクティブなキーの総数がコアの総数より少なく、スケールアップでは解決しません。
  • LONG_WORK_ITEM: 融合ステージでの処理に時間がかかりすぎているため、実行速度が遅いか停止していることを示しています。

Dataflow の分析情報で特定された問題を緩和する方法については、分析情報をご覧ください。

Dataflow 分析情報には content フィールドもあります。このフィールドには、分析情報の追加情報とメタデータを含むサブフィールドが含まれています。ユースケースに応じて、次の content サブフィールドを使用すると便利です。

  • jobName: Dataflow ジョブ名。
  • description: 分析情報の説明(英語)。
  • title: 分析情報のタイトル(英語)。

分析情報

高いファンアウトが検出された

Dataflow がジョブに 1 つ以上「高いファンアウト」の変換があることを検出した場合、次のメッセージが表示されます。

High fan-out detected

このメッセージは、入力対要素数の比率が高い ParDo が、後続の ParDo と融合された場合に表示されます。この状況では、2 番目の ParDo は最初のものと順次実行されます。これにより、特定の入力のすべての出力要素が同じワーカーに強制されるため、並列で処理されずパフォーマンスが低下します。

この問題を解決するには:

  • GroupByKey を挿入し、最初の ParDo の後でグループ化を解除します。Dataflow サービスが、集約間で ParDo オペレーションを融合することはありません。詳細については、融合の最適化をご覧ください。
  • 中間 PCollection を副入力として別の ParDo に渡します。Dataflow サービスは常に副入力を実体化します。
  • 再シャッフル ステップを挿入します。再シャッフルでは、融合が防止され、データがチェックポイント処理され、データが破棄されないようにウィンドウ処理戦略が再構成されます。再シャッフルは、Apache Beam のドキュメントで非推奨とされていても Dataflow でサポートされています(データを再シャッフルすると、パイプラインの実行費用が高くなる可能性があります)。

自動スケーリング: ワーカーの最大数を増やせる可能性

Dataflow が、ジョブで許可されるワーカーの最大数 maxNumWorkers(または max_num_workers)を使用していることを検出し、この最大値が引き上げられるとジョブがより多くのワーカーを使用する可能性がある場合、次のメッセージが表示されます。

maximum number of workers could be increased

たとえば、maxNumWorkers が 50 に設定されていて、50 個のすべてのワーカーを使用しており、平均ワーカー CPU 使用率が 80% を超えているバッチまたはストリーミング ジョブの場合、この推奨事項が表示されます。この推奨事項は、maxNumWorkers が 50 に設定されていて、50 個のすべてのワーカーを使用しており、平均ワーカー CPU 使用率が 50% を超えていて、ジョブの推定処理時間が 2 分を超えるストリーミング ジョブの場合にも表示されます。

通常、maxNumWorkers を増やすと、パイプラインのスループットが向上します。バッチ パイプラインは短時間で完了するようになります。ストリーミング パイプラインは、より大きなデータスパイクを処理できるようになり、1 秒あたりに処理される要素数が増加します。ただし、費用が増加する可能性があります。詳細については、ワーカー リソースの料金をご覧ください。自動スケーリング アルゴリズムの仕組みとその構成方法については、自動スケーリング ガイドをご覧ください。

この問題を解決するには:

  • maxNumWorkers パイプライン オプションを増やすか、削除します。このオプションを指定しないと、Dataflow は自動スケーリング ガイドに記載されているデフォルトを使用します。
  • パイプラインのパフォーマンスが十分であれば、何もする必要はありません。
    • バッチ パイプラインの場合、合計実行時間が要件を満たしていることを確認します。
    • ストリーミング パイプラインの場合は、ジョブページの [ジョブの指標] タブでデータの更新頻度グラフを確認します。グラフの値が継続的に増加していないことと、範囲内にあることを確認してください。

自動スケーリング: ワーカーの初期数を設定すると、ジョブのパフォーマンスが向上する場合があります

ジョブが実行時間の 50% 超にわたって特定数のワーカーを使用していることを Dataflow が検出した場合は、ワーカーの初期数を推奨値に設定すると、バッチジョブの実行時間を短縮する、またはストリーミング ジョブの更新時にバックログの増加を防止することにより、ジョブのパフォーマンスが向上する可能性があります。

ワーカーが OutOfMemory エラーで失敗する

メモリ不足エラーのためにジョブのワーカーが失敗していることを Dataflow が検出すると、次のメッセージが表示されます。

Some workers are out of memory

メモリ不足のため、ジョブの一部のワーカーが失敗しました。ジョブは完了する可能性がありますが、これらのエラーによってジョブが正常に完了しなかったり、パフォーマンスが低下する場合があります。

以下の方法を試すことをおすすめします。

事前ビルド ワークフローを利用できない

ワーカー イメージの事前ビルド ワークフローを使用していないパイプラインが Dataflow によって検出されると、次のメッセージが表示されます。

pre-build workflow not utilized

ワーカー イメージの事前ビルド ワークフローを使用しない場合は、パイプラインに、実行時に繰り返しインストールされる依存関係があります。この構成ではワーカーの起動時間が長くなるため、ジョブのスループットが低下し、自動スケーリングの信頼性が低下します。

この問題を解決するには、パイプラインの起動時にワーカー イメージの事前ビルド ワークフローを使用します。詳細については、Python 依存関係を事前にビルドするをご覧ください。

カスタマイズされたビルド済みコンテナがすでに使用されている場合は、不要なインストールを避けるために、'--sdk_location=container' オプションを追加し、次のオプションを削除します。

  • '--setup_file'
  • '--requirements_file'
  • '--extra_package(s)'

アクティブなキーが少ない

アクティブなキーの数がコアの総数より少ないためにジョブが遅れていることが Dataflow によって検出されると、次のメッセージが表示されます。この場合、スケールアップでは問題が解決しません。

Active keys can be increased

Dataflow は、ジョブでユーザーコードを実行するためにワーカーを使用します。各スレッドは、処理するデータセットのキーにマッピングされます。また、精度を確保するために、キーは一度に 1 つのコアでしか実行されません。

一部のコアの負荷が過剰で、他のコアがアイドル状態になっている場合もあります。この問題を解決するには、キーの数を増やします。これにより、アクティブなスレッドの数も増加します。

キーを増やす場合に可能な解決策: - より具体的なキータイプを使用して、キーの数を増やすことができます。たとえば、キータイプが IP address の場合、使用できるキーは少なくなります。ただし、キータイプを IP + [user identifier] に変更すると、使用可能なキーが増え、並列処理が向上します。- シンクがボトルネックとなる可能性がある BigQuery に書き込むパイプラインについては、こちらの記事をご覧ください。- 他のソースまたはシンクの場合は、numShards パラメータが含まれているかどうかをチェックして、キーを増やします。一般に、1 つのシャードは 1 つのキーにマッピングされます。- 実行モデルに関する一般的なガイダンスについては、こちらの記事をご覧ください。- ファンアウトを使用して単一の入力キーを取得し、それにハッシュを追加して複数の出力キーを生成できます。リファレンス

ステージの作業に時間がかかりすぎている

処理の完了に時間がかかりすぎることが Dataflow によって検出されると、次のメッセージが表示されます。

Stage spending too long on work

Dataflow は、処理する要素のバンドルに含まれる融合ステージに作業を送信します。ステージのすべての要素とその出力が処理されると、各バンドルは完了したものと見なされます。ストリーミング パイプラインは、処理が完了するまでの時間が 1 分未満の作業バンドルを中心に最適化されているため、処理時間が長くなると、パイプラインのパフォーマンスがさらに問題になる可能性があります。

この問題は、ユーザーの変換が停止しているか遅い場合に発生することがあります。Cloud Logging とその [診断] タブに出力される警告で、このような変換は「オペレーションの進行中」、「処理が停止しています」などのキーフレーズで識別できます。この問題がユーザー変換によって引き起こされたかどうかを診断するには、Cloud Profiler を使用してユーザー変換のパフォーマンスを調べます。次に、速度低下の原因となっているコードと頻度をトレースします。詳細については、一般的な Dataflow エラーのトラブルシューティングをご覧ください。

調査の結果、長時間の処理がユーザー変換によるものでないことが判明した場合は、Cloud サポートに連絡し、実施した調査手順について説明することをおすすめします。

ジョブが作業アイテムから先に進まない

1 つの作業アイテムが繰り返し失敗して再試行されたためにキーが停止したことを Dataflow が検出すると、次のメッセージが表示されます。

Job is stuck due to failed and retried work item

Dataflow では、パイプライン内のすべてのメッセージが特定のキーで処理されます。メッセージの処理中にエラーが発生すると、そのメッセージは再試行されます。メッセージが 2、3 回再試行されても許容範囲内ですが、エラーが繰り返し発生する場合は(10 回連続など)、パイプライン コードに根本的な問題があることを示しています。キーの特定のメッセージが再試行されると、同じキーの他のメッセージは処理を続行できなくなります。メッセージが 10 回以上失敗した場合、問題は自然に解決されない可能性が高くなります。このメッセージの失敗により、パイプラインで次のような問題が発生する可能性があります。

  • ウォーターマークが遅延する
  • バックログが発生する
  • ドレイン オペレーションが完了しない

この問題をデバッグするには、推奨事項で報告されたステージを調査し、ログを参照して問題のあるコードを特定します。その後、新しいパイプライン コードでジョブを更新し、ジョブが停止しないようにします。

Streaming Engine が有効になっていない

ストリーミング ジョブで Streaming Engine が有効になっていないことを Dataflow が検出すると、次のメッセージが表示されます。

This job isn't using Streaming Engine. It might benefit from having Streaming Engine enabled.

Streaming Engine を使用すると、水平自動スケーリングの強化、サポート性の向上、ワーカー VM の CPU、メモリ、Persistent Disk ストレージ リソースの使用量の削減など、さまざまなメリットが期待できます。Streaming Engine はリソースベースの課金もサポートしています。