Compatibilité d'Iceberg avec Dataproc Metastore

Cette page explique comment utiliser Apache Iceberg sur Dataproc en hébergeant un métastore Hive dans Dataproc Metastore. Vous y trouverez des informations sur l'utilisation de la table Iceberg via Spark, Hive et Presto.

Fonctionnalités

Apache Iceberg est un format de table ouvert pour les ensembles de données analytiques volumineux. Iceberg améliore considérablement les performances et fournit les fonctionnalités avancées suivantes:

  • Atomicité: les modifications de la table sont terminées ou échouent. Les modifications de la table ne sont pas partiellement validées.

  • Isolation d'instantané: les lectures n'utilisent qu'un seul instantané d'une table sans préservation à titre conservatoire.

  • Plusieurs rédacteurs simultanés: utilisent la simultanéité positive et les nouvelles tentatives pour garantir la réussite des mises à jour compatibles, même en cas d'écriture en conflit.

  • Évolution du schéma: les colonnes sont suivies par ID pour permettre l'ajout, la suppression, la mise à jour et le changement de nom.

  • Déplacements: les requêtes reproductibles peuvent utiliser la même table ou le même instantané. vous pouvez facilement examiner les modifications.

  • Planification distribuée: l'élimination des fichiers et l'envoi de prédicat sont distribués aux tâches, supprimant le métastore comme un goulot d'étranglement.

  • Historique des versions et rollback: corrigez les problèmes en rétablissant l'état précédent des tables.

  • Partitionnement masqué: empêche les erreurs utilisateur qui génèrent des résultats silencieusement incorrects ou des requêtes extrêmement lentes.

  • Évolution de la disposition de partition: peut mettre à jour la disposition d'une table lorsque le volume de données ou les modèles de requête changent.

  • Planification d'analyse et filtrage de fichier: recherche les fichiers nécessaires à une requête en supprimant définitivement les fichiers de métadonnées inutiles et en filtrant les fichiers de données ne contenant pas de données correspondantes.

Compatibilités

Iceberg fonctionne bien avec Dataproc et Dataproc Metastore. Il peut ajouter des tables au format hautes performances à Spark et Presto qui fonctionnent comme des tables SQL. Iceberg utilise un pointeur vers la dernière version d'un instantané et a besoin d'un mécanisme pour garantir l'atomicité lors du changement de version. Il offre deux options, Hive Catalog et les tables Hadoop, pour suivre les tables.

Les fonctionnalités suivantes sont acceptées :

Facteurs Sélectionner Insérer Créer une table
Spark
Hive
Presto

Avant de commencer

Pour commencer, créez un cluster Dataproc et utilisez le service Dataproc Metastore comme métastore Hive. Pour en savoir plus, consultez la page Créer un cluster Dataproc. Une fois le cluster créé, connectez-vous au cluster en SSH depuis un navigateur ou depuis la ligne de commande.

Table Iceberg avec Spark

Les tables Iceberg sont compatibles avec les opérations de lecture et d'écriture. Pour en savoir plus, consultez Apache Iceberg - Spark.

Configurations Spark

Tout d'abord, démarrez l'interface système Spark et utilisez un bucket Cloud Storage pour stocker des données. Pour inclure Iceberg dans l'installation de Spark, ajoutez le fichier JAR Iceberg Spark Runtime au dossier JAR Spark. Pour télécharger le fichier JAR, consultez la page Téléchargements Apache Iceberg. La commande suivante démarre l'interface système Spark compatible avec Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Utiliser le catalogue Hive pour créer des tables Iceberg

  1. Configurez des configurations de catalogue Hive pour créer des tables Iceberg dans le Scala Spark :

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Créez une table pour insérer et mettre à jour des données. Consultez l'exemple suivant.

    1. Créez une table example dans la base de données default:

      val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration);
      val name = TableIdentifier.of("default","example");
      
    2. Insérer des exemples de données

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Spécifiez la stratégie de partition basée sur la colonne id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Créez la table :

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Ajoutez le gestionnaire de stockage Iceberg et SerDe en tant que propriété de table:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Écrivez les données dans la table:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Lire les données:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Modifiez le schéma de la table. Consultez l'exemple suivant.

    1. Obtenez la table et ajoutez une colonne grade:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Vérifiez le schéma de la nouvelle table:

      table.schema.toString;
      
  4. Insérez des données supplémentaires et observez l'évolution du schéma. Consultez l'exemple suivant.

    1. Ajoutez des données à la table:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Vérifiez les nouvelles données insérées:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Affichez l'historique du tableau:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Affichez les instantanés:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Affichez les fichiers manifestes:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Affichez les fichiers de données:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Supposons que vous ayez commis une erreur en ajoutant la ligne avec la valeur de id=6 et que vous souhaitiez revenir à une version correcte de la table:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Remplacez snapshot-id par la version vers laquelle vous souhaitez revenir.

