本文档介绍了如何将数据从 Apache Kafka 读取到 Dataflow,并提供了一些性能提示和最佳实践。
对于大多数用例,请考虑使用托管式 I/O 连接器从 Kafka 读取数据。
如果您需要更高级的性能调优,不妨考虑使用 KafkaIO
连接器。KafkaIO
连接器适用于 Java,或者通过使用适用于 Python 和 Go 的多语言流水线框架。
最大并行数量
并行处理受两个因素的限制:工作器的最大数量 (max_num_workers
) 和 Kafka 分区的数量。Dataflow 默认的并行度扇出为 4 x max_num_workers
。不过,扇出受分区数量的限制。例如,如果有 100 个 vCPU 可用,但流水线仅从 10 个 Kafka 分区读取数据,则最大并行度为 10。
为尽可能提高并行度,建议至少有 4 x max_num_workers
个 Kafka 分区。如果作业使用 Runner v2,请考虑将并行处理数设置得更高。一个不错的起点是将分区数设为工作器 vCPU 数量的两倍。
如果您无法增加分区数量,可以通过调用 KafkaIO.Read.withRedistribute
来提高并行性。此方法会向流水线添加 Redistribute
转换,从而为 Dataflow 提供提示,以便更高效地重新分布数据并实现数据并行化。您还可以通过调用 KafkaIO.Read.withRedistributeNumKeys
来指定在重新分片步骤中使用的最佳分片数量。Dataflow 将此值视为优化提示。重新分发数据会增加一些额外的开销,以执行 shuffle 步骤。如需了解详情,请参阅阻止融合。
尽量确保各分区之间的负载相对均匀,而不是出现倾斜。如果负载出现偏差,可能会导致工作器的利用率较低。从负载较轻的分区读取数据的工作器可能相对空闲,而从负载较重的分区读取数据的工作器可能会落后。 Dataflow 会提供每个分区的积压指标。
如果负载出现倾斜,动态工作负载平衡有助于分配工作。例如,Dataflow 可能会分配一个工作器来读取多个低容量分区,并分配另一个工作器来读取单个高容量分区。不过,两个工作器无法从同一分区读取数据,因此负载过重的分区仍可能导致流水线滞后。
最佳做法
本部分包含有关从 Kafka 读取数据到 Dataflow 的建议。
低搜索量主题
一种常见的情况是同时从多个低容量主题读取数据,例如每个客户对应一个主题。为每个主题创建单独的 Dataflow 作业效率不高,因为每个作业至少需要一个完整的工作器。请改为考虑以下方案:
合并主题。在将主题提取到 Dataflow 之前对其进行合并。与提取许多低流量主题相比,提取少量高流量主题的效率要高得多。每个高容量主题都可以由一个充分利用其工作器的 Dataflow 作业来处理。
读取多个主题。如果您无法在将主题提取到 Dataflow 之前合并主题,请考虑创建可从多个主题读取数据的流水线。此方法可让 Dataflow 将多个主题分配给同一个工作器。您可以通过以下两种方式实现此方法:
单次读取步骤。创建
KafkaIO
连接器的单个实例,并将其配置为读取多个主题。然后按主题名称进行过滤,以便针对每个主题应用不同的逻辑。如需查看代码示例,请参阅从多个主题读取数据。如果您的所有主题都位于同一集群中,请考虑使用此选项。一个缺点是,单个接收器或转换出现问题可能会导致所有主题都积累待处理项。对于更高级的用例,请传入一组
KafkaSourceDescriptor
对象,用于指定要从中读取的主题。使用KafkaSourceDescriptor
可让您稍后根据需要更新主题列表。此功能需要使用 Java 和 Runner v2。多个读取步骤。如需从位于不同集群中的主题读取数据,您的流水线可以包含多个
KafkaIO
实例。作业运行时,您可以使用转换映射来更新各个来源。只有在使用 Runner v2 时,才支持设置新主题或集群。 这种方法可能会带来可观测性方面的挑战,因为您需要监控每个单独的读取转换,而不是依赖于流水线级指标。
提交回 Kafka
默认情况下,KafkaIO
连接器不使用 Kafka 偏移量来跟踪进度,也不会提交回 Kafka。如果您调用 commitOffsetsInFinalize
,连接器会在 Dataflow 中提交记录后尽最大努力提交回 Kafka。Dataflow 中已提交的记录可能未完全处理,因此,如果您取消流水线,则可能会提交偏移量,而记录未完全处理。
由于设置 enable.auto.commit=True
会在从 Kafka 读取偏移量后立即提交偏移量,而无需 Dataflow 进行任何处理,因此我们不建议使用此选项。建议同时设置 enable.auto.commit=False
和 commitOffsetsInFinalize=True
。如果您将 enable.auto.commit
设置为 True
,而流水线又在处理期间中断,则数据可能会丢失。已在 Kafka 中提交的记录可能会被丢弃。
水印
默认情况下,KafkaIO
连接器使用当前处理时间来分配输出水位和事件时间。如需更改此行为,请调用 withTimestampPolicyFactory
并分配 TimestampPolicy
。Beam 提供了 TimestampPolicy
的实现,可根据 Kafka 的日志附加时间或消息创建时间计算水位。
Runner 注意事项
KafkaIO
连接器具有两种用于 Kafka 读取的底层实现:较旧的 ReadFromKafkaViaUnbounded
和较新的 ReadFromKafkaViaSDF
。Dataflow 会根据您的 SDK 语言和作业要求自动选择最适合您作业的实现。除非您需要仅在该实现中提供的特定功能,否则应避免明确请求 runner 或 Kafka 实现。如需详细了解如何选择 Runner,请参阅使用 Dataflow Runner v2。
如果您的流水线使用 withTopic
或 withTopics
,则旧版实现会在流水线构建时查询 Kafka 以获取可用的分区。创建流水线的机器必须具有连接到 Kafka 的权限。如果您收到权限错误,请验证您是否拥有在本地连接到 Kafka 的权限。您可以使用 withTopicPartitions
来避免此问题,该函数在流水线构建时不会连接到 Kafka。
部署到生产环境
在生产环境中部署解决方案时,建议使用 Flex 模板。通过使用 Flex 模板,流水线可以从一致的环境中启动,这有助于缓解本地配置问题。
来自 KafkaIO
的日志可能非常详细。请考虑在生产环境中降低日志记录级别,如下所示:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
如需了解详情,请参阅设置流水线工作器日志级别。
配置网络
默认情况下,Dataflow 会在您的默认 Virtual Private Cloud (VPC) 网络中启动实例。根据您的 Kafka 配置,您可能需要为 Dataflow 配置不同的网络和子网。如需了解详情,请参阅指定网络和子网。 配置网络时,请创建防火墙规则,允许 Dataflow 工作器机器访问 Kafka 代理。
如果您在使用 VPC Service Controls,请将 Kafka 集群放在 VPC Service Controls 边界内,或者将边界扩展到授权 VPN 或 Cloud Interconnect 范围。
如果您的 Kafka 集群部署在 Google Cloud之外,您必须在 Dataflow 和 Kafka 集群之间创建网络连接。有几种网络选项可供您选择,它们各有利弊:
- 通过以下选项之一,使用共享 RFC 1918 地址空间进行连接:
- 使用以下选项之一,通过公共 IP 地址访问外部托管的 Kafka 集群:
专用互连是实现可预测性能和可靠性的最佳选择,但由于第三方必须预配新线路,因此设置专用互连可能需要更长时间。使用基于公共 IP 的拓扑,您可以快速开始,因为只需完成很少的网络操作。
接下来的两个部分更详细地介绍了这些选项。
共享的 RFC 1918 地址空间
使用专用互连和 IPsec VPN,均可以直接访问虚拟私有云 (VPC) 中的 RFC 1918 IP 地址,从而简化 Kafka 配置。如果您使用基于 VPN 的拓扑,请考虑设置高吞吐量 VPN。
默认情况下,Dataflow 会在您的默认 VPC 网络中启动实例。在专用网络拓扑中(其中的路由已在 Cloud Router 中明确定义并将 Google Cloud 中的子网连接到该 Kafka 集群),您需要更好地控制 Dataflow 实例的放置位置。您可以使用 Dataflow 配置 network
和 subnetwork
执行参数。
确保相应子网具有足够的 IP 地址,以便 Dataflow 在尝试横向扩容时可在这些地址上启动实例。此外,在创建用于启动 Dataflow 实例的单独网络时,请确保您具有可在项目中的所有虚拟机之间启用 TCP 流量的防火墙规则。默认网络已配置此防火墙规则。
公共 IP 地址空间
此架构使用传输层安全协议 (TLS) 来保护外部客户端与 Kafka 之间的流量,并使用未加密的流量进行代理间通信。当 Kafka 侦听器绑定到用于内部和外部通信的网络接口时,配置侦听器就非常简单了。但在许多情况下,集群中 Kafka broker 的外部通告地址将与 Kafka 使用的内部网络接口不同。在这种情况下,您可以使用 advertised.listeners
属性:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
外部客户端使用端口 9093 通过“SSL”通道连接,内部客户端使用端口 9092 通过明文通道连接。在 advertised.listeners
下指定地址时,请使用解析为外部和内部流量的相同实例的 DNS 名称(在此示例中为 kafkabroker-n.mydomain.com
)。使用公共 IP 地址可能不起作用,因为该地址可能无法解析内部流量。
调整 Kafka
Kafka 集群和 Kafka 客户端设置会对性能产生很大影响。具体而言,以下设置可能过低。本部分提供了一些建议的初始值,但您应针对自己的特定工作负载对这些值进行实验。
unboundedReaderMaxElements
。默认值为 10,000。较高的值(例如 100,000)可以增加软件包的大小,如果流水线包含聚合,则可以显著提升性能。不过,较高的值也可能会增加延迟时间。如需设置值,请使用setUnboundedReaderMaxElements
。 此设置不适用于 Runner v2。unboundedReaderMaxReadTimeMs
。默认值为 10,000 毫秒。较高的值(例如 20,000 毫秒)可能会增加捆绑包大小,而较低的值(例如 5, 000 毫秒)可能会缩短延迟时间或减少积压。如需设置该值,请使用setUnboundedReaderMaxReadTimeMs
。此设置不适用于 Runner v2。max.poll.records
。默认值为 500。通过一次性检索更多传入记录,较高的值可能会带来更好的效果,尤其是在使用 Runner v2 时。如需设置值,请调用withConsumerConfigUpdates
。fetch.max.bytes
。默认值为 1MB。较高的值可能会通过减少请求数量来提高吞吐量,尤其是在使用 Runner v2 时。不过,如果将该值设置得过高,可能会增加延迟时间,尽管下游处理更有可能成为主要瓶颈。建议的初始值为 100MB。如需设置值,请调用withConsumerConfigUpdates
。max.partition.fetch.bytes
。默认值为 1MB。此参数用于设置服务器返回的每个分区的最大数据量。增加该值可以减少请求数量,从而提高吞吐量,尤其是在使用 Runner v2 时。不过,如果将该值设置得过高,可能会增加延迟时间,尽管下游处理更有可能成为主要瓶颈。建议的初始值为 100MB。如需设置值,请调用withConsumerConfigUpdates
。consumerPollingTimeout
。默认值为 2 秒。如果使用方客户端在读取任何记录之前超时,请尝试设置更高的值。此设置最常在执行跨区域读取或通过慢速网络读取时使用。如需设置值,请调用withConsumerPollingTimeout
。
确保 receive.buffer.bytes
足够大,可以处理消息的大小。如果该值过小,日志可能会显示使用方不断重新创建并寻找特定偏移量。
示例
以下代码示例展示了如何创建从 Kafka 读取数据的 Dataflow 流水线。将应用默认凭据与 Google Cloud Managed Service for Apache Kafka 提供的回调处理程序搭配使用时,需要使用 kafka-clients
版本 3.7.0 或更高版本。
从单个主题中读取
此示例使用托管式 I/O 连接器。该示例展示了如何从 Kafka 主题读取数据,并将消息载荷写入文本文件。
Java
如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
Python
如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
从多个主题读取数据
此示例使用 KafkaIO
连接器。此示例展示了如何从多个 Kafka 主题读取数据,并为每个主题应用单独的流水线逻辑。
对于更高级的用例,请动态传入一组 KafkaSourceDescriptor
对象,以便您可以更新要读取的主题列表。此方法需要使用 Java 和 Runner v2。
Java
如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
Python
如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
后续步骤
- 写入 Apache Kafka。
- 详细了解托管式 I/O。