Von Dataflow verwaltete E/A

Der verwaltete E/A-Connector ist eine Apache Beam-Transformation, die eine gemeinsame API zum Erstellen von Quellen und Senken bereitstellt.

Übersicht

Der verwaltete E/A-Connector wird wie jeder andere E/A-Connector mit Apache Beam-Code erstellt. Sie geben eine Quelle oder Senke an, um eine Reihe von Konfigurationsparametern zu instanziieren und zu übergeben. Sie können die Konfigurationsparameter auch in eine YAML-Datei einfügen und eine URL zur Datei angeben.

Auf dem Back-End behandelt Dataflow den verwalteten E/A-Connector als Dienst, sodass Dataflow Laufzeitvorgänge für den Connector verwalten kann. Statt diese Details zu verwalten, können Sie sich dann auf die Geschäftslogik in Ihrer Pipeline konzentrieren.

Weitere Informationen zur verwalteten E/A API finden Sie unter Managed in der Dokumentation zum Apache Beam Java SDK.

Apache Iceberg

Für Apache Iceberg verwendet die verwaltete E/A die folgenden Konfigurationsparameter:

Name Datentyp Beschreibung
table String Die Kennung der Apache Iceberg-Tabelle. Beispiel: "db.table1".
catalog_name String Der Name des Katalogs. Beispiel: "local".
catalog_properties Karte Eine Zuordnung von Konfigurationseigenschaften für den Apache Iceberg-Katalog. Welche Properties erforderlich sind, hängt vom Katalog ab. Weitere Informationen finden Sie in der Apache Iceberg-Dokumentation unter CatalogUtil.
config_properties Karte Optionale Hadoop-Konfigurationseigenschaften. Weitere Informationen finden Sie in der Apache Iceberg-Dokumentation unter CatalogUtil.
triggering_frequency_seconds Ganzzahl Für Streaming-Schreibpipelines die Häufigkeit, mit der die Senke versucht, Snapshots zu erstellen, in Sekunden.

Weitere Informationen, einschließlich Codebeispielen, finden Sie unter folgenden Links:

Apache Kafka

Für Apache Kafka verwendet die verwaltete E/A die folgenden Konfigurationsparameter.

Konfiguration lesen und schreiben Datentyp Beschreibung
bootstrap_servers String Erforderlich. Eine durch Kommas getrennte Liste von Kafka-Bootstrap-Servern. Beispiel: localhost:9092.
topic String Erforderlich. Das Kafka-Thema, aus dem gelesen werden soll.
file_descriptor_path String Der Pfad zu einem Satz von Protokollzwischenspeicher-Dateideskriptoren. Gilt nur, wenn data_format "PROTO" ist.
data_format String Das Format der Nachrichten. Unterstützte Werte: "AVRO", "JSON", "PROTO", "RAW". Der Standardwert ist "RAW". Dabei werden die Rohbytes der Nachrichtennutzlast gelesen oder geschrieben.
message_name String Der Name der Protokollzwischenspeicher-Nachricht. Erforderlich, wenn data_format "PROTO" ist.
schema String

Das Kafka-Nachrichtenschema. Der erwartete Schematyp hängt vom Datenformat ab:

Bei Lesepipelines wird dieser Parameter ignoriert, wenn confluent_schema_registry_url festgelegt ist.

Konfiguration lesen
auto_offset_reset_config String

Gibt das Verhalten an, wenn kein Anfangsoffset vorhanden ist oder der aktuelle Offset nicht mehr auf dem Kafka-Server vorhanden ist. Folgende Werte werden unterstützt:

  • "earliest": Der Offset wird auf den frühesten Offset zurückgesetzt.
  • "latest": Setzen Sie den Offset auf den letzten Offset zurück.

Der Standardwert ist "latest".

confluent_schema_registry_subject String Das Subjekt einer Confluent-Schemaregistrierung. Erforderlich, wenn confluent_schema_registry_url angegeben ist
confluent_schema_registry_url String Die URL einer Confluent Schema Registry. Andernfalls wird der Parameter schema ignoriert.
consumer_config_updates Karte Hiermit werden Konfigurationsparameter für den Kafka-Nutzer festgelegt. Weitere Informationen finden Sie in der Kafka-Dokumentation unter Nutzerkonfigurationen. Mit diesem Parameter können Sie den Kafka-Nutzer anpassen.
max_read_time_seconds int Die maximale Lesezeit in Sekunden. Mit dieser Option wird eine begrenzte PCollection generiert. Sie ist hauptsächlich für Tests oder andere nicht produktionsbezogene Szenarien gedacht.
Konfiguration schreiben
producer_config_updates Karte Konfigurationsparameter für den Kafka-Produzenten festlegen Weitere Informationen finden Sie in der Kafka-Dokumentation unter Produzentenkonfigurationen. Mit diesem Parameter können Sie den Kafka-Produzenten anpassen.

Wenn Sie Avro- oder JSON-Nachrichten lesen möchten, müssen Sie ein Nachrichtenschema angeben. Wenn Sie ein Schema direkt festlegen möchten, verwenden Sie den Parameter schema. Wenn Sie das Schema über eine Confluent-Schema-Registry bereitstellen möchten, legen Sie die Parameter confluent_schema_registry_url und confluent_schema_registry_subject fest.

Wenn Sie Protokollzwischenspeicher-Nachrichten lesen oder schreiben möchten, geben Sie entweder ein Nachrichtenschema an oder legen den file_descriptor_path-Parameter fest.