Dataflow モニタリング インターフェースを使用する

Dataflow マネージド サービスを使用してパイプラインを実行する場合、Dataflow のウェブベースのモニタリング ユーザー インターフェースを使用して、実行中のジョブやその他の情報を表示できます。モニタリング インターフェースでは、Dataflow ジョブを表示して操作できます。

Google Cloud Console を使用すると、Dataflow モニタリング インターフェースにアクセスできます。このモニタリング インターフェースには、次の項目が表示されます。

  • 現在実行中のジョブを含め、過去 30 日間に実行されたすべての Dataflow ジョブのリスト
  • 各パイプラインの図
  • ジョブのステータス、タイプ、SDK のバージョンに関する詳細
  • パイプラインを実行している Google Cloud サービス(Compute EngineCloud Storage など)に関する情報へのリンク。
  • ジョブ中に発生したエラーまたは警告。
  • ジョブに関する追加の診断。

ジョブ モニタリングのグラフは、Dataflow モニタリング インターフェース内で表示できます。これらのグラフには、パイプライン ジョブの期間中の指標が表示され、次の情報が含まれます。

  • パイプラインの遅れを引き起こしている可能性のあるステップを特定するためのステップレベルの可視性。
  • 異常な動作を表面化させることのできる統計情報。
  • 参照元やシンク内のボトルネックの特定に役立つ I/O の指標。

Dataflow モニタリング インターフェースにアクセスする

Dataflow モニタリング インターフェースにアクセスする方法は次のとおりです。

  1. Cloud Console にログインします。
  2. Google Cloud プロジェクトを選択します。
  3. 左上隅のメニューをクリックします。
  4. [ビッグデータ] セクションに移動し、[Dataflow] をクリックします。

Dataflow ジョブのリストとそれぞれのステータスが表示されます。ジョブが表示されない場合は、新しいジョブを実行する必要があります。ジョブの実行方法については、Dataflow のクイックスタートをご覧ください。

ジョブの状態として「実行中」、「失敗」、「成功」が示された Dataflow ジョブのリスト。
図 1: それぞれのステータス(実行中失敗成功)での Cloud Console の Dataflow ジョブのリスト。

ジョブのステータスは以下のとおりです。

  • -: モニタリング インターフェースはまだ Dataflow サービスからステータスを受け取っていません。
  • 実行中: ジョブは実行中です。
  • 開始しています...: ジョブは作成されていますが、起動する前に準備の時間が必要です。
  • キューに格納済み: FlexRS ジョブがキューに格納されています。
  • キャンセルしています...: ジョブはキャンセル中です。
  • キャンセルしました: ジョブはキャンセルされています。
  • ドレインしています...: ジョブはドレイン中です。
  • ドレインされました: ジョブはドレインされます。
  • 更新しています...: ジョブは更新中です。
  • 更新しました: ジョブが更新されます。
  • 完了しました: ジョブは正常に終了しました。
  • 失敗しました: ジョブは完了しませんでした。

パイプラインの詳細については、ジョブの [名前] をクリックしてください。

ジョブ モニタリングのグラフへのアクセス

ジョブのモニタリング グラフにアクセスするには、Dataflow モニタリング インターフェース内のジョブの [名前] をクリックします。[ジョブの詳細] ページが表示されます。このページには、次の情報が含まれています。

  • ジョブのグラフ: パイプラインの視覚的表現
  • 実行の詳細: パイプラインのパフォーマンスを最適化するツール
  • ジョブの指標: ジョブの実行に関する指標
  • ジョブの情報パネル: パイプラインに関する記述情報
  • ジョブのログ: Dataflow サービスによってジョブレベルで生成されたログ
  • ワーカーログ: Dataflow サービスによってワーカーレベルで生成されたログ
  • 診断: 選択したタイムラインでエラーが発生した場所と、パイプラインに対して発生する可能性のある推奨事項を示すテーブル
  • タイムセレクタ: 指標の期間を調整できるツール

Job の詳細ページでは、ジョブのグラフ実行の詳細ジョブの指標でジョブビューを切り替えることができます。

ジョブのグラフタブが選択された Dataflow モニタリング インターフェースのビュー。このモードでは、パイプライン グラフ、ジョブ情報、ジョブログ、ワーカーログ、診断ツール、タイムセレクタ ツールを表示できます。

ジョブの指標タブが選択された Dataflow モニタリング インターフェースのビュー。このモードでは、ジョブ指標のグラフ、ジョブ情報、ジョブログ、ワーカーログ、診断ツール、タイムセレクタ ツールを表示できます。

