Apache Flink を統合すると、クライアント、JobManager、TaskManager のログが収集され、JSON ペイロードが生成されます。結果には、logger、level、message のフィールドが含まれます。
Flink の詳細については、Apache Flink のドキュメントをご覧ください。
前提条件
Flink テレメトリーを収集するには、Ops エージェントをインストールする必要があります。
- ログの場合は、バージョン 2.17.0 以降をインストールします。
- 指標の場合は、バージョン 2.18.1 以降をインストールします。
このインテグレーションは Flink バージョン 1.12.5、1.13.6、1.14.4 をサポートします。
Flink 用に Ops エージェントを構成する
Ops エージェントの構成のガイドに従って、Flink インスタンスからテレメトリーを収集するために必要な要素を追加し、エージェントを再起動します。
構成の例
次のコマンドは、Flink のテレメトリーを収集して取り込み、Ops エージェントを再起動するための構成を作成します。
ログの収集を構成する
Flink からログを取り込むには、Flink が生成するログのレシーバを作成してから、新しいレシーバ用のパイプラインを作成する必要があります。
flink
ログのレシーバを構成するには、次のフィールドを指定します。
フィールド | デフォルト | 説明 |
---|---|---|
exclude_paths |
include_paths の照合で除外するファイルシステム パスのパターンのリスト。 |
|
include_paths |
[/opt/flink/log/flink-*-standalonesession-*.log, /opt/flink/log/flink-*-taskexecutor-*.log, /opt/flink/log/flink-*-client-*.log] |
各ファイルのテーリングで読み込むファイルシステムのパスのリスト。パスにはワイルドカード(* )を使用できます。 |
record_log_file_path |
false |
true に設定すると、ログレコードの取得元のファイルのパスが agent.googleapis.com/log_file_path ラベルの値として出力ログエントリに表示されます。ワイルドカードを使用する場合、レコードを取得したファイルのパスのみが記録されます。 |
type |
値には flink を指定してください。 |
|
wildcard_refresh_interval |
60s |
include_paths のワイルドカード ファイルのパスの更新間隔。時間を指定します(例: 30s 、2m )。このプロパティは、ログファイルのローテーションがデフォルトの間隔よりも速く、ロギングのスループットが高い場合に有用です。 |
ログの内容
logName
は、構成で指定されたレシーバ ID から取得されます。LogEntry
内の詳細なフィールドは、次のとおりです。
flink
ログの LogEntry
には次のフィールドが含まれます。
フィールド | タイプ | 説明 |
---|---|---|
jsonPayload.level |
文字列 | ログエントリ レベル |
jsonPayload.message |
文字列 | ログ メッセージ(指定した場合の詳細なスタック トレースを含む)。 |
jsonPayload.source |
文字列 | ログエントリのソース Java クラス。 |
severity |
文字列(LogSeverity ) |
ログエントリ レベル(変換済み)。 |
指標の収集を構成する
Flink から指標を取り込むには、Flink が生成する指標のレシーバーを作成してから、新しいレシーバー用のパイプラインを作成する必要があります。
このレシーバーでは、複数のエンドポイントのモニタリングなど、構成で複数のインスタンスを使用することはできません。このようなインスタンスはすべて同じ時系列に書き込まれるため、Cloud Monitoring ではインスタンスを区別できません。
flink
指標のレシーバーを構成するには、次のフィールドを指定します。
フィールド | デフォルト | 説明 |
---|---|---|
collection_interval |
60s |
time.Duration 値(例: 30s 、5m )。 |
endpoint |
http://localhost:8081 |
Flink によって公開される URL。 |
type |
値は flink を指定してください。 |
モニタリング対象
次の表に、Ops エージェントが Flink インスタンスから収集する指標の一覧を示します。
指標タイプ | |
---|---|
種類、タイプ モニタリング対象リソース |
ラベル |
workload.googleapis.com/flink.job.checkpoint.count
|
|
CUMULATIVE 、INT64 gce_instance |
host_name
job_name
checkpoint
|
workload.googleapis.com/flink.job.checkpoint.in_progress
|
|
GAUGE 、INT64 gce_instance |
host_name
job_name
|
workload.googleapis.com/flink.job.last_checkpoint.size
|
|
GAUGE 、INT64 gce_instance |
host_name
job_name
|
workload.googleapis.com/flink.job.last_checkpoint.time
|
|
GAUGE 、INT64 gce_instance |
host_name
job_name
|
workload.googleapis.com/flink.job.restart.count
|
|
CUMULATIVE 、INT64 gce_instance |
host_name
job_name
|
workload.googleapis.com/flink.jvm.class_loader.classes_loaded
|
|
CUMULATIVE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.cpu.load
|
|
GAUGE 、DOUBLE gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.cpu.time
|
|
CUMULATIVE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.gc.collections.count
|
|
CUMULATIVE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
garbage_collector_name
|
workload.googleapis.com/flink.jvm.gc.collections.time
|
|
CUMULATIVE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
garbage_collector_name
|
workload.googleapis.com/flink.jvm.memory.direct.total_capacity
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.direct.used
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.heap.committed
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.heap.max
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.heap.used
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.mapped.total_capacity
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.mapped.used
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.metaspace.committed
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.metaspace.max
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.metaspace.used
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.nonheap.committed
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.nonheap.max
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.nonheap.used
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.jvm.threads.count
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.memory.managed.total
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.memory.managed.used
|
|
GAUGE 、INT64 gce_instance |
host_name
resource_type
taskmanager_id
|
workload.googleapis.com/flink.operator.record.count
|
|
CUMULATIVE 、INT64 gce_instance |
host_name
taskmanager_id
job_name
operator_name
task_name
subtask_index
record
|
workload.googleapis.com/flink.operator.watermark.output
|
|
GAUGE 、INT64 gce_instance |
host_name
job_name
operator_name
subtask_index
task_name
taskmanager_id
|
workload.googleapis.com/flink.task.record.count
|
|
CUMULATIVE 、INT64 gce_instance |
host_name
taskmanager_id
job_name
task_name
subtask_index
record
|
構成を確認する
このセクションでは、Flink レシーバが正しく構成されていることを確認する方法について説明します。Ops エージェントがテレメトリーの収集を開始するまでに 1~2 分かかる場合があります。
Flink ログが Cloud Logging に送信されていることを確認するには、次のようにします。
-
Google Cloud コンソールのナビゲーション パネルで、[ロギング] を選択してから、[ログ エクスプローラ] を選択します。
- エディタに次のクエリを入力し、[クエリを実行] をクリックします。
resource.type="gce_instance" log_id("flink")
Flink 指標が Cloud Monitoring に送信されていることを確認するには、次のようにします。
-
Google Cloud コンソールのナビゲーション パネルで [Monitoring] を選択し、次に [leaderboard Metrics Explorer] を選択します。
- クエリビルダー ペインのツールバーで、[codeMQL] または [codePROMQL] という名前のボタンを選択します。
- [言語] で [MQL] が選択されていることを確認します。言語切り替えボタンは、クエリの書式設定と同じツールバーにあります。
- エディタに次のクエリを入力し、[クエリを実行] をクリックします。
fetch gce_instance | metric 'workload.googleapis.com/flink.jvm.memory.heap.used' | every 1m
ダッシュボードを表示する
Flink 指標を表示するには、グラフまたはダッシュボードが構成されている必要があります。Flink インテグレーションには、1 つ以上のダッシュボードが含まれています。インテグレーションを構成して Ops エージェントが指標データの収集を開始すると、ダッシュボードは自動的にインストールされます。
インテグレーションをインストールすることなく、ダッシュボードの静的プレビューを表示することもできます。
インストールされているダッシュボードを表示する手順は次のとおりです。
-
Google Cloud コンソールのナビゲーション パネルで、[Monitoring] を選択してから、[ダッシュボード] を選択します。
- [ダッシュボード リスト] タブを選択し、[統合] カテゴリを選択します。
- 表示するダッシュボードの名前をクリックします。
インテグレーションを構成してもダッシュボードがインストールされていない場合は、Ops エージェントが実行されていることを確認します。ダッシュボードにグラフの指標データがない場合、ダッシュボードのインストールは失敗します。Ops エージェントが指標の収集を開始した後に、ダッシュボードがインストールされます。
ダッシュボードの静的プレビューを表示する手順は次のとおりです。
-
Google Cloud コンソールのナビゲーション パネルで、[Monitoring] を選択してから、[インテグレーション] を選択します。
- [デプロイメント プラットフォーム] フィルタの [Compute Engine] をクリックします。
- Flink のエントリを見つけて [詳細を表示] をクリックします。
- [ダッシュボード] タブを選択すると、静的プレビューが表示されます。ダッシュボードがインストールされている場合は、[ダッシュボードを表示] をクリックして移動できます。
Cloud Monitoring のダッシュボードについて詳しくは、ダッシュボードとグラフをご覧ください。
[インテグレーション] ページの使用方法については、インテグレーションを管理するをご覧ください。
アラート ポリシーをインストールする
アラート ポリシーは、指定した条件が成立した際に通知するように Cloud Monitoring に指示します。Flink インテグレーションには、使用する 1 つ以上のアラート ポリシーが含まれています。これらのアラート ポリシーは、Monitoring の [インテグレーション] ページで表示してインストールできます。
使用可能なアラート ポリシーの説明を表示してインストールする手順は次のとおりです。
-
Google Cloud コンソールのナビゲーション パネルで、[Monitoring] を選択してから、[インテグレーション] を選択します。
- Flink のエントリを見つけて [詳細を表示] をクリックします。
- [アラート] タブを選択します。このタブには、利用可能なアラート ポリシーの説明と、それらをインストールするためのインターフェースが表示されます。
- アラート ポリシーをインストールします。アラート ポリシーでは、アラートがトリガーされた通知の送信先を特定する必要があるため、インストール環境の情報が必要になります。アラート ポリシーをインストールする手順は次のとおりです。
- 利用可能なアラート ポリシーのリストから、インストールするアラート ポリシーを選択します。
[通知の構成] セクションで、1 つ以上の通知チャンネルを選択します。通知チャンネルの使用を無効にすることもできますが、無効にすると、アラート ポリシーは通知なく起動します。Monitoring でステータスを確認できますが、通知は受信しません。
通知チャンネルの詳細については、通知チャンネルを管理するをご覧ください。
- [ポリシーの作成] をクリックします。
Cloud Monitoring のアラート ポリシーの詳細については、アラートの概要をご覧ください。
[インテグレーション] ページの使用方法については、インテグレーションを管理するをご覧ください。
次のステップ
Ansible を使用して Ops エージェントをインストールし、サードパーティ アプリケーションを構成してサンプル ダッシュボードをインストールする方法については、Ops エージェントをインストールして、サードパーティ アプリケーションのトラブルシューティングを行うの動画をご覧ください。