Apache Kafka 統合は、トピック リクエストや失敗などのブローカー指標を収集します。また、ブローカーのパーティションもモニタリングします。この統合により、Kafka ログが収集され、JSON ペイロードが生成されます。結果には、logger、level、message のフィールドが含まれます。
Kafka の詳細については、kafka.apache.org/ をご覧ください。
前提条件
Kafka のログと指標を収集して取り込むには、Ops Agent バージョン 2.10.0 以降をインストールする必要があります。
このレシーバは、Apache Kafka バージョン 0.8~3.0.0 をサポートしています。
Kafka インスタンスを構成する
JMX エンドポイントを公開するには、JVM の起動時に com.sun.management.jmxremote.port
システム プロパティを設定する必要があります。また、com.sun.management.jmxremote.rmi.port
システム プロパティも同じポートに設定することをおすすめします。JMX エンドポイントをリモートで公開するには、java.rmi.server.hostname
システム プロパティも設定する必要があります。
デフォルトでは、これらのプロパティは Kafka デプロイの bin/kafka-run-class.sh
ファイルで設定されています。
コマンドライン引数を使用してシステム プロパティを設定するには、JVM の起動時にプロパティ名の前に -D
を付けます。たとえば、com.sun.management.jmxremote.port
をポート 9999
に設定するには、JVM の起動時に次のように指定します。
-Dcom.sun.management.jmxremote.port=9999
Kafka 用に Ops エージェントを構成する
Ops エージェントの構成のガイドに従って、Kafka インスタンスからログと指標を収集するために必要な要素を追加して、エージェントを再起動します。
構成の例
次のコマンドは、Kafka のログと指標を収集して取り込み、Linux で Ops エージェントを再起動するための構成ファイルを作成します。
sudo tee /etc/google-cloud-ops-agent/config.yaml > /dev/null << EOF
logging:
receivers:
kafka:
type: kafka
service:
pipelines:
kafka:
receivers:
- kafka
metrics:
receivers:
kafka:
type: kafka
service:
pipelines:
kafka:
receivers:
- kafka
EOF
sudo service google-cloud-ops-agent restart
ログの収集を構成する
Kafka からログを取り込むには、Kafka が生成するログのレシーバを作成し、新しいレシーバのパイプラインを作成する必要があります。kafka
ログのレシーバを構成するには、次のフィールドを指定します。
フィールド | デフォルト | 説明 |
---|---|---|
type |
値は kafka を指定してください。 |
|
include_paths |
[/var/log/kafka/*.log] |
各ファイルのテーリングで読み込むファイルシステムのパスのリスト。パスには、/var/log/kafka*/*.log のように、ワイルドカード(* )を使用できます。 |
exclude_paths |
include_paths の照合で除外するファイルシステム パスのパターンのリスト。 |
|
wildcard_refresh_interval |
60s |
include_paths のワイルドカード ファイルのパスの更新間隔。時間を指定します(例: 30s 、2m )。このプロパティは、ログファイルのローテーションがデフォルトの間隔よりも速く、ロギングのスループットが高い場合に有用です。1 秒の倍数にする必要があります。 |
ログの内容
kafka
ログの logName
は、構成で指定されたレシーバ ID から取得されます。LogEntry
内の詳細なフィールドは、次のとおりです。
フィールド | タイプ | 説明 |
---|---|---|
jsonPayload.source |
文字列 | ログの取得元のモジュール / スレッド。 |
jsonPayload.logger |
文字列 | ログの生成元のロガーの名前。 |
jsonPayload.message |
文字列 | ログ メッセージ(指定した場合の詳細なスタック トレースを含む)。 |
severity |
文字列(LogSeverity ) |
ログエントリ レベル(変換済み)。 |
timestamp |
文字列(Timestamp ) |
リクエストを受信した時刻。 |
空白または欠落しているフィールドはログエントリに存在しません。
指標の収集を構成する
Kafka から指標を収集するには、Kafka 指標を表示するレシーバを作成し、新しいレシーバ用のパイプラインを作成する必要があります。Kafka 指標のレシーバを構成するには、次のフィールドを指定します。
フィールド | デフォルト | 説明 |
---|---|---|
type |
値は kafka を指定してください。 |
|
stub_status_url |
localhost:9999 |
Service URL の作成に使用される JMX Service URL またはホストとポート。service:jmx:<protocol>:<sap> または host:port の形式にする必要があります。host:port フォームの値は、service:jmx:rmi:///jndi/rmi://<host>:<port>/jmxrmi の Service URL を作成するために使用されます。 |
collect_jvm_metrics |
true |
サポートされている JVM 指標も収集するようにレシーバを構成します。 |
username |
JMX が認証を要求するように構成されている場合、構成されたユーザー名。 | |
password |
JMX が認証を要求するように構成されている場合、構成されたパスワード。 | |
collection_interval |
60s |
time.Duration 値(例: 30s 、5m )。 |
モニタリング対象
次の表に、Ops エージェントが Kafka インスタンスから収集する指標の一覧を示します。
指標タイプ | |
---|---|
種類、タイプ モニタリング対象リソース |
ラベル |
workload.googleapis.com/kafka.isr.operation.count
|
|
CUMULATIVE 、INT64 gce_instance |
operation
|
workload.googleapis.com/kafka.message.count
|
|
CUMULATIVE 、INT64 gce_instance |
|
workload.googleapis.com/kafka.network.io
|
|
CUMULATIVE 、INT64 gce_instance |
state
|
workload.googleapis.com/kafka.partition.count
|
|
GAUGE 、INT64 gce_instance |
|
workload.googleapis.com/kafka.partition.offline
|
|
GAUGE 、INT64 gce_instance |
|
workload.googleapis.com/kafka.partition.under_replicated
|
|
GAUGE 、INT64 gce_instance |
|
workload.googleapis.com/kafka.purgatory.size
|
|
GAUGE 、INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.count
|
|
CUMULATIVE 、INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.failed
|
|
CUMULATIVE 、INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.time.total
|
|
CUMULATIVE 、INT64 gce_instance |
type
|
構成を確認する
ログ エクスプローラと Metrics Explorer を使用して、Kafka レシーバが正しく構成されていることを確認できます。Ops エージェントがログと指標の収集を開始するまでに 1~2 分かかる場合があります。
ログが取り込まれていることを確認するには、ログ エクスプローラに移動し、次のクエリを実行して Kafka のログを表示します。
resource.type="gce_instance"
logName=("projects/PROJECT_ID/logs/kafka")
指標が取り込まれていることを確認するには、Metrics Explorer に移動し、[MQL] タブで次のクエリを実行します。
fetch gce_instance
| metric 'workload.googleapis.com/kafka.request.count'
| align rate(1m)
| every 1m