Cloud Dataflow マネージド サービスを使用してパイプラインを実行する場合、Cloud Dataflow のウェブベースのモニタリング ユーザー インターフェースを使用して、実行中のジョブやその他すべてに関する情報を表示できます。モニタリング インターフェースでは、Cloud Dataflow ジョブを表示して操作できます。
Cloud Dataflow モニタリング インターフェースにアクセスするには、Google Cloud Platform Console を使用します。このモニタリング インターフェースには、次の項目が表示されます。
- 現在実行中のジョブを含め、過去 30 日間に実行されたすべての Cloud Dataflow ジョブのリスト
- 各パイプラインの図
- ジョブのステータス、実行、および SDK のバージョンに関する詳細
- パイプラインを実行している Google Cloud Platform サービス(Compute Engine や Cloud Storage など)に関する情報へのリンク
- ジョブ中に発生したエラーまたは警告
Cloud Dataflow モニタリング インターフェースにアクセスする
Cloud Dataflow モニタリング インターフェースにアクセスするには:
- GCP Console にログインします。
- GCP プロジェクトを選択します。
- 左上隅のメニューをクリックします。
- [ビッグデータ] セクションに移動し、[Dataflow] をクリックします。
Cloud Dataflow ジョブのリストとそれぞれのステータスが表示されます。ジョブが表示されない場合は、新しいジョブを実行する必要があります。ジョブを実行する方法については、Cloud Dataflow クイックスタートをご覧ください。

ジョブのステータスは以下のとおりです。
- -: Monitoring UI はまだ Cloud Dataflow サービスからステータスを受け取っていません。
- 実行中: ジョブは現在実行中です。
- 開始されていません: ジョブが作成されましたが、システムの起動前に準備する時間が必要です。
- キューに格納済み: FlexRS ジョブがキューに格納されています。
- キャンセルしています...: ジョブはキャンセル中です。
- キャンセルされました: ユーザーがジョブをキャンセルしました。
- ドレインしています...: ジョブはドレイン中です。
- ドレインされました: ユーザーがジョブをドレインしました。
- 更新しています...: ジョブは更新中です。
- 更新されました: ユーザーがジョブを更新しました。
- 完了しました: ジョブは正常に終了しました。
- 失敗しました: ジョブは完了しませんでした。
パイプラインの詳細については、そのジョブを選択してください。
パイプラインを表示する
特定の Cloud Dataflow ジョブを選択すると、モニタリング インターフェースにはそのジョブのパイプラインに関する詳細情報が表示されます。この情報には、Cloud Dataflow サービスで実行されるパイプラインの図、ジョブの概要、ジョブのログ、パイプラインの各ステップに関する詳細情報が含まれます。

