適用於 Apache Kafka 的 Dataflow 代管 I/O

受管理 I/O 支援讀取及寫入 Apache Kafka。

需求條件

下列 SDK 支援 Apache Kafka 的受管理 I/O:

  • Java 適用的 Apache Beam SDK 2.58.0 以上版本
  • Python 適用的 Apache Beam SDK 2.61.0 以上版本

設定

BigQuery 的受管理 I/O 支援下列設定參數:

KAFKA 閱讀

設定 類型 說明
bootstrap_servers str 用於建立與 Kafka 叢集初始連線的主機/通訊埠組合清單。無論這裡指定哪些伺服器用於啟動程序,用戶端都會使用所有伺服器,這份清單只會影響用於探索完整伺服器集的初始主機。這份清單的格式應為 `host1:port1,host2:port2,...`
主題 str 不適用
confluent_schema_registry_subject str 不適用
confluent_schema_registry_url str 不適用
consumer_config_updates map[str, str] 鍵/值組合清單,做為 Kafka 消費者的設定參數。大部分情況下不需要這些設定,但如果需要自訂 Kafka 消費者,可以使用這些設定。如需詳細清單,請參閱 https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
file_descriptor_path str 通訊協定緩衝區檔案描述元集檔案的路徑。這個檔案用於結構定義和訊息序列化。
format str 儲存在 Kafka 中的資料編碼格式。有效選項包括:RAW、STRING、AVRO、JSON、PROTO
message_name str 用於擷取結構定義和轉換資料的通訊協定緩衝區訊息名稱。
結構定義 str 資料在 Kafka 主題中編碼的結構定義。如果是 AVRO 資料,這是以 AVRO 結構定義語法 (https://avro.apache.org/docs/1.10.2/spec.html#schemas) 定義的結構定義。如果是 JSON 資料,這是以 JSON 結構定義語法 (https://json-schema.org/) 定義的結構定義。如果提供 Confluent Schema Registry 的網址,系統會忽略這個欄位,並從 Confluent Schema Registry 擷取結構定義。

KAFKA 寫入

設定 類型 說明
bootstrap_servers str 用於建立與 Kafka 叢集初始連線的主機/通訊埠組合清單。無論這裡指定哪些伺服器用於啟動程序,用戶端都會使用所有伺服器,這份清單只會影響用於探索完整伺服器集的初始主機。| 格式:host1:port1,host2:port2,...
格式 str 儲存在 Kafka 中的資料編碼格式。有效選項為:RAW、JSON、AVRO、PROTO
主題 str 不適用
file_descriptor_path str 通訊協定緩衝區檔案描述元集檔案的路徑。這個檔案用於結構定義和訊息序列化。
message_name str 用於擷取結構定義和轉換資料的通訊協定緩衝區訊息名稱。
producer_config_updates map[str, str] 鍵/值組合清單,做為 Kafka 製作工具的設定參數。您不需要使用大部分的設定,但如果需要自訂 Kafka 製作工具,可以使用這項設定。如需詳細清單,請參閱 https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
結構定義 str 不適用

後續步驟

如需更多資訊和程式碼範例,請參閱下列主題: