Von Dataflow verwaltete E/A für Apache Kafka

Verwaltete E/A unterstützt das Lesen und Schreiben in Apache Kafka.

Voraussetzungen

Erfordert das Apache Beam SDK für Java Version 2.58.0 oder höher.

Konfiguration

Bei verwalteten E/A werden die folgenden Konfigurationsparameter für Apache Kafka verwendet.

Konfiguration lesen und schreiben Datentyp Beschreibung
bootstrap_servers String Erforderlich. Eine durch Kommas getrennte Liste von Kafka-Bootstrap-Servern. Beispiel: localhost:9092.
topic String Erforderlich. Das Kafka-Thema, das gelesen oder geschrieben werden soll.
file_descriptor_path String Der Pfad zu einem Satz von Protokollzwischenspeicher-Dateideskriptoren. Gilt nur, wenn data_format "PROTO" ist.
data_format String Das Format der Nachrichten. Unterstützte Werte: "AVRO", "JSON", "PROTO", "RAW". Der Standardwert ist "RAW". Dabei werden die Rohbytes der Nachrichtennutzlast gelesen oder geschrieben.
message_name String Der Name der Protokollzwischenspeicher-Nachricht. Erforderlich, wenn data_format "PROTO" ist.
schema String

Das Kafka-Nachrichtenschema. Der erwartete Schematyp hängt vom Datenformat ab:

Bei Lesepipelines wird dieser Parameter ignoriert, wenn confluent_schema_registry_url festgelegt ist.

Lesekonfiguration Datentyp Beschreibung
auto_offset_reset_config String

Gibt das Verhalten an, wenn kein Anfangsoffset vorhanden ist oder der aktuelle Offset nicht mehr auf dem Kafka-Server vorhanden ist. Folgende Werte werden unterstützt:

  • "earliest": Der Offset wird auf den frühesten Offset zurückgesetzt.
  • "latest": Setzen Sie den Offset auf den letzten Offset zurück.

Der Standardwert ist "latest".

confluent_schema_registry_subject String Das Subjekt einer Confluent-Schemaregistrierung. Erforderlich, wenn confluent_schema_registry_url angegeben ist
confluent_schema_registry_url String Die URL einer Confluent Schema Registry. Andernfalls wird der schema-Parameter ignoriert.
consumer_config_updates Karte Konfigurationsparameter für den Kafka-Nutzer festlegen. Weitere Informationen finden Sie in der Kafka-Dokumentation unter Nutzerkonfigurationen. Mit diesem Parameter können Sie den Kafka-Nutzer anpassen.
max_read_time_seconds int Die maximale Lesezeit in Sekunden. Mit dieser Option wird eine begrenzte PCollection generiert. Sie ist hauptsächlich für Tests oder andere nicht produktionsbezogene Szenarien gedacht.
Schreibkonfiguration Datentyp Beschreibung
producer_config_updates Karte Konfigurationsparameter für den Kafka-Produzenten festlegen. Weitere Informationen finden Sie in der Kafka-Dokumentation unter Produzentenkonfigurationen. Mit diesem Parameter können Sie den Kafka-Produzenten anpassen.

Wenn Sie Avro- oder JSON-Nachrichten lesen möchten, müssen Sie ein Nachrichtenschema angeben. Wenn Sie ein Schema direkt festlegen möchten, verwenden Sie den schema-Parameter. Wenn Sie das Schema über eine Confluent-Schema-Registry bereitstellen möchten, legen Sie die Parameter confluent_schema_registry_url und confluent_schema_registry_subject fest.

Wenn Sie Protokollzwischenspeicher-Nachrichten lesen oder schreiben möchten, geben Sie entweder ein Nachrichtenschema an oder legen den file_descriptor_path-Parameter fest.

Weitere Informationen und Codebeispiele finden Sie unter folgenden Themen: