# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication
inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address
advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
外部客户端使用端口 9093 通过“SSL”通道连接,内部客户端使用端口 9092 通过明文通道连接。在 advertised.listeners 下指定地址时,请使用解析为外部和内部流量的相同实例的 DNS 名称(在此示例中为 kafkabroker-n.mydomain.com)。使用公共 IP 地址可能不起作用,因为该地址可能无法解析内部流量。
[[["易于理解","easyToUnderstand","thumb-up"],["解决了我的问题","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["很难理解","hardToUnderstand","thumb-down"],["信息或示例代码不正确","incorrectInformationOrSampleCode","thumb-down"],["没有我需要的信息/示例","missingTheInformationSamplesINeed","thumb-down"],["翻译问题","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],["最后更新时间 (UTC):2025-09-04。"],[],[],null,["# Write from Dataflow to Apache Kafka\n\nThis document describes how to write data from Dataflow to Apache\nKafka.\n\nFor most use cases, consider using the\n[Managed I/O connector](/dataflow/docs/guides/managed-io-kafka) to write to\nKafka.\n\nIf you need more advanced [performance tuning](#tune-kafka), consider using the\n`KafkaIO` connector. The `KafkaIO` connector is available for\n[Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.html)\nor by using the\n[multi-language pipelines framework](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines)\nfor [Python](https://beam.apache.org/releases/pydoc/current/apache_beam.io.kafka.html)\nand [Go](https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio).\n\nExactly-once processing\n-----------------------\n\nBy default, the `KafkaIO` connector doesn't provide\n[exactly-once semantics](/dataflow/docs/concepts/exactly-once) for writes. That\nmeans data might be written to your Kafka topic multiple times. To enable\nexactly-once writes, call the [`withEOS`](https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/kafka/KafkaIO.Write.html#withEOS-int-java.lang.String-) method. Exactly-once writes\nguarantee that data is written to the destination Kafka topic exactly once.\nHowever, it also increases the pipeline cost and decreases throughput.\n\nIf you don't have strict requirements for exactly-once semantics, and the logic\nin your pipeline can handle duplicate records, consider enabling at-least-once\nmode for the entire pipeline to reduce costs. For more information, see\n[Set the pipeline streaming mode](/dataflow/docs/guides/streaming-modes).\n\n### Pipeline drains\n\nIf you drain the pipeline, exactly-once semantics are not guaranteed. The only\nguarantee is that no acknowledged data is lost. As a result, some data might be\nprocessed while the pipeline is draining, without the commit of read offsets\nback to Kafka. To achieve at-least-once semantics for Kafka when you modify a\npipeline, [update the pipeline](/dataflow/docs/guides/updating-a-pipeline)\ninstead of cancelling the job and starting a new job.\n\n### Tune Kafka for exactly-once semantics\n\nAdjusting `transaction.max.timeout.ms` and `transactional.id.expiration.ms` can\ncomplement your overall fault-tolerance and exactly-once delivery strategy.\nHowever, their impact depends on the nature of the outage and your specific\nconfiguration. Set `transaction.max.timeout.ms` close to the retention time of\nyour Kafka topics to prevent data duplication caused by Kafka broker outages.\n\nIf a Kafka broker becomes temporarily unavailable (for example, due to network\npartition or node failure), and a producer has ongoing transactions, those\ntransactions might time out. Increasing the value of\n`transaction.max.timeout.ms` gives transactions more time to complete after a\nbroker recovers, potentially avoiding the need to restart transactions and\nresend messages. This mitigation indirectly helps maintain exactly-once\nsemantics, by reducing the chance of duplicate messages caused by transaction\nrestarts. On the other hand, a shorter expiration time can help clean up\ninactive transactional IDs more quickly, reducing potential resource usage.\n\nConfigure networking\n--------------------\n\nBy default, Dataflow launches instances within your default\nVirtual Private Cloud (VPC) network. Depending on your Kafka configuration, you\nmight need to configure a different network and subnet for\nDataflow. For more information, see\n[Specify a network and subnetwork](/dataflow/docs/guides/specifying-networks).\nWhen configuring your network, create\n[firewall rules](/vpc/docs/using-firewalls) that allow the\nDataflow worker machines to reach the Kafka brokers.\n\nIf you are using VPC Service Controls, then place the Kafka cluster within the\nVPC Service Controls perimeter, or else\n[extend the perimeters to the authorized VPN or Cloud Interconnect](/vpc-service-controls/docs/overview#hybrid_access).\n\nIf your Kafka cluster is deployed outside of Google Cloud, you must create\na network connection between Dataflow and the Kafka cluster.\nThere are several networking options with different tradeoffs:\n\n- Connect using a shared RFC 1918 address space, by using one of the following:\n - [Dedicated Interconnect](/network-connectivity/docs/interconnect/concepts/dedicated-overview)\n - [IPsec virtual private network (VPN)](/network-connectivity/docs/vpn/concepts/overview)\n- Reach your externally hosted Kafka cluster through public IP addresses, by using one of the following:\n - Public internet\n - [Direct peering](/network-connectivity/docs/direct-peering/direct-peering)\n - [Carrier peering](/network-connectivity/docs/carrier-peering)\n\nDedicated Interconnect is the best option for predictable\nperformance and reliability, but it can take longer to set up because third\nparties must provision the new circuits. With a public IP--based topology, you\ncan get started quickly because little networking work needs to be done.\n\nThe next two sections describe these options in more detail.\n\n#### Shared RFC 1918 address space\n\nBoth Dedicated Interconnect and IPsec VPN give you direct access\nto RFC 1918 IP addresses in your Virtual Private Cloud (VPC), which can simplify\nyour Kafka configuration. If you're using a VPN--based topology, consider setting\nup a [high-throughput VPN](/solutions/building-high-throughput-vpns).\n\nBy default, Dataflow launches instances on your default\n[VPC network](/vpc/docs/vpc). In a private network topology with\n[routes explicitly defined in Cloud Router](/network-connectivity/docs/router/concepts/overview)\nthat connect subnetworks in Google Cloud to that Kafka cluster, you need\nmore control over where to locate your Dataflow instances. You\ncan use Dataflow to configure the `network` and `subnetwork`\n[execution parameters](/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options).\n\nMake sure that the corresponding subnetwork has enough IP addresses available\nfor Dataflow to launch instances on when it attempts to scale out.\nAlso, when you create a separate network for launching your\nDataflow instances, ensure that you have a firewall rule that\nenables TCP traffic among all virtual machines in the project. The default\nnetwork already has this firewall rule configured.\n\n#### Public IP address space\n\nThis architecture uses Transport Layer Security\n([TLS](https://wikipedia.org/wiki/Transport_Layer_Security)) to secure traffic\nbetween external clients and Kafka, and uses unencrypted traffic for inter-broker\ncommunication. When the Kafka listener binds to a network interface that is used\nfor both internal and external communication, configuring the listener is\nstraightforward. However, in many scenarios, the externally advertised addresses\nof the Kafka brokers in the cluster differ from the internal network interfaces\nthat Kafka uses. In such scenarios, you can use the `advertised.listeners`\nproperty: \n\n```\n# Configure protocol map\nlistener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL\n\n# Use plaintext for inter-broker communication\ninter.broker.listener.name=INTERNAL\n\n# Specify that Kafka listeners should bind to all local interfaces\nlisteners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093\n\n# Separately, specify externally visible address\nadvertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093\n```\n\nExternal clients connect using port 9093 through an \"SSL\" channel, and internal\nclients connect using port 9092 through a plaintext channel. When you specify an\naddress under `advertised.listeners`, use DNS names\n(`kafkabroker-n.mydomain.com`, in this sample) that resolve to the same instance\nfor both external and internal traffic. Using public IP addresses might not work\nbecause the addresses might fail to resolve for internal traffic.\n\nLogging\n-------\n\nLogging from `KafkaIO` can be quite verbose. Consider reducing the logging\nlevel in production as follows: \n\n sdkHarnessLogLevelOverrides='{\"org.apache.kafka.clients.consumer.internals.SubscriptionState\":\"WARN\"}'.\n\nFor more information, see\n[Set pipeline worker log levels](/dataflow/docs/guides/logging#SettingLevels).\n\nWhat's next\n-----------\n\n- [Read from Apache Kafka](/dataflow/docs/guides/read-from-kafka).\n- Learn more about [Managed I/O](/dataflow/docs/guides/managed-io)."]]