Dataflow ジョブのグラフ

特定の Dataflow ジョブを選択すると、モニタリング インターフェースにはパイプラインのグラフィカルに表現したジョブグラフが表示されます。コンソールのジョブグラフ ページには、ジョブの概要、ジョブログ、パイプラインの各ステップに関する情報も表示されます。

パイプラインのジョブグラフでは、パイプライン内の各変換が 1 つのボックスとして表されます。各ボックスには、変換名と以下のようなジョブ ステータス情報が含まれています。

  • 実行中: ステップは実行中です。
  • キューに格納済み: FlexRS ジョブ内のステップがキューに格納されています。
  • 完了しました: ステップは正常に終了しました。
  • 停止しました: ジョブが停止したため、ステップは停止されました。
  • 不明: ステータスを報告できませんでした。
  • 失敗しました: ステップは完了しませんでした。

デフォルトでは、ジョブグラフ ページにグラフビューが表示されます。ジョブグラフを表形式で表示するには、[ジョブステップのビュー] で [表形式] を選択します。表形式には、同じ情報が別の形式で含まれています。表形式は次のシナリオで役立ちます。

  • ジョブに多くのステージがあるため、ジョブグラフを操作しにくい。
  • ジョブステップを特定のプロパティで並べ替えたい。たとえば、経過時間でテーブルを並べ替えて遅いステップを特定できます。

基本的なジョブグラフ

パイプライン コード:

Java

  // Read the lines of the input text.
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

Python

(
    pipeline
    # Read the lines of the input text.
    | 'ReadLines' >> beam.io.ReadFromText(args.input_file)
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
    | 'WriteCounts' >> beam.io.WriteToText(args.output_path))

Go

  // Create the pipeline.
  p := beam.NewPipeline()
    s := p.Root()
  // Read the lines of the input text.
  lines := textio.Read(s, *input)
  // Count the words.
  counted := beam.ParDo(s, CountWords, lines)
  // Write the formatted word counts to output.
  textio.Write(s, *output, formatted)

ジョブグラフ

Dataflow のモニタリング インターフェースで表示される WordCount パイプラインの実行グラフ。

図 1: Dataflow モニタリング インターフェースの実行グラフに表示された WordCount パイプラインのパイプライン コード。

複合変換

ジョブグラフでは、複合変換(複数のネストしたサブ変換を含む変換)は展開可能です。展開可能な複合変換は、グラフに矢印でマークされます。矢印をクリックして変換を展開し、その中のサブ変換を表示します。

パイプライン コード:

Java

  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

Python

# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
      # Format each word and count into a printable string.
      | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Go

  // The CountWords Composite Transform inside the WordCount pipeline.
  func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("CountWords")

    // Convert lines of text into individual words.
    col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)

    // Count the number of times each word occurs.
    return stats.Count(s, col)
  }
ジョブグラフ

WordCount パイプラインのジョブグラフと、コンポーネント変換を表示するように展開された CountWords 変換。

図 2: パイプライン全体の展開されたジョブグラフと CountWords 変換のサブステップのパイプライン コード

パイプラインのコードで、複合変換を次のように呼び出した可能性があります。

result = transform.apply(input);

このようにして呼び出された複合変換では予期されるネストが省略されるため、Dataflow Monitoring Interface で展開して表示されることがあります。パイプラインでは、パイプラインの実行時に安定した一意名に関する警告やエラーが生成されることもあります。

これらの問題を回避するには、推奨される形式を使用して変換を呼び出してください。

result = input.apply(transform);

変換名

Dataflow には、モニタリング ジョブ グラフに表示される変換名を取得する方法がいくつかあります。

Java

  • Dataflow では、変換を適用するときに割り当てる名前を使用できますapply メソッドに指定する最初の引数が変換名になります。
  • Cloud Dataflow では、変換名を推定できます。(カスタム変換を作成した場合は)クラス名から、または(ParDo などのコア変換を使用している場合は)DoFn 関数オブジェクトの名前から推定できます。

Python

  • Dataflow では、変換を適用するときに割り当てる名前を使用できます。変換の label 引数を指定することで変換名を設定できます。
  • Cloud Dataflow では、変換名を推定できます。(カスタム変換を作成した場合は)クラス名から、または(ParDo などのコア変換を使用している場合は)DoFn 関数オブジェクトの名前から推定できます。

Go

  • Dataflow では、変換を適用するときに割り当てる名前を使用できます。変換名を設定するには、Scope を指定します。
  • Dataflow では、変換名を推定できます(構造 DoFn を使用している場合は構造体名から、または、関数 DoFn を使用している場合は関数名から)。

指標について

このセクションでは、ジョブグラフに関連した指標について詳しく説明します。

経過時間

ステップをクリックすると、[ステップ情報] パネルに経過時間の指標が表示されます。経過時間は、すべてのワーカーのすべてのスレッドで次のアクションにかかったおおよその合計時間です。

  • ステップの初期化
  • データの処理
  • データのシャッフル
  • ステップの終了

複合ステップの場合は、経過時間はコンポーネントのステップにかかった時間の合計です。この見積もりは、遅いステップを特定し、パイプラインのどの部分が本来よりも時間がかかっているかを診断するのに役立ちます。

パイプラインの各ステップの実行時間を確認できます。
図 3: 経過時間の指標は、パイプラインが効率的に稼働していることを確認するのに役立ちます。

副入力の指標

