托管式 I/O 支持对 Apache Kafka 执行读写操作。
使用要求
需要使用 Java 版 Apache Beam SDK 2.58.0 或更高版本。
配置
受管 I/O 为 Apache Kafka 使用以下配置参数。
读取和写入配置 | 数据类型 | 说明 |
---|---|---|
bootstrap_servers |
字符串 | 必填。Kafka 引导服务器的英文逗号分隔列表。示例:localhost:9092 。 |
topic |
字符串 | 必填。要读取或写入的 Kafka 主题。 |
file_descriptor_path |
字符串 | 协议缓冲区文件描述符集的路径。仅在 data_format 为 "PROTO" 时适用。 |
data_format |
字符串 | 消息的格式。支持的值:"AVRO" 、"JSON" 、"PROTO" 、"RAW" 。默认值为 "RAW" ,用于读取或写入消息载荷的原始字节。 |
message_name |
字符串 | 协议缓冲区消息的名称。如果 data_format 为 "PROTO" ,则必须提供此值。 |
schema |
字符串 | Kafka 消息架构。预期的架构类型取决于数据格式:
对于读取流水线,如果设置了 |
读取配置 | 数据类型 | 说明 |
auto_offset_reset_config |
字符串 | 指定在 Kafka 服务器上没有初始偏移量或当前偏移量不存在时执行的行为。支持以下值:
默认值为 |
confluent_schema_registry_subject |
字符串 | Confluent 架构注册表的主题。如果指定了 confluent_schema_registry_url ,则必须使用此属性 |
confluent_schema_registry_url |
字符串 | Confluent 架构注册表的网址。如果指定,则系统会忽略 schema 参数。 |
consumer_config_updates |
地图 | 为 Kafka 使用方设置配置参数。如需了解详情,请参阅 Kafka 文档中的使用方配置。您可以使用此参数自定义 Kafka 使用方。 |
max_read_time_seconds |
int | 最大读取时间,以秒为单位。此选项会生成有边界的 PCollection ,主要用于测试或其他非生产场景。 |
写入配置 | 数据类型 | 说明 |
producer_config_updates |
地图 | 为 Kafka 提供方设置配置参数。如需了解详情,请参阅 Kafka 文档中的提供方配置。您可以使用此参数来自定义 Kafka 提供方。 |
如需读取 Avro 或 JSON 消息,您必须指定消息架构。如需直接设置架构,请使用 schema
参数。如需通过 Confluent 架构注册表提供架构,请设置 confluent_schema_registry_url
和 confluent_schema_registry_subject
参数。
如需读取或写入 Protocol Buffer 消息,请指定消息架构或设置 file_descriptor_path
参数。
如需了解详情和代码示例,请参阅以下主题: