Supporto di Iceberg su Dataproc Metastore

Questa pagina spiega come utilizzare Apache Iceberg su Dataproc ospitando il metastore Hive in Dataproc Metastore. Include le informazioni su come utilizzare la tabella Iceberg tramite Spark, Hive e Presto.

Funzionalità

Apache Iceberg è un formato a tabella aperta per grandi set di dati analitici. Iceberg migliora notevolmente le prestazioni e offre le seguenti funzionalità avanzate:

  • Atomicità: le modifiche alla tabella sono state completate o non riuscite. Non esiste un impegno parziale di modifiche alla tabella.

  • Isolamento snapshot: le letture utilizzano solo un'istantanea di una tabella senza tenere un blocco.

  • Writer simultanei multipli: utilizza la contemporaneità e i tentativi ripetuti di ottimizzazione per garantire che gli aggiornamenti compatibili abbiano esito positivo, anche quando i dati sono in conflitto.

  • Evoluzione dello schema: le colonne vengono monitorate in base all'ID per supportare l'aggiunta, il rilascio, l'aggiornamento e la ridenominazione.

  • Viaggio nel tempo: le query riproducibili possono utilizzare la stessa tabella o snapshot; puoi esaminare facilmente le modifiche.

  • Pianificazione distribuita: l'eliminazione dei file e il push-down nel predicato vengono distribuiti ai job e il metastore viene rimosso come collo di bottiglia.

  • Cronologia delle versioni e rollback: correggi i problemi reimpostando le tabelle su uno stato precedente.

  • Partizione nascosta: impedisce gli errori utente che causano risultati erroneamente silenziosi o query estremamente lente.

  • Evoluzione del layout delle partizioni: può aggiornare il layout di una tabella quando cambiano il volume di dati o i pattern di query.

  • Scansione e pianificazione dei file: consente di trovare i file necessari per una query eliminando i file di metadati non necessari e filtrando i file che non contengono dati corrispondenti.

Compatibilità

Iceberg funziona bene con Dataproc e Dataproc Metastore. Può aggiungere tabelle con formato ad alte prestazioni a Spark e Presto, che funzionano come tabelle SQL. Iceberg utilizza un puntatore per aggiornare la versione più recente di un'istantanea e necessita di un meccanismo per garantire l'atomia nella migrazione delle versioni. Offre due opzioni, le tabelle Hive Catalog e Hadoop, per monitorare le tabelle.

Le funzionalità supportate includono:

Driver Seleziona Inserisci Crea tabella
Spark
Hive
Presto

Prima di iniziare

Per iniziare, crea un cluster Dataproc e utilizza il servizio Dataproc Metastore come metastore Hive. Per ulteriori informazioni, consulta la sezione Creare un cluster Dataproc. Dopo aver creato il cluster, accedi al cluster tramite SSH da un browser o dalla riga di comando.

Tavola Iceberg con Spark

Le tabelle Iceberg supportano le operazioni di lettura e scrittura. Per maggiori informazioni, consulta Apache Iceberg - Spark.

Configurazioni Spark

Innanzitutto, avvia la shell Spark e utilizza un bucket Cloud Storage per archiviare i dati. Per includere Iceberg nell'installazione di Spark, aggiungi il file JAR Iceberg Spark Runtime alla cartella JAR di Spark. Per scaricare il file JAR, consulta la sezione Download di Apache Iceberg. Il comando seguente avvia la shell Spark con il supporto per Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Usa il catalogo Hive per creare tabelle Iceberg

  1. Imposta le configurazioni Hive Catalog per creare tabelle Iceberg nella spark scala:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Crea una tabella per inserire e aggiornare i dati. Di seguito è riportato un esempio.

    1. Crea una tabella denominata example nel database default:

      val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration);
      val name = TableIdentifier.of("default","example");
      
    2. Inserisci dati di esempio:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Specifica la strategia di partizione in base alla colonna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crea la tabella:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Aggiungi Iceberg Storage Handler e SerDe come proprietà della tabella:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Scrivi i dati nella tabella:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Leggere i dati:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Modifica lo schema della tabella. Di seguito è riportato un esempio.

    1. Scegli la tabella e aggiungi una nuova colonna grade:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Controlla il nuovo schema della tabella:

      table.schema.toString;
      
  4. Inserisci più dati e visualizza l'evoluzione dello schema. Di seguito è riportato un esempio.

    1. Aggiungi nuovi dati alla tabella:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Controlla i nuovi dati inseriti:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Visualizzare la cronologia della tabella:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Visualizza gli snapshot:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Visualizza i file manifest:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Visualizzare i file di dati:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Supponiamo che tu abbia commesso un errore aggiungendo la riga con il valore id=6 e che voglia tornare indietro per visualizzare una versione corretta della tabella:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Sostituisci snapshot-id con la versione su cui vuoi tornare.

