适用于 Apache Kafka 的 Dataflow 托管式 I/O

托管式 I/O 支持对 Apache Kafka 执行读写操作。

使用要求

需要使用 Java 版 Apache Beam SDK 2.58.0 或更高版本。

配置

受管 I/O 为 Apache Kafka 使用以下配置参数。

读取和写入配置 数据类型 说明
bootstrap_servers 字符串 必填。Kafka 引导服务器的英文逗号分隔列表。示例:localhost:9092
topic 字符串 必填。要读取或写入的 Kafka 主题。
file_descriptor_path 字符串 协议缓冲区文件描述符集的路径。仅在 data_format"PROTO" 时适用。
data_format 字符串 消息的格式。支持的值:"AVRO""JSON""PROTO""RAW"。默认值为 "RAW",用于读取或写入消息载荷的原始字节。
message_name 字符串 协议缓冲区消息的名称。如果 data_format"PROTO",则必须提供此值。
schema 字符串

Kafka 消息架构。预期的架构类型取决于数据格式:

对于读取流水线,如果设置了 confluent_schema_registry_url,则会忽略此参数。

读取配置 数据类型 说明
auto_offset_reset_config 字符串

指定在 Kafka 服务器上没有初始偏移量或当前偏移量不存在时执行的行为。支持以下值:

  • "earliest":将偏移重置为最早的偏移。
  • "latest":将偏移重置为最新偏移。

默认值为 "latest"

confluent_schema_registry_subject 字符串 Confluent 架构注册表的主题。如果指定了 confluent_schema_registry_url,则必须使用此属性
confluent_schema_registry_url 字符串 Confluent 架构注册表的网址。如果指定,则系统会忽略 schema 参数。
consumer_config_updates 地图 为 Kafka 使用方设置配置参数。如需了解详情,请参阅 Kafka 文档中的使用方配置。您可以使用此参数自定义 Kafka 使用方。
max_read_time_seconds int 最大读取时间,以秒为单位。此选项会生成有边界的 PCollection,主要用于测试或其他非生产场景。
写入配置 数据类型 说明
producer_config_updates 地图 为 Kafka 提供方设置配置参数。如需了解详情,请参阅 Kafka 文档中的提供方配置。您可以使用此参数来自定义 Kafka 提供方。

如需读取 Avro 或 JSON 消息,您必须指定消息架构。如需直接设置架构,请使用 schema 参数。如需通过 Confluent 架构注册表提供架构,请设置 confluent_schema_registry_urlconfluent_schema_registry_subject 参数。

如需读取或写入 Protocol Buffer 消息,请指定消息架构或设置 file_descriptor_path 参数。

如需了解详情和代码示例,请参阅以下主题: