Scrivere da Dataflow ad Apache Iceberg

Per scrivere da Dataflow ad Apache Iceberg, utilizza il connettore I/O gestito.

I/O gestita supporta le seguenti funzionalità per Apache Iceberg:

Cataloghi
  • Hadoop
  • Hive
  • Cataloghi basati su REST
  • Metastore BigQuery (dopo il rilascio di Beam 2.63.0)
Funzionalità di lettura Lettura batch
Funzionalità di scrittura

Per le tabelle BigQuery per Apache Iceberg, utilizza il connettore BigQueryIO con l'API BigQuery Storage. La tabella deve già esistere; la creazione di tabelle dinamiche non è supportata.

Dipendenze

Aggiungi le seguenti dipendenze al progetto:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

Configurazione

L'I/O gestita utilizza i seguenti parametri di configurazione per Apache Iceberg:

Lettura e scrittura della configurazione Tipo di dati Descrizione
table string L'identificatore della tabella Apache Iceberg. Esempio: "db.table1".
catalog_name string Il nome del catalogo. Esempio: "local".
catalog_properties mappa Una mappa delle proprietà di configurazione per il catalogo Apache Iceberg. Le proprietà richieste dipendono dal catalogo. Per ulteriori informazioni, consulta CatalogUtil nella documentazione di Apache Iceberg.
config_properties mappa Un insieme facoltativo di proprietà di configurazione Hadoop. Per ulteriori informazioni, consulta CatalogUtil nella documentazione di Apache Iceberg.
Scrittura configurazione Tipo di dati Descrizione
triggering_frequency_seconds integer Per le pipeline di scrittura in streaming, la frequenza con cui il sink tenta di produrre snapshot, in secondi.

Destinazioni dinamiche

I/O gestito per Apache Iceberg supporta le destinazioni dinamiche. Invece di scrivere in una singola tabella fissa, il connettore può selezionare dinamicamente una tabella di destinazione in base ai valori dei campi all'interno dei record in entrata.

Per utilizzare le destinazioni dinamiche, fornisci un modello per il parametro di configurazione table. Per ulteriori informazioni, consulta Destinazioni dinamiche.

Esempio

Il seguente esempio scrive i dati JSON in-memory in una tabella Apache Iceberg.

Java

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.values.PCollectionRowTuple;

public class ApacheIcebergWrite {
  static final List<String> TABLE_ROWS = Arrays.asList(
      "{\"id\":0, \"name\":\"Alice\"}",
      "{\"id\":1, \"name\":\"Bob\"}",
      "{\"id\":2, \"name\":\"Charles\"}"
  );

  static final String CATALOG_TYPE = "hadoop";

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addStringField("name")
      .addInt64Field("id")
      .build();

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        .build();

    // Build the pipeline.
    pipeline.apply(Create.of(TABLE_ROWS))
        .apply(JsonToRow.withSchema(SCHEMA))
        .apply(Managed.write(Managed.ICEBERG).withConfig(config));

    pipeline.run().waitUntilFinish();
  }
}

Passaggi successivi