Escribir desde Dataflow en Apache Iceberg

Para escribir datos de Dataflow en Apache Iceberg, usa el conector de E/S gestionado.

La E/gestionada admite las siguientes funciones de Apache Iceberg:

Catálogos
  • Hadoop
  • Hive
  • Catálogos basados en REST
  • Metastore de BigQuery (requiere el SDK de Apache Beam 2.62.0 o una versión posterior si no se usa Runner v2)
Funciones de lectura Lectura por lotes
Funciones de escritura

En el caso de las tablas de BigQuery para Apache Iceberg, usa el conector BigQueryIO con la API Storage de BigQuery. La tabla ya debe existir. No se admite la creación de tablas dinámicas.

Dependencias

Añade las siguientes dependencias a tu proyecto:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

Destinos dinámicos

La E/gestionada de Apache Iceberg admite destinos dinámicos. En lugar de escribir en una sola tabla fija, el conector puede seleccionar de forma dinámica una tabla de destino en función de los valores de campo de los registros entrantes.

Para usar destinos dinámicos, proporciona una plantilla para el parámetro de configuración table. Para obtener más información, consulta Destinos dinámicos.

Ejemplos

En los siguientes ejemplos se muestra cómo usar la entrada/salida gestionada para escribir en Apache Iceberg.

Escribir en una tabla de Apache Iceberg

En el siguiente ejemplo se escriben datos JSON en memoria en una tabla de Apache Iceberg.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.values.PCollectionRowTuple;

public class ApacheIcebergWrite {
  static final List<String> TABLE_ROWS = Arrays.asList(
      "{\"id\":0, \"name\":\"Alice\"}",
      "{\"id\":1, \"name\":\"Bob\"}",
      "{\"id\":2, \"name\":\"Charles\"}"
  );

  static final String CATALOG_TYPE = "hadoop";

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addStringField("name")
      .addInt64Field("id")
      .build();

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        .build();

    // Build the pipeline.
    pipeline.apply(Create.of(TABLE_ROWS))
        .apply(JsonToRow.withSchema(SCHEMA))
        .apply(Managed.write(Managed.ICEBERG).withConfig(config));

    pipeline.run().waitUntilFinish();
  }
}

Escribir con destinos dinámicos

En el siguiente ejemplo se escribe en diferentes tablas de Apache Iceberg en función de un campo de los datos de entrada.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;

public class ApacheIcebergDynamicDestinations {

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addInt64Field("id")
      .addStringField("name")
      .addStringField("airport")
      .build();

  // The data to write to table, formatted as JSON strings.
  static final List<String> TABLE_ROWS = List.of(
      "{\"id\":0, \"name\":\"Alice\", \"airport\": \"ORD\" }",
      "{\"id\":1, \"name\":\"Bob\", \"airport\": \"SYD\" }",
      "{\"id\":2, \"name\":\"Charles\", \"airport\": \"ORD\" }"
  );

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);
  }

  // Write JSON data to Apache Iceberg, using dynamic destinations to determine the Iceberg table
  // where Dataflow writes each record. The JSON data contains a field named "airport". The
  // Dataflow pipeline writes to Iceberg tables with the naming pattern "flights-{airport}".
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", "hadoop")
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        // Route the incoming records based on the value of the "airport" field.
        .put("table", "flights-{airport}")
        // Specify which fields to keep from the input data.
        .put("keep", Arrays.asList("name", "id"))
        .build();

    // Build the pipeline.
    pipeline
        // Read in-memory JSON data.
        .apply(Create.of(TABLE_ROWS))
        // Convert the JSON records to Row objects.
        .apply(JsonToRow.withSchema(SCHEMA))
        // Write each Row to Apache Iceberg.
        .apply(Managed.write(Managed.ICEBERG).withConfig(config));

    // Run the pipeline.
    pipeline.run().waitUntilFinish();
  }
}

Siguientes pasos