Cloud Dataflow モニタリング インターフェースには、パイプラインの図として「実行グラフ」が表示されます。パイプラインの実行グラフでは、パイプライン内の各変換が 1 つのボックスとして表されます。各ボックスには、変換名と以下のようなジョブ ステータス情報が含まれています。
- 実行中: ステップは現在実行中です。
- キューに格納済み: FlexRS ジョブ内のステップがキューに格納されています。
- 完了しました: ステップは正常に終了しました。
- 停止しました: ジョブが停止したため、ステップは停止されました。
- 不明: ステップは完了しませんでした。
- 失敗しました: ステップは完了しませんでした。
基本的な実行グラフ
パイプライン コード:
Java: SDK 2.x// Read the lines of the input text. p.apply("ReadLines", TextIO.read().from(options.getInputFile())) // Count the words. .apply(new CountWords()) // Write the formatted word counts to output. .apply("WriteCounts", TextIO.write().to(options.getOutput())) .to(options.getOutput())); Python(p # Read the lines of the input text. | 'ReadLines' >> beam.io.ReadFromText(options.input) # Count the words. | CountWords() # Write the formatted word counts to output. | 'WriteCounts' >> beam.io.WriteToText(options.output)) Java: SDK 1.x// Read the lines of the input text. p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) // Count the words. .apply(new CountWords()) // Write the formatted word counts to output. .apply(TextIO.Write.named("WriteCounts") .to(options.getOutput()) .withNumShards(options.getNumShards())); |
実行グラフ:
![]() |
複合変換
実行グラフでは、複合変換(複数のネストしたサブ変換を含む変換)は展開可能です。展開可能な複合変換は、グラフに矢印でマークされます。矢印をクリックして変換を展開し、その中のサブ変換を表示します。
パイプライン コード:
Java: SDK 2.x// The CountWords Composite Transform // inside the WordCount pipeline. public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> { @Override public PCollection<String> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } } 注: 右の画像の Python# The CountWords Composite Transform inside the WordCount pipeline. class CountWords(beam.PTransform): def expand(self, pcoll): return (pcoll # Convert lines of text into individual words. | 'ExtractWords' >> beam.ParDo(ExtractWordsFn()) # Count the number of times each word occurs. | beam.combiners.Count.PerElement() # Format each word and count into a printable string. | 'FormatCounts' >> beam.ParDo(FormatCountsFn())) Java: SDK 1.x// The CountWords Composite Transform // inside the WordCount pipeline. public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> { @Override public PCollection<String> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); // Format each word and count into a printable string. PCollection<String> results = wordCounts.apply( ParDo.of(new FormatCountsFn())); return results; } } |
実行グラフ:
![]() |
変換の名前
Cloud Dataflow でモニタリング実行グラフに表示される変換名を取得するには、いくつかの方法があります。
Java: SDK 2.x
- Cloud Dataflow では、変換を適用するときに割り当てる名前を使用できます。
apply
メソッドに指定する最初の引数が、変換名になります。 - Cloud Dataflow では、変換名を推定できます。(カスタム変換を作成した場合は)クラス名から、または(
ParDo
などのコア変換を使用している場合は)DoFn
関数オブジェクトの名前から推定できます。
Python
- Cloud Dataflow では、変換を適用するときに割り当てる名前を使用できます。変換の
label
引数を指定することで変換名を設定できます。 - Cloud Dataflow では、変換名を推定できます。(カスタム変換を作成した場合は)クラス名から、または(
ParDo
などのコア変換を使用している場合は)DoFn
関数オブジェクトの名前から推定できます。
Java: SDK 1.x
- Cloud Dataflow では、変換を適用するときに割り当てる名前を使用できます。
apply
メソッドに指定する最初の引数が、変換名になります。 - Cloud Dataflow では、変換名を推定できます。(カスタム変換を作成した場合は)クラス名から、または(ParDo などのコア変換を使用している場合は)
DoFn
関数オブジェクトの名前から推定できます。
経過時間
ステップをクリックすると、経過時間の指標が表示されます。経過時間は、すべてのワーカーのすべてのスレッドで次のアクションにかかったおおよその合計時間です。
- ステップの初期化
- データの処理
- データのシャッフル
- ステップの終了
複合ステップの場合は、経過時間はコンポーネントのステップにかかった時間の合計です。この見積もりは、遅いステップを特定し、パイプラインのどの部分が本来よりも時間がかかっているかを診断するのに役立ちます。

