Écrire depuis Dataflow vers Apache Iceberg

Pour écrire depuis Dataflow vers Apache Iceberg, utilisez le connecteur d'E/S géré.

Les E/S gérées sont compatibles avec les fonctionnalités suivantes pour Apache Iceberg:

Catalogues
Fonctionnalités de lecture Lecture par lots
Capacités d'écriture

Pour les tables BigQuery pour Apache Iceberg, utilisez le connecteur BigQueryIO avec l'API BigQuery Storage. La table doit déjà exister. La création de tables dynamiques n'est pas prise en charge.

Dépendances

Ajoutez les dépendances suivantes au projet :

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

Configuration

Les E/S gérées utilisent les paramètres de configuration suivants pour Apache Iceberg:

Lire et écrire la configuration Type de données Description
table chaîne Identifiant de la table Apache Iceberg. Exemple : "db.table1".
catalog_name chaîne Nom du catalogue. Exemple : "local".
catalog_properties carte Mappage des propriétés de configuration pour le catalogue Apache Iceberg. Les propriétés requises dépendent du catalogue. Pour en savoir plus, consultez CatalogUtil dans la documentation Apache Iceberg.
config_properties carte Ensemble facultatif de propriétés de configuration Hadoop. Pour en savoir plus, consultez la page CatalogUtil dans la documentation Apache Iceberg.
Écrire la configuration Type de données Description
triggering_frequency_seconds entier Pour les pipelines d'écriture en streaming, fréquence à laquelle le récepteur tente de produire des instantanés, en secondes.

Destinations dynamiques

Les E/S gérées pour Apache Iceberg sont compatibles avec les destinations dynamiques. Au lieu d'écrire dans une seule table fixe, le connecteur peut sélectionner dynamiquement une table de destination en fonction des valeurs de champ dans les enregistrements entrants.

Pour utiliser des destinations dynamiques, fournissez un modèle pour le paramètre de configuration table. Pour en savoir plus, consultez la section Destinations dynamiques.

Exemple

L'exemple suivant écrit des données JSON en mémoire dans une table Apache Iceberg.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.values.PCollectionRowTuple;

public class ApacheIcebergWrite {
  static final List<String> TABLE_ROWS = Arrays.asList(
      "{\"id\":0, \"name\":\"Alice\"}",
      "{\"id\":1, \"name\":\"Bob\"}",
      "{\"id\":2, \"name\":\"Charles\"}"
  );

  static final String CATALOG_TYPE = "hadoop";

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addStringField("name")
      .addInt64Field("id")
      .build();

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        .build();

    // Build the pipeline.
    pipeline.apply(Create.of(TABLE_ROWS))
        .apply(JsonToRow.withSchema(SCHEMA))
        .apply(Managed.write(Managed.ICEBERG).withConfig(config));

    pipeline.run().waitUntilFinish();
  }
}

Étape suivante