Apache Kafka

Apache Kafka 統合は、トピック リクエストや失敗などのブローカー指標を収集します。また、ブローカーのパーティションもモニタリングします。この統合により、Kafka ログが収集され、JSON ペイロードが生成されます。結果には、logger、level、message のフィールドが含まれます。

Kafka の詳細については、Apache Kafka のドキュメントをご覧ください。

前提条件

Kafka テレメトリーを収集するには、Ops エージェントをインストールする必要があります。

  • 指標の場合は、バージョン 2.10.0 以降をインストールします。
  • ログの場合は、バージョン 2.10.0 以降をインストールします。

この統合は、Kafka バージョン 0.8~3.0.0 をサポートしています。

Kafka インスタンスを構成する

JMX エンドポイントを公開するには、JVM の起動時に com.sun.management.jmxremote.port システム プロパティを設定する必要があります。また、com.sun.management.jmxremote.rmi.port システム プロパティも同じポートに設定することをおすすめします。JMX エンドポイントをリモートで公開するには、java.rmi.server.hostname システム プロパティも設定する必要があります。

デフォルトでは、これらのプロパティは Kafka デプロイの bin/kafka-run-class.sh ファイルで設定されています。

コマンドライン引数を使用してシステム プロパティを設定するには、JVM の起動時にプロパティ名の前に -D を付けます。たとえば、com.sun.management.jmxremote.port をポート 9999 に設定するには、JVM の起動時に次のように指定します。

-Dcom.sun.management.jmxremote.port=9999

Kafka 用に Ops エージェントを構成する

Ops エージェントの構成のガイドに従って、Kafka インスタンスからテレメトリーを収集するために必要な要素を追加し、エージェントを再起動します。

構成の例

次のコマンドは、Kafka のテレメトリーを収集して取り込み、Ops エージェントを再起動するための構成を作成します。

# Configures Ops Agent to collect telemetry from the app and restart Ops Agent.

set -e

# Create a back up of the existing file so existing configurations are not lost.
sudo cp /etc/google-cloud-ops-agent/config.yaml /etc/google-cloud-ops-agent/config.yaml.bak

# Configure the Ops Agent.
sudo tee /etc/google-cloud-ops-agent/config.yaml > /dev/null << EOF
metrics:
  receivers:
    kafka:
      type: kafka
  service:
    pipelines:
      kafka:
        receivers:
          - kafka
logging:
  receivers:
    kafka:
      type: kafka
  service:
    pipelines:
      kafka:
        receivers:
          - kafka
EOF

sudo service google-cloud-ops-agent restart

ログの収集を構成する

Kafka からログを取り込むには、Kafka が生成するログのレシーバを作成し、新しいレシーバのパイプラインを作成する必要があります。

kafka ログのレシーバを構成するには、次のフィールドを指定します。