副入力の指標は、副入力のアクセス パターンとアルゴリズムがパイプラインのパフォーマンスにどのように影響するかを示します。パイプラインで副入力が使用される場合、Dataflow は副入力コレクションを永続レイヤ(ディスクなど)に書き込みます。変換では、この永続的なコレクションから副入力を読み取ります。こうした書き込みと読み取りは、ジョブの実行時間に影響を及ぼします。

Dataflow モニタリング インターフェースに副入力の指標が表示されるのは、副入力コレクションを作成または使用する変換を選択した場合です。[ステップ情報] パネルの [副入力の指標] セクションで指標を確認できます。

副入力を作成する変換

選択した変換が副入力コレクションを作成する場合、[副入力の指標] セクションに、そのコレクションの名前と次の指標が表示されます。

  • [書き込みに要した時間]: 副入力コレクションの書き込みに費やされた実行時間。
  • [書き込みバイト数]: 副入力コレクションに書き込まれた合計バイト数。
  • [副入力からの読み取り時間と読み取りバイト数]: 副入力コレクションを使用するすべての変換(副入力コンシューマ)に関する追加の指標を表示するテーブル。

[副入力からの読み取り時間と読み取りバイト数] テーブルには、副入力コンシューマのそれぞれについて、次の情報が含まれています。

  • [副入力のコンシューマ]: 副入力コンシューマの変換名。
  • [読み取りに要した時間]: 該当するコンシューマが副入力のコレクションの読み取りに費やした時間。
  • [読み取りバイト数]: 該当するコンシューマが副入力コレクションから読み取ったバイト数。

パイプラインで副入力を作成する複合変換が使用されている場合、副入力を作成する特定のサブ変換が表示されるまで、複合変換を展開してください。その後、[副入力の指標] セクションに表示するサブ変換を選択します。

図 4 は、副入力コレクションを作成する変換での副入力の指標を示しています。

サブ変換を選択して、その副入力の指標を [ステップ情報] サイドパネルに表示できます。
図 4: 複合変換(MakeMapView)が展開された状態のジョブグラフ。副入力を作成するサブ変換(CreateDataflowView)が選択されていて、[ステップ情報] サイドパネルにその副入力の指標が表示されています。

1 つ以上の副入力を使用する変換

選択した変換が 1 つ以上の副入力を使用する場合、[副入力の指標] セクションには [副入力からの読み取り時間と読み取りバイト数] テーブルが表示されます。このテーブルには、副入力コレクションのそれぞれについて、次の情報が含まれています。

  • [副入力のコレクション]: 副入力コレクションの名前。
  • [読み取りに要した時間]: 変換で副入力コレクションの読み取りに費やされた時間。
  • [読み取りバイト数]: 変換で副入力コレクションから読み取ったバイト数。

パイプラインで副入力を読み取る複合変換が使用されている場合、副入力を読み取る特定のサブ変換が表示されるまで、複合変換を展開してください。その後、[副入力の指標] セクションに表示するサブ変換を選択します。

図 5 は、副入力コレクションから読み取る変換での副入力の指標を示しています。

変換を選択して、その副入力の指標を [ステップ情報] サイドパネルに表示できます。
図 5: 副入力コレクションから読み取る JoinBothCollections 変換。ジョブグラフで JoinBothCollections が選択されていて、[ステップ情報] サイドパネルに、副入力の指標が表示されます。

副入力のパフォーマンスに関する問題を特定する

副入力に関してよく発生するパフォーマンスの問題は、再反復処理です。副入力 PCollection のサイズが大きすぎると、ワーカーはコレクション全体をメモリ内のキャッシュに保存できません。そのため、ワーカーは永続的な副入力コレクションからの読み取りを繰り返さなければならなくなります。

図 6 の副入力の指標を見ると、副入力コレクションから読み取られた合計バイト数がコレクションのサイズ(合計書き込みバイト数)より遥かに大きいことがわかります。

変換を選択して、その副入力の指標を [ステップ情報] サイドパネルに表示できます。
図 6: 再イテレーションの例。副入力コレクションのサイズは 563 MB であるのに対し、副入力コレクションを使用する変換で読み取られた合計バイト数は 12 GB 近くになっています。

このパイプラインのパフォーマンスを向上させるには、副入力データの反復処置や再取得が回避されるようにアルゴリズムを再設計する必要があります。この例では、パイプラインによって 2 つのコレクションのデカルト積が生成されています。これは、アルゴリズムでメインのコレクションに含まれる要素ごとに、副入力コレクション全体が反復処理されているためです。したがって、メインのコレクションに含まれる複数の要素をまとめてバッチ処理することで、パイプラインのアクセス パターンを改善できます。このように変更すると、ワーカーが副入力コレクションを再読み取りしなければならない回数が減ります。

パイプラインで結合を実行するために、1 つ以上のサイズの大きい副入力で ParDo を適用すると、別の一般的なパフォーマンス問題が発生することがあります。この場合、ワーカーは結合オペレーションの処理時間の大部分を副入力コレクションからの読み取りに費やします。

図 7 は、この問題に関する副入力の指標の例を示しています。

変換を選択して、その副入力の指標を [ステップ情報] サイドパネルに表示できます。
図 7: JoinBothCollections 変換の合計実行時間は 18 分 31 秒です。ワーカーは実行時間の大半(10 分 3 秒)をサイズ 10 GB の副入力コレクションの読み取りに費やしています。

このパイプラインのパフォーマンスを向上させるには、副入力の代わりに CoGroupByKey を使用します。