Ler do Apache Iceberg para o Dataflow

Para ler do Apache Iceberg para o Dataflow, use a conector de E/S gerenciado.

Dependências

Adicione estas dependências ao projeto:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>2.56.0</version>
</dependency>

Configuração

O conector do Apache Iceberg usa os seguintes parâmetros de configuração:

  • table (string). É o nome do Apache Iceberg. Exemplo: "db.table1".
  • catalog_config (mapeamento). A configuração do catálogo. Contém os seguintes campos:
    • catalog_name (string). O nome do catálogo. Exemplo: "local".
    • catalog_type (string). O tipo de catálogo. Valores aceitos: "hadoop", "hive" e "rest".
    • warehouse_location (string). A localização do warehouse. Por exemplo: file://path/to/warehouse.

Exemplo

O exemplo a seguir lê uma tabela do Apache Iceberg e grava os dados no arquivos de texto.

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ApacheIcebergRead {

  static final String CATALOG_TYPE = "hadoop";

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("Path to write the output file")
    String getOutputPath();

    void setOutputPath(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME --outputPath=$OUTPUT_FILE
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("catalog_name", options.getCatalogName())
        .put("warehouse_location", options.getWarehouseLocation())
        .put("catalog_type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_config", catalogConfig)
        .build();

    // Build the pipeline.
    pipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config))
        .getSinglePCollection()
        // Format each record as a string with the format 'id:name'.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              return String.format("%d:%s",
                  row.getInt64("id"),
                  row.getString("name"));
            })))
        // Write to a text file.
        .apply(
            TextIO.write()
                .to(options.getOutputPath())
                .withNumShards(1)
                .withSuffix(".txt"));

    pipeline.run().waitUntilFinish();
  }
}