Apache Kafka

使用集合让一切井井有条 根据您的偏好保存内容并对其进行分类。

Apache Kafka 集成会收集代理指标,例如主题请求数和失败数。它还会监控代理上的分区。此外,该集成还会收集 Kafka 日志并将其解析为 JSON 载荷。结果将包含日志记录器、级别和消息字段。

如需详细了解 Kafka,请参阅 Apache Kafka 文档

前提条件

如需收集 Kafka 遥测数据,您必须安装 Ops Agent

  • 对于指标,请安装 2.10.0 版或更高版本。
  • 对于日志,请安装 2.10.0 版或更高版本。

此集成支持 Kafka 0.8 到 3.0.0 版。

配置 Kafka 实例

如需公开 JMX 端点,您必须在启动 JVM 时设置 com.sun.management.jmxremote.port 系统属性。我们还建议您将 com.sun.management.jmxremote.rmi.port 系统属性设置为同一端口。 如需远程公开 JMX 端点,您还必须设置 java.rmi.server.hostname 系统属性。

默认情况下,这些属性在 Kafka 部署的 bin/kafka-run-class.sh 文件中设置。

如需使用命令行参数设置系统属性,请在启动 JVM 时在属性名称前加上 -D。 例如,如需将 com.sun.management.jmxremote.port 设置为端口 9999,请在启动 JVM 时指定以下内容:

-Dcom.sun.management.jmxremote.port=9999

为 Kafka 配置 Ops Agent

按照配置 Ops Agent 指南,添加从 Kafka 实例收集遥测数据所需的元素并重启代理

配置示例

以下命令会创建相关配置来收集和注入 Kafka 的遥测数据,并重启 Ops Agent。

set -e

# Create a back up of the existing file so existing configurations are not lost.
sudo cp /etc/google-cloud-ops-agent/config.yaml /etc/google-cloud-ops-agent/config.yaml.bak

# Configure the Ops Agent.
sudo tee /etc/google-cloud-ops-agent/config.yaml > /dev/null << EOF
metrics:
  receivers:
    memcached:
      type: memcached
  service:
    pipelines:
      memcached:
        receivers:
          - memcached
EOF

sudo service google-cloud-ops-agent restart
sleep 60

配置日志收集

如需从 Kafka 注入日志,您必须为 Kafka 生成的日志创建接收器,然后为新的接收器创建流水线。

如需为 kafka 日志配置接收器,请指定以下字段:

字段 默认 说明
exclude_paths 要从 include_paths 匹配的集合中排除的文件系统路径模式列表。
include_paths [/var/log/kafka/*.log] 要通过跟踪每个文件读取的文件系统路径列表。路径中可以使用通配符 *;例如 /var/log/kafka*/*.log
record_log_file_path false 如果设置为 true,则从中获取日志记录的特定文件的路径将作为 agent.googleapis.com/log_file_path 标签的值显示在输出日志条目中。使用通配符时,系统只会记录从中获取记录的文件的路径。
type 该值必须为 kafka
wildcard_refresh_interval 60s include_paths 中通配符文件路径的刷新间隔。指定为可由 time.ParseDuration 解析的时长,例如 30s2m。该属性在高日志记录吞吐量下可能很有用,因为日志文件的轮替速度快于默认时间间隔。

记录的内容

logName 派生自配置中指定的接收器 ID。LogEntry 中的详细字段如下所示。

kafka 日志包含 LogEntry 中的以下字段:

字段 类型 说明
jsonPayload.level 字符串 (LogSeverity) 日志条目级别
jsonPayload.logger 字符串 (Timestamp) 发起日志的日志记录器的名称。
jsonPayload.message 字符串 日志消息,包括详细的堆栈轨迹(如果提供)
jsonPayload.source 字符串 发起日志的模块和/或线程。
severity 字符串 日志条目级别(已转换)。
timestamp 字符串 收到请求的时间。

配置指标收集

如需从 Kafka 注入指标,您必须为 Kafka 生成的指标创建接收器,然后为新的接收器创建流水线。

如需为 kafka 指标配置接收器,请指定以下字段:

字段 默认 说明
stub_status_url localhost:9999 JMX 服务网址或用于构造服务网址的主机和端口。必须采用 service:jmx:<protocol>:<sap>host:port 格式。host:port 中的值将用于创建 service:jmx:rmi:///jndi/rmi://<host>:<port>/jmxrmi 的服务网址。
collect_jvm_metrics true 配置接收器以同时收集支持的 JVM 指标
collection_interval 60s 时长值,例如 30s5m
password 将 JMX 配置为需要身份验证时配置的密码。
stub_status_url localhost:9999 JMX 服务网址或用于构造服务网址的主机和端口。该值必须采用以下格式:service:jmx:: 或 host:port。host:port 表单中的值用于创建 service:jmx:rmi:///jndi/rmi://:/jmxrmi. 的服务网址
type 该值必须为 kafka
username 将 JMX 配置为需要身份验证时配置的用户名。

监控的内容

下表提供了 Ops Agent 从 Kafka 实例收集的指标列表。

指标类型
种类、类型
受监控的资源
标签
workload.googleapis.com/kafka.isr.operation.count
CUMULATIVEINT64
gce_instance
operation
workload.googleapis.com/kafka.message.count
CUMULATIVEINT64
gce_instance
 
workload.googleapis.com/kafka.network.io
CUMULATIVEINT64
gce_instance
state
workload.googleapis.com/kafka.partition.count
GAUGEINT64
gce_instance
 
workload.googleapis.com/kafka.partition.offline
GAUGEINT64
gce_instance
 
workload.googleapis.com/kafka.partition.under_replicated
GAUGEINT64
gce_instance
 
workload.googleapis.com/kafka.purgatory.size
GAUGEINT64
gce_instance
type
workload.googleapis.com/kafka.request.count
CUMULATIVEINT64
gce_instance
type
workload.googleapis.com/kafka.request.failed
CUMULATIVEINT64
gce_instance
type
workload.googleapis.com/kafka.request.time.total
CUMULATIVEINT64
gce_instance
type

示例信息中心

如需查看 Kafka 指标,您必须配置一个图表或信息中心。Cloud Monitoring 提供了一个用于集成的示例信息中心库,其中包含一些预配置的图表。如需了解如何安装这些信息中心,请参阅安装示例信息中心

验证配置

本部分介绍如何验证您是否正确配置了 Kafka 接收器。Ops Agent 可能需要一两分钟才会开始收集遥测数据。

如需验证日志已注入,请前往日志浏览器并运行以下查询来查看 Kafka 日志:

resource.type="gce_instance"
log_id("kafka")

如需验证指标已注入,请前往 Metrics Explorer 并在 MQL 标签页中运行以下查询:

fetch gce_instance
| metric 'workload.googleapis.com/kafka.message.count'
| every 1m

后续步骤

如需查看如何使用 Ansible 安装 Ops Agent、配置第三方应用和安装示例信息中心的演示,请参阅安装 Ops Agent 以排查第三方应用的问题视频。