Dataflow マネージド サービスを使用してパイプラインを実行する場合、Dataflow のウェブベースのモニタリング ユーザー インターフェースを使用して、実行中のジョブやその他すべてに関する情報を表示できます。モニタリング インターフェースを使用すると、Dataflow ジョブを表示して操作できます。
Google Cloud Console を使用すると、Dataflow モニタリング インターフェースにアクセスできます。このモニタリング インターフェースには、次の項目が表示されます。
- 現在実行中のジョブを含め、過去 30 日間に実行されたすべての Dataflow ジョブのリスト
- 各パイプラインの図
- ジョブのステータス、実行、および SDK のバージョンに関する詳細
- パイプラインを実行している Google Cloud サービス(Compute Engine や Cloud Storage など)に関する情報へのリンク。
- ジョブ中に発生したエラーまたは警告
- ジョブの追加の診断。
ジョブ モニタリングのグラフは、Dataflow モニタリング インターフェース内で表示できます。これらのグラフには、パイプライン ジョブの期間中の指標が表示され、次の情報が含まれます。
- パイプラインの遅れを引き起こしている可能性のあるステップを特定するためのステップレベルの可視性。
- 異常な動作を表面化させることのできる統計情報。
- 参照元やシンク内のボトルネックの特定に役立つ I/O の指標。
Dataflow モニタリング インターフェースにアクセスする
Dataflow モニタリング インターフェースにアクセスする方法は次のとおりです。
- Cloud Console にログインします。
- Google Cloud プロジェクトを選択します。
- 左上隅のメニューをクリックします。
- [ビッグデータ] セクションに移動し、[Dataflow] をクリックします。
Dataflow ジョブのリストとそれぞれのステータスが表示されます。ジョブが表示されない場合は、新しいジョブを実行する必要があります。ジョブの実行方法については、Dataflow のクイックスタートをご覧ください。

