Dataflow パイプラインに Cloud Monitoring を使用する

Cloud Monitoring は、強力なロギングと診断機能を提供します。Dataflow に Monitoring を統合すると、ジョブ ステータス、要素数、システムラグ(ストリーミング ジョブ用)、ユーザー カウンタなどの Dataflow ジョブ指標に Monitoring のダッシュボードからアクセスできるようになります。また、Monitoring アラートを使用して、長いストリーミングのシステムラグや失敗したジョブなど、さまざまな条件の通知を受け取ることもできます。

始める前に

Java クイックスタートPython クイックスタート、または Go クイックスタートに従って、Dataflow プロジェクトを設定します。次に、パイプラインを作成して実行します。

Metrics Explorer でログを表示するには、ワーカー サービス アカウントroles/monitoring.metricWriter ロールが必要です。

カスタム指標

Apache Beam パイプラインで定義した指標は、Dataflow からカスタム指標として Monitoring に通知されます。Apache Beam には、CounterDistributionGauge3 種類のパイプライン指標があります。

  • Dataflow は、Counter 指標と Distribution 指標を Monitoring に報告します。
  • Distribution は、_MAX_MIN_MEAN_COUNT が末尾に付加された 4 種類のサブ指標として通知されます。
  • Dataflow は、Distribution 指標からヒストグラムを作成する機能はサポートしていません。
  • Dataflow は、ほぼ 30 秒ごとに Monitoring へ差分の更新を通知します。
  • 競合を避けるため、Dataflow カスタム指標はすべて double データ型としてエクスポートされます。
  • わかりやすくするため、すべての Dataflow カスタム指標は GAUGE 指標種類としてエクスポートされます。次の Monitoring Query Language の例に示すように、GAUGE 指標の時間枠での差分をモニタリングできます。

     fetch dataflow_job
     | metric 'dataflow.googleapis.com/job/user_counter'
     | filter (metric.job_id == '[JobID]')
     | delta 1m
     | group_by 1m, [value_user_counter_mean: mean(value.user_counter)]
     | every 1m
     | group_by [metric.ptransform, metric.metric_name],
       [value_user_counter_mean_aggregate: aggregate(value_user_counter_mean)]
    
  • Dataflow のカスタム指標は、Monitoring で dataflow.googleapis.com/job/user_counter として metric_name: metric-nameptransform: ptransform-name というラベル付きで表示されます。

  • 下位互換性を維持するために、Dataflow はカスタム指標を custom.googleapis.com/dataflow/metric-name として Monitoring に報告します。

  • Dataflow のカスタム指標には、Monitoring のカーディナリティの制限が適用されます。

  • 各プロジェクトで使用できる Dataflow カスタム指標の上限は 100 個です。これらの指標は custom.googleapis.com/dataflow/metric-name として公開されます。

Monitoring に報告されるカスタム指標は、Cloud Monitoring の料金に基づいて課金されます。

Metrics Explorer を使用する

Monitoring を使用して Dataflow の指標を調べます。このセクションの手順に沿って、Apache Beam パイプラインごとに提供される標準指標を確認します。Metrics Explorer の使用の詳細については、Metrics Explorer でグラフを作成するをご覧ください。

  1. Google Cloud コンソールで [Monitoring] を選択します。

    [Monitoring] に移動

  2. ナビゲーション パネルで、[Metrics Explorer] を選択します。

  3. [指標を選択] ペインで、フィルタに「Dataflow Job」と入力します。

  4. 表示されたリストから、いずれかのジョブについて確認する指標を選択します。

Dataflow ジョブを実行する場合は、ソースとシンクの指標もモニタリングすることをおすすめします。たとえば、BigQuery Storage API 指標をモニタリングできます。詳細については、ダッシュボード、グラフ、アラートの作成BigQuery Data Transfer Service の指標の完全なリストをご覧ください。

アラート ポリシーとダッシュボードを作成する

Monitoring では、Dataflow 関連の指標にアクセスできます。指標の時系列をグラフ化するダッシュボードを作成し、指標が指定値に達したときに通知するアラート ポリシーを作成します。

