Apache Kafka

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

Apache Kafka 統合は、トピック リクエストや失敗などのブローカー指標を収集します。また、ブローカーのパーティションもモニタリングします。この統合により、Kafka ログが収集され、JSON ペイロードが生成されます。結果には、logger、level、message のフィールドが含まれます。

Kafka の詳細については、Apache Kafka のドキュメントをご覧ください。

前提条件

Kafka テレメトリーを収集するには、Ops エージェントをインストールする必要があります。

  • 指標の場合は、バージョン 2.10.0 以降をインストールします。
  • ログの場合は、バージョン 2.10.0 以降をインストールします。

この統合は、Kafka バージョン 0.8~3.0.0 をサポートしています。

Kafka インスタンスを構成する

JMX エンドポイントを公開するには、JVM の起動時に com.sun.management.jmxremote.port システム プロパティを設定する必要があります。また、com.sun.management.jmxremote.rmi.port システム プロパティも同じポートに設定することをおすすめします。JMX エンドポイントをリモートで公開するには、java.rmi.server.hostname システム プロパティも設定する必要があります。

デフォルトでは、これらのプロパティは Kafka デプロイの bin/kafka-run-class.sh ファイルで設定されています。

コマンドライン引数を使用してシステム プロパティを設定するには、JVM の起動時にプロパティ名の前に -D を付けます。たとえば、com.sun.management.jmxremote.port をポート 9999 に設定するには、JVM の起動時に次のように指定します。

-Dcom.sun.management.jmxremote.port=9999

Kafka 用に Ops エージェントを構成する

Ops エージェントの構成のガイドに従って、Kafka インスタンスからテレメトリーを収集するために必要な要素を追加し、エージェントを再起動します。

構成の例

次のコマンドは、Kafka のテレメトリーを収集して取り込み、Ops エージェントを再起動するための構成を作成します。

set -e

# Create a back up of the existing file so existing configurations are not lost.
sudo cp /etc/google-cloud-ops-agent/config.yaml /etc/google-cloud-ops-agent/config.yaml.bak

# Configure the Ops Agent.
sudo tee /etc/google-cloud-ops-agent/config.yaml > /dev/null << EOF
metrics:
  receivers:
    memcached:
      type: memcached
  service:
    pipelines:
      memcached:
        receivers:
          - memcached
EOF

sudo service google-cloud-ops-agent restart
sleep 60

ログの収集を構成する

Kafka からログを取り込むには、Kafka が生成するログのレシーバを作成し、新しいレシーバのパイプラインを作成する必要があります。

kafka ログのレシーバを構成するには、次のフィールドを指定します。