Cloud Monitoring アラートの作成

Dataflow は Cloud Monitoring と完全に統合されているので、ジョブがユーザー定義のしきい値を超えたときのアラートを作成できます。指標グラフから Cloud Monitoring アラートを作成するには、[通知ポリシーを作成] をクリックします。

[通知ポリシーを作成] リンクをクリックすると、指標グラフからアラートを作成できます。

これらのアラートの作成手順については、Dataflow パイプラインの Cloud Monitoring を使用するページをご覧ください。モニタリング グラフを表示できない場合やアラートを作成できない場合は、追加のMonitoring 権限が必要な場合があります。

全画面モード

指標のグラフを全画面表示するには、[] をクリックします。

タイムセレクタ ツールの使用

タイムセレクタ ツールを使用すると、指標の期間を調整できます。事前定義された期間を選択するか、カスタムの時間間隔を選択してジョブを分析できます。

タイムセレクタ ツールを使って、時間や日数単位、またはカスタム範囲を指定して期間を選択できます。

ストリーミングまたは処理中のバッチジョブの場合、グラフのデフォルト表示には、そのジョブの過去 6 時間の指標が表示されます。ストリーミング ジョブが停止または完了した場合、グラフのデフォルト表示では、ジョブの実行時間全体が表示されます。

ステップとワーカーの指標

次の指標のグラフを表示できます。

  • データの鮮度(ストリーミング パイプラインのみ)
  • システム レイテンシ(ストリーミング パイプラインのみ)
  • 自動スケーリング
  • スループット
  • CPU 使用率
  • ワーカーのエラーログのカウント
  • 入力と出力の指標

これらのグラフの詳細情報を表示するには、凡例切り替えボタンをクリックして「グラフの凡例を展開」します。

凡例切り替えボタンは、[アラート ポリシーを作成] ボタンの近くにあります。

データの鮮度(ストリーミング パイプラインのみ)

データの鮮度はリアルタイムと出力ウォーターマークの間の経過時間です。パイプラインの各ステップには、出力データ ウォーターマークがあります。出力データ ウォーターマークがTの場合、イベント時間がTより前のすべての要素が計算処理されたことを示します。出力データ ウォーターマークは、すべての上流計算の中で最も早い入力データ ウォーターマークによって制限されます。一部の入力データがまだ処理されていない場合、出力ウォーターマークが保持され、データの更新に影響します。

ストリーミング パイプラインでデータの鮮度を示すデータの画像。

システム レイテンシ(ストリーミング パイプラインのみ)

システムのレイテンシは、データ項目を処理している(または処理を待機している)時間の現時点での最長時間(秒)です。この指標は、パイプライン内のいずれかのソース内で要素が待機する時間を表します。最長時間は処理後に調整されます。追加の考慮事項は次のとおりです。

  • 複数のソースとシンクがある場合、システムのレイテンシは、すべてのシンクに書き込まれるまでにソース内で要素が待機する最長時間になります。
  • 場合によっては、ソース内で要素が待機する時間の値が提供されないこともあります。また、要素にイベント時間を定義するメタデータがない場合があります。この場合、パイプラインが最初に要素を受け取った時点からシステムのレイテンシが計算されます。

ストリーミング パイプラインでのシステムのレイテンシを示すデータの可視化。

自動スケーリング

Dataflow サービスは、自動スケーリングのジョブを実行するために必要なワーカー インスタンスの数を自動的に選択します。ワーカー インスタンスの数は、ジョブの要件に応じて時間とともに変化する可能性があります。

パイプライン内のワーカー数を示すデータの可視化。

自動スケーリングの変更履歴を確認するには、[その他の履歴] ボタンをクリックします。パイプラインのワーカーの履歴情報テーブルが表示されます。

パイプラインのワーカーの履歴を示すテーブル。

スループット

スループットは、任意の時点で処理されたデータ量を表します。このステップごとの指標は 1 秒あたりの要素数で表示されます。この指標を 1 秒あたりのバイト数で表示するには、[スループット(要素数/秒)] 、[スループット(バイト/秒)] の順にクリックします。

パイプライン内の 4 つのステップのスループットを示すデータの画像。

ワーカーのエラーログのカウント

ワーカーのエラーログのカウントには、ある時点ですべてのワーカーにわたって観測されたエラーの割合が表示されます。

記録された各エラーの概要と発生回数。

CPU 使用率

CPU 使用率は、使用されている CPU の量を処理可能な CPU の量で割ったものです。このワーカーごとの指標は割合として表示されます。

1 つの Dataflow ワーカーでの CPU 使用率を示すデータの画像。

入力と出力の指標

ストリーミング Dataflow ジョブが Pub/Sub を使用してレコードを読み書きする場合、入力と出力の指標が表示されます。

デフォルトでは、すべての入力指標と出力指標が表示されます。表示する指標を変更するには、各セクションのフィルタ プルダウンを使用します。次の画像は、利用可能なすべてのフィルタを示しています。

Dataflow ジョブの入力指標に使用できるフィルタ プルダウン。 Dataflow ジョブの出力指標に使用できるフィルタ プルダウン。

次の 2 つのグラフが、[入力の指標] セクションと [出力の指標] セクションに表示されます。

Dataflow ジョブの入力と出力の指標を示す一連のグラフ。

リクエスト数/秒

1 秒あたりのリクエスト数は、一定期間に発生したデータの読み取りまたは書き込みを行う API リクエストの頻度をソース別またはシンク別に集計した値です。レートが 0 になるか、長時間にわたり、予想される動作よりも大幅に低下している場合は、パイプラインによる特定のオペレーションの実行がブロックされている可能性があります。また、読み取るデータがない可能性もあります。このような場合は、システム ウォーターマークの高いジョブステップを確認します。また、ワーカーログを調べて、エラーや処理が遅いことを示す兆候がないか調べます。

一定期間に発生したデータの読み取りまたは書き込みを行う API リクエストの数をソース別またはシンク別に示すグラフ。

エラーの種類別のレスポンス エラー数/秒

1 秒あたりのエラーの種類別のレスポンス エラーは、一定期間に発生したデータの読み取りまたは書き込みを行う API リクエストの失敗の頻度をソース別またはシンク別に集計した値です。このようなエラーが頻繁に発生すると、API リクエストの処理が遅くなる可能性があります。このようなエラーが発生している API リクエストは調査する必要があります。これらの問題のトラブルシューティングを行う際は、I/O エラーコードのドキュメントと、ソースまたはシンクで使用される特定のエラーコード(Pub/Sub のエラーコードなど)のドキュメントをご確認ください。

一定期間に発生したデータの読み取りまたは書き込みを行う API リクエストの失敗の頻度をソースまたはシンク別に示すグラフ。

Metrics Explorer の使用

次の Dataflow I/O 指標は、Metrics Explorer で表示できます。

  • job/pubsub/write_count: Dataflow ジョブの PubsubIO.Write からの Pub/Sub Publish リクエスト。
  • job/pubsub/read_count: Dataflow ジョブの PubsubIO.Read からの Pub/Sub Pull リクエスト。
  • job/bigquery/write_count: Dataflow ジョブの BigQueryIO.Write からの BigQuery Publish リクエスト。job/bigquery/write_count 指標は、WriteToBigQuery 変換を使用した Python パイプラインで使用可能で、Apache Beam v2.28.0 以降では method='STREAMING_INSERTS' が有効になっています。

Dataflow の指標の完全なリストについては、Google Cloud の指標のドキュメントをご覧ください。

パイプラインを表示する

特定の Dataflow ジョブを選択すると、モニタリング インターフェースにはそのジョブのパイプラインに関する情報が表示されます。この情報には、Dataflow サービスで実行されるパイプラインの図、ジョブの概要、ジョブのログ、パイプラインの各ステップに関する詳細情報が含まれます。

Dataflow のモニタリング インターフェースでは、パイプラインの図である実行グラフが提供されます。パイプラインの実行グラフでは、パイプライン内の各変換が 1 つのボックスとして表されます。各ボックスには、変換名と以下のようなジョブ ステータス情報が含まれています。

  • 実行中: ステップは実行中です。
  • キューに格納済み: FlexRS ジョブ内のステップがキューに格納されています。
  • 完了しました: ステップは正常に終了しました。
  • 停止しました: ジョブが停止したため、ステップは停止されました。
  • 不明: ステータスを報告できませんでした。
  • 失敗しました: ステップは完了しませんでした。

基本的な実行グラフ

パイプライン コード:

Java: SDK 2.x


  // Read the lines of the input text.
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

Python


(
    pipeline
    # Read the lines of the input text.
    | 'ReadLines' >> beam.io.ReadFromText(args.input_file)
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
    | 'WriteCounts' >> beam.io.WriteToText(args.output_path))

Java: SDK 1.x

実行グラフ:

Dataflow のモニタリング インターフェースで表示される WordCount パイプラインの実行グラフ。

図 2: Dataflow モニタリング インターフェースの実行グラフに表示された WordCount パイプラインのパイプライン コード。

複合変換

実行グラフでは、複合変換(複数のネストしたサブ変換を含む変換)は展開可能です。展開可能な複合変換は、グラフに矢印でマークされます。矢印をクリックして変換を展開し、その中のサブ変換を表示します。

パイプライン コード:

Java: SDK 2.x


  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

注: 右の画像の FormatCounts は、この SDK とは関係ありません。

Python


# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
      # Format each word and count into a printable string.
      | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Java: SDK 1.x

実行グラフ:

WordCount パイプラインの実行グラフと、コンポーネント変換を表示するように展開された CountWords 変換。

図 3: パイプライン全体の展開された実行グラフと CountWords 変換のサブステップのパイプライン コード

変換の名前

Dataflow には、モニタリング実行グラフに表示される変換名を取得する方法がいくつかあります。

Java: SDK 2.x

  • Dataflow では、変換を適用するときに割り当てる名前を使用できますapply メソッドに指定する最初の引数が変換名になります。
  • Cloud Dataflow では、変換名を推定できます。(カスタム変換を作成した場合は)クラス名から、または(ParDo などのコア変換を使用している場合は)DoFn 関数オブジェクトの名前から推定できます。

Python

  • Dataflow では、変換を適用するときに割り当てる名前を使用できます。変換の label 引数を指定することで変換名を設定できます。
  • Cloud Dataflow では、変換名を推定できます。(カスタム変換を作成した場合は)クラス名から、または(ParDo などのコア変換を使用している場合は)DoFn 関数オブジェクトの名前から推定できます。

Java: SDK 1.x

指標について

経過時間

ステップをクリックすると、ステップ情報パネルに経過時間の指標が表示されます。経過時間は、すべてのワーカーのすべてのスレッドで次のアクションにかかったおおよその合計時間です。

  • ステップの初期化
  • データの処理
  • データのシャッフル
  • ステップの終了

複合ステップの場合は、経過時間はコンポーネントのステップにかかった時間の合計です。この見積もりは、遅いステップを特定し、パイプラインのどの部分が本来よりも時間がかかっているかを診断するのに役立ちます。

パイプラインの各ステップの実行時間を確認できます。
図 4: 経過時間の指標は、パイプラインが効率的に稼働していることを確認するのに役立ちます。

副入力の指標

副入力の指標は、副入力のアクセス パターンとアルゴリズムがパイプラインのパフォーマンスにどのように影響するかを示します。パイプラインで副入力が使用される場合、Dataflow は副入力コレクションを永続レイヤ(ディスクなど)に書き込みます。変換では、この永続的なコレクションから副入力を読み取ります。こうした書き込みと読み取りは、ジョブの実行時間に影響を及ぼします。

Dataflow モニタリング インターフェースに副入力の指標が表示されるのは、副入力コレクションを作成または使用する変換を選択した場合です。ステップ情報パネルの副入力の指標セクションで指標を確認できます。

副入力を作成する変換

選択した変換が副入力コレクションを作成する場合、[副入力の指標] セクションに、そのコレクションの名前と次の指標が表示されます。

  • [書き込みに要した時間]: 副入力コレクションの書き込みに費やされた実行時間。
  • [書き込みバイト数]: 副入力コレクションに書き込まれた合計バイト数。
  • [副入力からの読み取り時間と読み取りバイト数]: 副入力コレクションを使用するすべての変換(副入力コンシューマ)に関する追加の指標を表示するテーブル。

[副入力からの読み取り時間と読み取りバイト数] テーブルには、副入力コンシューマのそれぞれについて、次の情報が含まれています。

  • [副入力のコンシューマ]: 副入力コンシューマの変換名。
  • [読み取りに要した時間]: 該当するコンシューマが副入力のコレクションの読み取りに費やした時間。
  • [読み取りバイト数]: 該当するコンシューマが副入力コレクションから読み取ったバイト数。

パイプラインで副入力を作成する複合変換が使用されている場合、副入力を作成する特定のサブ変換が表示されるまで、複合変換を展開してください。その後、[副入力の指標] セクションに表示するサブ変換を選択します。

図 5 は、副入力コレクションを作成する変換での副入力の指標を示しています。

サブ変換を選択して、その副入力の指標を [ステップ情報] サイドパネルに表示できます。
図 5: 複合変換(MakeMapView)が展開された状態の実行グラフ。副入力を作成するサブ変換(CreateDataflowView)が選択されていて、[ステップ情報] サイドパネルに、そのサブ変換での副入力の指標が表示されています。

1 つ以上の副入力を使用する変換

選択した変換が 1 つ以上の副入力を使用する場合、[副入力の指標] セクションには [副入力からの読み取り時間と読み取りバイト数] テーブルが表示されます。このテーブルには、副入力コレクションのそれぞれについて、次の情報が含まれています。

  • [副入力のコレクション]: 副入力コレクションの名前。
  • [読み取りに要した時間]: 変換で副入力コレクションの読み取りに費やされた時間。
  • [読み取りバイト数]: 変換で副入力コレクションから読み取ったバイト数。

パイプラインで副入力を読み取る複合変換が使用されている場合、副入力を読み取る特定のサブ変換が表示されるまで、複合変換を展開してください。その後、[副入力の指標] セクションに表示するサブ変換を選択します。

図 6 は、副入力コレクションから読み取る変換での副入力の指標を示しています。

変換を選択して、その副入力の指標を [ステップ情報] サイドパネルに表示できます。
図 6: 副入力コレクションから読み取る JoinBothCollections 変換。実行グラフで JoinBothCollections が選択されていて、[ステップ情報] サイドパネルに、副入力の指標が表示されます。

副入力のパフォーマンスに関する問題を識別する

副入力に関してよく発生するパフォーマンスの問題は、再反復処理です。副入力 PCollection のサイズが大きすぎると、ワーカーはコレクション全体をメモリ内のキャッシュに保存できません。そのため、ワーカーは永続的な副入力コレクションからの読み取りを繰り返さなければならなくなります。

図 7 の副入力の指標を見ると、副入力コレクションから読み取られた合計バイト数がコレクションのサイズ(合計書き込みバイト数)より遥かに大きいことがわかります。

変換を選択して、その副入力の指標を [ステップ情報] サイドパネルに表示できます。
図 7: 再イテレーションの例。副入力コレクションのサイズは 563 MB であるのに対し、副入力コレクションを使用する変換で読み取られた合計バイト数は 12 GB 近くになっています。

このパイプラインのパフォーマンスを向上させるには、副入力データの反復処置や再取得が回避されるようにアルゴリズムを再設計する必要があります。この例では、パイプラインによって 2 つのコレクションのデカルト積が生成されています。これは、アルゴリズムでメインのコレクションに含まれる要素ごとに、副入力コレクション全体が反復処理されているためです。したがって、メインのコレクションに含まれる複数の要素をまとめてバッチ処理することで、パイプラインのアクセス パターンを改善できます。このように変更すると、ワーカーが副入力コレクションを再読み取りしなければならない回数が減ります。

パイプラインで結合を実行するために、1 つ以上のサイズの大きい副入力で ParDo を適用すると、別の一般的なパフォーマンス問題が発生することがあります。この場合、ワーカーは結合オペレーションの処理時間の大部分を副入力コレクションからの読み取りに費やします。

図 8 は、この問題に関する副入力の指標の例を示しています。

変換を選択して、その副入力の指標を [ステップ情報] サイドパネルに表示できます。
図 8: JoinBothCollections 変換の合計実行時間は 18 分 31 秒です。ワーカーは実行時間の大半(10 分 3 秒)をサイズ 10 GB の副入力コレクションの読み取りに費やしています。

このパイプラインのパフォーマンスを向上させるには、副入力の代わりに CoGroupByKey を使用します。

診断

[ログ] の [診断] タブに、パイプラインで発生したエラーが表示されます。

エラーレポートには次の情報が含まれます。

  • エラーとエラー メッセージのリスト。
  • 各エラーが発生した回数。
  • 各エラーがいつ発生したかを示すヒストグラム。
  • 直近のエラーの発生時刻。
  • エラーが最初に発生した時刻。
  • エラーのステータス。

特定のエラーのエラーレポートを表示するには、[エラー] 列の説明をクリックします。[エラーレポーティング] ページが表示されます。このページの詳細については、エラーの表示をご覧ください。

次のステップ

  • 実行の詳細を使用して Dataflow ジョブを最適化する方法を確認する。

  • Cloud Monitoring について調べ、アラートを作成して Dataflow 指標(カスタム指標を含む)を確認する。