Ler do Apache Iceberg para o Dataflow

Para ler do Apache Iceberg para o Dataflow, use a conector de E/S gerenciado.

A E/S gerenciada oferece suporte aos seguintes recursos do Apache Iceberg:

  • Hadoop
  • Hive
  • Catálogos baseados em REST
  • Metastore do BigQuery: requer o SDK do Apache Beam 2.62.0 ou mais recente se você não estiver usando o Runner v2.
Capacidades de leitura Leitura em lote
Recursos de gravação

Para tabelas do BigQuery para o Apache Iceberg, use o conector BigQueryIO com a API BigQuery Storage. A tabela já precisa existir. A criação de tabelas dinâmicas não é compatível.


Adicione estas dependências ao projeto:





O exemplo a seguir lê uma tabela do Apache Iceberg e grava os dados no arquivos de texto.


Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ApacheIcebergRead {

  static final String CATALOG_TYPE = "hadoop";

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("Path to write the output file")
    String getOutputPath();

    void setOutputPath(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME --outputPath=$OUTPUT_FILE
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", CATALOG_TYPE)

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)

    // Build the pipeline.
        // Format each record as a string with the format 'id:name'.
            .via((row -> {
              return String.format("%d:%s",
        // Write to a text file.


A seguir