Gravar do Dataflow para o Apache Iceberg

Para gravar do Dataflow no Apache Iceberg, use a função conector de E/S gerenciado.

A E/S gerenciada oferece suporte aos seguintes recursos do Apache Iceberg:

Catálogos
Capacidades de leitura Leitura em lote
Recursos de gravação

Para tabelas do BigQuery para o Apache Iceberg, use o conector BigQueryIO com a API BigQuery Storage. A tabela já precisa existir. A criação de tabelas dinâmicas não é compatível.

Dependências

Adicione estas dependências ao projeto:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

Configuração

A E/S gerenciada usa os seguintes parâmetros de configuração para o Apache Iceberg:

Ler e gravar configuração Tipo de dado Descrição
table string O identificador da tabela do Apache Iceberg. Exemplo: "db.table1".
catalog_name string O nome do catálogo. Exemplo: "local".
catalog_properties mapa Mapa das propriedades de configuração do Apache Iceberg no seu catálogo. As propriedades obrigatórias dependem do catálogo. Para mais informações, consulte CatalogUtil na documentação do Apache Iceberg.
config_properties mapa Um conjunto opcional de propriedades de configuração do Hadoop. Para mais informações, consulte CatalogUtil na documentação do Apache Iceberg.
Gravar configuração Tipo de dado Descrição
triggering_frequency_seconds integer Para pipelines de gravação de streaming, a frequência com que o sink tenta produzir snapshots, em segundos.

Destinos dinâmicos

A E/S gerenciada para o Apache Iceberg oferece suporte a destinos dinâmicos. Em vez de gravar em uma única tabela fixa, o conector pode selecionar dinamicamente uma tabela de destino com base nos valores de campo nos registros recebidos.

Para usar destinos dinâmicos, forneça um modelo para o parâmetro de configuração table. Para mais informações, consulte Destinos dinâmicos.

Exemplo

O exemplo a seguir grava dados JSON na memória em uma tabela do Apache Iceberg.

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.values.PCollectionRowTuple;

public class ApacheIcebergWrite {
  static final List<String> TABLE_ROWS = Arrays.asList(
      "{\"id\":0, \"name\":\"Alice\"}",
      "{\"id\":1, \"name\":\"Bob\"}",
      "{\"id\":2, \"name\":\"Charles\"}"
  );

  static final String CATALOG_TYPE = "hadoop";

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addStringField("name")
      .addInt64Field("id")
      .build();

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        .build();

    // Build the pipeline.
    pipeline.apply(Create.of(TABLE_ROWS))
        .apply(JsonToRow.withSchema(SCHEMA))
        .apply(Managed.write(Managed.ICEBERG).withConfig(config));

    pipeline.run().waitUntilFinish();
  }
}

A seguir