Apache Flink を統合すると、クライアント、JobManager、TaskManager のログが収集され、JSON ペイロードが生成されます。結果には、source、level、message のフィールドが含まれます。
Flink の詳細については、Apache Flink のドキュメントをご覧ください。
前提条件
Flink テレメトリーを収集するには、Ops エージェントをインストールする必要があります。
- 指標の場合は、バージョン 2.18.1 以降をインストールします。
- ログの場合は、バージョン 2.17.0 以降をインストールします。
このインテグレーションは Flink バージョン 1.12.5、1.13.6、1.14.4 をサポートします。
Flink 用に Ops エージェントを構成する
Ops エージェントの構成のガイドに従って、Flink インスタンスからテレメトリーを収集するために必要な要素を追加し、エージェントを再起動します。
構成の例
次のコマンドは、Flink のテレメトリーを収集して取り込む構成を作成します。
これらの変更を有効にするには、Ops エージェントを再起動する必要があります。
Linux
- エージェントを再起動するには、インスタンスで次のコマンドを実行します。sudo systemctl restart google-cloud-ops-agent 
- エージェントが再起動したことを確認するには、次のコマンドを実行して「Metrics Agent」と「Logging エージェント」のコンポーネントが起動したことを確認します。sudo systemctl status "google-cloud-ops-agent*" 
Windows
- RDP または同様のツールを使用してインスタンスに接続し、Windows にログインします。
- PowerShell アイコンを右クリックし、[管理者として実行] を選択して、管理者権限で PowerShell ターミナルを開きます。
- エージェントを再起動するには、次の PowerShell コマンドを実行します。Restart-Service google-cloud-ops-agent -Force 
- エージェントが再起動したことを確認するには、次のコマンドを実行して「Metrics Agent」と「Logging エージェント」のコンポーネントが起動したことを確認します。Get-Service google-cloud-ops-agent* 
ログの収集を構成する
Flink からログを取り込むには、Flink が生成するログのレシーバーを作成してから、新しいレシーバー用のパイプラインを作成する必要があります。
flink ログのレシーバーを構成するには、次のフィールドを指定します。
| フィールド | デフォルト | 説明 | 
|---|---|---|
| exclude_paths | include_pathsの照合で除外するファイルシステム パスのパターンのリスト。 | |
| include_paths | [/opt/flink/log/flink-*-standalonesession-*.log, /opt/flink/log/flink-*-taskexecutor-*.log, /opt/flink/log/flink-*-client-*.log] | 各ファイルのテーリングで読み込むファイルシステムのパスのリスト。パスにはワイルドカード( *)を使用できます。 | 
| record_log_file_path | false | trueに設定すると、ログレコードの取得元のファイルのパスがagent.googleapis.com/log_file_pathラベルの値として出力ログエントリに表示されます。ワイルドカードを使用する場合、レコードを取得したファイルのパスのみが記録されます。 | 
| type | この値は、 flinkにする必要があります。 | |
| wildcard_refresh_interval | 60s | include_pathsのワイルドカード ファイルのパスの更新間隔。期間を指定します(例:30s、2m)。このプロパティは、ログファイルのローテーションがデフォルトの間隔よりも速く、ロギングのスループットが高い場合に有用です。 | 
ログの内容
logName は、構成で指定されたレシーバー ID から取得されます。LogEntry 内の詳細なフィールドは、次のとおりです。
flink ログの LogEntry には次のフィールドが含まれます。
| フィールド | タイプ | 説明 | 
|---|---|---|
| jsonPayload.level | 文字列 | ログエントリ レベル | 
| jsonPayload.message | 文字列 | ログ メッセージ(指定した場合の詳細なスタックトレースを含む) | 
| jsonPayload.source | 文字列 | ログエントリのソース Java クラス | 
| severity | 文字列( LogSeverity) | ログエントリ レベル(変換済み)。 | 
指標の収集を構成する
Flink から指標を取り込むには、Flink が生成する指標のレシーバーを作成してから、新しいレシーバー用のパイプラインを作成する必要があります。
このレシーバーでは、複数のエンドポイントのモニタリングなど、構成で複数のインスタンスを使用することはできません。このようなインスタンスはすべて同じ時系列に書き込まれるため、Cloud Monitoring ではインスタンスを区別できません。
flink 指標のレシーバーを構成するには、次のフィールドを指定します。
| フィールド | デフォルト | 説明 | 
|---|---|---|
| collection_interval | 60s | 期間の値(例: 30s、5m)。 | 
| endpoint | http://localhost:8081 | Flink によって公開される URL。 | 
| type | 値は、 flinkにする必要があります。 | 
モニタリング対象
次の表に、Ops エージェントが Flink インスタンスから収集する指標の一覧を示します。
| 指標タイプ | |
|---|---|
| 種類、タイプ モニタリング対象リソース | ラベル | 
| workload.googleapis.com/flink.job.checkpoint.count | |
| CUMULATIVE、INT64gce_instance | checkpointhost_namejob_name | 
| workload.googleapis.com/flink.job.checkpoint.in_progress | |
| GAUGE、INT64gce_instance | host_namejob_name | 
| workload.googleapis.com/flink.job.last_checkpoint.size | |
| GAUGE、INT64gce_instance | host_namejob_name | 
| workload.googleapis.com/flink.job.last_checkpoint.time | |
| GAUGE、INT64gce_instance | host_namejob_name | 
| workload.googleapis.com/flink.job.restart.count | |
| CUMULATIVE、INT64gce_instance | host_namejob_name | 
| workload.googleapis.com/flink.jvm.class_loader.classes_loaded | |
| CUMULATIVE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.cpu.load | |
| GAUGE、DOUBLEgce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.cpu.time | |
| CUMULATIVE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.gc.collections.count | |
| CUMULATIVE、INT64gce_instance | garbage_collector_namehost_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.gc.collections.time | |
| CUMULATIVE、INT64gce_instance | garbage_collector_namehost_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.memory.direct.total_capacity | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.memory.direct.used | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.memory.heap.committed | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.memory.heap.max | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.memory.heap.used | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.memory.mapped.total_capacity | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.memory.mapped.used | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.memory.metaspace.committed | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.memory.metaspace.max | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.memory.metaspace.used | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.memory.nonheap.committed | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.memory.nonheap.max | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.memory.nonheap.used | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.jvm.threads.count | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.memory.managed.total | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.memory.managed.used | |
| GAUGE、INT64gce_instance | host_nameresource_typetaskmanager_id | 
| workload.googleapis.com/flink.operator.record.count | |
| CUMULATIVE、INT64gce_instance | host_namejob_nameoperator_namerecordsubtask_indextask_nametaskmanager_id | 
| workload.googleapis.com/flink.operator.watermark.output | |
| GAUGE、INT64gce_instance | host_namejob_nameoperator_namesubtask_indextask_nametaskmanager_id | 
| workload.googleapis.com/flink.task.record.count | |
| CUMULATIVE、INT64gce_instance | host_namejob_namerecordsubtask_indextask_nametaskmanager_id | 
構成を確認する
このセクションでは、Flink レシーバが正しく構成されていることを確認する方法について説明します。Ops エージェントがテレメトリーの収集を開始するまでに 1~2 分かかる場合があります。
Flink ログが Cloud Logging に送信されていることを確認するには、次のようにします。
- 
Google Cloud コンソールで [ログ エクスプローラ] ページに移動します。 検索バーを使用してこのページを検索する場合は、小見出しが「Logging」の結果を選択します。 
- エディタに次のクエリを入力し、[クエリを実行] をクリックします。resource.type="gce_instance" log_id("flink")
Flink 指標が Cloud Monitoring に送信されていることを確認するには、次のようにします。
- 
Google Cloud コンソールで [leaderboard Metrics Explorer] ページに移動します。 検索バーを使用してこのページを検索する場合は、小見出しが [Monitoring] の結果を選択します。 
- クエリビルダー ペインのツールバーで、[codeMQL] または [codePROMQL] という名前のボタンを選択します。
- [言語] で [PromQL] が選択されていることを確認します。言語切り替えボタンは、クエリの書式設定と同じツールバーにあります。
- エディタに次のクエリを入力し、[クエリを実行] をクリックします。{"workload.googleapis.com/flink.jvm.memory.heap.used", monitored_resource="gce_instance"}
ダッシュボードを表示する
Flink 指標を表示するには、グラフまたはダッシュボードが構成されている必要があります。Flink インテグレーションには、1 つ以上のダッシュボードが含まれています。インテグレーションを構成して Ops エージェントが指標データの収集を開始すると、ダッシュボードは自動的にインストールされます。
インテグレーションをインストールすることなく、ダッシュボードの静的プレビューを表示することもできます。
インストールされているダッシュボードを表示する手順は次のとおりです。
- 
Google Cloud コンソールで  [ダッシュボード] ページに移動します。 [ダッシュボード] ページに移動します。検索バーを使用してこのページを検索する場合は、小見出しが [Monitoring] の結果を選択します。 
- [ダッシュボード リスト] タブを選択し、[統合] カテゴリを選択します。
- 表示するダッシュボードの名前をクリックします。
インテグレーションを構成してもダッシュボードがインストールされていない場合は、Ops エージェントが実行されていることを確認します。ダッシュボードにグラフの指標データがない場合、ダッシュボードのインストールは失敗します。Ops エージェントが指標の収集を開始した後に、ダッシュボードがインストールされます。
ダッシュボードの静的プレビューを表示する手順は次のとおりです。
- 
Google Cloud コンソールで  [インテグレーション] ページに移動します。 [インテグレーション] ページに移動します。検索バーを使用してこのページを検索する場合は、小見出しが [Monitoring] の結果を選択します。 
- デプロイメント プラットフォーム フィルタの [Compute Engine] をクリックします。
- Flink のエントリを見つけて、[詳細を表示] をクリックします。
- [ダッシュボード] タブを選択すると、静的プレビューが表示されます。ダッシュボードがインストールされている場合は、[ダッシュボードを表示] をクリックして移動できます。
Cloud Monitoring のダッシュボードについて詳しくは、ダッシュボードとグラフをご覧ください。
[インテグレーション] ページの使用方法については、インテグレーションを管理するをご覧ください。
アラート ポリシーをインストールする
アラート ポリシーは、指定した条件が成立した際に通知するように Cloud Monitoring に指示します。Flink インテグレーションには、使用する 1 つ以上のアラート ポリシーが含まれています。これらのアラート ポリシーは、Monitoring の [インテグレーション] ページで表示してインストールできます。
使用可能なアラート ポリシーの説明を表示してインストールする手順は次のとおりです。
- 
Google Cloud コンソールで  [インテグレーション] ページに移動します。 [インテグレーション] ページに移動します。検索バーを使用してこのページを検索する場合は、小見出しが [Monitoring] の結果を選択します。 
- Flink のエントリを見つけて、[詳細を表示] をクリックします。
- [アラート] タブを選択します。このタブには、利用可能なアラート ポリシーの説明と、それらをインストールするためのインターフェースが表示されます。
- アラート ポリシーをインストールします。アラート ポリシーでは、アラートがトリガーされた通知の送信先を特定する必要があるため、インストール環境の情報が必要になります。アラート ポリシーをインストールする手順は次のとおりです。
- 利用可能なアラート ポリシーのリストから、インストールするアラート ポリシーを選択します。
- [通知の構成] セクションで、1 つ以上の通知チャンネルを選択します。通知チャンネルの使用を無効にすることもできますが、無効にすると、アラート ポリシーは通知なく起動します。Monitoring でステータスを確認できますが、通知は受信しません。 - 通知チャンネルの詳細については、通知チャンネルを管理するをご覧ください。 
- [ポリシーの作成] をクリックします。
 
Cloud Monitoring のアラート ポリシーの詳細については、アラートの概要をご覧ください。
[インテグレーション] ページの使用方法については、インテグレーションを管理するをご覧ください。
次のステップ
Ansible を使用して Ops エージェントをインストールし、サードパーティ アプリケーションを構成してサンプル ダッシュボードをインストールする方法についてのチュートリアルは、Ops エージェントをインストールして、サードパーティ アプリケーションのトラブルシューティングを行うの動画をご覧ください。