Von Dataflow in Apache Iceberg schreiben

Verwenden Sie den verwalteten E/A-Connector, um von Dataflow in Apache Iceberg zu schreiben.

Verwaltete E/A unterstützt die folgenden Funktionen für Apache Iceberg:

Kataloge
  • Hadoop
  • Hive
  • REST-basierte Kataloge
  • BigQuery-Metastore (nach der Veröffentlichung von Beam 2.63.0)
Lesefunktionen Batchlesevorgang
Schreibfunktionen
  • Batch-Schreiben
  • Streaming-Schreibvorgang
  • Dynamische Ziele
  • Dynamische Tabellen erstellen

Verwenden Sie für BigQuery-Tabellen für Apache Iceberg den BigQueryIO-Connector mit der BigQuery Storage API. Die Tabelle muss bereits vorhanden sein. Das Erstellen dynamischer Tabellen wird nicht unterstützt.

Abhängigkeiten

Fügen Sie Ihrem Projekt die folgenden Abhängigkeiten hinzu:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

Konfiguration

Für die verwaltete E/A werden die folgenden Konfigurationsparameter für Apache Iceberg verwendet:

Konfiguration lesen und schreiben Datentyp Beschreibung
table String Die Kennung der Apache Iceberg-Tabelle. Beispiel: "db.table1".
catalog_name String Der Name des Katalogs. Beispiel: "local".
catalog_properties Karte Eine Zuordnung von Konfigurationseigenschaften für den Apache Iceberg-Katalog. Welche Properties erforderlich sind, hängt vom Katalog ab. Weitere Informationen finden Sie in der Apache Iceberg-Dokumentation unter CatalogUtil.
config_properties Karte Optionale Hadoop-Konfigurationseigenschaften. Weitere Informationen finden Sie in der Apache Iceberg-Dokumentation unter CatalogUtil.
Schreibkonfiguration Datentyp Beschreibung
triggering_frequency_seconds integer Für Streaming-Schreibpipelines die Häufigkeit, mit der die Senke versucht, Snapshots zu erstellen, in Sekunden.

Dynamische Ziele

Verwaltete E/A für Apache Iceberg unterstützt dynamische Ziele. Anstatt in eine einzelne feste Tabelle zu schreiben, kann der Connector basierend auf den Feldwerten in den eingehenden Datensätzen dynamisch eine Zieltabelle auswählen.

Wenn Sie dynamische Ziele verwenden möchten, müssen Sie eine Vorlage für den Konfigurationsparameter table angeben. Weitere Informationen finden Sie unter Dynamische Ziele.

Beispiel

Im folgenden Beispiel werden JSON-Daten im Arbeitsspeicher in eine Apache Iceberg-Tabelle geschrieben.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.values.PCollectionRowTuple;

public class ApacheIcebergWrite {
  static final List<String> TABLE_ROWS = Arrays.asList(
      "{\"id\":0, \"name\":\"Alice\"}",
      "{\"id\":1, \"name\":\"Bob\"}",
      "{\"id\":2, \"name\":\"Charles\"}"
  );

  static final String CATALOG_TYPE = "hadoop";

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addStringField("name")
      .addInt64Field("id")
      .build();

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        .build();

    // Build the pipeline.
    pipeline.apply(Create.of(TABLE_ROWS))
        .apply(JsonToRow.withSchema(SCHEMA))
        .apply(Managed.write(Managed.ICEBERG).withConfig(config));

    pipeline.run().waitUntilFinish();
  }
}

Nächste Schritte