Write from Dataflow to Apache Kafka

This document describes how to write data from Dataflow to Apache Kafka.

The Apache Beam Kafka I/O connector (KafkaIO) is available natively for Java, and is also available for Python and Go using the Apache Beam multi-language pipelines framework.

For Java pipelines, consider using the Managed I/O connector to read from Kafka.

Exactly-once processing

By default, the KafkaIO connector doesn't provide exactly-once semantics for writes. That means data might be written to your Kafka topic multiple times. To enable exactly-once writes, call the withEOS method. Exactly-once writes guarantee that data is written to the destination Kafka topic exactly once. However, it also increases the pipeline cost and decreases throughput.

If you don't have strict requirements for exactly-once semantics, and the logic in your pipeline can handle duplicate records, consider enabling at-least-once mode for the entire pipeline to reduce costs. For more information, see Set the pipeline streaming mode.

Pipeline drains

If you drain the pipeline, exactly-once semantics are not guaranteed. The only guarantee is that no acknowledged data is lost. As a result, some data might be processed while the pipeline is draining, without the commit of read offsets back to Kafka. To achieve at-least-once semantics for Kafka when you modify a pipeline, update the pipeline instead of cancelling the job and starting a new job.

Tune Kafka for exactly-once semantics

Adjusting transaction.max.timeout.ms and transactional.id.expiration.ms can complement your overall fault-tolerance and exactly-once delivery strategy. However, their impact depends on the nature of the outage and your specific configuration. Set transaction.max.timeout.ms close to the retention time of your Kafka topics to prevent data duplication caused by Kafka broker outages.

If a Kafka broker becomes temporarily unavailable (for example, due to network partition or node failure), and a producer has ongoing transactions, those transactions might time out. Increasing the value of transaction.max.timeout.ms gives transactions more time to complete after a broker recovers, potentially avoiding the need to restart transactions and resend messages. This mitigation indirectly helps maintain exactly-once semantics, by reducing the chance of duplicate messages caused by transaction restarts. On the other hand, a shorter expiration time can help clean up inactive transactional IDs more quickly, reducing potential resource usage.

Configure networking

By default, Dataflow launches instances within your default Virtual Private Cloud (VPC) network. Depending on your Kafka configuration, you might need to configure a different network and subnet for Dataflow. For more information, see Specify a network and subnetwork. When configuring your network, create firewall rules that allow the Dataflow worker machines to reach the Kafka brokers.

If you are using VPC Service Controls, then place the Kafka cluster within the VPC Service Controls perimeter, or else extend the perimeters to the authorized VPN or Cloud Interconnect.

If your Kafka cluster is deployed outside of Google Cloud, you must create a network connection between Dataflow and the Kafka cluster. There are several networking options with different tradeoffs:

Dedicated Interconnect is the best option for predictable performance and reliability, but it can take longer to set up because third parties must provision the new circuits. With a public IP–based topology, you can get started quickly because little networking work needs to be done.

The next two sections describe these options in more detail.

Shared RFC 1918 address space

Both Dedicated Interconnect and IPsec VPN give you direct access to RFC 1918 IP addresses in your Virtual Private Cloud (VPC), which can simplify your Kafka configuration. If you're using a VPN–based topology, consider setting up a high-throughput VPN.

By default, Dataflow launches instances on your default VPC network. In a private network topology with routes explicitly defined in Cloud Router that connect subnetworks in Google Cloud to that Kafka cluster, you need more control over where to locate your Dataflow instances. You can use Dataflow to configure the network and subnetwork execution parameters.

Make sure that the corresponding subnetwork has enough IP addresses available for Dataflow to launch instances on when it attempts to scale out. Also, when you create a separate network for launching your Dataflow instances, ensure that you have a firewall rule that enables TCP traffic among all virtual machines in the project. The default network already has this firewall rule configured.

Public IP address space

This architecture uses Transport Layer Security (TLS) to secure traffic between external clients and Kafka, and uses unencrypted traffic for inter-broker communication. When the Kafka listener binds to a network interface that is used for both internal and external communication, configuring the listener is straightforward. However, in many scenarios, the externally advertised addresses of the Kafka brokers in the cluster differ from the internal network interfaces that Kafka uses. In such scenarios, you can use the advertised.listeners property:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

External clients connect using port 9093 through an "SSL" channel, and internal clients connect using port 9092 through a plaintext channel. When you specify an address under advertised.listeners, use DNS names (kafkabroker-n.mydomain.com, in this sample) that resolve to the same instance for both external and internal traffic. Using public IP addresses might not work because the addresses might fail to resolve for internal traffic.

Logging

Logging from KafkaIO can be quite verbose. Consider reducing the logging level in production as follows:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

For more information, see Set pipeline worker log levels.