フィールド デフォルト 説明
exclude_paths include_paths の照合で除外するファイルシステム パスのパターンのリスト。
include_paths [/var/log/kafka/*.log] 各ファイルのテーリングで読み込むファイルシステムのパスのリスト。パスには、/var/log/kafka*/*.log のように、ワイルドカード * を使用できます。
record_log_file_path false true に設定すると、ログレコードの取得元のファイルのパスが agent.googleapis.com/log_file_path ラベルの値として出力ログエントリに表示されます。ワイルドカードを使用する場合、レコードを取得したファイルのパスのみが記録されます。
type 値は、kafka にする必要があります。
wildcard_refresh_interval 60s include_paths のワイルドカード ファイルのパスの更新間隔。time.ParseDuration によって解析可能な時間として指定します(例: 30s または 2m)。このプロパティは、ログファイルのローテーションがデフォルトよりも短い間隔で実施され、ロギングのスループットが高い場合に有用な可能性があります。

ログの内容

logName は、構成で指定されたレシーバ ID から取得されます。LogEntry 内の詳細なフィールドは、次のとおりです。

kafka ログの LogEntry には次のフィールドが含まれます。

フィールド タイプ 説明
jsonPayload.level 文字列(LogSeverity ログエントリ レベル
jsonPayload.logger 文字列(Timestamp ログの生成元のロガーの名前。
jsonPayload.message 文字列 ログ メッセージ(指定した場合の詳細なスタックトレースを含む)
jsonPayload.source 文字列 ログの取得元のモジュール / スレッド。
severity 文字列 ログエントリ レベル(変換済み)。
timestamp 文字列 リクエストを受信した時刻。

指標の収集を構成する

Kafka から指標を取り込むには、Kafka が生成する指標のレシーバを作成してから、新しいレシーバ用のパイプラインを作成する必要があります。

kafka 指標のレシーバを構成するには、次のフィールドを指定します。

フィールド デフォルト 説明
stub_status_url localhost:9999 JMX サービス URL、またはサービス URL の作成に使用されるホストとポート。service:jmx:<protocol>:<sap> または host:port の形式にする必要があります。host:port フォームの値は、service:jmx:rmi:///jndi/rmi://<host>:<port>/jmxrmi の Service URL を作成するために使用されます。
collect_jvm_metrics true サポートされている JVM 指標も収集するようにレシーバを構成します。
collection_interval 60s 期間の値(例: 30s5m)。
password JMX が認証を要求するように構成されている場合、構成されたパスワード。
stub_status_url localhost:9999 JMX サービス URL、またはサービス URL の作成に使用されるホストとポート。この値は、service:jmx:: または host:port の形式にする必要があります。host:port フォームの値は、service:jmx:rmi:///jndi/rmi://:/jmxrmi. のサービス URL を作成するために使用されます。
type 値は、kafka にする必要があります。
username JMX が認証を要求するように構成されている場合、構成されたユーザー名。

モニタリング対象

次の表に、Ops エージェントが Kafka インスタンスから収集する指標の一覧を示します。

指標タイプ
種類、タイプ
モニタリング対象リソース
ラベル
workload.googleapis.com/kafka.isr.operation.count
CUMULATIVEINT64
gce_instance
operation
workload.googleapis.com/kafka.message.count
CUMULATIVEINT64
gce_instance
 
workload.googleapis.com/kafka.network.io
CUMULATIVEINT64
gce_instance
state
workload.googleapis.com/kafka.partition.count
GAUGEINT64
gce_instance
 
workload.googleapis.com/kafka.partition.offline
GAUGEINT64
gce_instance
 
workload.googleapis.com/kafka.partition.under_replicated
GAUGEINT64
gce_instance
 
workload.googleapis.com/kafka.purgatory.size
GAUGEINT64
gce_instance
type
workload.googleapis.com/kafka.request.count
CUMULATIVEINT64
gce_instance
type
workload.googleapis.com/kafka.request.failed
CUMULATIVEINT64
gce_instance
type
workload.googleapis.com/kafka.request.time.total
CUMULATIVEINT64
gce_instance
type

ダッシュボードの例

Kafka 指標を表示するには、グラフまたはダッシュボードが構成されている必要があります。Cloud Monitoring には、統合用のサンプル ダッシュボードのライブラリが用意されています。ここには、事前に構成されたグラフが含まれています。これらのダッシュボードのインストールについては、サンプル ダッシュボードのインストールをご覧ください。

構成を確認する

このセクションでは、Kafka レシーバが正しく構成されていることを確認する方法について説明します。Ops エージェントがテレメトリーの収集を開始するまでに 1~2 分かかる場合があります。

ログが取り込まれていることを確認するには、ログ エクスプローラに移動し、次のクエリを実行して Kafka のログを表示します。

resource.type="gce_instance"
log_id("kafka")

指標が取り込まれていることを確認するには、Metrics Explorer に移動し、[MQL] タブで次のクエリを実行します。

fetch gce_instance
| metric 'workload.googleapis.com/kafka.message.count'
| every 1m

次のステップ

Ansible を使用して Ops エージェントをインストールし、サードパーティ アプリケーションを構成してサンプル ダッシュボードをインストールする方法については、Ops エージェントをインストールして、サードパーティ アプリケーションのトラブルシューティングを行うの動画をご覧ください。