Apache Flink

Apache Flink を統合すると、クライアント、JobManager、TaskManager のログが収集され、JSON ペイロードが生成されます。結果には、logger、level、message のフィールドが含まれます。

Flink の詳細については、Apache Flink のドキュメントをご覧ください。

前提条件

Flink テレメトリーを収集するには、Ops エージェントをインストールする必要があります。

  • ログの場合は、バージョン 2.17.0 以降をインストールします。
  • 指標の場合は、バージョン 2.18.1 以降をインストールします。

このインテグレーションは Flink バージョン 1.12.5、1.13.6、1.14.4 をサポートします。

Flink 用に Ops エージェントを構成する

Ops エージェントの構成のガイドに従って、Flink インスタンスからテレメトリーを収集するために必要な要素を追加し、エージェントを再起動します。

構成の例

次のコマンドは、Flink のテレメトリーを収集して取り込み、Ops エージェントを再起動するための構成を作成します。

# Configures Ops Agent to collect telemetry from the app and restart Ops Agent.

set -e

# Create a back up of the existing file so existing configurations are not lost.
sudo cp /etc/google-cloud-ops-agent/config.yaml /etc/google-cloud-ops-agent/config.yaml.bak

# Configure the Ops Agent.
sudo tee /etc/google-cloud-ops-agent/config.yaml > /dev/null << EOF
metrics:
  receivers:
    flink:
      type: flink
  service:
    pipelines:
      flink:
        receivers:
          - flink
logging:
  receivers:
    flink:
      type: flink
  service:
    pipelines:
      flink:
        receivers:
          - flink
EOF

sudo service google-cloud-ops-agent restart
sleep 30

Flink からログを取り込むには、Flink が生成するログのレシーバを作成してから、新しいレシーバ用のパイプラインを作成する必要があります。

flink ログのレシーバを構成するには、次のフィールドを指定します。

フィールド デフォルト 説明
exclude_paths include_paths の照合で除外するファイルシステム パスのパターンのリスト。
include_paths [/opt/flink/log/flink-*-standalonesession-*.log, /opt/flink/log/flink-*-taskexecutor-*.log, /opt/flink/log/flink-*-client-*.log] 各ファイルのテーリングで読み込むファイルシステムのパスのリスト。パスにはワイルドカード(*)を使用できます。
record_log_file_path false true に設定すると、ログレコードの取得元のファイルのパスが agent.googleapis.com/log_file_path ラベルの値として出力ログエントリに表示されます。ワイルドカードを使用する場合、レコードを取得したファイルのパスのみが記録されます。
type 値には flink を指定してください。
wildcard_refresh_interval 60s include_paths のワイルドカード ファイルのパスの更新間隔。時間を指定します(例: 30s2m)。このプロパティは、ログファイルのローテーションがデフォルトの間隔よりも速く、ロギングのスループットが高い場合に有用です。

ログの内容

logName は、構成で指定されたレシーバ ID から取得されます。LogEntry 内の詳細なフィールドは、次のとおりです。

flink ログの LogEntry には次のフィールドが含まれます。

