Lee desde Apache Iceberg hasta Dataflow

Para leer de Apache Iceberg a Dataflow, usa el conector de E/S administrado.

La E/S administrada admite las siguientes funciones para Apache Iceberg:

Catálogos
Cómo leer las capacidades Lectura por lotes
Funciones de escritura

Para las tablas de BigQuery para Apache Iceberg, usa el conector BigQueryIO con la API de BigQuery Storage. La tabla ya debe existir. No se admite la creación de tablas dinámicas.

Dependencias

Agrega las siguientes dependencias a tu proyecto:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

Ejemplo

En el siguiente ejemplo, se leen desde una tabla de Apache Iceberg y se escriben los datos en archivos de texto.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ApacheIcebergRead {

  static final String CATALOG_TYPE = "hadoop";

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("Path to write the output file")
    String getOutputPath();

    void setOutputPath(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME --outputPath=$OUTPUT_FILE
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        .build();

    // Build the pipeline.
    pipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config))
        .getSinglePCollection()
        // Format each record as a string with the format 'id:name'.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              return String.format("%d:%s",
                  row.getInt64("id"),
                  row.getString("name"));
            })))
        // Write to a text file.
        .apply(
            TextIO.write()
                .to(options.getOutputPath())
                .withNumShards(1)
                .withSuffix(".txt"));

    pipeline.run().waitUntilFinish();
  }
}

¿Qué sigue?