将 Dataflow 写入 Apache Kafka

本文档介绍了如何将数据从 Dataflow 写入 Apache Kafka。

Apache Beam Kafka I/O 连接器 (KafkaIO) 原生适用于 Java,并且还适用于使用 Apache Beam 多语言流水线框架PythonGo

对于 Java 流水线,请考虑使用受管 I/O 连接器从 Kafka 读取数据。

一次性处理

默认情况下,KafkaIO 连接器不会为写入操作提供正好传送一次语义。这意味着数据可能会多次写入 Kafka 主题。如需启用“正好一次”写入,请调用 withEOS 方法。确切写入可确保数据仅写入目标 Kafka 主题一次。不过,这也会增加流水线成本并降低吞吐量。

如果您对“正好一次”语义没有严格的要求,并且流水线中的逻辑可以处理重复记录,请考虑为整个流水线启用“至少一次”模式,以降低费用。如需了解详情,请参阅设置流水线流处理模式

流水线排空

如果您排空流水线,则无法保证“正好一次”语义。唯一的保证是,不会丢失已确认的数据。因此,在流水线排空时,可能会处理一些数据,而无需将读取偏移量提交回 Kafka。如需在修改流水线时实现 Kafka 的至少一次语义,请更新流水线,而不是取消作业并启动新作业。

针对“正好一次”语义调整 Kafka

调整 transaction.max.timeout.mstransactional.id.expiration.ms 可以补充您的整体容错和恰好一次交付策略。不过,其影响取决于中断的性质和您的具体配置。将 transaction.max.timeout.ms 设置为接近 Kafka 主题的保留时间,以防止 Kafka 代理中断导致数据重复。

如果 Kafka 代理暂时不可用(例如,由于网络分区或节点故障),并且生产者有正在进行的交易,则这些交易可能会超时。增加 transaction.max.timeout.ms 的值可让事务在代理恢复后有更多时间完成,从而可能避免重启事务和重新发送消息。此缓解措施通过减少因交易重启而导致重复消息的可能性,间接有助于保持“恰好一次”语义。另一方面,较短的过期时间有助于更快地清理无效的事务 ID,从而减少潜在资源用量。

配置网络

默认情况下,Dataflow 会在您的默认 Virtual Private Cloud (VPC) 网络中启动实例。根据您的 Kafka 配置,您可能需要为 Dataflow 配置不同的网络和子网。如需了解详情,请参阅指定网络和子网。 配置网络时,请创建防火墙规则,允许 Dataflow 工作器机器访问 Kafka 代理。

如果您在使用 VPC Service Controls,请将 Kafka 集群放在 VPC Service Controls 边界内,或者将边界扩展到授权 VPN 或 Cloud Interconnect 范围

如果您的 Kafka 集群部署在 Google Cloud 之外,您必须在 Dataflow 和 Kafka 集群之间创建网络连接。有几种网络选项可供您选择,它们各有利弊:

专用互连是实现可预测性能和可靠性的最佳选择,但由于第三方必须预配新线路,因此设置专用互连可能需要更长时间。使用基于公共 IP 的拓扑,您可以快速开始,因为只需完成很少的网络操作。

接下来的两个部分更详细地介绍了这些选项。

共享的 RFC 1918 地址空间

使用专用互连和 IPsec VPN,均可以直接访问虚拟私有云 (VPC) 中的 RFC 1918 IP 地址,从而简化 Kafka 配置。如果您使用基于 VPN 的拓扑,请考虑设置高吞吐量 VPN

默认情况下,Dataflow 会在您的默认 VPC 网络中启动实例。在专用网络拓扑中(其中的路由已在 Cloud Router 中明确定义并将 Google Cloud 中的子网连接到该 Kafka 集群),您需要更好地控制 Dataflow 实例的放置位置。您可以使用 Dataflow 配置 networksubnetwork 执行参数

确保相应子网具有足够的 IP 地址,以便 Dataflow 在尝试横向扩容时可在这些地址上启动实例。此外,在创建用于启动 Dataflow 实例的单独网络时,请确保您具有可在项目中的所有虚拟机之间启用 TCP 流量的防火墙规则。默认网络已配置此防火墙规则。

公共 IP 地址空间

此架构使用传输层安全协议 (TLS) 来保护外部客户端与 Kafka 之间的流量,并使用未加密的流量进行代理间通信。当 Kafka 侦听器绑定到用于内部和外部通信的网络接口时,配置侦听器就非常简单了。但在许多情况下,集群中 Kafka broker 的外部通告地址将与 Kafka 使用的内部网络接口不同。在这种情况下,您可以使用 advertised.listeners 属性:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

外部客户端使用端口 9093 通过“SSL”通道连接,内部客户端使用端口 9092 通过明文通道连接。在 advertised.listeners 下指定地址时,请使用解析为外部和内部流量的相同实例的 DNS 名称(在此示例中为 kafkabroker-n.mydomain.com)。使用公共 IP 地址可能不起作用,因为该地址可能无法解析内部流量。

日志记录

KafkaIO 记录日志可能会非常冗长。请考虑在生产环境中降低日志记录级别,如下所示:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

如需了解详情,请参阅设置流水线工作器日志级别