Scrivere da Dataflow in Apache Iceberg

Per scrivere da Dataflow in Apache Iceberg, utilizza il connettore I/O gestito.

Dipendenze

Aggiungi le seguenti dipendenze al progetto:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>2.56.0</version>
</dependency>

Configurazione

Il connettore Apache Iceberg utilizza i seguenti parametri di configurazione:

  • table (stringa). Il nome dell'Apache Iceberg. Esempio: "db.table1".
  • catalog_config (mappa). La configurazione del catalogo. Contiene i seguenti campi:
    • catalog_name (stringa). Il nome del catalogo. Esempio: "local".
    • catalog_type (stringa). Il tipo di catalogo. Valori supportati: "hadoop", "hive", "rest".
    • warehouse_location (stringa). La posizione del magazzino. Esempio: file://path/to/warehouse.

Esempio

L'esempio seguente scrive dati JSON in memoria in una tabella Apache Iceberg.

Java

Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.values.PCollectionRowTuple;

public class ApacheIcebergWrite {
  static final List<String> TABLE_ROWS = Arrays.asList(
      "{\"id\":0, \"name\":\"Alice\"}",
      "{\"id\":1, \"name\":\"Bob\"}",
      "{\"id\":2, \"name\":\"Charles\"}"
  );

  static final String CATALOG_TYPE = "hadoop";

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addStringField("name")
      .addInt64Field("id")
      .build();

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("catalog_name", options.getCatalogName())
        .put("warehouse_location", options.getWarehouseLocation())
        .put("catalog_type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_config", catalogConfig)
        .build();

    // Build the pipeline.
    var input = pipeline
        .apply(Create.of(TABLE_ROWS))
        .apply(JsonToRow.withSchema(SCHEMA));

    PCollectionRowTuple.of("input", input).apply(
        Managed.write(Managed.ICEBERG)
            .withConfig(config)
    );

    pipeline.run().waitUntilFinish();
  }
}