Questa pagina spiega come utilizzare le tabelle Apache Iceberg con un servizio Dataproc Metastore collegato a un cluster Dataproc. Apache Iceberg è un formato a tabella aperta per set di dati analitici di grandi dimensioni.
Compatibilità
Le tabelle di iceberg supportano le seguenti funzionalità.
Driver | Seleziona | Inserisci | Crea tabella |
---|---|---|---|
Spark | ✓ | ✓ | ✓ |
Hive | ✓ | ✓ | |
Presto | ✓ | ✓ | ✓ |
Prima di iniziare
- Creare un servizio Dataproc Metastore.
- Collega Dataproc Metastore a un cluster Dataproc.
Utilizzo della tabella Iceberg con Spark
L'esempio seguente mostra che devi utilizzare le tabelle Iceberg con Spark.
Le tabelle iceberg supportano le operazioni di lettura e scrittura. Per ulteriori informazioni, consulta Apache Iceberg - Spark.
Configurazioni Spark
Per prima cosa, avvia la shell di Spark e utilizza un bucket Cloud Storage per archiviare i dati. Per includere Iceberg nell'installazione di Spark, aggiungi il file JAR di Iceberg Spark Runtime alla cartella JAR di Spark. Per scaricare il file JAR, consulta la sezione Download di Apache Iceberg. Il seguente comando avvia la shell Spark con supporto per Apache Iceberg:
$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar
Utilizzare il catalogo Hive per creare tabelle Iceberg
Imposta le configurazioni del catalogo Hive per creare tabelle Iceberg nella scala Spark:
import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.catalog._ import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._ import java.util.HashMap
Crea una tabella per inserire e aggiornare i dati. Di seguito è riportato un esempio.
Crea una tabella denominata
example
nel databasedefault
:val catalog = new HiveCatalog(); catalog.setConf(spark.sparkContext.hadoopConfiguration); catalog.initialize("hive", new HashMap[String,String]()); val name = TableIdentifier.of("default","example");
Inserisci dati di esempio:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Specifica la strategia di partizione in base alla colonna
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Crea la tabella:
val table=catalog.createTable(name,df1_schema,partition_spec);
Aggiungi il gestore dell'archiviazione Iceberg e SerDe come proprietà della tabella:
table.updateProperties().set("engine.hive.enabled", "true").commit();
Scrivi i dati nella tabella:
df1.write.format("iceberg").mode("overwrite").save("default.example");
Leggi i dati:
val read_df1=spark.read.format("iceberg").load("default.example"); read_df1.show;
Modificare lo schema della tabella. Di seguito è riportato un esempio.
Recupera la tabella e aggiungi una nuova colonna
grade
:val table = catalog.loadTable(TableIdentifier.of("default", "example")); table.updateSchema.addColumn("grade", StringType.get()).commit();
Controlla il nuovo schema della tabella:
table.schema.toString;
Inserisci altri dati e visualizza l'evoluzione dello schema. Di seguito è riportato un esempio.
Aggiungi nuovi dati alla tabella:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save("default.example"); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save("default.example");
Controlla i nuovi dati inseriti:
val read_df2=spark.read.format("iceberg").load("default.example"); read_df2.show;
Visualizzare la cronologia della tabella:
spark.read.format("iceberg").load("default.example.history").show(truncate = false);
Visualizza gli snapshot:
spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
Visualizza i file manifest:
spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
Visualizza i file di dati:
spark.read.format("iceberg").load("default.example.files").show(truncate = false);
Supponi di aver commesso un errore aggiungendo la riga con il valore
id=6
e di voler tornare indietro e visualizzare una versione corretta della tabella:spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
Sostituisci
snapshot-id
con la versione a cui vuoi tornare.
Utilizzare le tabelle Hadoop per creare tabelle Iceberg
Configura le configurazioni delle tabelle Hadoop per creare tabelle Iceberg nella scala Spark:
import org.apache.hadoop.conf.Configuration import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.Table import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._
Crea una tabella per inserire e aggiornare i dati. Di seguito è riportato un esempio.
Crea una tabella denominata
example
nel databasedefault
:val conf = new Configuration(); val tables = new HadoopTables(conf);
Inserisci dati di esempio:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Specifica la strategia di partizione in base alla colonna
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Crea la tabella:
val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>"; val table = tables.create(df1_schema, partition_spec, table_location);
Scrivi i dati nella tabella:
df1.write.format("iceberg").mode("overwrite").save(table_location);
Leggi i dati:
val read_df1=spark.read.format("iceberg").load(table_location); read_df1.show;
Modificare lo schema della tabella. Di seguito è riportato un esempio.
Recupera la tabella e aggiungi una nuova colonna
grade
:val table = tables.load(table_location); table.updateSchema.addColumn("grade", StringType.get()).commit();
Controlla il nuovo schema della tabella:
table.schema.toString;
Inserisci altri dati e visualizza l'evoluzione dello schema. Di seguito è riportato un esempio.
Aggiungi nuovi dati alla tabella:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save(table_location); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save(table_location);
Controlla i nuovi dati inseriti:
val read_df2=spark.read.format("iceberg").load(table_location); read_df2.show;
Visualizzare la cronologia della tabella:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
Visualizza gli snapshot:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
Visualizza i file manifest:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
Visualizza i file di dati:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
Torna indietro per visualizzare una versione specifica della tabella:
spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
Sostituisci
snapshot-id
con la versione a cui vuoi tornare e aggiungi"L"
alla fine. Ad esempio:"3943776515926014142L"
.
Utilizzo della tabella Iceberg su Hive
Iceberg supporta la lettura delle tabelle mediante Hive mediante un StorageHandler
. Tieni presente che sono supportate solo le versioni Hive 2.x e 3.1.2. Per ulteriori informazioni, consulta
Apache Iceberg - Hive. Inoltre, aggiungi il file JAR di runtime Iceberg Hive al percorso della classe Hive. Per scaricare il file JAR, consulta la sezione Download di Apache Iceberg.
Per sovrapporre una tabella Hive in cima a una tabella Iceberg, devi creare la tabella Iceberg utilizzando un catalogo Hive o una tabella Hadoop. Inoltre, devi configurare Hive di conseguenza per leggere i dati dalla tabella Iceberg.
Lettura della tabella Iceberg (catalogo Hive) su Hive
Apri il client Hive e imposta le configurazioni per leggere le tabelle Iceberg nella sessione del client Hive:
add jar /path/to/iceberg-hive-runtime.jar; set iceberg.engine.hive.enabled=true; set engine.hive.enabled=true; set iceberg.mr.catalog=hive; set hive.vectorized.execution.enabled=false;
Leggi lo schema e i dati della tabella. Di seguito è riportato un esempio.
Controlla lo schema della tabella e verifica che il formato della tabella sia Iceberg:
describe formatted example;
Leggi i dati dalla tabella:
select * from example;
Lettura tabella Iceberg (tabella Hadoop) su Hive
Apri il client Hive e imposta le configurazioni per leggere le tabelle Iceberg nella sessione del client Hive:
add jar /path/to/iceberg-hive-runtime.jar; set engine.hive.enabled=true; set hive.vectorized.execution.enabled=false;
Leggi lo schema e i dati della tabella. Di seguito è riportato un esempio.
Crea una tabella esterna (sovrapponi una tabella Hive sopra la tabella Iceberg):
CREATE EXTERNAL TABLE hadoop_table STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>' TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
Controlla lo schema della tabella e verifica che il formato della tabella sia Iceberg:
describe formatted hadoop_table;
Leggi i dati dalla tabella:
select * from hadoop_table;
Utilizzare la tabella Iceberg su Presto
Le query Presto utilizzano il connettore Hive per ottenere le posizioni delle partizioni, quindi devi configurare Presto di conseguenza per leggere e scrivere dati nella tabella Iceberg. Per ulteriori informazioni, consulta Connettore Presto/Trino - Hive e Connettore Presto/Trino - Iceberg.
Configurazioni Presto
Sotto ogni nodo del cluster Dataproc, crea un file denominato
iceberg.properties
/etc/presto/conf/catalog/iceberg.properties
e configurahive.metastore.uri
come segue:connector.name=iceberg hive.metastore.uri=thrift://<example.net:9083>
Sostituisci
example.net:9083
con l'host e la porta corretti per il servizio Hive Metastore Thrift.Riavvia il servizio Presto per eseguire il push delle configurazioni:
sudo systemctl restart presto.service
Crea tabella Iceberg su Presto
Apri il client Presto e utilizza il connettore "Iceberg" per ottenere il metastore:
--catalog iceberg --schema default
Crea una tabella per inserire e aggiornare i dati. Di seguito è riportato un esempio.
Crea una tabella denominata
example
nel databasedefault
:CREATE TABLE iceberg.default.example ( id integer, name VARCHAR, major VARCHAR, grade VARCHAR) WITH (partitioning = ARRAY['major', 'grade']);
Inserisci dati di esempio:
INSERT INTO iceberg.default.example VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
Leggi i dati dalla tabella:
SELECT * FROM iceberg.default.example;
Inserisci altri nuovi dati per controllare gli snapshot:
INSERT INTO example VALUES (4, 'Cindy', 'UX Design', 'Junior'); INSERT INTO example VALUES (5, 'Amy', 'UX Design', 'Sophomore');
Visualizza gli snapshot:
SELECT snapshot_id FROM iceberg.default."example$snapshots";
Aggiungendo il comando
ORDER BY committed_at DESC LIMIT 1;
, puoi trovare l'ID snapshot più recente.Esegui il rollback a una versione specifica della tabella:
CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
Sostituisci
snapshot-id
con la versione a cui vuoi tornare.