Utiliser des tables Apache Iceberg avec Dataproc Metastore

Cette page explique comment utiliser des tables Apache Iceberg avec un service Dataproc Metastore associé à un cluster Dataproc. Apache Iceberg est un format de table ouvert pour les grands ensembles de données analytiques.

Compatibilités

Les tables Iceberg prennent en charge les fonctionnalités suivantes.

Facteurs Sélectionner Insérer Créer une table
Spark
Hive
Presto

Avant de commencer

Utiliser une table Iceberg avec Spark

L'exemple suivant montre que vous devez utiliser des tables Iceberg avec Spark.

Les tables Iceberg sont compatibles avec les opérations de lecture et d'écriture. Pour en savoir plus, consultez Apache Iceberg - Spark.

Configurations Spark

Tout d'abord, démarrez l'interface système Spark et utilisez un bucket Cloud Storage pour stocker des données. Pour inclure Iceberg dans l'installation de Spark, ajoutez le fichier JAR Iceberg Spark Runtime au dossier JAR Spark. Pour télécharger le fichier JAR, consultez la page Téléchargements Apache Iceberg. La commande suivante démarre l'interface système Spark compatible avec Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Utiliser le catalogue Hive pour créer des tables Iceberg

  1. Configurez des configurations de catalogue Hive pour créer des tables Iceberg dans le Scala Spark :

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. Créez une table pour insérer et mettre à jour des données. Consultez l'exemple suivant.

    1. Créez une table example dans la base de données default:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. Insérer des exemples de données

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Spécifiez la stratégie de partition basée sur la colonne id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Créez la table :

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Ajoutez le gestionnaire de stockage Iceberg et SerDe en tant que propriété de table:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Écrivez les données dans la table:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Lire les données:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Modifiez le schéma de la table. Consultez l'exemple suivant.

    1. Obtenez la table et ajoutez une colonne grade:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Vérifiez le schéma de la nouvelle table:

      table.schema.toString;
      
  4. Insérez des données supplémentaires et observez l'évolution du schéma. Consultez l'exemple suivant.

    1. Ajoutez des données à la table:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Vérifiez les nouvelles données insérées:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Affichez l'historique du tableau:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Affichez les instantanés:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Affichez les fichiers manifestes:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Affichez les fichiers de données:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Supposons que vous ayez commis une erreur en ajoutant la ligne avec la valeur de id=6 et que vous souhaitiez revenir à une version correcte de la table:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Remplacez snapshot-id par la version vers laquelle vous souhaitez revenir.

Utiliser des tables Hadoop pour créer des tables Iceberg

  1. Définissez les configurations de table Hadoop pour créer des tables Iceberg dans la Scala Spark :

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Créez une table pour insérer et mettre à jour des données. Consultez l'exemple suivant.

    1. Créez une table example dans la base de données default:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Insérer des exemples de données

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Spécifiez la stratégie de partition basée sur la colonne id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Créez la table :

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Écrivez les données dans la table:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Lire les données:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Modifiez le schéma de la table. Consultez l'exemple suivant.

    1. Obtenez la table et ajoutez une colonne grade:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Vérifiez le schéma de la nouvelle table:

      table.schema.toString;
      
  4. Insérez des données supplémentaires et observez l'évolution du schéma. Consultez l'exemple suivant.

    1. Ajoutez des données à la table:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Vérifiez les nouvelles données insérées:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Affichez l'historique du tableau:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Affichez les instantanés:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Affichez les fichiers manifestes:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Affichez les fichiers de données:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Revenez à une version spécifique de la table:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Remplacez snapshot-id par la version à laquelle vous souhaitez revenir et ajoutez "L" à la fin. Exemple : "3943776515926014142L".

Utiliser la table Iceberg sur Hive

Iceberg prend en charge les tables lues à l'aide de Hive à l'aide d'un StorageHandler. Notez que seules les versions Hive 2.x et 3.1.2 sont compatibles. Pour en savoir plus, consultez Apache Iceberg – Hive. En outre, ajoutez le fichier JAR Iceberg Hive Runtime au paramètre classpath Hive. Pour télécharger le fichier JAR, consultez la page Téléchargements d'Apache Iceberg.

Pour superposer une table Hive sur une table Iceberg, vous devez créer la table Iceberg à l'aide d'un catalogue Hive ou d'une table Hadoop. De plus, vous devez configurer Hive en conséquence pour lire les données de la table Iceberg.

Lire la table Iceberg (Catalogue Hive) sur Hive

  1. Ouvrez le client Hive et définissez des configurations pour lire des tables Iceberg sur la session client Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Lire le schéma et les données de la table Consultez l'exemple suivant.

    1. Vérifiez le schéma de la table et vérifiez si le format de la table est Iceberg:

      describe formatted example;
      
    2. Lisez les données de la table:

      select * from example;
      

Lire la table Iceberg (table Hadoop) sur Hive

  1. Ouvrez le client Hive et définissez des configurations pour lire des tables Iceberg sur la session client Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Lire le schéma et les données de la table Consultez l'exemple suivant.

    1. Créez une table externe (superposez une table Hive sur la table Iceberg):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Vérifiez le schéma de la table et vérifiez si le format de la table est Iceberg:

      describe formatted hadoop_table;
      
    3. Lisez les données de la table:

      select * from hadoop_table;
      

Utiliser la table Iceberg sur Presto

Les requêtes Presto utilisent le connecteur Hive pour obtenir les emplacements de partition. Vous devez donc configurer Presto en conséquence pour lire et écrire des données dans la table Iceberg Pour en savoir plus, consultez les pages Presto/Trino – Hive Connector et Presto/Trino – Iceberg Connector.

Configurations de Presto

  1. Sous chaque nœud de cluster Dataproc, créez un fichier nommé iceberg.properties /etc/presto/conf/catalog/iceberg.properties et configurez le fichier hive.metastore.uri comme suit:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Remplacez example.net:9083 par l'hôte et le port appropriés pour votre service Thrift du métastore Hive.

  2. Redémarrez le service Presto pour transférer les configurations:

    sudo systemctl restart presto.service
    

Créer une table Iceberg sur Presto

  1. Ouvrez le client Presto et utilisez le connecteur "Iceberg" pour obtenir le métastore:

    --catalog iceberg --schema default
    
  2. Créez une table pour insérer et mettre à jour des données. Consultez l'exemple suivant.

    1. Créez une table example dans la base de données default:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Insérer des exemples de données

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Lisez les données de la table:

      SELECT * FROM iceberg.default.example;
      
    4. Insérez des données supplémentaires pour vérifier les instantanés:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Affichez les instantanés:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      En ajoutant la commande ORDER BY committed_at DESC LIMIT 1;, vous trouverez le dernier ID d'instantané.

    6. Effectuez un rollback vers une version spécifique de la table:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Remplacez snapshot-id par la version vers laquelle vous souhaitez revenir.

Étapes suivantes