フィールド デフォルト 説明
exclude_paths include_paths の照合で除外するファイルシステム パスのパターンのリスト。
include_paths [/var/log/kafka/*.log] 各ファイルのテーリングで読み込むファイルシステムのパスのリスト。パスには、/var/log/kafka*/*.log のように、ワイルドカード * を使用できます。
record_log_file_path false true に設定すると、ログレコードの取得元のファイルのパスが agent.googleapis.com/log_file_path ラベルの値として出力ログエントリに表示されます。ワイルドカードを使用する場合、レコードを取得したファイルのパスのみが記録されます。
type 値は、kafka にする必要があります。
wildcard_refresh_interval 60s include_paths のワイルドカード ファイルのパスの更新間隔。time.ParseDuration によって解析可能な時間として指定します(例: 30s または 2m)。このプロパティは、ログファイルのローテーションがデフォルトよりも短い間隔で実施され、ロギングのスループットが高い場合に有用な可能性があります。

ログの内容

logName は、構成で指定されたレシーバ ID から取得されます。LogEntry 内の詳細なフィールドは、次のとおりです。

kafka ログの LogEntry には次のフィールドが含まれます。

フィールド タイプ 説明
jsonPayload.level 文字列(LogSeverity ログエントリ レベル
jsonPayload.logger 文字列(Timestamp ログの生成元のロガーの名前。
jsonPayload.message 文字列 ログ メッセージ(指定した場合の詳細なスタックトレースを含む)
jsonPayload.source 文字列 ログの取得元のモジュール / スレッド。
severity 文字列 ログエントリ レベル(変換済み)。
timestamp 文字列 リクエストを受信した時刻。

指標の収集を構成する

Kafka から指標を取り込むには、Kafka が生成する指標のレシーバーを作成してから、新しいレシーバー用のパイプラインを作成する必要があります。

このレシーバーでは、複数のエンドポイントのモニタリングなど、構成で複数のインスタンスを使用することはできません。このようなインスタンスはすべて同じ時系列に書き込まれるため、Cloud Monitoring ではインスタンスを区別できません。

kafka 指標のレシーバーを構成するには、次のフィールドを指定します。

フィールド デフォルト 説明
stub_status_url localhost:9999 JMX サービス URL、またはサービス URL の作成に使用されるホストとポート。service:jmx:<protocol>:<sap> または host:port の形式にする必要があります。host:port フォームの値は、service:jmx:rmi:///jndi/rmi://<host>:<port>/jmxrmi の Service URL を作成するために使用されます。
collect_jvm_metrics true サポートされている JVM 指標も収集するようにレシーバを構成します。
collection_interval 60s 期間の値(例: 30s5m)。
password JMX が認証を要求するように構成されている場合、構成されたパスワード。
stub_status_url localhost:9999 JMX サービス URL、またはサービス URL の作成に使用されるホストとポート。この値は、service:jmx:: または host:port の形式にする必要があります。host:port フォームの値は、service:jmx:rmi:///jndi/rmi://:/jmxrmi. のサービス URL を作成するために使用されます。
type 値は、kafka にする必要があります。
username JMX が認証を要求するように構成されている場合、構成されたユーザー名。

モニタリング対象

次の表に、Ops エージェントが Kafka インスタンスから収集する指標の一覧を示します。

指標タイプ
種類、タイプ
モニタリング対象リソース
ラベル
workload.googleapis.com/kafka.isr.operation.count
CUMULATIVEINT64
gce_instance
operation
workload.googleapis.com/kafka.message.count
CUMULATIVEINT64
gce_instance
 
workload.googleapis.com/kafka.network.io
CUMULATIVEINT64
gce_instance
state
workload.googleapis.com/kafka.partition.count
GAUGEINT64
gce_instance
 
workload.googleapis.com/kafka.partition.offline
GAUGEINT64
gce_instance
 
workload.googleapis.com/kafka.partition.under_replicated
GAUGEINT64
gce_instance
 
workload.googleapis.com/kafka.purgatory.size
GAUGEINT64
gce_instance
type
workload.googleapis.com/kafka.request.count
CUMULATIVEINT64
gce_instance
type
workload.googleapis.com/kafka.request.failed
CUMULATIVEINT64
gce_instance
type
workload.googleapis.com/kafka.request.time.total
CUMULATIVEINT64
gce_instance
type

構成を確認する

このセクションでは、Kafka レシーバが正しく構成されていることを確認する方法について説明します。Ops エージェントがテレメトリーの収集を開始するまでに 1~2 分かかる場合があります。

Kafka ログが Cloud Logging に送信されていることを確認するには、次のようにします。

  1. Google Cloud コンソールのナビゲーション パネルで、[ロギング] を選択してから、[ログ エクスプローラ] を選択します。

    [ログ エクスプローラ] に移動

  2. エディタに次のクエリを入力し、[クエリを実行] をクリックします。
    resource.type="gce_instance"
    log_id("kafka")
    

Kafka 指標が Cloud Monitoring に送信されていることを確認するには、次のようにします。

  1. Google Cloud コンソールのナビゲーション パネルで [Monitoring] を選択し、次に [ Metrics Explorer] を選択します。

    Metrics Explorer に移動

  2. クエリビルダー ペインのツールバーで、名前が [MQL] または  [PromQL] のボタンを選択します。
  3. [言語] 切り替えで [MQL] が選択されていることを確認します。言語切り替えボタンは、クエリの書式設定を行うのと同じツールバーにあります。
  4. エディタに次のクエリを入力し、[クエリを実行] をクリックします。
    fetch gce_instance
    | metric 'workload.googleapis.com/kafka.message.count'
    | every 1m
    

ダッシュボードを表示する

Kafka 指標を表示するには、グラフまたはダッシュボードが構成されている必要があります。Kafka インテグレーションには、1 つ以上のダッシュボードが含まれています。インテグレーションを構成して Ops エージェントが指標データの収集を開始すると、ダッシュボードは自動的にインストールされます。

インテグレーションをインストールすることなく、ダッシュボードの静的プレビューを表示することもできます。

インストールされているダッシュボードを表示する手順は次のとおりです。

  1. Google Cloud コンソールのナビゲーション パネルで、[Monitoring] を選択してから、[ダッシュボード] を選択します。

    [ダッシュボード] に移動

  2. [ダッシュボード リスト] タブを選択し、[統合] カテゴリを選択します。
  3. 表示するダッシュボードの名前をクリックします。

インテグレーションを構成してもダッシュボードがインストールされていない場合は、Ops エージェントが実行されていることを確認します。ダッシュボードにグラフの指標データがない場合、ダッシュボードのインストールは失敗します。Ops エージェントが指標の収集を開始した後に、ダッシュボードがインストールされます。

ダッシュボードの静的プレビューを表示する手順は次のとおりです。

  1. Google Cloud コンソールのナビゲーション パネルで、[Monitoring] を選択してから、[インテグレーション] を選択します。

    [インテグレーション] に移動

  2. [デプロイメント プラットフォーム] フィルタの [Compute Engine] をクリックします。
  3. Kafka のエントリを見つけて、[詳細を表示] をクリックします。
  4. [ダッシュボード] タブを選択すると、静的プレビューが表示されます。ダッシュボードがインストールされている場合は、[ダッシュボードを表示] をクリックして移動できます。

Cloud Monitoring のダッシュボードについて詳しくは、ダッシュボードとグラフをご覧ください。

[インテグレーション] ページの使用方法については、インテグレーションを管理するをご覧ください。

アラート ポリシーをインストールする

アラート ポリシーは、指定した条件が成立した際に通知するように Cloud Monitoring に指示します。Kafka インテグレーションには、使用する 1 つ以上のアラート ポリシーが含まれています。これらのアラート ポリシーは、Monitoring の [インテグレーション] ページで表示してインストールできます。

使用可能なアラート ポリシーの説明を表示してインストールする手順は次のとおりです。

  1. Google Cloud コンソールのナビゲーション パネルで、[Monitoring] を選択してから、[インテグレーション] を選択します。

    [インテグレーション] に移動

  2. Kafka のエントリを見つけて、[詳細を表示] をクリックします。
  3. [アラート] タブを選択します。このタブには、利用可能なアラート ポリシーの説明と、それらをインストールするためのインターフェースが表示されます。
  4. アラート ポリシーをインストールします。アラート ポリシーでは、アラートがトリガーされた通知の送信先を特定する必要があるため、インストール環境の情報が必要になります。アラート ポリシーをインストールする手順は次のとおりです。
    1. 利用可能なアラート ポリシーのリストから、インストールするアラート ポリシーを選択します。
    2. [通知の構成] セクションで、1 つ以上の通知チャンネルを選択します。通知チャンネルの使用を無効にすることもできますが、無効にすると、アラート ポリシーは通知なく起動します。Monitoring でステータスを確認できますが、通知は受信しません。

      通知チャンネルの詳細については、通知チャンネルを管理するをご覧ください。

    3. [ポリシーの作成] をクリックします。

Cloud Monitoring のアラート ポリシーの詳細については、アラートの概要をご覧ください。

[インテグレーション] ページの使用方法については、インテグレーションを管理するをご覧ください。

次のステップ

Ansible を使用して Ops エージェントをインストールし、サードパーティ アプリケーションを構成してサンプル ダッシュボードをインストールする方法については、Ops エージェントをインストールして、サードパーティ アプリケーションのトラブルシューティングを行うの動画をご覧ください。