Utilizzo di Hadoop Tables per creare tabelle Iceberg

  1. Imposta le configurazioni di Hadoop Table per creare tabelle Iceberg nella spark scala:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Crea una tabella per inserire e aggiornare i dati. Di seguito è riportato un esempio.

    1. Crea una tabella denominata example nel database default:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Inserisci dati di esempio:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Specifica la strategia di partizione in base alla colonna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crea la tabella:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Scrivi i dati nella tabella:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Leggere i dati:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Modifica lo schema della tabella. Di seguito è riportato un esempio.

    1. Scegli la tabella e aggiungi una nuova colonna grade:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Controlla il nuovo schema della tabella:

      table.schema.toString;
      
  4. Inserisci più dati e visualizza l'evoluzione dello schema. Di seguito è riportato un esempio.

    1. Aggiungi nuovi dati alla tabella:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Controlla i nuovi dati inseriti:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Visualizzare la cronologia della tabella:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Visualizza gli snapshot:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Visualizza i file manifest:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Visualizzare i file di dati:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Torna indietro per visualizzare una versione specifica della tabella:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Sostituisci snapshot-id con la versione su cui vuoi tornare indietro e aggiungi "L" alla fine. Ad esempio: "3943776515926014142L".

Usa la tabella Iceberg su Hive

Iceberg supporta le tabelle lette tramite Hive utilizzando un StorageStorage. Tieni presente che sono supportate solo le versioni Hive 2.x e 3.1.2. Per maggiori informazioni, consulta Apache Iceberg - Hive. Inoltre, aggiungi il file JAR Iceberg Hive Runtime al percorso classe Hive. Per scaricare il file JAR, consulta la sezione Download di Apache Iceberg.

Per sovrapporre una tabella Hive su una tabella Iceberg, devi crearne una utilizzando un catalogo Hive o una tabella Hadoop. Inoltre, devi configurare Hive di conseguenza per leggere i dati dalla tabella Iceberg.

Leggi la tabella Iceberg (catalogo Hive) su Hive

  1. Apri il client Hive e imposta le configurazioni per leggere le tabelle Iceberg nella sessione client Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Leggi lo schema e i dati della tabella. Di seguito è riportato un esempio.

    1. Controlla lo schema della tabella e se il formato della tabella è Iceberg:

      describe formatted example;
      
    2. Leggi i dati della tabella:

      select * from example;
      

Leggi la tabella Iceberg (Hadoop Table) su Hive

  1. Apri il client Hive e imposta le configurazioni per leggere le tabelle Iceberg nella sessione client Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Leggi lo schema e i dati della tabella. Di seguito è riportato un esempio.

    1. Creare una tabella esterna (sovrapporre una tabella Hive nella parte superiore della tabella Iceberg):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Controlla lo schema della tabella e se il formato della tabella è Iceberg:

      describe formatted hadoop_table;
      
    3. Leggi i dati della tabella:

      select * from hadoop_table;
      

Usa la tabella Iceberg su Presto

Le query Presto utilizzano il connettore Hive per recuperare le posizioni delle partizioni, quindi devi configurare Presto di conseguenza per leggere e scrivere dati nella tabella Iceberg. Per ulteriori informazioni, vedi Connettore Presto/Trino - Hive e Connettore Presto/Trino - Iceberg.

Configurazioni di Presto

  1. Sotto ogni nodo del cluster Dataproc, crea un file denominato iceberg.properties/etc/presto/conf/catalog/iceberg.properties e configura hive.metastore.uri come segue:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Sostituisci example.net:9083 con l'host e la porta corretti per il servizio Hive Thrift.

  2. Riavvia il servizio Presto per eseguire il push delle configurazioni:

    sudo systemctl restart presto.service
    

Crea la tabella Iceberg su Presto

  1. Apri il client Presto e utilizza il connettore "Iceberg" per scaricare il metastore:

    --catalog iceberg --schema default
    
  2. Crea una tabella per inserire e aggiornare i dati. Di seguito è riportato un esempio.

    1. Crea una tabella denominata example nel database default:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Inserisci dati di esempio:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Leggere i dati della tabella:

      SELECT * FROM iceberg.default.example;
      
    4. Inserisci nuovi dati per controllare gli snapshot:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Visualizza gli snapshot:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Se aggiungi il comando ORDER BY committed_at DESC LIMIT 1;, puoi trovare l'ID snapshot più recente.

    6. Esegui il rollback a una versione specifica della tabella:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Sostituisci snapshot-id con la versione su cui vuoi tornare.

Passaggi successivi