Mit Sammlungen den Überblick behalten
Sie können Inhalte basierend auf Ihren Einstellungen speichern und kategorisieren.
Verwaltete E/A unterstützt das Lesen und Schreiben von Daten in Apache Kafka.
Voraussetzungen
Die folgenden SDKs unterstützen die verwaltete E/A für Apache Kafka:
Apache Beam SDK für Java Version 2.58.0 oder höher
Apache Beam SDK für Python Version 2.61.0 oder höher
Konfiguration
Managed I/O für BigQuery unterstützt die folgenden Konfigurationsparameter:
KAFKA Lesen
Konfiguration
Typ
Beschreibung
bootstrap_servers
str
Eine Liste von Host/Port-Paaren, die zum Herstellen der ersten Verbindung zum Kafka-Cluster verwendet werden. Der Client verwendet alle Server, unabhängig davon, welche Server hier für das Bootstrapping angegeben sind. Diese Liste wirkt sich nur auf die anfänglichen Hosts aus, die zum Ermitteln der vollständigen Servergruppe verwendet werden. Diese Liste muss das Format „host1:port1,host2:port2,...“ haben.
thema
str
–
confluent_schema_registry_subject
str
–
confluent_schema_registry_url
str
–
consumer_config_updates
map[str, str]
Eine Liste von Schlüssel/Wert-Paaren, die als Konfigurationsparameter für Kafka-Consumer dienen. Die meisten dieser Konfigurationen sind nicht erforderlich. Wenn Sie Ihren Kafka-Consumer jedoch anpassen müssen, können Sie diese verwenden. Eine detaillierte Liste finden Sie unter https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html.
file_descriptor_path
str
Der Pfad zur Datei mit dem Satz von Protokollzwischenspeicher-Dateideskriptoren. Diese Datei wird für die Schemadefinition und die Nachrichtenserialisierung verwendet.
Format
str
Das Codierungsformat für die in Kafka gespeicherten Daten. Gültige Optionen: RAW,STRING,AVRO,JSON,PROTO
message_name
str
Der Name der Protocol Buffer-Nachricht, die für die Schemaextraktion und Datenkonvertierung verwendet werden soll.
schema
str
Das Schema, in dem die Daten im Kafka-Thema codiert sind. Für AVRO-Daten ist dies ein Schema, das mit der AVRO-Schemasyntax (https://avro.apache.org/docs/1.10.2/spec.html#schemas) definiert wird. Für JSON-Daten ist dies ein Schema, das mit der JSON-Schema-Syntax (https://json-schema.org/) definiert wird. Wenn eine URL zur Confluent Schema Registry angegeben wird, wird dieses Feld ignoriert und das Schema wird aus der Confluent Schema Registry abgerufen.
KAFKA Schreiben
Konfiguration
Typ
Beschreibung
bootstrap_servers
str
Eine Liste von Host/Port-Paaren, die zum Herstellen der ersten Verbindung zum Kafka-Cluster verwendet werden. Der Client verwendet alle Server, unabhängig davon, welche Server hier für das Bootstrapping angegeben sind. Diese Liste wirkt sich nur auf die anfänglichen Hosts aus, die zum Ermitteln der vollständigen Servergruppe verwendet werden. | Format: host1:port1,host2:port2,...
format
str
Das Codierungsformat für die in Kafka gespeicherten Daten. Gültige Optionen sind: RAW,JSON,AVRO,PROTO
thema
str
–
file_descriptor_path
str
Der Pfad zur Datei mit dem Satz von Protokollzwischenspeicher-Dateideskriptoren. Diese Datei wird für die Schemadefinition und die Nachrichtenserialisierung verwendet.
message_name
str
Der Name der Protocol Buffer-Nachricht, die für die Schemaextraktion und Datenkonvertierung verwendet werden soll.
producer_config_updates
map[str, str]
Eine Liste von Schlüssel/Wert-Paaren, die als Konfigurationsparameter für Kafka-Produzenten dienen. Die meisten dieser Konfigurationen sind nicht erforderlich. Wenn Sie Ihren Kafka-Produzenten jedoch anpassen müssen, können Sie diese verwenden. Eine detaillierte Liste finden Sie unter https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html.
schema
str
–
Nächste Schritte
Weitere Informationen und Codebeispiele finden Sie unter den folgenden Themen:
[[["Leicht verständlich","easyToUnderstand","thumb-up"],["Mein Problem wurde gelöst","solvedMyProblem","thumb-up"],["Sonstiges","otherUp","thumb-up"]],[["Schwer verständlich","hardToUnderstand","thumb-down"],["Informationen oder Beispielcode falsch","incorrectInformationOrSampleCode","thumb-down"],["Benötigte Informationen/Beispiele nicht gefunden","missingTheInformationSamplesINeed","thumb-down"],["Problem mit der Übersetzung","translationIssue","thumb-down"],["Sonstiges","otherDown","thumb-down"]],["Zuletzt aktualisiert: 2025-09-10 (UTC)."],[[["\u003cp\u003eManaged I/O supports reading and writing data to and from Apache Kafka, requiring Apache Beam SDK for Java version 2.58.0 or later.\u003c/p\u003e\n"],["\u003cp\u003eConfiguration for Kafka operations requires specifying \u003ccode\u003ebootstrap_servers\u003c/code\u003e and \u003ccode\u003etopic\u003c/code\u003e, and supports various data formats such as \u003ccode\u003e"AVRO"\u003c/code\u003e, \u003ccode\u003e"JSON"\u003c/code\u003e, \u003ccode\u003e"PROTO"\u003c/code\u003e, and \u003ccode\u003e"RAW"\u003c/code\u003e.\u003c/p\u003e\n"],["\u003cp\u003eSchemas for Avro, JSON, or Protocol Buffer messages can be provided directly via the \u003ccode\u003eschema\u003c/code\u003e parameter or through a Confluent schema registry using \u003ccode\u003econfluent_schema_registry_url\u003c/code\u003e and \u003ccode\u003econfluent_schema_registry_subject\u003c/code\u003e.\u003c/p\u003e\n"],["\u003cp\u003eKafka consumer and producer configurations can be customized using \u003ccode\u003econsumer_config_updates\u003c/code\u003e and \u003ccode\u003eproducer_config_updates\u003c/code\u003e respectively, allowing for fine-tuning of the Kafka interaction.\u003c/p\u003e\n"],["\u003cp\u003e\u003ccode\u003eauto_offset_reset_config\u003c/code\u003e is responsible for managing offsets and can be set to \u003ccode\u003e"earliest"\u003c/code\u003e or \u003ccode\u003e"latest"\u003c/code\u003e to handle situations where no offset exists.\u003c/p\u003e\n"]]],[],null,["[Managed I/O](/dataflow/docs/guides/managed-io) supports reading and writing to\nApache Kafka.\n\nRequirements\n\nThe following SDKs support managed I/O for Apache Kafka:\n\n- Apache Beam SDK for Java version 2.58.0 or later\n- Apache Beam SDK for Python version 2.61.0 or later\n\nConfiguration\n\nManaged I/O for BigQuery supports the following configuration\nparameters:\n\n`KAFKA` Read \n\n| Configuration | Type | Description |\n|-----------------------------------|---------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n| **bootstrap_servers** | `str` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping---this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form \\`host1:port1,host2:port2,...\\` |\n| **topic** | `str` | n/a |\n| confluent_schema_registry_subject | `str` | n/a |\n| confluent_schema_registry_url | `str` | n/a |\n| consumer_config_updates | `map[`str`, `str`]` | A list of key-value pairs that act as configuration parameters for Kafka consumers. Most of these configurations will not be needed, but if you need to customize your Kafka consumer, you may use this. See a detailed list: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html |\n| file_descriptor_path | `str` | The path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization. |\n| format | `str` | The encoding format for the data stored in Kafka. Valid options are: RAW,STRING,AVRO,JSON,PROTO |\n| message_name | `str` | The name of the Protocol Buffer message to be used for schema extraction and data conversion. |\n| schema | `str` | The schema in which the data is encoded in the Kafka topic. For AVRO data, this is a schema defined with AVRO schema syntax (https://avro.apache.org/docs/1.10.2/spec.html#schemas). For JSON data, this is a schema defined with JSON-schema syntax (https://json-schema.org/). If a URL to Confluent Schema Registry is provided, then this field is ignored, and the schema is fetched from Confluent Schema Registry. |\n\n\u003cbr /\u003e\n\n`KAFKA` Write \n\n| Configuration | Type | Description |\n|-------------------------|---------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n| **bootstrap_servers** | `str` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping---this list only impacts the initial hosts used to discover the full set of servers. \\| Format: host1:port1,host2:port2,... |\n| **format** | `str` | The encoding format for the data stored in Kafka. Valid options are: RAW,JSON,AVRO,PROTO |\n| **topic** | `str` | n/a |\n| file_descriptor_path | `str` | The path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization. |\n| message_name | `str` | The name of the Protocol Buffer message to be used for schema extraction and data conversion. |\n| producer_config_updates | `map[`str`, `str`]` | A list of key-value pairs that act as configuration parameters for Kafka producers. Most of these configurations will not be needed, but if you need to customize your Kafka producer, you may use this. See a detailed list: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html |\n| schema | `str` | n/a |\n\n\u003cbr /\u003e\n\nWhat's next\n\nFor more information and code examples, see the following topics:\n\n- [Read from Apache Kafka](/dataflow/docs/guides/read-from-kafka)\n- [Write to Apache Kafka](/dataflow/docs/guides/write-to-kafka)"]]