# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication
inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address
advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
外部用戶端透過「SSL」管道使用 9093 通訊埠進行連線,內部用戶端則透過明文管道使用 9092 通訊埠進行連線。當您在 advertised.listeners 下指定位址時,請使用外部和內部流量都可以解析到同一執行個體的 DNS 名稱 (在此樣本中為 kafkabroker-n.mydomain.com)。使用公開 IP 位址可能行不通,因為對於內部流量而言,該位址可能無法解析。
[[["容易理解","easyToUnderstand","thumb-up"],["確實解決了我的問題","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["難以理解","hardToUnderstand","thumb-down"],["資訊或程式碼範例有誤","incorrectInformationOrSampleCode","thumb-down"],["缺少我需要的資訊/範例","missingTheInformationSamplesINeed","thumb-down"],["翻譯問題","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],["上次更新時間:2025-09-04 (世界標準時間)。"],[],[],null,["# Write from Dataflow to Apache Kafka\n\nThis document describes how to write data from Dataflow to Apache\nKafka.\n\nFor most use cases, consider using the\n[Managed I/O connector](/dataflow/docs/guides/managed-io-kafka) to write to\nKafka.\n\nIf you need more advanced [performance tuning](#tune-kafka), consider using the\n`KafkaIO` connector. The `KafkaIO` connector is available for\n[Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.html)\nor by using the\n[multi-language pipelines framework](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines)\nfor [Python](https://beam.apache.org/releases/pydoc/current/apache_beam.io.kafka.html)\nand [Go](https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio).\n\nExactly-once processing\n-----------------------\n\nBy default, the `KafkaIO` connector doesn't provide\n[exactly-once semantics](/dataflow/docs/concepts/exactly-once) for writes. That\nmeans data might be written to your Kafka topic multiple times. To enable\nexactly-once writes, call the [`withEOS`](https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/kafka/KafkaIO.Write.html#withEOS-int-java.lang.String-) method. Exactly-once writes\nguarantee that data is written to the destination Kafka topic exactly once.\nHowever, it also increases the pipeline cost and decreases throughput.\n\nIf you don't have strict requirements for exactly-once semantics, and the logic\nin your pipeline can handle duplicate records, consider enabling at-least-once\nmode for the entire pipeline to reduce costs. For more information, see\n[Set the pipeline streaming mode](/dataflow/docs/guides/streaming-modes).\n\n### Pipeline drains\n\nIf you drain the pipeline, exactly-once semantics are not guaranteed. The only\nguarantee is that no acknowledged data is lost. As a result, some data might be\nprocessed while the pipeline is draining, without the commit of read offsets\nback to Kafka. To achieve at-least-once semantics for Kafka when you modify a\npipeline, [update the pipeline](/dataflow/docs/guides/updating-a-pipeline)\ninstead of cancelling the job and starting a new job.\n\n### Tune Kafka for exactly-once semantics\n\nAdjusting `transaction.max.timeout.ms` and `transactional.id.expiration.ms` can\ncomplement your overall fault-tolerance and exactly-once delivery strategy.\nHowever, their impact depends on the nature of the outage and your specific\nconfiguration. Set `transaction.max.timeout.ms` close to the retention time of\nyour Kafka topics to prevent data duplication caused by Kafka broker outages.\n\nIf a Kafka broker becomes temporarily unavailable (for example, due to network\npartition or node failure), and a producer has ongoing transactions, those\ntransactions might time out. Increasing the value of\n`transaction.max.timeout.ms` gives transactions more time to complete after a\nbroker recovers, potentially avoiding the need to restart transactions and\nresend messages. This mitigation indirectly helps maintain exactly-once\nsemantics, by reducing the chance of duplicate messages caused by transaction\nrestarts. On the other hand, a shorter expiration time can help clean up\ninactive transactional IDs more quickly, reducing potential resource usage.\n\nConfigure networking\n--------------------\n\nBy default, Dataflow launches instances within your default\nVirtual Private Cloud (VPC) network. Depending on your Kafka configuration, you\nmight need to configure a different network and subnet for\nDataflow. For more information, see\n[Specify a network and subnetwork](/dataflow/docs/guides/specifying-networks).\nWhen configuring your network, create\n[firewall rules](/vpc/docs/using-firewalls) that allow the\nDataflow worker machines to reach the Kafka brokers.\n\nIf you are using VPC Service Controls, then place the Kafka cluster within the\nVPC Service Controls perimeter, or else\n[extend the perimeters to the authorized VPN or Cloud Interconnect](/vpc-service-controls/docs/overview#hybrid_access).\n\nIf your Kafka cluster is deployed outside of Google Cloud, you must create\na network connection between Dataflow and the Kafka cluster.\nThere are several networking options with different tradeoffs:\n\n- Connect using a shared RFC 1918 address space, by using one of the following:\n - [Dedicated Interconnect](/network-connectivity/docs/interconnect/concepts/dedicated-overview)\n - [IPsec virtual private network (VPN)](/network-connectivity/docs/vpn/concepts/overview)\n- Reach your externally hosted Kafka cluster through public IP addresses, by using one of the following:\n - Public internet\n - [Direct peering](/network-connectivity/docs/direct-peering/direct-peering)\n - [Carrier peering](/network-connectivity/docs/carrier-peering)\n\nDedicated Interconnect is the best option for predictable\nperformance and reliability, but it can take longer to set up because third\nparties must provision the new circuits. With a public IP--based topology, you\ncan get started quickly because little networking work needs to be done.\n\nThe next two sections describe these options in more detail.\n\n#### Shared RFC 1918 address space\n\nBoth Dedicated Interconnect and IPsec VPN give you direct access\nto RFC 1918 IP addresses in your Virtual Private Cloud (VPC), which can simplify\nyour Kafka configuration. If you're using a VPN--based topology, consider setting\nup a [high-throughput VPN](/solutions/building-high-throughput-vpns).\n\nBy default, Dataflow launches instances on your default\n[VPC network](/vpc/docs/vpc). In a private network topology with\n[routes explicitly defined in Cloud Router](/network-connectivity/docs/router/concepts/overview)\nthat connect subnetworks in Google Cloud to that Kafka cluster, you need\nmore control over where to locate your Dataflow instances. You\ncan use Dataflow to configure the `network` and `subnetwork`\n[execution parameters](/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options).\n\nMake sure that the corresponding subnetwork has enough IP addresses available\nfor Dataflow to launch instances on when it attempts to scale out.\nAlso, when you create a separate network for launching your\nDataflow instances, ensure that you have a firewall rule that\nenables TCP traffic among all virtual machines in the project. The default\nnetwork already has this firewall rule configured.\n\n#### Public IP address space\n\nThis architecture uses Transport Layer Security\n([TLS](https://wikipedia.org/wiki/Transport_Layer_Security)) to secure traffic\nbetween external clients and Kafka, and uses unencrypted traffic for inter-broker\ncommunication. When the Kafka listener binds to a network interface that is used\nfor both internal and external communication, configuring the listener is\nstraightforward. However, in many scenarios, the externally advertised addresses\nof the Kafka brokers in the cluster differ from the internal network interfaces\nthat Kafka uses. In such scenarios, you can use the `advertised.listeners`\nproperty: \n\n```\n# Configure protocol map\nlistener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL\n\n# Use plaintext for inter-broker communication\ninter.broker.listener.name=INTERNAL\n\n# Specify that Kafka listeners should bind to all local interfaces\nlisteners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093\n\n# Separately, specify externally visible address\nadvertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093\n```\n\nExternal clients connect using port 9093 through an \"SSL\" channel, and internal\nclients connect using port 9092 through a plaintext channel. When you specify an\naddress under `advertised.listeners`, use DNS names\n(`kafkabroker-n.mydomain.com`, in this sample) that resolve to the same instance\nfor both external and internal traffic. Using public IP addresses might not work\nbecause the addresses might fail to resolve for internal traffic.\n\nLogging\n-------\n\nLogging from `KafkaIO` can be quite verbose. Consider reducing the logging\nlevel in production as follows: \n\n sdkHarnessLogLevelOverrides='{\"org.apache.kafka.clients.consumer.internals.SubscriptionState\":\"WARN\"}'.\n\nFor more information, see\n[Set pipeline worker log levels](/dataflow/docs/guides/logging#SettingLevels).\n\nWhat's next\n-----------\n\n- [Read from Apache Kafka](/dataflow/docs/guides/read-from-kafka).\n- Learn more about [Managed I/O](/dataflow/docs/guides/managed-io)."]]