リソースのグループを作成する

アラートの設定とダッシュボードの作成を簡単にするために、複数の Apache Beam パイプラインを含むリソース グループを作成します。

  1. Google Cloud コンソールで [Monitoring] を選択します。

    [Monitoring] に移動

  2. ナビゲーション パネルで [グループ] を選択します。

  3. [CREATE GROUP] をクリックします。

  4. グループの名前を入力します。

  5. グループに含まれる Dataflow リソースを定義するフィルタ条件を追加します。たとえば、フィルタ条件の 1 つの名前をパイプラインの接頭辞にできます。 グループを作成します。

  6. グループが作成されると、そのグループ内のリソースに関連する基本指標を表示できるようになります。

Dataflow の指標に対するアラート ポリシーを作成する

Monitoring を使用すると、指標が指定されたしきい値を超えたときのアラートを作成し、通知を受け取ることができます。たとえば、ストリーミング パイプラインのシステムラグが事前定義の値を超えて増加した場合に通知を受け取ることができます。

  1. Google Cloud コンソールで [Monitoring] を選択します。

    [Monitoring] に移動

  2. ナビゲーション パネルで [アラート] を選択します。

  3. [CREATE POLICY] をクリックします。

  4. [Create new alerting policy] ページで、アラートの条件と通知チャネルを定義します。
    たとえば、WindowedWordCount Apache Beam パイプライン グループのシステムラグに対するアラートを設定する場合は、次の操作を行います。

    1. [指標を選択] をクリックします。
    2. [指標を選択してください] フィールドで「Dataflow Job」と入力します。
    3. 指標のカテゴリで [Job] を選択します。
    4. 指標で [System lag] を選択します。

    アラートがトリガーされるたびに、インシデントと対応するイベントが作成されます。アラートにメールや SMS などの通知メカニズムを指定した場合は、通知も受け取ります。

カスタム モニタリング ダッシュボードを作成する

最も関連性の高い Dataflow 関連のグラフからなる Monitoring ダッシュボードを作成できます。ダッシュボードにグラフを追加するには、次のようにします。

  1. Google Cloud コンソールで [Monitoring] を選択します。

    [Monitoring] に移動

  2. ナビゲーション パネルで [ダッシュボード] を選択します。

  3. [CREATE DASHBOARD] をクリックします。

  4. [ウィジェットを追加] をクリックします。

  5. [ウィジェットを追加] ウィンドウで、[Data] として [指標] を選択します。

  6. [指標を選択] ペインの [指標] に「Dataflow Job」と入力します。

  7. 指標カテゴリと指標を選択します。

ダッシュボードにいくつでもグラフを追加できます。

Monitoring エージェントからワーカー VM の指標を受信する

Monitoring を使用して、永続ディスク、CPU、ネットワーク、プロセスの指標をモニタリングできます。パイプラインを実行するときに、Dataflow ワーカー VM インスタンスから Monitoring エージェントを有効にします。使用可能な Monitoring エージェントの指標のリストをご覧ください。

Monitoring エージェントを有効にするには、パイプラインの実行時に --experiments=enable_stackdriver_agent_metrics オプションを使用します。ワーカー サービス アカウントには、roles/monitoring.metricWriter ロールが必要です。

パイプラインを停止せずに Monitoring エージェントを無効にするには、--experiments=enable_stackdriver_agent_metrics パラメータを指定せずに置換ジョブを起動して、パイプラインを更新します。

保存と保持

完了またはキャンセルされた Dataflow ジョブに関する情報は 30 日間保存されます。

オペレーション ログは、_Default ログバケットに保存されます。Logging API サービス名は dataflow.googleapis.com です。Cloud Logging で使用される Google Cloud のモニタリング対象リソースタイプとサービスの詳細については、モニタリング対象リソースとサービスをご覧ください。

Logging でログエントリが保持される期間の詳細については、割り当てと上限: ログの保持期間で保持情報をご覧ください。

オペレーション ログの表示方法については、パイプライン ログをモニタリングおよび表示するをご覧ください。

次のステップ

詳細を確認するには、以下の他のリソースをご覧ください。