Mantieni tutto organizzato con le raccolte
Salva e classifica i contenuti in base alle tue preferenze.
Managed I/O supporta la lettura e la scrittura in
Apache Kafka.
Requisiti
I seguenti SDK supportano I/O gestito per Apache Kafka:
Apache Beam SDK per Java versione 2.58.0 o successive
SDK Apache Beam per Python versione 2.61.0 o successive
Configurazione
L'I/O gestito per BigQuery supporta i seguenti parametri di configurazione:
KAFKA Leggi
Configurazione
Tipo
Descrizione
bootstrap_servers
str
Un elenco di coppie host/porta da utilizzare per stabilire la connessione iniziale al cluster Kafka. Il client utilizzerà tutti i server indipendentemente da quali server sono specificati qui per il bootstrapping. Questo elenco influisce solo sugli host iniziali utilizzati per scoprire l'insieme completo di server. Questo elenco deve essere nel formato `host1:port1,host2:port2,...`
topic
str
n/a
confluent_schema_registry_subject
str
n/a
confluent_schema_registry_url
str
n/a
consumer_config_updates
map[str, str]
Un elenco di coppie chiave-valore che fungono da parametri di configurazione per i consumer Kafka. La maggior parte di queste configurazioni non sarà necessaria, ma puoi utilizzarle se devi personalizzare il consumer Kafka. Visualizza un elenco dettagliato: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
file_descriptor_path
str
Il percorso del file del set di descrittori di file del buffer di protocollo. Questo file viene utilizzato per la definizione dello schema e la serializzazione dei messaggi.
formato
str
Il formato di codifica per i dati archiviati in Kafka. Le opzioni valide sono: RAW,STRING,AVRO,JSON,PROTO
message_name
str
Il nome del messaggio Protocol Buffer da utilizzare per l'estrazione dello schema e la conversione dei dati.
schema
str
Lo schema in cui i dati sono codificati nell'argomento Kafka. Per i dati AVRO, si tratta di uno schema definito con la sintassi dello schema AVRO (https://avro.apache.org/docs/1.10.2/spec.html#schemas). Per i dati JSON, si tratta di uno schema definito con la sintassi JSON Schema (https://json-schema.org/). Se viene fornito un URL a Confluent Schema Registry, questo campo viene ignorato e lo schema viene recuperato da Confluent Schema Registry.
KAFKA Scrittura
Configurazione
Tipo
Descrizione
bootstrap_servers
str
Un elenco di coppie host/porta da utilizzare per stabilire la connessione iniziale al cluster Kafka. Il client utilizzerà tutti i server indipendentemente da quali server sono specificati qui per il bootstrapping. Questo elenco influisce solo sugli host iniziali utilizzati per scoprire l'insieme completo di server. | Formato: host1:porta1,host2:porta2,...
formato
str
Il formato di codifica per i dati archiviati in Kafka. Le opzioni valide sono: RAW,JSON,AVRO,PROTO
topic
str
n/a
file_descriptor_path
str
Il percorso del file del set di descrittori di file del buffer di protocollo. Questo file viene utilizzato per la definizione dello schema e la serializzazione dei messaggi.
message_name
str
Il nome del messaggio Protocol Buffer da utilizzare per l'estrazione dello schema e la conversione dei dati.
producer_config_updates
map[str, str]
Un elenco di coppie chiave-valore che fungono da parametri di configurazione per i produttori Kafka. La maggior parte di queste configurazioni non sarà necessaria, ma puoi utilizzarle se devi personalizzare il produttore Kafka. Consulta un elenco dettagliato: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
schema
str
n/a
Passaggi successivi
Per ulteriori informazioni ed esempi di codice, consulta i seguenti argomenti:
[[["Facile da capire","easyToUnderstand","thumb-up"],["Il problema è stato risolto","solvedMyProblem","thumb-up"],["Altra","otherUp","thumb-up"]],[["Difficile da capire","hardToUnderstand","thumb-down"],["Informazioni o codice di esempio errati","incorrectInformationOrSampleCode","thumb-down"],["Mancano le informazioni o gli esempi di cui ho bisogno","missingTheInformationSamplesINeed","thumb-down"],["Problema di traduzione","translationIssue","thumb-down"],["Altra","otherDown","thumb-down"]],["Ultimo aggiornamento 2025-09-10 UTC."],[[["\u003cp\u003eManaged I/O supports reading and writing data to and from Apache Kafka, requiring Apache Beam SDK for Java version 2.58.0 or later.\u003c/p\u003e\n"],["\u003cp\u003eConfiguration for Kafka operations requires specifying \u003ccode\u003ebootstrap_servers\u003c/code\u003e and \u003ccode\u003etopic\u003c/code\u003e, and supports various data formats such as \u003ccode\u003e"AVRO"\u003c/code\u003e, \u003ccode\u003e"JSON"\u003c/code\u003e, \u003ccode\u003e"PROTO"\u003c/code\u003e, and \u003ccode\u003e"RAW"\u003c/code\u003e.\u003c/p\u003e\n"],["\u003cp\u003eSchemas for Avro, JSON, or Protocol Buffer messages can be provided directly via the \u003ccode\u003eschema\u003c/code\u003e parameter or through a Confluent schema registry using \u003ccode\u003econfluent_schema_registry_url\u003c/code\u003e and \u003ccode\u003econfluent_schema_registry_subject\u003c/code\u003e.\u003c/p\u003e\n"],["\u003cp\u003eKafka consumer and producer configurations can be customized using \u003ccode\u003econsumer_config_updates\u003c/code\u003e and \u003ccode\u003eproducer_config_updates\u003c/code\u003e respectively, allowing for fine-tuning of the Kafka interaction.\u003c/p\u003e\n"],["\u003cp\u003e\u003ccode\u003eauto_offset_reset_config\u003c/code\u003e is responsible for managing offsets and can be set to \u003ccode\u003e"earliest"\u003c/code\u003e or \u003ccode\u003e"latest"\u003c/code\u003e to handle situations where no offset exists.\u003c/p\u003e\n"]]],[],null,["[Managed I/O](/dataflow/docs/guides/managed-io) supports reading and writing to\nApache Kafka.\n\nRequirements\n\nThe following SDKs support managed I/O for Apache Kafka:\n\n- Apache Beam SDK for Java version 2.58.0 or later\n- Apache Beam SDK for Python version 2.61.0 or later\n\nConfiguration\n\nManaged I/O for BigQuery supports the following configuration\nparameters:\n\n`KAFKA` Read \n\n| Configuration | Type | Description |\n|-----------------------------------|---------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n| **bootstrap_servers** | `str` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping---this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form \\`host1:port1,host2:port2,...\\` |\n| **topic** | `str` | n/a |\n| confluent_schema_registry_subject | `str` | n/a |\n| confluent_schema_registry_url | `str` | n/a |\n| consumer_config_updates | `map[`str`, `str`]` | A list of key-value pairs that act as configuration parameters for Kafka consumers. Most of these configurations will not be needed, but if you need to customize your Kafka consumer, you may use this. See a detailed list: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html |\n| file_descriptor_path | `str` | The path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization. |\n| format | `str` | The encoding format for the data stored in Kafka. Valid options are: RAW,STRING,AVRO,JSON,PROTO |\n| message_name | `str` | The name of the Protocol Buffer message to be used for schema extraction and data conversion. |\n| schema | `str` | The schema in which the data is encoded in the Kafka topic. For AVRO data, this is a schema defined with AVRO schema syntax (https://avro.apache.org/docs/1.10.2/spec.html#schemas). For JSON data, this is a schema defined with JSON-schema syntax (https://json-schema.org/). If a URL to Confluent Schema Registry is provided, then this field is ignored, and the schema is fetched from Confluent Schema Registry. |\n\n\u003cbr /\u003e\n\n`KAFKA` Write \n\n| Configuration | Type | Description |\n|-------------------------|---------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n| **bootstrap_servers** | `str` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping---this list only impacts the initial hosts used to discover the full set of servers. \\| Format: host1:port1,host2:port2,... |\n| **format** | `str` | The encoding format for the data stored in Kafka. Valid options are: RAW,JSON,AVRO,PROTO |\n| **topic** | `str` | n/a |\n| file_descriptor_path | `str` | The path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization. |\n| message_name | `str` | The name of the Protocol Buffer message to be used for schema extraction and data conversion. |\n| producer_config_updates | `map[`str`, `str`]` | A list of key-value pairs that act as configuration parameters for Kafka producers. Most of these configurations will not be needed, but if you need to customize your Kafka producer, you may use this. See a detailed list: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html |\n| schema | `str` | n/a |\n\n\u003cbr /\u003e\n\nWhat's next\n\nFor more information and code examples, see the following topics:\n\n- [Read from Apache Kafka](/dataflow/docs/guides/read-from-kafka)\n- [Write to Apache Kafka](/dataflow/docs/guides/write-to-kafka)"]]