Tetap teratur dengan koleksi
Simpan dan kategorikan konten berdasarkan preferensi Anda.
Managed I/O mendukung pembacaan dan penulisan ke
Apache Kafka.
Persyaratan
SDK berikut mendukung I/O terkelola untuk Apache Kafka:
Apache Beam SDK untuk Java versi 2.58.0 atau yang lebih baru
Apache Beam SDK untuk Python versi 2.61.0 atau yang lebih baru
Konfigurasi
I/O Terkelola untuk BigQuery mendukung parameter konfigurasi berikut:
KAFKA Baca
Konfigurasi
Jenis
Deskripsi
bootstrap_servers
str
Daftar pasangan host/port yang akan digunakan untuk membuat koneksi awal ke cluster Kafka. Klien akan menggunakan semua server, terlepas dari server mana yang ditentukan di sini untuk bootstrapping—daftar ini hanya memengaruhi host awal yang digunakan untuk menemukan kumpulan lengkap server. Daftar ini harus dalam format `host1:port1,host2:port2,...`
topik
str
t/a
confluent_schema_registry_subject
str
t/a
confluent_schema_registry_url
str
t/a
consumer_config_updates
map[str, str]
Daftar pasangan nilai kunci yang berfungsi sebagai parameter konfigurasi untuk konsumen Kafka. Sebagian besar konfigurasi ini tidak akan diperlukan, tetapi jika Anda perlu menyesuaikan konsumen Kafka, Anda dapat menggunakannya. Lihat daftar mendetail: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
file_descriptor_path
str
Jalur ke file Set Deskriptor File Buffer Protokol. File ini digunakan untuk definisi skema dan serialisasi pesan.
format
str
Format encoding untuk data yang disimpan di Kafka. Opsi yang valid adalah: RAW,STRING,AVRO,JSON,PROTO
message_name
str
Nama pesan Protocol Buffer yang akan digunakan untuk ekstraksi skema dan konversi data.
schema
str
Skema tempat data dienkode dalam topik Kafka. Untuk data AVRO, ini adalah skema yang ditentukan dengan sintaksis skema AVRO (https://avro.apache.org/docs/1.10.2/spec.html#schemas). Untuk data JSON, ini adalah skema yang ditentukan dengan sintaksis JSON-schema (https://json-schema.org/). Jika URL ke Confluent Schema Registry disediakan, kolom ini akan diabaikan, dan skema akan diambil dari Confluent Schema Registry.
KAFKA Menulis
Konfigurasi
Jenis
Deskripsi
bootstrap_servers
str
Daftar pasangan host/port yang akan digunakan untuk membuat koneksi awal ke cluster Kafka. Klien akan menggunakan semua server, terlepas dari server mana yang ditentukan di sini untuk bootstrapping—daftar ini hanya memengaruhi host awal yang digunakan untuk menemukan kumpulan lengkap server. | Format: host1:port1,host2:port2,...
format
str
Format encoding untuk data yang disimpan di Kafka. Opsi yang valid adalah: RAW,JSON,AVRO,PROTO
topik
str
t/a
file_descriptor_path
str
Jalur ke file Set Deskriptor File Buffer Protokol. File ini digunakan untuk definisi skema dan serialisasi pesan.
message_name
str
Nama pesan Protocol Buffer yang akan digunakan untuk ekstraksi skema dan konversi data.
producer_config_updates
map[str, str]
Daftar key-value pair yang berfungsi sebagai parameter konfigurasi untuk produser Kafka. Sebagian besar konfigurasi ini tidak akan diperlukan, tetapi jika Anda perlu menyesuaikan produser Kafka, Anda dapat menggunakan konfigurasi ini. Lihat daftar mendetail: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
schema
str
t/a
Langkah berikutnya
Untuk informasi dan contoh kode selengkapnya, lihat topik berikut:
[[["Mudah dipahami","easyToUnderstand","thumb-up"],["Memecahkan masalah saya","solvedMyProblem","thumb-up"],["Lainnya","otherUp","thumb-up"]],[["Sulit dipahami","hardToUnderstand","thumb-down"],["Informasi atau kode contoh salah","incorrectInformationOrSampleCode","thumb-down"],["Informasi/contoh yang saya butuhkan tidak ada","missingTheInformationSamplesINeed","thumb-down"],["Masalah terjemahan","translationIssue","thumb-down"],["Lainnya","otherDown","thumb-down"]],["Terakhir diperbarui pada 2025-09-10 UTC."],[[["\u003cp\u003eManaged I/O supports reading and writing data to and from Apache Kafka, requiring Apache Beam SDK for Java version 2.58.0 or later.\u003c/p\u003e\n"],["\u003cp\u003eConfiguration for Kafka operations requires specifying \u003ccode\u003ebootstrap_servers\u003c/code\u003e and \u003ccode\u003etopic\u003c/code\u003e, and supports various data formats such as \u003ccode\u003e"AVRO"\u003c/code\u003e, \u003ccode\u003e"JSON"\u003c/code\u003e, \u003ccode\u003e"PROTO"\u003c/code\u003e, and \u003ccode\u003e"RAW"\u003c/code\u003e.\u003c/p\u003e\n"],["\u003cp\u003eSchemas for Avro, JSON, or Protocol Buffer messages can be provided directly via the \u003ccode\u003eschema\u003c/code\u003e parameter or through a Confluent schema registry using \u003ccode\u003econfluent_schema_registry_url\u003c/code\u003e and \u003ccode\u003econfluent_schema_registry_subject\u003c/code\u003e.\u003c/p\u003e\n"],["\u003cp\u003eKafka consumer and producer configurations can be customized using \u003ccode\u003econsumer_config_updates\u003c/code\u003e and \u003ccode\u003eproducer_config_updates\u003c/code\u003e respectively, allowing for fine-tuning of the Kafka interaction.\u003c/p\u003e\n"],["\u003cp\u003e\u003ccode\u003eauto_offset_reset_config\u003c/code\u003e is responsible for managing offsets and can be set to \u003ccode\u003e"earliest"\u003c/code\u003e or \u003ccode\u003e"latest"\u003c/code\u003e to handle situations where no offset exists.\u003c/p\u003e\n"]]],[],null,["[Managed I/O](/dataflow/docs/guides/managed-io) supports reading and writing to\nApache Kafka.\n\nRequirements\n\nThe following SDKs support managed I/O for Apache Kafka:\n\n- Apache Beam SDK for Java version 2.58.0 or later\n- Apache Beam SDK for Python version 2.61.0 or later\n\nConfiguration\n\nManaged I/O for BigQuery supports the following configuration\nparameters:\n\n`KAFKA` Read \n\n| Configuration | Type | Description |\n|-----------------------------------|---------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n| **bootstrap_servers** | `str` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping---this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form \\`host1:port1,host2:port2,...\\` |\n| **topic** | `str` | n/a |\n| confluent_schema_registry_subject | `str` | n/a |\n| confluent_schema_registry_url | `str` | n/a |\n| consumer_config_updates | `map[`str`, `str`]` | A list of key-value pairs that act as configuration parameters for Kafka consumers. Most of these configurations will not be needed, but if you need to customize your Kafka consumer, you may use this. See a detailed list: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html |\n| file_descriptor_path | `str` | The path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization. |\n| format | `str` | The encoding format for the data stored in Kafka. Valid options are: RAW,STRING,AVRO,JSON,PROTO |\n| message_name | `str` | The name of the Protocol Buffer message to be used for schema extraction and data conversion. |\n| schema | `str` | The schema in which the data is encoded in the Kafka topic. For AVRO data, this is a schema defined with AVRO schema syntax (https://avro.apache.org/docs/1.10.2/spec.html#schemas). For JSON data, this is a schema defined with JSON-schema syntax (https://json-schema.org/). If a URL to Confluent Schema Registry is provided, then this field is ignored, and the schema is fetched from Confluent Schema Registry. |\n\n\u003cbr /\u003e\n\n`KAFKA` Write \n\n| Configuration | Type | Description |\n|-------------------------|---------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n| **bootstrap_servers** | `str` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping---this list only impacts the initial hosts used to discover the full set of servers. \\| Format: host1:port1,host2:port2,... |\n| **format** | `str` | The encoding format for the data stored in Kafka. Valid options are: RAW,JSON,AVRO,PROTO |\n| **topic** | `str` | n/a |\n| file_descriptor_path | `str` | The path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization. |\n| message_name | `str` | The name of the Protocol Buffer message to be used for schema extraction and data conversion. |\n| producer_config_updates | `map[`str`, `str`]` | A list of key-value pairs that act as configuration parameters for Kafka producers. Most of these configurations will not be needed, but if you need to customize your Kafka producer, you may use this. See a detailed list: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html |\n| schema | `str` | n/a |\n\n\u003cbr /\u003e\n\nWhat's next\n\nFor more information and code examples, see the following topics:\n\n- [Read from Apache Kafka](/dataflow/docs/guides/read-from-kafka)\n- [Write to Apache Kafka](/dataflow/docs/guides/write-to-kafka)"]]