Dataflow ジョブのグラフ

Dataflow モニタリング インターフェースには、各ジョブをグラフィカルに表現したジョブグラフが表示されます。ジョブグラフには、ジョブの概要、ジョブログ、パイプラインの各ステップに関する情報も表示されます。

ジョブのジョブグラフを表示する手順は次のとおりです。

  1. Google Cloud コンソールで、[Dataflow] > [ジョブ] ページに移動します。

    [ジョブ] に移動

  2. ジョブを選択します。

  3. [ジョブグラフ] タブをクリックします。

パイプラインのジョブグラフでは、パイプライン内の各変換が 1 つのボックスとして表されます。各ボックスには、変換名と以下のようなジョブ ステータス情報が含まれています。

  • 実行中: ステップが実行中
  • キューに格納済み: FlexRS ジョブ内のステップがキューに格納されている
  • 完了しました: ステップが正常に終了
  • 停止しました: ジョブが停止したため、ステップが停止された
  • 不明: ステータスを報告できなかった
  • 失敗しました: ステップは完了しなかった

デフォルトでは、ジョブグラフ ページにグラフビューが表示されます。ジョブグラフを表形式で表示するには、[ジョブステップのビュー] で [表形式] を選択します。表形式には、同じ情報が別の形式で含まれています。表形式は次のシナリオで役立ちます。

  • ジョブに多くのステージがあるため、ジョブグラフを操作しにくい。
  • ジョブステップを特定のプロパティで並べ替えたい。たとえば、経過時間でテーブルを並べ替えると、遅いステップを特定できます。

基本的なジョブグラフ

パイプライン コード:

Java

  // Read the lines of the input text.
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

Python

(
    pipeline
    # Read the lines of the input text.
    | 'ReadLines' >> beam.io.ReadFromText(args.input_file)
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
    | 'WriteCounts' >> beam.io.WriteToText(args.output_path))

Go

  // Create the pipeline.
  p := beam.NewPipeline()
    s := p.Root()
  // Read the lines of the input text.
  lines := textio.Read(s, *input)
  // Count the words.
  counted := beam.ParDo(s, CountWords, lines)
  // Write the formatted word counts to output.
  textio.Write(s, *output, formatted)
ジョブグラフ

Dataflow のモニタリング インターフェースで表示される WordCount パイプラインの実行グラフ。

図 1: Dataflow モニタリング インターフェースの実行グラフに表示された WordCount パイプラインのパイプライン コード。

複合変換

複合変換は、複数のネストされたサブ変換を含む変換です。ジョブグラフでは、複合変換は展開可能です。変換を展開してサブ変換を表示するには、矢印をクリックします。

パイプライン コード:

Java

  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

Python

# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
      # Format each word and count into a printable string.
      | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Go

  // The CountWords Composite Transform inside the WordCount pipeline.
  func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("CountWords")

    // Convert lines of text into individual words.
    col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)

    // Count the number of times each word occurs.
    return stats.Count(s, col)
  }
ジョブグラフ

WordCount パイプラインのジョブグラフと、コンポーネント変換を表示するように展開された CountWords 変換。

図 2: CountWords 変換のサブステップのパイプライン コード。パイプライン全体の展開されたジョブグラフとともに表示されています。

パイプラインのコードで、次のようなコードを使用して複合変換を呼び出す場合があります。

result = transform.apply(input);

このようにして呼び出された複合変換では、予期されるネストが省略されるため、Dataflow モニタリング インターフェースで展開して表示されることがあります。パイプラインでは、パイプラインの実行時に安定した一意名に関する警告やエラーが生成されることもあります。

これらの問題を回避するには、推奨される形式を使用して変換を呼び出してください。

result = input.apply(transform);

変換の名前

Dataflow には、モニタリング ジョブ グラフに表示される変換名を取得する方法がいくつかあります。変換名は、Dataflow モニタリング インターフェース、ログファイル、デバッグツールなど、一般公開されている場所で使用されます。個人を特定できる情報(ユーザー名や組織名など)を含む変換名は使用しないでください。

Java

  • Dataflow では、変換を適用するときに割り当てる名前を使用できますapply メソッドに指定する最初の引数が変換名になります。
  • Dataflow では、変換名を推定できます。(カスタム変換を作成した場合は)クラス名から、または(ParDo などのコア変換を使用している場合は)DoFn 関数オブジェクトの名前から推定できます。

Python

  • Dataflow では、変換を適用するときに割り当てる名前を使用できます。変換の label 引数を指定することで変換名を設定できます。
  • Dataflow では、変換名を推定できます。(カスタム変換を作成した場合は)クラス名から、または(ParDo などのコア変換を使用している場合は)DoFn 関数オブジェクトの名前から推定できます。

Go

  • Dataflow では、変換を適用するときに割り当てる名前を使用できます。変換名を設定するには、Scope を指定します。
  • Dataflow では、変換名を推定できます(構造 DoFn を使用している場合は構造体名から、または、関数 DoFn を使用している場合は関数名から)。

ステップ情報を確認する

ジョブグラフのステップをクリックすると、[ステップ情報] パネルにステップの詳細情報が表示されます。詳細については、ジョブのステップ情報をご覧ください。