Utiliser des tables Hadoop pour créer des tables Iceberg

  1. Définissez les configurations de table Hadoop pour créer des tables Iceberg dans la Scala Spark :

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Créez une table pour insérer et mettre à jour des données. Consultez l'exemple suivant.

    1. Créez une table example dans la base de données default:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Insérer des exemples de données

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Spécifiez la stratégie de partition basée sur la colonne id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Créez la table :

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Écrivez les données dans la table:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Lire les données:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Modifiez le schéma de la table. Consultez l'exemple suivant.

    1. Obtenez la table et ajoutez une colonne grade:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Vérifiez le schéma de la nouvelle table:

      table.schema.toString;
      
  4. Insérez des données supplémentaires et observez l'évolution du schéma. Consultez l'exemple suivant.

    1. Ajoutez des données à la table:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Vérifiez les nouvelles données insérées:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Affichez l'historique du tableau:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Affichez les instantanés:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Affichez les fichiers manifestes:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Affichez les fichiers de données:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Revenez à une version spécifique de la table:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Remplacez snapshot-id par la version à laquelle vous souhaitez revenir et ajoutez "L" à la fin. Exemple : "3943776515926014142L".

Utiliser la table Iceberg sur Hive

Iceberg accepte les tables lues via Hive à l'aide d'un objet StorageHandler. Notez que seules les versions Hive 2.x et 3.1.2 sont compatibles. Pour en savoir plus, consultez Apache Iceberg - Hive. En outre, ajoutez le fichier JAR Iceberg Hive Runtime au paramètre classpath Hive. Pour télécharger le fichier JAR, consultez la page Téléchargements Apache Iceberg.

Pour superposer une table Hive sur une table Iceberg, vous devez créer la table Iceberg à l'aide d'un catalogue Hive ou d'une table Hadoop. De plus, vous devez configurer Hive en conséquence pour lire les données de la table Iceberg.

Lire la table Iceberg (Catalogue Hive) sur Hive

  1. Ouvrez le client Hive et définissez des configurations pour lire des tables Iceberg sur la session client Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Lire le schéma et les données de la table Consultez l'exemple suivant.

    1. Vérifiez le schéma de la table et vérifiez si le format de la table est Iceberg:

      describe formatted example;
      
    2. Lisez les données de la table:

      select * from example;
      

Lire la table Iceberg (table Hadoop) sur Hive

  1. Ouvrez le client Hive et définissez des configurations pour lire des tables Iceberg sur la session client Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Lire le schéma et les données de la table Consultez l'exemple suivant.

    1. Créez une table externe (superposez une table Hive sur la table Iceberg):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Vérifiez le schéma de la table et vérifiez si le format de la table est Iceberg:

      describe formatted hadoop_table;
      
    3. Lisez les données de la table:

      select * from hadoop_table;
      

Utiliser la table Iceberg sur Presto

Les requêtes Presto utilisent le connecteur Hive pour obtenir les emplacements de partition. Vous devez donc configurer Presto en conséquence pour lire et écrire des données dans la table Iceberg Pour en savoir plus, consultez les pages Presto/Trino – Hive Connector et Presto/Trino – Iceberg Connector.

Configurations de Presto

  1. Sous chaque nœud de cluster Dataproc, créez un fichier nommé iceberg.properties /etc/presto/conf/catalog/iceberg.properties et configurez le fichier hive.metastore.uri comme suit:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Remplacez example.net:9083 par l'hôte et le port appropriés pour votre service Thrift du métastore Hive.

  2. Redémarrez le service Presto pour transférer les configurations:

    sudo systemctl restart presto.service
    

Créer une table Iceberg sur Presto

  1. Ouvrez le client Presto et utilisez le connecteur "Iceberg" pour obtenir le métastore:

    --catalog iceberg --schema default
    
  2. Créez une table pour insérer et mettre à jour des données. Consultez l'exemple suivant.

    1. Créez une table example dans la base de données default:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Insérer des exemples de données

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Lisez les données de la table:

      SELECT * FROM iceberg.default.example;
      
    4. Insérez des données supplémentaires pour vérifier les instantanés:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Affichez les instantanés:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      En ajoutant la commande ORDER BY committed_at DESC LIMIT 1;, vous trouverez le dernier ID d'instantané.

    6. Effectuez un rollback vers une version spécifique de la table:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Remplacez snapshot-id par la version vers laquelle vous souhaitez revenir.

Étape suivante