Apache Kafka

Apache Kafka 統合は、トピック リクエストや失敗などのブローカー指標を収集します。また、ブローカーのパーティションもモニタリングします。この統合により、Kafka ログが収集され、JSON ペイロードが生成されます。結果には、logger、level、message のフィールドが含まれます。

Kafka の詳細については、kafka.apache.org/ をご覧ください。

前提条件

Kafka のログと指標を収集して取り込むには、Ops Agent バージョン 2.10.0 以降をインストールする必要があります。

このレシーバは、Apache Kafka バージョン 0.8~3.0.0 をサポートしています。

Kafka インスタンスを構成する

JMX エンドポイントを公開するには、JVM の起動時に com.sun.management.jmxremote.port システム プロパティを設定する必要があります。また、com.sun.management.jmxremote.rmi.port システム プロパティも同じポートに設定することをおすすめします。JMX エンドポイントをリモートで公開するには、java.rmi.server.hostname システム プロパティも設定する必要があります。

デフォルトでは、これらのプロパティは Kafka デプロイの bin/kafka-run-class.sh ファイルで設定されています。

コマンドライン引数を使用してシステム プロパティを設定するには、JVM の起動時にプロパティ名の前に -D を付けます。たとえば、com.sun.management.jmxremote.port をポート 9999 に設定するには、JVM の起動時に次のように指定します。

-Dcom.sun.management.jmxremote.port=9999

Kafka 用に Ops エージェントを構成する

Ops エージェントの構成のガイドに従って、Kafka インスタンスからログと指標を収集するために必要な要素を追加して、エージェントを再起動します。

構成の例

次のコマンドは、Kafka のログと指標を収集して取り込み、Linux で Ops エージェントを再起動するための構成ファイルを作成します。

sudo tee /etc/google-cloud-ops-agent/config.yaml > /dev/null << EOF
logging:
  receivers:
    kafka:
      type: kafka
  service:
    pipelines:
      kafka:
        receivers:
          - kafka
metrics:
  receivers:
    kafka:
      type: kafka
  service:
    pipelines:
      kafka:
        receivers:
          - kafka
EOF
sudo service google-cloud-ops-agent restart

ログの収集を構成する

Kafka からログを取り込むには、Kafka が生成するログのレシーバを作成し、新しいレシーバのパイプラインを作成する必要があります。kafka ログのレシーバを構成するには、次のフィールドを指定します。

フィールド デフォルト 説明
type 値は kafka を指定してください。
include_paths [/var/log/kafka/*.log] 各ファイルのテーリングで読み込むファイルシステムのパスのリスト。パスには、/var/log/kafka*/*.log のように、ワイルドカード(*)を使用できます。
exclude_paths include_paths の照合で除外するファイルシステム パスのパターンのリスト。
wildcard_refresh_interval 60s include_paths のワイルドカード ファイルのパスの更新間隔。時間を指定します(例: 30s2m)。このプロパティは、ログファイルのローテーションがデフォルトの間隔よりも速く、ロギングのスループットが高い場合に有用です。1 秒の倍数にする必要があります。

ログの内容

kafka ログの logName は、構成で指定されたレシーバ ID から取得されます。LogEntry 内の詳細なフィールドは、次のとおりです。

フィールド タイプ 説明
jsonPayload.source 文字列 ログの取得元のモジュール / スレッド。
jsonPayload.logger 文字列 ログの生成元のロガーの名前。
jsonPayload.message 文字列 ログ メッセージ(指定した場合の詳細なスタック トレースを含む)。
severity 文字列(LogSeverity ログエントリ レベル(変換済み)。
timestamp 文字列(Timestamp リクエストを受信した時刻。

空白または欠落しているフィールドはログエントリに存在しません。

指標の収集を構成する

Kafka から指標を収集するには、Kafka 指標を表示するレシーバを作成し、新しいレシーバ用のパイプラインを作成する必要があります。Kafka 指標のレシーバを構成するには、次のフィールドを指定します。

フィールド デフォルト 説明
type 値は kafka を指定してください。
stub_status_url localhost:9999 Service URL の作成に使用される JMX Service URL またはホストとポート。service:jmx:<protocol>:<sap> または host:port の形式にする必要があります。host:port フォームの値は、service:jmx:rmi:///jndi/rmi://<host>:<port>/jmxrmi の Service URL を作成するために使用されます。
collect_jvm_metrics true サポートされている JVM 指標も収集するようにレシーバを構成します。
username JMX が認証を要求するように構成されている場合、構成されたユーザー名。
password JMX が認証を要求するように構成されている場合、構成されたパスワード。
collection_interval 60s time.Duration 値(例: 30s5m)。

モニタリング対象

次の表に、Ops エージェントが Kafka インスタンスから収集する指標の一覧を示します。

指標タイプ
種類、タイプ
モニタリング対象リソース
ラベル
workload.googleapis.com/kafka.isr.operation.count
CUMULATIVEINT64
gce_instance
operation
workload.googleapis.com/kafka.message.count
CUMULATIVEINT64
gce_instance
 
workload.googleapis.com/kafka.network.io
CUMULATIVEINT64
gce_instance
state
workload.googleapis.com/kafka.partition.count
GAUGEINT64
gce_instance
 
workload.googleapis.com/kafka.partition.offline
GAUGEINT64
gce_instance
 
workload.googleapis.com/kafka.partition.under_replicated
GAUGEINT64
gce_instance
 
workload.googleapis.com/kafka.purgatory.size
GAUGEINT64
gce_instance
type
workload.googleapis.com/kafka.request.count
CUMULATIVEINT64
gce_instance
type
workload.googleapis.com/kafka.request.failed
CUMULATIVEINT64
gce_instance
type
workload.googleapis.com/kafka.request.time.total
CUMULATIVEINT64
gce_instance
type

構成を確認する

ログ エクスプローラと Metrics Explorer を使用して、Kafka レシーバが正しく構成されていることを確認できます。Ops エージェントがログと指標の収集を開始するまでに 1~2 分かかる場合があります。

ログが取り込まれていることを確認するには、ログ エクスプローラに移動し、次のクエリを実行して Kafka のログを表示します。

resource.type="gce_instance"
logName=("projects/PROJECT_ID/logs/kafka")


指標が取り込まれていることを確認するには、Metrics Explorer に移動し、[MQL] タブで次のクエリを実行します。

fetch gce_instance
| metric 'workload.googleapis.com/kafka.request.count'
| align rate(1m)
| every 1m

次のステップ

Ansible を使用して Ops エージェントをインストールし、サードパーティ アプリケーションを構成してサンプル ダッシュボードをインストールする方法については、Ops エージェントをインストールして、サードパーティ アプリケーションのトラブルシューティングを行うの動画をご覧ください。