Using Cloud Dataflow to Process Outside-Hosted Messages from Kafka

This article discusses key networking-related issues to consider when using Dataflow and KafkaIO to process messages. In this case, Kafka is outside of Google Cloud, but you use Dataflow to process the messages inside Google Cloud. With the release of the Kafka Apache Beam transform, you can use the power of Apache Beam and Dataflow to process messages from Kafka. The following figure illustrates a popular scenario: you use Dataflow to process the messages, where Kafka is hosted either on-premises or in another public cloud such as Amazon Web Services (AWS).

Processing Kafka messages outside of Google Cloud

Network topology

You have a variety of connectivity options for linking resources on Google Cloud with resources outside of Google Cloud.

On Google Cloud, Dedicated Interconnect is the best option for predictable performance and reliability, but it can take longer to set up because third parties must provision the new circuits. If you're using a VPN–based topology, consider setting up a high-throughput VPN. Both Dedicated Interconnect and IPsec VPN give you direct access to RFC 1918 IP addresses in your virtual private cloud (VPC), which can simplify your Kafka configuration. With a public IP–based topology, you can get started quickly because little networking work needs to be done.

In both topologies, it's a good idea to validate connectivity by sending and reading messages from a Kafka client on a separate Compute Engine instance in the same subnetwork as your Dataflow instances.

Latency is also an important consideration for stream-processing workloads. Pick a Google Cloud region for your Dataflow deployment near your Kafka cluster. For tips about optimizing network performance, see 5 steps to better Google Cloud network performance.

Shared RFC 1918 address space

This section discusses the network topology shown in the following diagram.

Shared RFC 1918 address space

Specifying a Dataflow subnetwork

By default, Dataflow launches instances on your default VPC network, which works if you can reach your externally hosted Kafka cluster through public IP addresses. In a private network topology with routes explicitly defined in Cloud Router that connect subnetworks in Google Cloud to that Kafka cluster, you need more control over where to locate your Dataflow instances. You can use Dataflow to configure the network and subnetwork execution parameters, as shown in this code sample:

mvn compile exec:java \
    -Dexec.mainClass=your-pipeline-java-class \
    -Dexec.args="--project=your-gcp-project
    --network=your-dataflow-network \
    --subnetwork=your-dataflow-subnet \
    --runner=DataflowRunner"

Make sure that the corresponding subnetwork has enough IP addresses available for Dataflow to launch instances on as it attempts to scale out. Also, when you create a separate network for launching your Dataflow instances, ensure that you have a firewall rule that enables TCP traffic among all virtual machines in the project. The default network already has this firewall rule configured.

Communication between Dataflow and Kafka

In a private network topology, configure Kafka as you normally would and follow best practices for availability, security, and durability.

Public IP address space

The following diagram shows a sample architecture for securely hosting a cluster of three Kafka brokers that you can access over the public internet.

Public IP address space

Dataflow configuration

Because traffic flows over the public internet, you don't have to configure a network and subnetwork. However, in the case of a private network topology, you can specify the network and subnetwork, so long as a route exists from the Dataflow network to the public IP addresses of the corresponding Kafka cluster.

Kafka configuration

The architecture shown in the previous diagram uses the terminology of standard convention in Kafka documentation and configuration (Secure Sockets Layer (SSL), but the architecture actually uses Transport Layer Security (TLS), to secure traffic between external clients and Kafka and uses plaintext for inter-broker communication. When the Kafka listener binds to a network interface that is used for both internal and external communication, configuring the listener is straightforward. However, in many scenarios, such as when deploying on AWS, the externally advertised addresses of the Kafka brokers in the cluster differ from the internal network interfaces that Kafka uses. In such scenarios, you can use the advertised.listeners property shown in this sample server.properties snippet:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

In this configuration, external clients connect using port 9093 through an "SSL" channel, and internal clients connect using port 9092 through a plaintext channel. When you specify an address under advertised.listeners, use DNS names (kafkabroker-n.mydomain.com, in this sample) that resolve to the same instance for both external and internal traffic. Using public IP addresses might not work because the addresses might fail to resolve for internal traffic.

What's next