The Apache Kafka integration collects broker metrics, such as topic requests and failures. It also monitors the partitions on the broker. The integration collects Kafka logs and parses them into a JSON payload. The result includes fields for logger, level, and message.
For more information about Kafka, see kafka.apache.org/.
Prerequisites
To collect and ingest Kafka logs and metrics, you must install Ops Agent version 2.10.0 or higher.
This receiver supports Apache Kafka versions 0.8 through 3.0.0.
Configure your Kafka instance
To expose a JMX endpoint, you must set the com.sun.management.jmxremote.port
system property when starting the JVM. We also recommend setting the
com.sun.management.jmxremote.rmi.port
system property to the same port. To
expose a JMX endpoint remotely, you must also set the java.rmi.server.hostname
system property.
By default, these properties are set in a Kafka deployment's
bin/kafka-run-class.sh
file.
To set system properties by using command-line arguments, prepend the property
name with -D
when starting the JVM. For example, to set
com.sun.management.jmxremote.port
to port 9999
, specify the following when
starting the JVM:
-Dcom.sun.management.jmxremote.port=9999
Configure the Ops Agent for Kafka
Following the guide for Configuring the Ops Agent, add the required elements to collect logs and metrics from your Kafka instances, and restart the agent.
Example configuration
The following command creates the configuration file to collect and ingest logs and metrics for Kafka and restarts the Ops Agent on Linux.
sudo tee /etc/google-cloud-ops-agent/config.yaml > /dev/null << EOF
logging:
receivers:
kafka:
type: kafka
service:
pipelines:
kafka:
receivers:
- kafka
metrics:
receivers:
kafka:
type: kafka
service:
pipelines:
kafka:
receivers:
- kafka
EOF
sudo service google-cloud-ops-agent restart
Configure logs collection
To ingest logs from Kafka, you must create receivers for the logs Kafka produces
and then create a pipeline for the new receivers. To configure a receiver for
your kafka
logs, specify the following fields:
Field | Default | Description |
---|---|---|
type |
The value must be kafka . |
|
include_paths |
[/var/log/kafka/*.log] |
A list of filesystem paths to read by tailing each file. A wild card (* ) can be used in the paths; for example, /var/log/kafka*/*.log . |
exclude_paths |
A list of filesystem path patterns to exclude from the set matched by include_paths . |
|
record_log_file_path |
false |
If set to true , then the path to the specific file from which the log record was obtained appears in the output log entry as the value of the agent.googleapis.com/log_file_path label. When using a wildcard, only the path of the file from which the record was obtained is recorded. |
wildcard_refresh_interval |
60s |
The interval at which wildcard file paths in include_paths are refreshed. Given as a time duration, for example 30s or 2m . This property might be useful under high logging throughputs where log files are rotated faster than the default interval. Must be a multiple of 1s. |
What is logged
The logName
of the kafka
logs are derived from the receiver IDs specified
in the configuration. Detailed fields inside the
LogEntry
are as follows.
Field | Type | Description |
---|---|---|
jsonPayload.source |
string | Module and/or thread where the log originated. |
jsonPayload.logger |
string | Name of the logger where the log originated. |
jsonPayload.message |
string | Log message, including detailed stacktrace where provided. |
severity |
string (LogSeverity ) |
Log entry level (translated). |
timestamp |
string (Timestamp ) |
Time the request was received. |
Any fields that are blank or missing will not be present in the log entry.
Configure metrics collection
To collect metrics from Kafka, you must create a receiver for Kafka metrics and then create a pipeline for the new receiver. To configure a receiver for your Kafka metrics, specify the following fields:
Field | Default | Description |
---|---|---|
type |
The value must be kafka . |
|
stub_status_url |
localhost:9999 |
The JMX Service URL or host and port used to construct the Service URL. Must be in the form of service:jmx:<protocol>:<sap> or host:port . Values in host:port form will be used to create a Service URL of service:jmx:rmi:///jndi/rmi://<host>:<port>/jmxrmi . |
collect_jvm_metrics |
true |
Configures the receiver to also collect the supported JVM metrics. |
username |
The configured username if JMX is configured to require authentication. | |
password |
The configured password if JMX is configured to require authentication. | |
collection_interval |
60s |
A time.Duration value, such as 30s or 5m . |
What is monitored
The following table provides the list of metrics that the Ops Agent collects from the Kafka instance.
Metric type | |
---|---|
Kind, Type Monitored resources |
Labels |
workload.googleapis.com/kafka.isr.operation.count
|
|
CUMULATIVE , INT64 gce_instance |
operation
|
workload.googleapis.com/kafka.message.count
|
|
CUMULATIVE , INT64 gce_instance |
|
workload.googleapis.com/kafka.network.io
|
|
CUMULATIVE , INT64 gce_instance |
state
|
workload.googleapis.com/kafka.partition.count
|
|
GAUGE , INT64 gce_instance |
|
workload.googleapis.com/kafka.partition.offline
|
|
GAUGE , INT64 gce_instance |
|
workload.googleapis.com/kafka.partition.under_replicated
|
|
GAUGE , INT64 gce_instance |
|
workload.googleapis.com/kafka.purgatory.size
|
|
GAUGE , INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.count
|
|
CUMULATIVE , INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.failed
|
|
CUMULATIVE , INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.time.total
|
|
CUMULATIVE , INT64 gce_instance |
type
|
Verify the configuration
You can use the Logs Explorer and Metrics Explorer to verify that you correctly configured the Kafka receiver. It might take one or two minutes for the Ops Agent to begin collecting logs and metrics.
To verify the logs are ingested, go to the Logs Explorer and run the following query to view the Kafka logs:
resource.type="gce_instance"
logName=("projects/PROJECT_ID/logs/kafka")
To verify the metrics are ingested, go to
Metrics Explorer
and run the following query in the MQL tab.
fetch gce_instance
| metric 'workload.googleapis.com/kafka.request.count'
| align rate(1m)
| every 1m
What's next
For a walkthrough on how to use Ansible to install the Ops Agent, configure a third-party application, and install a sample dashboard, see the Install the Ops Agent to troubleshoot third-party applications video.