Escribe desde Dataflow hasta Apache Iceberg

Para escribir desde Dataflow en Apache Iceberg, usa el conector de E/S administrado.

La E/S administrada admite las siguientes funciones para Apache Iceberg:

Catálogos
Cómo leer las capacidades Lectura por lotes
Funciones de escritura

Para las tablas de BigQuery para Apache Iceberg, usa el conector BigQueryIO con la API de BigQuery Storage. La tabla ya debe existir. No se admite la creación de tablas dinámicas.

Dependencias

Agrega las siguientes dependencias a tu proyecto:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

Configuración

La E/S administrada usa los siguientes parámetros de configuración para Apache Iceberg:

Lee y escribe la configuración Tipo de datos Descripción
table string Es el identificador de la tabla de Apache Iceberg. Examplo: "db.table1".
catalog_name string Es el nombre del catálogo. Ejemplo: "local".
catalog_properties mapa Un mapa de propiedades de configuración para el catálogo de Apache Iceberg. Las propiedades obligatorias dependen del catálogo. Para obtener más información, consulta CatalogUtil en la documentación de Apache Iceberg.
config_properties mapa Un conjunto opcional de propiedades de configuración de Hadoop. Para obtener más información, consulta CatalogUtil en la documentación de Apache Iceberg.
Cómo escribir la configuración Tipo de datos Descripción
triggering_frequency_seconds integer Para las canalizaciones de escritura de transmisión, es la frecuencia con la que el receptor intenta producir instantáneas, en segundos.

Destinos dinámicos

La E/S administrada para Apache Iceberg admite destinos dinámicos. En lugar de escribir en una sola tabla fija, el conector puede seleccionar de forma dinámica una tabla de destino según los valores de campo dentro de los registros entrantes.

Para usar destinos dinámicos, proporciona una plantilla para el parámetro de configuración table. Para obtener más información, consulta Destinos dinámicos.

Ejemplo

En el siguiente ejemplo, se escriben datos JSON en la memoria en una tabla de Apache Iceberg.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.values.PCollectionRowTuple;

public class ApacheIcebergWrite {
  static final List<String> TABLE_ROWS = Arrays.asList(
      "{\"id\":0, \"name\":\"Alice\"}",
      "{\"id\":1, \"name\":\"Bob\"}",
      "{\"id\":2, \"name\":\"Charles\"}"
  );

  static final String CATALOG_TYPE = "hadoop";

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addStringField("name")
      .addInt64Field("id")
      .build();

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        .build();

    // Build the pipeline.
    pipeline.apply(Create.of(TABLE_ROWS))
        .apply(JsonToRow.withSchema(SCHEMA))
        .apply(Managed.write(Managed.ICEBERG).withConfig(config));

    pipeline.run().waitUntilFinish();
  }
}

¿Qué sigue?