ジョブのステータスは以下のとおりです。
- -: モニタリング UI はまだ Dataflow サービスからステータスを受け取っていません。
- 実行中: ジョブは現在実行中です。
- 開始しています...: ジョブは作成されていますが、起動する前に準備の時間が必要です。
- キューに格納済み: FlexRS ジョブがキューに格納されています。
- キャンセルしています...: ジョブはキャンセル中です。
- キャンセルされました: ユーザーがジョブをキャンセルしました。
- ドレインしています...: ジョブはドレイン中です。
- ドレインされました: ユーザーがジョブをドレインしました。
- 更新しています...: ジョブは更新中です。
- 更新されました: ユーザーがジョブを更新しました。
- 完了しました: ジョブは正常に終了しました。
- 失敗しました: ジョブは完了しませんでした。
パイプラインの詳細については、ジョブの [名前] をクリックしてください。
ジョブ モニタリングのグラフへのアクセス
ジョブのモニタリング グラフにアクセスするには、Dataflow モニタリング インターフェース内のジョブの [名前] をクリックします。[ジョブの詳細] ページが開いて、次の情報が表示されます。
- ジョブのグラフ: パイプラインの視覚的表現
- ジョブの指標: ジョブの実行に関する指標
- ジョブの情報パネル: パイプラインに関する記述情報
- ジョブのログ: Dataflow サービスによってジョブレベルで生成されたログ
- ワーカーログ: Dataflow サービスによってワーカーレベルで生成されたログ
- 診断: 選択したタイムラインでエラーが発生した場所と、パイプラインに対して発生する可能性のある推奨事項を示すテーブル
- タイムセレクタ: 指標の期間を調整できるツール
ジョブの詳細ページでは、[ジョブのグラフ] と [ジョブの指標] タブを使用してジョブビューを切り替えることができます。
Cloud Monitoring アラートの作成
Dataflow は Cloud Monitoring と完全に統合されているので、ジョブがユーザー定義のしきい値を超えたときのアラートを作成できます。指標グラフから Cloud Monitoring アラートを作成するには、[通知ポリシーを作成] をクリックします。
これらのアラートの作成手順については、Dataflow パイプラインの Cloud Monitoring を使用するページをご覧ください。
全画面モード
指標のグラフを全画面表示するには、[fullscreen] をクリックします。
タイムセレクタ ツールの使用
タイムセレクタ ツールを使用すると、指標の期間を調整できます。事前定義された期間を選択するか、カスタムの時間間隔を選択してジョブを分析できます。
ストリーミングまたは処理中のバッチジョブの場合、グラフのデフォルト表示には、そのジョブの過去 6 時間の指標が表示されます。ストリーミング ジョブが停止または完了した場合、グラフのデフォルト表示では、ジョブの実行時間全体が表示されます。
ステップとワーカーの指標
次の指標のグラフを表示できます。
- データの鮮度(ストリーミング パイプラインのみ)
- システム レイテンシ(ストリーミング パイプラインのみ)
- 自動スケーリング
- スループット
- CPU 使用率
- ワーカーのエラーログのカウント
- 入力と出力の指標
これらのグラフの詳細情報を表示するには、凡例切り替えボタンをクリックして「グラフの凡例を展開」します。
データの鮮度(ストリーミング パイプラインのみ)
データの鮮度はリアルタイムと出力ウォーターマークの間の経過時間です。パイプラインの各ステップには、出力データ ウォーターマークがあります。出力データ ウォーターマークが T の場合、イベント時間が T より前のすべての要素が計算処理に使用されます。出力データ ウォーターマークは、すべての上流計算の中で最も早い入力データ ウォーターマークによって制限されます。一部の入力データがまだ処理されていない場合、出力ウォーターマークが保持され、データの更新に影響します。
システム レイテンシ(ストリーミング パイプラインのみ)
システムのレイテンシは、データ項目が処理を行っているまたは処理を待機している、現時点での最長時間です。この指標は、パイプライン内のいずれかのソース内で要素が待機する時間を秒単位で示します。最長時間は処理後に調整されます。追加の考慮事項は次のとおりです。
- 複数のソースとシンクがある場合、システムのレイテンシは、すべてのシンクに書き込まれるまでにソース内で要素が待機する最長時間になります。
- ソース内で要素が待機する時間がソースコードから提供されず、要素のメタデータにイベント時間が定義されていない場合、パイプラインが最初に要素を受信した時間からシステムのレイテンシが計算されます。
自動スケーリング
Dataflow サービスは、自動スケーリングのジョブを実行するために必要なワーカー インスタンスの数を自動的に選択します。ワーカー インスタンスの数は、ジョブのニーズに応じて経時的に変更される可能性があります。
自動スケーリングの変更履歴を確認するには、[その他の履歴] ボタンをクリックします。パイプラインのワーカーの履歴に関する情報を含むテーブルがグラフの下に表示されます。
スループット
スループットは、いずれかの時点で処理されているデータの量です。このステップごとの指標は、1 秒あたりの要素数として表示されます。この指標を 1 秒あたりのバイト数で表示するには、[スループット(要素数/秒)] arrow_drop_down、[スループット(バイト/秒)] の順にクリックします。
CPU 使用率
CPU 使用率は、使用されている CPU の量を処理可能な CPU の量で割ったものです。このワーカーごとの指標は割合として表示されます。
入力と出力の指標
ストリーミング Dataflow ジョブが Pub/Sub を使用してレコードの読み取りまたは書き込みをした場合、入力の指標と出力の指標が表示されます。
次の 3 つのグラフが、[入力の指標] セクションと [出力の指標] セクションに表示されます。
リクエスト数/秒
1 秒あたりのリクエスト数は、一定期間に発生したデータの読み取りまたは書き込みを行う API リクエストの頻度をソース別またはシンク別に集計した値です。この頻度が 0 に低下した場合、または、予想される動作に対して長時間にわたって大幅に低下している場合は、特定のオペレーションの実行がブロックされている(または、読み取るデータがない)可能性があるため、システム ウォーターマークが高いステップを確認する必要があります。また、ワーカーログを調査して、エラーや、遅い処理が発生している兆候を探します。
エラーの種類別のレスポンス エラー数/秒
1 秒あたりのエラーの種類別のレスポンス エラーは、一定期間に発生したデータの読み取りまたは書き込みを行う API リクエストの失敗の頻度をソース別またはシンク別に集計した値です。エラーが頻繁に繰り返し発生する場合は、API リクエストの処理が遅くなる可能性があるため、調査する必要があります。これらの問題のトラブルシューティングを行う際は、I/O エラーコードのドキュメントと、ソースまたはシンクで使用される特定のエラーコード(Pub/Sub のエラーコードなど)のドキュメントをご確認ください。
リクエストのレイテンシ
リクエストのレイテンシは、さまざまなパーセンタイルでの応答時間の長さです。たとえば、95 パーセンタイルは、95% のリクエストの所要時間が、表示されている時間よりも短いことを示します。特定の入力または出力に関して 99 パーセンタイルの応答時間が非常に長い場合、ホットスポット化の問題を示している可能性があります。この問題は、多数の要素が読み取りまたは書き込みされていて、サービスがリソースの過負荷を引き起こしているときに発生します。例については、Cloud Bigtable のパフォーマンスについてをご覧ください。
Metrics Explorer の使用
次の Dataflow I/O 指標は、Metrics Explorer で表示できます。
job/pubsub/write_count
: Dataflow ジョブの PubsubIO.Write からの Pub/Sub Publish リクエスト。job/pubsub/read_count
: Dataflow ジョブの Pubsub.IO.Read からの Pub/Sub Pull リクエスト。job/pubsub/write_latencies
: Dataflow ジョブの PubsubIO.Write からの Pub/Sub Publish リクエストのレイテンシ。job/pubsub/read_latencies
: Dataflow ジョブの Pubsub.IO.Read からの Pub/Sub Pull リクエストのレイテンシ。
Dataflow の指標の完全なリストについては、Google Cloud の指標のドキュメントをご覧ください。
Cloud Monitoring アラートの作成
Dataflow が Cloud Monitoring と完全統合されているので、ジョブがユーザー定義しきい値を超えたときのアラートを作成できます。
これらのアラートの作成手順については、Dataflow パイプラインの Cloud Monitoring を使用するページをご覧ください。
パイプラインを表示する
特定の Dataflow ジョブを選択すると、モニタリング インターフェースにはそのジョブのパイプラインに関する詳細情報が表示されます。この情報には、Dataflow サービスで実行されるパイプラインの図、ジョブの概要、ジョブのログ、パイプラインの各ステップに関する詳細情報が含まれます。
Dataflow のモニタリング インターフェースでは、パイプラインの図である実行グラフが提供されます。パイプラインの実行グラフでは、パイプライン内の各変換が 1 つのボックスとして表されます。各ボックスには、変換名と以下のようなジョブ ステータス情報が含まれています。
- 実行中: ステップは現在実行中です。
- キューに格納済み: FlexRS ジョブ内のステップがキューに格納されています。
- 完了しました: ステップは正常に終了しました。
- 停止しました: ジョブが停止したため、ステップは停止されました。
- 不明: ステータスを報告できませんでした。
- 失敗しました: ステップは完了しませんでした。
基本的な実行グラフ
パイプライン コード:
Java: SDK 2.x// Read the lines of the input text. p.apply("ReadLines", TextIO.read().from(options.getInputFile())) // Count the words. .apply(new CountWords()) // Write the formatted word counts to output. .apply("WriteCounts", TextIO.write().to(options.getOutput())) .to(options.getOutput())); Python( p # Read the lines of the input text. | 'ReadLines' >> beam.io.ReadFromText(options.input) # Count the words. | CountWords() # Write the formatted word counts to output. | 'WriteCounts' >> beam.io.WriteToText(options.output)) Java: SDK 1.x |
実行グラフ:
![]() |
複合変換
実行グラフでは、複合変換(複数のネストしたサブ変換を含む変換)は展開可能です。展開可能な複合変換は、グラフに矢印でマークされます。矢印をクリックして変換を展開し、その中のサブ変換を表示します。
パイプライン コード:
Java: SDK 2.x// The CountWords Composite Transform // inside the WordCount pipeline. public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> { @Override public PCollection<String> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } } 注: 右の画像の Python# The CountWords Composite Transform inside the WordCount pipeline. class CountWords(beam.PTransform): def expand(self, pcoll): return ( pcoll # Convert lines of text into individual words. | 'ExtractWords' >> beam.ParDo(ExtractWordsFn()) # Count the number of times each word occurs. | beam.combiners.Count.PerElement() # Format each word and count into a printable string. | 'FormatCounts' >> beam.ParDo(FormatCountsFn())) Java: SDK 1.x |
実行グラフ:
![]() |
変換の名前
Dataflow には、モニタリング実行グラフに表示される変換名を取得する方法がいくつかあります。
Java: SDK 2.x
- Dataflow では、変換を適用するときに割り当てる名前を使用できます。
apply
メソッドに指定する最初の引数が、変換名になります。 - Cloud Dataflow では、変換名を推定できます。(カスタム変換を作成した場合は)クラス名から、または(
ParDo
などのコア変換を使用している場合は)DoFn
関数オブジェクトの名前から推定できます。
Python
- Dataflow では、変換を適用するときに割り当てる名前を使用できます。変換の
label
引数を指定することで変換名を設定できます。 - Cloud Dataflow では、変換名を推定できます。(カスタム変換を作成した場合は)クラス名から、または(
ParDo
などのコア変換を使用している場合は)DoFn
関数オブジェクトの名前から推定できます。
Java: SDK 1.x
指標について
経過時間
ステップをクリックすると、経過時間の指標が表示されます。経過時間は、すべてのワーカーのすべてのスレッドで次のアクションにかかったおおよその合計時間です。
- ステップの初期化
- データの処理
- データのシャッフル
- ステップの終了
複合ステップの場合は、経過時間はコンポーネントのステップにかかった時間の合計です。この見積もりは、遅いステップを特定し、パイプラインのどの部分が本来よりも時間がかかっているかを診断するのに役立ちます。

Dataflow パイプラインでの時間を理解するの例をご覧ください。ここでは、経過時間(以前は合計実行時間と呼ばれていた)の指標を使用して、パイプラインが想定よりも長い時間を要している原因を調査しています。
副入力の指標
副入力の指標は、副入力のアクセス パターンとアルゴリズムがパイプラインのパフォーマンスにどのように影響するかを示します。パイプラインで副入力が使用される場合、Dataflow は副入力コレクションを永続レイヤ(ディスクなど)に書き込みます。変換では、この永続的なコレクションから副入力を読み取ります。こうした書き込みと読み取りは、ジョブの実行時間に影響を及ぼします。
Dataflow モニタリング インターフェースに副入力の指標が表示されるのは、副入力コレクションを作成または使用する変換を選択した場合です。その場合、[ステップ] タブの [副入力の指標] セクションで指標を確認できます。
副入力を作成する変換
選択した変換が副入力コレクションを作成する場合、[副入力の指標] セクションに、そのコレクションの名前と次の指標が表示されます。
- [書き込みに要した時間]: 副入力コレクションの書き込みに費やされた実行時間。
- [書き込みバイト数]: 副入力コレクションに書き込まれた合計バイト数。
- [副入力からの読み取り時間と読み取りバイト数]: 副入力コレクションを使用するすべての変換(副入力コンシューマ)に関する追加の指標を表示するテーブル。
[副入力からの読み取り時間と読み取りバイト数] テーブルには、副入力コンシューマのそれぞれについて、次の情報が含まれています。
- [副入力のコンシューマ]: 副入力コンシューマの変換名。
- [読み取りに要した時間]: 該当するコンシューマが副入力のコレクションの読み取りに費やした時間。
- [読み取りバイト数]: 該当するコンシューマが副入力コレクションから読み取ったバイト数。
パイプラインで副入力を作成する複合変換が使用されている場合、副入力を作成する特定のサブ変換が表示されるまで、複合変換を展開してください。その後、[副入力の指標] セクションに表示するサブ変換を選択します。
図 5 は、副入力コレクションを作成する変換での副入力の指標を示しています。
![サブ変換を選択して、その副入力の指標を [ステップ情報] サイドパネルに表示できます。](https://cloud.google.com/dataflow/images/monitoring-side-input-write.png?hl=ja)
MakeMapView
)が展開された状態の実行グラフ。副入力を作成するサブ変換(CreateDataflowView
)が選択されていて、[ステップ情報] サイドパネルに、そのサブ変換での副入力の指標が表示されています。1 つ以上の副入力を使用する変換
選択した変換が 1 つ以上の副入力を使用する場合、[副入力の指標] セクションには [副入力からの読み取り時間と読み取りバイト数] テーブルが表示されます。このテーブルには、副入力コレクションのそれぞれについて、次の情報が含まれています。
- [副入力のコレクション]: 副入力コレクションの名前。
- [読み取りに要した時間]: 変換で副入力コレクションの読み取りに費やされた時間。
- [読み取りバイト数]: 変換で副入力コレクションから読み取ったバイト数。
パイプラインで副入力を読み取る複合変換が使用されている場合、副入力を読み取る特定のサブ変換が表示されるまで、複合変換を展開してください。その後、[副入力の指標] セクションに表示するサブ変換を選択します。
図 6 は、副入力コレクションから読み取る変換での副入力の指標を示しています。
![変換を選択して、その副入力の指標を [ステップ情報] サイドパネルに表示できます。](https://cloud.google.com/dataflow/images/monitoring-side-input-read.png?hl=ja)
JoinBothCollections
変換。実行グラフで JoinBothCollections
が選択されていて、[ステップ情報] サイドパネルに、副入力の指標が表示されます。副入力のパフォーマンスに関する問題を識別する
副入力に関してよく発生するパフォーマンスの問題は、再反復処理です。副入力 PCollection
のサイズが大きすぎると、ワーカーはコレクション全体をメモリ内のキャッシュに保存できません。そのため、ワーカーは永続的な副入力コレクションからの読み取りを繰り返さなければならなくなります。
図 7 の副入力の指標を見ると、副入力コレクションから読み取られた合計バイト数がコレクションのサイズ(合計書き込みバイト数)より遥かに大きいことがわかります。
![変換を選択して、その副入力の指標を [ステップ情報] サイドパネルに表示できます。](https://cloud.google.com/dataflow/images/monitoring-side-input-reiteration.png?hl=ja)
このパイプラインのパフォーマンスを向上させるには、副入力データの反復処置や再取得が回避されるようにアルゴリズムを再設計する必要があります。この例では、パイプラインによって 2 つのコレクションのデカルト積が生成されています。これは、アルゴリズムでメインのコレクションに含まれる要素ごとに、副入力コレクション全体が反復処理されているためです。したがって、メインのコレクションに含まれる複数の要素をまとめてバッチ処理することで、パイプラインのアクセス パターンを改善できます。このように変更すると、ワーカーが副入力コレクションを再読み取りしなければならない回数が減ります。
パイプラインで結合を実行するために、1 つ以上のサイズの大きい副入力で ParDo
を適用すると、別の一般的なパフォーマンス問題が発生することがあります。その場合、ワーカーは結合の実行時間のほとんどの部分を副入力コレクションからの読み取りに費やします。
図 8 は、この問題に関する副入力の指標の例を示しています。
![変換を選択して、その副入力の指標を [ステップ情報] サイドパネルに表示できます。](https://cloud.google.com/dataflow/images/monitoring-side-input-read.png?hl=ja)
JoinBothCollections
変換の合計実行時間は 18 分 31 秒です。ワーカーは実行時間の大半(10 分 3 秒)をサイズ 10 GB の副入力コレクションの読み取りに費やしています。このパイプラインのパフォーマンスを向上させるには、副入力の代わりに CoGroupByKey を使用します。
エラー報告
Error Reporting インターフェースは、パイプラインで発生したエラーを集約して表示します。
エラーレポートには次のものが含まれます。
- エラーとエラー メッセージのリスト。
- 各エラーが発生した回数。
- 各エラーがいつ発生したかを示すヒストグラム。
- 直近のエラーの発生時刻。
パイプラインのエラーレポートを表示するには、パイプライン グラフの上にある [ログ] メニューをクリックし、パイプライン グラフの下にある [スタック トレース] タブをクリックします。Dataflow Monitoring Interface に、記録された各エラーの概要と発生回数が表示されます。
ワーカーのエラーログのカウントパネルには、特定の時点でのすべてのワーカーで観測されたエラーの割合が表示されます。

エラーの詳細を表示するには、エラーの概要をクリックします。Error Reporting インターフェースが表示されます。

Error Reporting インターフェースには、このほかの機能もあります。パイプラインで発生するエラーについて詳しくは、エラーの表示をご覧ください。