Apache Kafka

Stay organized with collections Save and categorize content based on your preferences.

The Apache Kafka integration collects broker metrics, such as topic requests and failures. It also monitors the partitions on the broker. The integration collects Kafka logs and parses them into a JSON payload. The result includes fields for logger, level, and message.

For more information about Kafka, see the Apache Kafka documentation.

Prerequisites

To collect Kafka telemetry, you must install the Ops Agent:

  • For metrics, install version 2.10.0 or higher.
  • For logs, install version 2.10.0 or higher.

This integration supports Kafka versions 0.8 through 3.0.0.

Configure your Kafka instance

To expose a JMX endpoint, you must set the com.sun.management.jmxremote.port system property when starting the JVM. We also recommend setting the com.sun.management.jmxremote.rmi.port system property to the same port. To expose a JMX endpoint remotely, you must also set the java.rmi.server.hostname system property.

By default, these properties are set in a Kafka deployment's bin/kafka-run-class.sh file.

To set system properties by using command-line arguments, prepend the property name with -D when starting the JVM. For example, to set com.sun.management.jmxremote.port to port 9999, specify the following when starting the JVM:

-Dcom.sun.management.jmxremote.port=9999

Configure the Ops Agent for Kafka

Following the guide for Configuring the Ops Agent, add the required elements to collect telemetry from Kafka instances, and restart the agent.

Example configuration

The following command creates the configuration to collect and ingest telemetry for Kafka and restarts the Ops Agent.

set -e

# Create a back up of the existing file so existing configurations are not lost.
sudo cp /etc/google-cloud-ops-agent/config.yaml /etc/google-cloud-ops-agent/config.yaml.bak

# Configure the Ops Agent.
sudo tee /etc/google-cloud-ops-agent/config.yaml > /dev/null << EOF
metrics:
  receivers:
    memcached:
      type: memcached
  service:
    pipelines:
      memcached:
        receivers:
          - memcached
EOF

sudo service google-cloud-ops-agent restart
sleep 60

Configure logs collection

To ingest logs from Kafka, you must create receivers for the logs that Kafka produces and then create a pipeline for the new receivers.

To configure a receiver for your kafka logs, specify the following fields:

Field Default Description
exclude_paths A list of filesystem path patterns to exclude from the set matched by include_paths.
include_paths [/var/log/kafka/*.log] A list of filesystem paths to read by tailing each file. A wild card * can be used in the paths; for example, /var/log/kafka*/*.log.
record_log_file_path false If set to true, then the path to the specific file from which the log record was obtained appears in the output log entry as the value of the agent.googleapis.com/log_file_path label. When using a wildcard, only the path of the file from which the record was obtained is recorded.
type This value must be kafka.
wildcard_refresh_interval 60s The interval at which wildcard file paths in include_paths are refreshed. Given as a time duration parsable by time.ParseDuration, for example 30s or 2m. This property might be useful under high logging throughputs where log files are rotated faster than the default interval.

What is logged

The logName is derived from the receiver IDs specified in the configuration. Detailed fields inside the LogEntry are as follows.

The kafka logs contain the following fields in the LogEntry:

Field Type Description
jsonPayload.level string (LogSeverity) Log entry level
jsonPayload.logger string (Timestamp) Name of the logger where the log originated.
jsonPayload.message string Log message, including detailed stacktrace where provided
jsonPayload.source string Module and/or thread where the log originated.
severity string Log entry level (translated).
timestamp string Time that the request was received.

Configure metrics collection

To ingest metrics from Kafka, you must create receivers for the metrics that Kafka produces and then create a pipeline for the new receivers.

To configure a receiver for your kafka metrics, specify the following fields:

Field Default Description
stub_status_url localhost:9999 The JMX Service URL or host and port used to construct the Service URL. Must be in the form of service:jmx:<protocol>:<sap> or host:port. Values in host:port form will be used to create a Service URL of service:jmx:rmi:///jndi/rmi://<host>:<port>/jmxrmi.
collect_jvm_metrics true Configures the receiver to also collect the supported JVM metrics.
collection_interval 60s A time duration value, such as 30s or 5m.
password The configured password if JMX is configured to require authentication.
stub_status_url localhost:9999 The JMX Service URL or host and port used to construct the service URL. This value must be in the form of service:jmx:: or host:port. Values in host:port form are used to create a service URL of service:jmx:rmi:///jndi/rmi://:/jmxrmi.
type This value must be kafka.
username The configured username if JMX is configured to require authentication.

What is monitored

The following table provides the list of metrics that the Ops Agent collects from the Kafka instance.

Metric type 
Kind, Type
Monitored resources
Labels
workload.googleapis.com/kafka.isr.operation.count
CUMULATIVEINT64
gce_instance
operation
workload.googleapis.com/kafka.message.count
CUMULATIVEINT64
gce_instance
 
workload.googleapis.com/kafka.network.io
CUMULATIVEINT64
gce_instance
state
workload.googleapis.com/kafka.partition.count
GAUGEINT64
gce_instance
 
workload.googleapis.com/kafka.partition.offline
GAUGEINT64
gce_instance
 
workload.googleapis.com/kafka.partition.under_replicated
GAUGEINT64
gce_instance
 
workload.googleapis.com/kafka.purgatory.size
GAUGEINT64
gce_instance
type
workload.googleapis.com/kafka.request.count
CUMULATIVEINT64
gce_instance
type
workload.googleapis.com/kafka.request.failed
CUMULATIVEINT64
gce_instance
type
workload.googleapis.com/kafka.request.time.total
CUMULATIVEINT64
gce_instance
type

Sample dashboard

To view your Kafka metrics, you must have a chart or dashboard configured. Cloud Monitoring provides a library of sample dashboards for integrations, which contain preconfigured charts. For information about installing these dashboards, see Installing sample dashboards.

Verify the configuration

This section describes how to verify that you correctly configured the Kafka receiver. It might take one or two minutes for the Ops Agent to begin collecting telemetry.

To verify that the logs are ingested, go to the Logs Explorer and run the following query to view the Kafka logs:

resource.type="gce_instance"
log_id("kafka")

To verify that the metrics are ingested, go to Metrics Explorer and run the following query in the MQL tab:

fetch gce_instance
| metric 'workload.googleapis.com/kafka.message.count'
| every 1m

What's next

For a walkthrough on how to use Ansible to install the Ops Agent, configure a third-party application, and install a sample dashboard, see the Install the Ops Agent to troubleshoot third-party applications video.