Verwaltete E/A unterstützt das Lesen und Schreiben in Apache Kafka.
Voraussetzungen
Erfordert das Apache Beam SDK für Java Version 2.58.0 oder höher.
Konfiguration
Bei verwalteten E/A werden die folgenden Konfigurationsparameter für Apache Kafka verwendet.
Konfiguration lesen und schreiben | Datentyp | Beschreibung |
---|---|---|
bootstrap_servers |
String | Erforderlich. Eine durch Kommas getrennte Liste von Kafka-Bootstrap-Servern.
Beispiel: localhost:9092 . |
topic |
String | Erforderlich. Das Kafka-Thema, das gelesen oder geschrieben werden soll. |
file_descriptor_path |
String | Der Pfad zu einem Satz von Protokollzwischenspeicher-Dateideskriptoren. Gilt nur, wenn data_format "PROTO" ist. |
data_format |
String | Das Format der Nachrichten. Unterstützte Werte: "AVRO" ,
"JSON" , "PROTO" , "RAW" . Der Standardwert ist "RAW" . Dabei werden die Rohbytes der Nachrichtennutzlast gelesen oder geschrieben. |
message_name |
String | Der Name der Protokollzwischenspeicher-Nachricht. Erforderlich, wenn data_format "PROTO" ist. |
schema |
String | Das Kafka-Nachrichtenschema. Der erwartete Schematyp hängt vom Datenformat ab:
Bei Lesepipelines wird dieser Parameter ignoriert, wenn |
Lesekonfiguration | Datentyp | Beschreibung |
auto_offset_reset_config |
String | Gibt das Verhalten an, wenn kein Anfangsoffset vorhanden ist oder der aktuelle Offset nicht mehr auf dem Kafka-Server vorhanden ist. Folgende Werte werden unterstützt:
Der Standardwert ist |
confluent_schema_registry_subject |
String | Das Subjekt einer Confluent-Schemaregistrierung. Erforderlich, wenn confluent_schema_registry_url angegeben ist |
confluent_schema_registry_url |
String | Die URL einer Confluent Schema Registry. Andernfalls wird der schema -Parameter ignoriert. |
consumer_config_updates |
Karte | Konfigurationsparameter für den Kafka-Nutzer festlegen. Weitere Informationen finden Sie in der Kafka-Dokumentation unter Nutzerkonfigurationen. Mit diesem Parameter können Sie den Kafka-Nutzer anpassen. |
max_read_time_seconds |
int | Die maximale Lesezeit in Sekunden. Mit dieser Option wird eine begrenzte PCollection generiert. Sie ist hauptsächlich für Tests oder andere nicht produktionsbezogene Szenarien gedacht. |
Schreibkonfiguration | Datentyp | Beschreibung |
producer_config_updates |
Karte | Konfigurationsparameter für den Kafka-Produzenten festlegen. Weitere Informationen finden Sie in der Kafka-Dokumentation unter Produzentenkonfigurationen. Mit diesem Parameter können Sie den Kafka-Produzenten anpassen. |
Wenn Sie Avro- oder JSON-Nachrichten lesen möchten, müssen Sie ein Nachrichtenschema angeben. Wenn Sie ein Schema direkt festlegen möchten, verwenden Sie den schema
-Parameter. Wenn Sie das Schema über eine Confluent-Schema-Registry bereitstellen möchten, legen Sie die Parameter confluent_schema_registry_url
und confluent_schema_registry_subject
fest.
Wenn Sie Protokollzwischenspeicher-Nachrichten lesen oder schreiben möchten, geben Sie entweder ein Nachrichtenschema an oder legen den file_descriptor_path
-Parameter fest.
Weitere Informationen und Codebeispiele finden Sie unter folgenden Themen: