Cette page explique comment utiliser des tables Apache Iceberg avec un service Dataproc Metastore associé à un cluster Dataproc. Apache Iceberg est un format de table ouvert pour les ensembles de données analytiques volumineux.
Compatibilités
Les tables Iceberg sont compatibles avec les fonctionnalités suivantes.
Facteurs | Sélectionner | Insérer | Créer une table |
---|---|---|---|
Spark | ✓ | ✓ | ✓ |
Hive | ✓ | ✓ | |
Presto | ✓ | ✓ | ✓ |
Avant de commencer
- Créez un service Dataproc Metastore.
- Associez Dataproc Metastore à un cluster Dataproc.
Utiliser la table Iceberg avec Spark
L'exemple suivant vous montre comment utiliser des tables Iceberg avec Spark.
Les tables Iceberg sont compatibles avec les opérations de lecture et d'écriture. Pour en savoir plus, consultez Apache Iceberg - Spark.
Configurations Spark
Tout d'abord, démarrez l'interface système Spark et utilisez un bucket Cloud Storage pour stocker des données. Pour inclure Iceberg dans l'installation de Spark, ajoutez le fichier JAR Iceberg Spark Runtime au dossier JAR Spark. Pour télécharger le fichier JAR, consultez la page Téléchargements Apache Iceberg. La commande suivante démarre l'interface système Spark compatible avec Apache Iceberg:
$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar
Utiliser le catalogue Hive pour créer des tables Iceberg
Configurez des configurations de catalogue Hive pour créer des tables Iceberg dans le Scala Spark :
import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.catalog._ import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._ import java.util.HashMap
Créez une table pour insérer et mettre à jour des données. Consultez l'exemple suivant.
Créez une table
example
dans la base de donnéesdefault
:val catalog = new HiveCatalog(); catalog.setConf(spark.sparkContext.hadoopConfiguration); catalog.initialize("hive", new HashMap[String,String]()); val name = TableIdentifier.of("default","example");
Insérer des exemples de données
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Spécifiez la stratégie de partition basée sur la colonne
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Créez la table :
val table=catalog.createTable(name,df1_schema,partition_spec);
Ajoutez le gestionnaire de stockage Iceberg et SerDe en tant que propriété de table:
table.updateProperties().set("engine.hive.enabled", "true").commit();
Écrivez les données dans la table:
df1.write.format("iceberg").mode("overwrite").save("default.example");
Lire les données:
val read_df1=spark.read.format("iceberg").load("default.example"); read_df1.show;
Modifiez le schéma de la table. Consultez l'exemple suivant.
Obtenez la table et ajoutez une colonne
grade
:val table = catalog.loadTable(TableIdentifier.of("default", "example")); table.updateSchema.addColumn("grade", StringType.get()).commit();
Vérifiez le schéma de la nouvelle table:
table.schema.toString;
Insérez des données supplémentaires et observez l'évolution du schéma. Consultez l'exemple suivant.
Ajoutez des données à la table:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save("default.example"); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save("default.example");
Vérifiez les nouvelles données insérées:
val read_df2=spark.read.format("iceberg").load("default.example"); read_df2.show;
Affichez l'historique du tableau:
spark.read.format("iceberg").load("default.example.history").show(truncate = false);
Affichez les instantanés:
spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
Affichez les fichiers manifestes:
spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
Affichez les fichiers de données:
spark.read.format("iceberg").load("default.example.files").show(truncate = false);
Supposons que vous ayez commis une erreur en ajoutant la ligne avec la valeur de
id=6
et que vous souhaitiez revenir à une version correcte de la table:spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
Remplacez
snapshot-id
par la version vers laquelle vous souhaitez revenir.
Utiliser des tables Hadoop pour créer des tables Iceberg
Définissez les configurations de table Hadoop pour créer des tables Iceberg dans la Scala Spark :
import org.apache.hadoop.conf.Configuration import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.Table import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._
Créez une table pour insérer et mettre à jour des données. Consultez l'exemple suivant.
Créez une table
example
dans la base de donnéesdefault
:val conf = new Configuration(); val tables = new HadoopTables(conf);
Insérer des exemples de données
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Spécifiez la stratégie de partition basée sur la colonne
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Créez la table :
val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>"; val table = tables.create(df1_schema, partition_spec, table_location);
Écrivez les données dans la table:
df1.write.format("iceberg").mode("overwrite").save(table_location);
Lire les données:
val read_df1=spark.read.format("iceberg").load(table_location); read_df1.show;
Modifiez le schéma de la table. Consultez l'exemple suivant.
Obtenez la table et ajoutez une colonne
grade
:val table = tables.load(table_location); table.updateSchema.addColumn("grade", StringType.get()).commit();
Vérifiez le schéma de la nouvelle table:
table.schema.toString;
Insérez des données supplémentaires et observez l'évolution du schéma. Consultez l'exemple suivant.
Ajoutez des données à la table:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save(table_location); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save(table_location);
Vérifiez les nouvelles données insérées:
val read_df2=spark.read.format("iceberg").load(table_location); read_df2.show;
Affichez l'historique du tableau:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
Affichez les instantanés:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
Affichez les fichiers manifestes:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
Affichez les fichiers de données:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
Revenez à une version spécifique de la table:
spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
Remplacez
snapshot-id
par la version à laquelle vous souhaitez revenir et ajoutez"L"
à la fin. Exemple :"3943776515926014142L"
.
Utiliser la table Iceberg sur Hive
Iceberg accepte les tables lues à l'aide de Hive à l'aide d'un StorageHandler
. Notez que seules les versions Hive 2.x et 3.1.2 sont compatibles. Pour en savoir plus, consultez Apache Iceberg - Hive. En outre, ajoutez le fichier JAR Iceberg Hive Runtime au paramètre classpath Hive. Pour télécharger le fichier JAR, consultez la page Téléchargements Apache Iceberg.
Pour superposer une table Hive sur une table Iceberg, vous devez créer la table Iceberg à l'aide d'un catalogue Hive ou d'une table Hadoop. De plus, vous devez configurer Hive en conséquence pour lire les données de la table Iceberg.
Lire la table Iceberg (Catalogue Hive) sur Hive
Ouvrez le client Hive et définissez des configurations pour lire des tables Iceberg sur la session client Hive:
add jar /path/to/iceberg-hive-runtime.jar; set iceberg.engine.hive.enabled=true; set engine.hive.enabled=true; set iceberg.mr.catalog=hive; set hive.vectorized.execution.enabled=false;
Lire le schéma et les données de la table Consultez l'exemple suivant.
Vérifiez le schéma de la table et vérifiez si le format de la table est Iceberg:
describe formatted example;
Lisez les données de la table:
select * from example;
Lire la table Iceberg (table Hadoop) sur Hive
Ouvrez le client Hive et définissez des configurations pour lire des tables Iceberg sur la session client Hive:
add jar /path/to/iceberg-hive-runtime.jar; set engine.hive.enabled=true; set hive.vectorized.execution.enabled=false;
Lire le schéma et les données de la table Consultez l'exemple suivant.
Créez une table externe (superposez une table Hive sur la table Iceberg):
CREATE EXTERNAL TABLE hadoop_table STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>' TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
Vérifiez le schéma de la table et vérifiez si le format de la table est Iceberg:
describe formatted hadoop_table;
Lisez les données de la table:
select * from hadoop_table;
Utiliser la table Iceberg sur Presto
Les requêtes Presto utilisent le connecteur Hive pour obtenir les emplacements de partition. Vous devez donc configurer Presto en conséquence pour lire et écrire des données dans la table Iceberg Pour en savoir plus, consultez les pages Presto/Trino – Hive Connector et Presto/Trino – Iceberg Connector.
Configurations de Presto
Sous chaque nœud de cluster Dataproc, créez un fichier nommé
iceberg.properties
/etc/presto/conf/catalog/iceberg.properties
et configurez le fichierhive.metastore.uri
comme suit:connector.name=iceberg hive.metastore.uri=thrift://<example.net:9083>
Remplacez
example.net:9083
par l'hôte et le port appropriés pour votre service Thrift du métastore Hive.Redémarrez le service Presto pour transférer les configurations:
sudo systemctl restart presto.service
Créer une table Iceberg sur Presto
Ouvrez le client Presto et utilisez le connecteur "Iceberg" pour obtenir le métastore:
--catalog iceberg --schema default
Créez une table pour insérer et mettre à jour des données. Consultez l'exemple suivant.
Créez une table
example
dans la base de donnéesdefault
:CREATE TABLE iceberg.default.example ( id integer, name VARCHAR, major VARCHAR, grade VARCHAR) WITH (partitioning = ARRAY['major', 'grade']);
Insérer des exemples de données
INSERT INTO iceberg.default.example VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
Lisez les données de la table:
SELECT * FROM iceberg.default.example;
Insérez des données supplémentaires pour vérifier les instantanés:
INSERT INTO example VALUES (4, 'Cindy', 'UX Design', 'Junior'); INSERT INTO example VALUES (5, 'Amy', 'UX Design', 'Sophomore');
Affichez les instantanés:
SELECT snapshot_id FROM iceberg.default."example$snapshots";
En ajoutant la commande
ORDER BY committed_at DESC LIMIT 1;
, vous trouverez le dernier ID d'instantané.Effectuez un rollback vers une version spécifique de la table:
CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
Remplacez
snapshot-id
par la version vers laquelle vous souhaitez revenir.