Apache Kafka 統合は、トピック リクエストや失敗などのブローカー指標を収集します。また、ブローカーのパーティションもモニタリングします。この統合により、Kafka ログが収集され、JSON ペイロードが生成されます。結果には、logger、level、message のフィールドが含まれます。
Kafka の詳細については、Apache Kafka のドキュメントをご覧ください。
前提条件
Kafka テレメトリーを収集するには、Ops エージェントをインストールする必要があります。
- 指標の場合は、バージョン 2.10.0 以降をインストールします。
- ログの場合は、バージョン 2.10.0 以降をインストールします。
この統合は、Kafka バージョン 0.8~3.0.0 をサポートしています。
Kafka インスタンスを構成する
JMX エンドポイントを公開するには、JVM の起動時に com.sun.management.jmxremote.port
システム プロパティを設定する必要があります。また、com.sun.management.jmxremote.rmi.port
システム プロパティも同じポートに設定することをおすすめします。JMX エンドポイントをリモートで公開するには、java.rmi.server.hostname
システム プロパティも設定する必要があります。
デフォルトでは、これらのプロパティは Kafka デプロイの bin/kafka-run-class.sh
ファイルで設定されています。
コマンドライン引数を使用してシステム プロパティを設定するには、JVM の起動時にプロパティ名の前に -D
を付けます。たとえば、com.sun.management.jmxremote.port
をポート 9999
に設定するには、JVM の起動時に次のように指定します。
-Dcom.sun.management.jmxremote.port=9999
Kafka 用に Ops エージェントを構成する
Ops エージェントの構成のガイドに従って、Kafka インスタンスからテレメトリーを収集するために必要な要素を追加し、エージェントを再起動します。
構成の例
次のコマンドは、Kafka のテレメトリーを収集して取り込み、Ops エージェントを再起動するための構成を作成します。
ログの収集を構成する
Kafka からログを取り込むには、Kafka が生成するログのレシーバを作成し、新しいレシーバのパイプラインを作成する必要があります。
kafka
ログのレシーバを構成するには、次のフィールドを指定します。
フィールド | デフォルト | 説明 |
---|---|---|
exclude_paths |
include_paths の照合で除外するファイルシステム パスのパターンのリスト。 |
|
include_paths |
[/var/log/kafka/*.log] |
各ファイルのテーリングで読み込むファイルシステムのパスのリスト。パスには、/var/log/kafka*/*.log のように、ワイルドカード * を使用できます。 |
record_log_file_path |
false |
true に設定すると、ログレコードの取得元のファイルのパスが agent.googleapis.com/log_file_path ラベルの値として出力ログエントリに表示されます。ワイルドカードを使用する場合、レコードを取得したファイルのパスのみが記録されます。 |
type |
値は、kafka にする必要があります。 |
|
wildcard_refresh_interval |
60s |
include_paths のワイルドカード ファイルのパスの更新間隔。time.ParseDuration によって解析可能な時間として指定します(例: 30s または 2m )。このプロパティは、ログファイルのローテーションがデフォルトよりも短い間隔で実施され、ロギングのスループットが高い場合に有用な可能性があります。 |
ログの内容
logName
は、構成で指定されたレシーバ ID から取得されます。LogEntry
内の詳細なフィールドは、次のとおりです。
kafka
ログの LogEntry
には次のフィールドが含まれます。
フィールド | タイプ | 説明 |
---|---|---|
jsonPayload.level |
文字列(LogSeverity ) |
ログエントリ レベル |
jsonPayload.logger |
文字列(Timestamp ) |
ログの生成元のロガーの名前。 |
jsonPayload.message |
文字列 | ログ メッセージ(指定した場合の詳細なスタックトレースを含む) |
jsonPayload.source |
文字列 | ログの取得元のモジュール / スレッド。 |
severity |
文字列 | ログエントリ レベル(変換済み)。 |
timestamp |
文字列 | リクエストを受信した時刻。 |
指標の収集を構成する
Kafka から指標を取り込むには、Kafka が生成する指標のレシーバを作成してから、新しいレシーバ用のパイプラインを作成する必要があります。
kafka
指標のレシーバを構成するには、次のフィールドを指定します。
フィールド | デフォルト | 説明 |
---|---|---|
stub_status_url |
localhost:9999 |
JMX サービス URL、またはサービス URL の作成に使用されるホストとポート。service:jmx:<protocol>:<sap> または host:port の形式にする必要があります。host:port フォームの値は、service:jmx:rmi:///jndi/rmi://<host>:<port>/jmxrmi の Service URL を作成するために使用されます。 |
collect_jvm_metrics |
true |
サポートされている JVM 指標も収集するようにレシーバを構成します。 |
collection_interval |
60s |
期間の値(例: 30s 、5m )。 |
password |
JMX が認証を要求するように構成されている場合、構成されたパスワード。 | |
stub_status_url |
localhost:9999 |
JMX サービス URL、またはサービス URL の作成に使用されるホストとポート。この値は、service:jmx: |
type |
値は、kafka にする必要があります。 |
|
username |
JMX が認証を要求するように構成されている場合、構成されたユーザー名。 |
モニタリング対象
次の表に、Ops エージェントが Kafka インスタンスから収集する指標の一覧を示します。
指標タイプ | |
---|---|
種類、タイプ モニタリング対象リソース |
ラベル |
workload.googleapis.com/kafka.isr.operation.count
|
|
CUMULATIVE 、INT64 gce_instance |
operation
|
workload.googleapis.com/kafka.message.count
|
|
CUMULATIVE 、INT64 gce_instance |
|
workload.googleapis.com/kafka.network.io
|
|
CUMULATIVE 、INT64 gce_instance |
state
|
workload.googleapis.com/kafka.partition.count
|
|
GAUGE 、INT64 gce_instance |
|
workload.googleapis.com/kafka.partition.offline
|
|
GAUGE 、INT64 gce_instance |
|
workload.googleapis.com/kafka.partition.under_replicated
|
|
GAUGE 、INT64 gce_instance |
|
workload.googleapis.com/kafka.purgatory.size
|
|
GAUGE 、INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.count
|
|
CUMULATIVE 、INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.failed
|
|
CUMULATIVE 、INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.time.total
|
|
CUMULATIVE 、INT64 gce_instance |
type
|
ダッシュボードの例
Kafka 指標を表示するには、グラフまたはダッシュボードが構成されている必要があります。Cloud Monitoring には、統合用のサンプル ダッシュボードのライブラリが用意されています。ここには、事前に構成されたグラフが含まれています。これらのダッシュボードのインストールについては、サンプル ダッシュボードのインストールをご覧ください。
構成を確認する
このセクションでは、Kafka レシーバが正しく構成されていることを確認する方法について説明します。Ops エージェントがテレメトリーの収集を開始するまでに 1~2 分かかる場合があります。
ログが取り込まれていることを確認するには、ログ エクスプローラに移動し、次のクエリを実行して Kafka のログを表示します。
resource.type="gce_instance"
log_id("kafka")
指標が取り込まれていることを確認するには、Metrics Explorer に移動し、[MQL] タブで次のクエリを実行します。
fetch gce_instance
| metric 'workload.googleapis.com/kafka.message.count'
| every 1m
次のステップ
Ansible を使用して Ops エージェントをインストールし、サードパーティ アプリケーションを構成してサンプル ダッシュボードをインストールする方法については、Ops エージェントをインストールして、サードパーティ アプリケーションのトラブルシューティングを行うの動画をご覧ください。