Escribe desde Dataflow hasta Apache Iceberg

Para escribir desde Dataflow en Apache Iceberg, usa el conector de E/S administrado.

Dependencias

Agrega las siguientes dependencias a tu proyecto:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>2.56.0</version>
</dependency>

Configuración

El conector de Apache Iceberg usa los siguientes parámetros de configuración:

  • table (cadena). El nombre de Apache Iceberg. Ejemplo: "db.table1".
  • catalog_config (mapa). La configuración del catálogo. Contiene los siguientes campos:
    • catalog_name (cadena). Es el nombre del catálogo. Ejemplo: "local".
    • catalog_type (cadena). El tipo de catálogo. Valores compatibles: "hadoop", "hive", "rest".
    • warehouse_location (cadena). La ubicación del almacén. Ejemplo: file://path/to/warehouse.

Ejemplo

En el siguiente ejemplo, se escriben datos JSON en la memoria en una tabla de Apache Iceberg.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.values.PCollectionRowTuple;

public class ApacheIcebergWrite {
  static final List<String> TABLE_ROWS = Arrays.asList(
      "{\"id\":0, \"name\":\"Alice\"}",
      "{\"id\":1, \"name\":\"Bob\"}",
      "{\"id\":2, \"name\":\"Charles\"}"
  );

  static final String CATALOG_TYPE = "hadoop";

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addStringField("name")
      .addInt64Field("id")
      .build();

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("catalog_name", options.getCatalogName())
        .put("warehouse_location", options.getWarehouseLocation())
        .put("catalog_type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_config", catalogConfig)
        .build();

    // Build the pipeline.
    var input = pipeline
        .apply(Create.of(TABLE_ROWS))
        .apply(JsonToRow.withSchema(SCHEMA));

    PCollectionRowTuple.of("input", input).apply(
        Managed.write(Managed.ICEBERG)
            .withConfig(config)
    );

    pipeline.run().waitUntilFinish();
  }
}