フィールド タイプ 説明
jsonPayload.level 文字列 ログエントリ レベル
jsonPayload.message 文字列 ログ メッセージ(指定した場合の詳細なスタック トレースを含む)。
jsonPayload.source 文字列 ログエントリのソース Java クラス。
severity 文字列(LogSeverity ログエントリ レベル(変換済み)。

Flink から指標を取り込むには、Flink が生成する指標のレシーバーを作成してから、新しいレシーバー用のパイプラインを作成する必要があります。

このレシーバーでは、複数のエンドポイントのモニタリングなど、構成で複数のインスタンスを使用することはできません。このようなインスタンスはすべて同じ時系列に書き込まれるため、Cloud Monitoring ではインスタンスを区別できません。

flink 指標のレシーバーを構成するには、次のフィールドを指定します。

フィールド デフォルト 説明
collection_interval 60s time.Duration 値(例: 30s5m)。
endpoint http://localhost:8081 Flink によって公開される URL。
type 値は flink を指定してください。

モニタリング対象

次の表に、Ops エージェントが Flink インスタンスから収集する指標の一覧を示します。

指標タイプ
種類、タイプ
モニタリング対象リソース
ラベル
workload.googleapis.com/flink.job.checkpoint.count
CUMULATIVEINT64
gce_instance
host_name
job_name
checkpoint
workload.googleapis.com/flink.job.checkpoint.in_progress
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.last_checkpoint.size
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.last_checkpoint.time
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.restart.count
CUMULATIVEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.jvm.class_loader.classes_loaded
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.cpu.load
GAUGEDOUBLE
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.cpu.time
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.gc.collections.count
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
garbage_collector_name
workload.googleapis.com/flink.jvm.gc.collections.time
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
garbage_collector_name
workload.googleapis.com/flink.jvm.memory.direct.total_capacity
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.direct.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.mapped.total_capacity
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.mapped.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.threads.count
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.memory.managed.total
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.memory.managed.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.operator.record.count
CUMULATIVEINT64
gce_instance
host_name
taskmanager_id
job_name
operator_name
task_name
subtask_index
record
workload.googleapis.com/flink.operator.watermark.output
GAUGEINT64
gce_instance
host_name
job_name
operator_name
subtask_index
task_name
taskmanager_id
workload.googleapis.com/flink.task.record.count
CUMULATIVEINT64
gce_instance
host_name
taskmanager_id
job_name
task_name
subtask_index
record

構成を確認する

このセクションでは、Flink レシーバが正しく構成されていることを確認する方法について説明します。Ops エージェントがテレメトリーの収集を開始するまでに 1~2 分かかる場合があります。

Flink ログが Cloud Logging に送信されていることを確認するには、次のようにします。

  1. Google Cloud コンソールのナビゲーション パネルで、[ロギング] を選択してから、[ログ エクスプローラ] を選択します。

    [ログ エクスプローラ] に移動

  2. エディタに次のクエリを入力し、[クエリを実行] をクリックします。
    resource.type="gce_instance"
    log_id("flink")
    

Flink 指標が Cloud Monitoring に送信されていることを確認するには、次のようにします。

  1. Google Cloud コンソールのナビゲーション パネルで [Monitoring] を選択し、次に [ Metrics Explorer] を選択します。

    Metrics Explorer に移動

  2. クエリビルダー ペインのツールバーで、[MQL] または [PROMQL] という名前のボタンを選択します。
  3. [言語] で [MQL] が選択されていることを確認します。言語切り替えボタンは、クエリの書式設定と同じツールバーにあります。
  4. エディタに次のクエリを入力し、[クエリを実行] をクリックします。
    fetch gce_instance
    | metric 'workload.googleapis.com/flink.jvm.memory.heap.used'
    | every 1m
    

ダッシュボードを表示する

Flink 指標を表示するには、グラフまたはダッシュボードが構成されている必要があります。Flink インテグレーションには、1 つ以上のダッシュボードが含まれています。インテグレーションを構成して Ops エージェントが指標データの収集を開始すると、ダッシュボードは自動的にインストールされます。

インテグレーションをインストールすることなく、ダッシュボードの静的プレビューを表示することもできます。

インストールされているダッシュボードを表示する手順は次のとおりです。

  1. Google Cloud コンソールのナビゲーション パネルで、[Monitoring] を選択してから、[ダッシュボード] を選択します。

    [ダッシュボード] に移動

  2. [ダッシュボード リスト] タブを選択し、[統合] カテゴリを選択します。
  3. 表示するダッシュボードの名前をクリックします。

インテグレーションを構成してもダッシュボードがインストールされていない場合は、Ops エージェントが実行されていることを確認します。ダッシュボードにグラフの指標データがない場合、ダッシュボードのインストールは失敗します。Ops エージェントが指標の収集を開始した後に、ダッシュボードがインストールされます。

ダッシュボードの静的プレビューを表示する手順は次のとおりです。

  1. Google Cloud コンソールのナビゲーション パネルで、[Monitoring] を選択してから、[インテグレーション] を選択します。

    [インテグレーション] に移動

  2. [デプロイメント プラットフォーム] フィルタの [Compute Engine] をクリックします。
  3. Flink のエントリを見つけて [詳細を表示] をクリックします。
  4. [ダッシュボード] タブを選択すると、静的プレビューが表示されます。ダッシュボードがインストールされている場合は、[ダッシュボードを表示] をクリックして移動できます。

Cloud Monitoring のダッシュボードについて詳しくは、ダッシュボードとグラフをご覧ください。

[インテグレーション] ページの使用方法については、インテグレーションを管理するをご覧ください。

アラート ポリシーをインストールする

アラート ポリシーは、指定した条件が成立した際に通知するように Cloud Monitoring に指示します。Flink インテグレーションには、使用する 1 つ以上のアラート ポリシーが含まれています。これらのアラート ポリシーは、Monitoring の [インテグレーション] ページで表示してインストールできます。

使用可能なアラート ポリシーの説明を表示してインストールする手順は次のとおりです。

  1. Google Cloud コンソールのナビゲーション パネルで、[Monitoring] を選択してから、[インテグレーション] を選択します。

    [インテグレーション] に移動

  2. Flink のエントリを見つけて [詳細を表示] をクリックします。
  3. [アラート] タブを選択します。このタブには、利用可能なアラート ポリシーの説明と、それらをインストールするためのインターフェースが表示されます。
  4. アラート ポリシーをインストールします。アラート ポリシーでは、アラートがトリガーされた通知の送信先を特定する必要があるため、インストール環境の情報が必要になります。アラート ポリシーをインストールする手順は次のとおりです。
    1. 利用可能なアラート ポリシーのリストから、インストールするアラート ポリシーを選択します。
    2. [通知の構成] セクションで、1 つ以上の通知チャンネルを選択します。通知チャンネルの使用を無効にすることもできますが、無効にすると、アラート ポリシーは通知なく起動します。Monitoring でステータスを確認できますが、通知は受信しません。

      通知チャンネルの詳細については、通知チャンネルを管理するをご覧ください。

    3. [ポリシーの作成] をクリックします。

Cloud Monitoring のアラート ポリシーの詳細については、アラートの概要をご覧ください。

[インテグレーション] ページの使用方法については、インテグレーションを管理するをご覧ください。

次のステップ

Ansible を使用して Ops エージェントをインストールし、サードパーティ アプリケーションを構成してサンプル ダッシュボードをインストールする方法については、Ops エージェントをインストールして、サードパーティ アプリケーションのトラブルシューティングを行うの動画をご覧ください。