Cloud Dataflow パイプラインでの時間を理解するの例をご覧ください。ここでは、経過時間(以前は合計実行時間と呼ばれていた)の指標を使用して、パイプラインが想定よりも長い時間を要している原因を調査しています。
副入力の指標
副入力の指標は、副入力のアクセス パターンとアルゴリズムがパイプラインのパフォーマンスにどのように影響するかを示します。パイプラインで副入力が使用される場合、Cloud Dataflow は副入力コレクションを永続レイヤ(ディスクなど)に書き込みます。変換では、この永続的なコレクションから副入力を読み取ります。こうした書き込みと読み取りは、ジョブの実行時間に影響を及ぼします。
Cloud Dataflow モニタリング インターフェースに副入力の指標が表示されるのは、副入力コレクションを作成または使用する変換を選択した場合です。その場合、[ステップ] タブの [副入力の指標] セクションで指標を確認できます。
副入力を作成する変換
選択した変換が副入力コレクションを作成する場合、[副入力の指標] セクションに、そのコレクションの名前と次の指標が表示されます。
- [書き込みに要した時間]: 副入力コレクションの書き込みに費やされた実行時間。
- [書き込みバイト数]: 副入力コレクションに書き込まれた合計バイト数。
- [副入力からの読み取り時間と読み取りバイト数]: 副入力コレクションを使用するすべての変換(副入力コンシューマ)に関する追加の指標を表示するテーブル。
[副入力からの読み取り時間と読み取りバイト数] テーブルには、副入力コンシューマのそれぞれについて、次の情報が含まれています。
- [副入力のコンシューマ]: 副入力コンシューマの変換名。
- [読み取りに要した時間]: 該当するコンシューマが副入力のコレクションの読み取りに費やした時間。
- [読み取りバイト数]: 該当するコンシューマが副入力コレクションから読み取ったバイト数。
パイプラインで副入力を作成する複合変換が使用されている場合、副入力を作成する特定のサブ変換が表示されるまで、複合変換を展開してください。その後、[副入力の指標] セクションに表示するサブ変換を選択します。
図 6 は、副入力コレクションを作成する変換での副入力の指標を示しています。
![サブ変換を選択して、その副入力の指標を [ステップ] タブに表示できます。](https://cloud.google.com/dataflow/images/monitoring-side-input-write.png?hl=ja)
MakeMapView
)が展開された状態の実行グラフ。副入力を作成するサブ変換(CreateDataflowView
)が選択されていて、[ステップ] タブに、そのサブ変換での副入力の指標が表示されています。1 つ以上の副入力を使用する変換
選択した変換が 1 つ以上の副入力を使用する場合、[副入力の指標] セクションには [副入力からの読み取り時間と読み取りバイト数] テーブルが表示されます。このテーブルには、副入力コレクションのそれぞれについて、次の情報が含まれています。
- [副入力のコレクション]: 副入力コレクションの名前。
- [読み取りに要した時間]: 変換で副入力コレクションの読み取りに費やされた時間。
- [読み取りバイト数]: 変換で副入力コレクションから読み取ったバイト数。
パイプラインで副入力を読み取る複合変換が使用されている場合、副入力を読み取る特定のサブ変換が表示されるまで、複合変換を展開してください。その後、[副入力の指標] セクションに表示するサブ変換を選択します。
図 7 は、副入力コレクションから読み取る変換での副入力の指標を示しています。
![変換を選択して、その副入力の指標を [ステップ] タブに表示できます。](https://cloud.google.com/dataflow/images/monitoring-side-input-read.png?hl=ja)
JoinBothCollections
変換。実行グラフで JoinBothCollections
が選択されていて、[ステップ] タブに、この変換での副入力の指標が表示されています。副入力のパフォーマンスに関する問題を識別する
副入力に関してよく発生するパフォーマンスの問題は、再反復処理です。副入力 PCollection
のサイズが大きすぎると、ワーカーはコレクション全体をメモリ内のキャッシュに保存できません。そのため、ワーカーは永続的な副入力コレクションからの読み取りを繰り返さなければならなくなります。
図 8 の副入力の指標を見ると、副入力コレクションから読み取られた合計バイト数がコレクションのサイズ(合計書き込みバイト数)より遥かに大きいことがわかります。
![変換を選択して、その副入力の指標を [ステップ] タブに表示できます。](https://cloud.google.com/dataflow/images/monitoring-side-input-reiteration.png?hl=ja)
このパイプラインのパフォーマンスを向上させるには、副入力データの反復処置や再取得が回避されるようにアルゴリズムを再設計する必要があります。この例では、パイプラインによって 2 つのコレクションのデカルト積が生成されています。これは、アルゴリズムでメインのコレクションに含まれる要素ごとに、副入力コレクション全体が反復処理されているためです。したがって、メインのコレクションに含まれる複数の要素をまとめてバッチ処理することで、パイプラインのアクセス パターンを改善できます。このように変更すると、ワーカーが副入力コレクションを再読み取りしなければならない回数が減ります。
パイプラインで結合を実行するために、1 つ以上のサイズの大きい副入力で ParDo
を適用すると、別の一般的なパフォーマンス問題が発生することがあります。その場合、ワーカーは結合の実行時間のほとんどの部分を副入力コレクションからの読み取りに費やします。
図 9 は、この問題に関する副入力の指標の例を示しています。
![変換を選択して、その副入力の指標を [ステップ] タブに表示できます。](https://cloud.google.com/dataflow/images/monitoring-side-input-read.png?hl=ja)
JoinBothCollections
変換の合計実行時間は 18 分 31 秒です。ワーカーは実行時間の大半(10 分 3 秒)をサイズ 10 GB の副入力コレクションの読み取りに費やしています。このパイプラインのパフォーマンスを向上させるには、副入力の代わりに CoGroupByKey を使用します。
システムのレイテンシとデータの鮮度
[ジョブの概要] パネルには、過去 6 時間の [システムのレイテンシ] と [データの鮮度] の指標を示すグラフが表示されます。このデータは 60 秒ごとにサンプリングされます。サンプリング後、データは最長 180 秒間表示されません。
システムのレイテンシは、データ項目が処理待ち状態になっていた期間の、現在の最長時間です。この指標は、パイプライン内のいずれかのソース内で要素が待機する時間を秒単位で示します。最長時間は処理後に調整されます。追加の考慮事項は次のとおりです。
- 複数のソースとシンクがある場合、システムのレイテンシは、すべてのシンクに書き込まれるまでにソース内で要素が待機する最長時間になります。
- ソース内で要素が待機する時間がソースコードから提供されず、要素のメタデータにイベント時間が定義されていない場合、パイプラインが最初に要素を受信した時間からシステムのレイテンシが計算されます。
データの鮮度とは、完全に処理された直近のデータ項目の経過時間(イベント タイムスタンプからの時間)です。パイプラインに複数のブランチとシンクがあり、データを複数のパスから受信している場合、すべての要素の中で最大のイベント タイムスタンプがデータの鮮度として使用されます。
パイプラインの各ステップには、出力データ ウォーターマークがあります。出力データ ウォーターマークが T の場合、イベント時間が T より前のすべての要素が計算処理に使用されます。出力データ ウォーターマークは、すべての上流計算の中で最も早い入力データ ウォーターマークによって制限されます。一部の入力データがまだ処理されていない場合、出力ウォーターマークが保持され、データの更新に影響します。
グラフの下にある [アラートを作成] リンクをクリックすると、Stackdriver Monitoring ダッシュボードの [Create new alerting policy] ページが開きます。

自動スケーリング
[ジョブの概要] パネルで、自動スケーリング ジョブの情報を表示できます。Cloud Dataflow サービスは、自動スケーリング ジョブを実行するのに必要なワーカー インスタンスの数を自動的に選択します。ワーカー インスタンスの数は、ジョブのニーズに応じて経時的に変更される可能性があります。
![[概要] タブで、自動スケーリングのパイプラインによって使用されるワーカー インスタンスの数や、その他の情報を確認できます。](https://cloud.google.com/dataflow/images/dataflow-job-exec-autoscaling-summary.png?hl=ja)
自動スケーリングの変更履歴を確認するには、[その他の履歴を表示] リンクをクリックします。パイプラインのワーカーの履歴に関する情報のポップアップ ウィンドウが表示されます。
![自動スケーリングの変更履歴を確認するには、[その他の履歴を表示] リンクをクリックします。](https://cloud.google.com/dataflow/images/dataflow-job-exec-autoscaling-history.png?hl=ja)
注: 2016 年 12 月 12 日以降に実施された、ストリーミング パイプラインの実行や更新に関する自動スケーリングの詳細を確認できます。パイプラインの実行や最後の更新が 12 月 12 日よりも前である場合は、パイプラインを更新した後の自動スケーリングの詳細のみを確認できます。
Error Reporting
Stackdriver Error Reporting インターフェースは、パイプラインで発生したエラーを集約して表示します。
エラーレポートには次のものが含まれます。
- エラーとエラー メッセージのリスト。
- 各エラーが発生した回数。
- 各エラーがいつ発生したかを示すヒストグラム。
- 直近のエラーの発生時刻。
パイプラインのエラーレポートを表示するには、パイプライン グラフの上にある [ログ] メニューをクリックし、パイプライン グラフの下にある [スタック トレース] タブをクリックします。Cloud Dataflow モニタリング インターフェースには、記録された各エラーの概要と発生した回数が表示されます。

エラーの詳細を表示するには、エラーの概要をクリックします。Stackdriver Error Reporting インターフェースが表示されます。

Stackdriver Error Reporting インターフェースには、このほかの機能もあります。パイプラインで発生するエラーについて詳しくは、エラーの表示をご覧ください。