Apache Iceberg-Tabellen mit Dataproc Metastore verwenden

Auf dieser Seite wird erläutert, wie Sie Apache Iceberg-Tabellen mit einem Dataproc Metastore-Dienst verwenden, der an einen Dataproc-Cluster angehängt ist. Apache Iceberg ist ein offenes Tabellenformat für große analytische Datasets.

Kompatibilitäten

Iceberg-Tabellen unterstützen die folgenden Funktionen.

Erfolgsfaktoren Auswählen Einfügen Tabelle erstellen
Spark
Hive
Presto

Hinweise

Iceberg-Tabelle mit Spark verwenden

Im folgenden Beispiel sehen Sie, wie Sie Iceberg-Tabellen mit Spark verwenden sollten.

Iceberg-Tabellen unterstützen Lese- und Schreibvorgänge. Weitere Informationen finden Sie unter Apache Iceberg – Spark.

Spark-Konfigurationen

Starten Sie zuerst die Spark-Shell und verwenden Sie einen Cloud Storage-Bucket zum Speichern von Daten. Um Iceberg in die Spark-Installation aufzunehmen, fügen Sie die JAR-Datei für die Spark-Laufzeit in den JAR-Ordner von Spark hinzu. Informationen zum Herunterladen der JAR-Datei finden Sie unter Apache Iceberg-Downloads. Mit dem folgenden Befehl wird die Spark-Shell mit Unterstützung für Apache Iceberg gestartet:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Hive-Katalog zum Erstellen von Iceberg-Tabellen verwenden

  1. Richten Sie Hive-Katalogkonfigurationen ein, um Iceberg-Tabellen in der Spark-Scala zu erstellen:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.

    1. Erstellen Sie unter der default-Datenbank eine Tabelle mit dem Namen example:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. Beispieldaten einfügen:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Geben Sie eine Partitionsstrategie basierend auf der Spalte id an:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Erstellen Sie die Tabelle:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Fügen Sie den Iceberg Storage-Handler und SerDe als Tabellenattribut hinzu:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Daten in die Tabelle schreiben:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Lesen Sie die Daten:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Ändern Sie das Tabellenschema. Folgendes ist ein Beispiel.

    1. Rufen Sie die Tabelle ab und fügen Sie die neue Spalte grade hinzu:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Prüfen Sie das neue Tabellenschema:

      table.schema.toString;
      
  4. Fügen Sie mehr Daten ein und sehen Sie sich die Schemaentwicklung an. Folgendes ist ein Beispiel.

    1. So fügen Sie der Tabelle neue Daten hinzu:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Prüfen Sie die eingefügten neuen Daten:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Rufen Sie den Tabellenverlauf auf:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Sehen Sie sich die Snapshots an:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Rufen Sie die Manifestdateien auf:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Rufen Sie die Datendateien auf:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Angenommen, Sie haben beim Hinzufügen der Zeile mit dem Wert id=6 einen Fehler gemacht und möchten zurückgehen, um eine korrekte Version der Tabelle aufzurufen:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Ersetzen Sie snapshot-id durch die Version, die Sie wiederherstellen möchten.

Hadoop Tables zum Erstellen von Iceberg-Tabellen verwenden

  1. Richten Sie Hadoop-Tabellenkonfigurationen ein, um Iceberg-Tabellen in der Spark-Scala zu erstellen:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.

    1. Erstellen Sie unter der default-Datenbank eine Tabelle mit dem Namen example:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Beispieldaten einfügen:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Geben Sie eine Partitionsstrategie basierend auf der Spalte id an:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Erstellen Sie die Tabelle:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Daten in die Tabelle schreiben:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Lesen Sie die Daten:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Ändern Sie das Tabellenschema. Folgendes ist ein Beispiel.

    1. Rufen Sie die Tabelle ab und fügen Sie die neue Spalte grade hinzu:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Prüfen Sie das neue Tabellenschema:

      table.schema.toString;
      
  4. Fügen Sie mehr Daten ein und sehen Sie sich die Schemaentwicklung an. Folgendes ist ein Beispiel.

    1. So fügen Sie der Tabelle neue Daten hinzu:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Prüfen Sie die eingefügten neuen Daten:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Rufen Sie den Tabellenverlauf auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Sehen Sie sich die Snapshots an:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Rufen Sie die Manifestdateien auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Rufen Sie die Datendateien auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Kehren Sie zurück, um eine bestimmte Version der Tabelle aufzurufen:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Ersetzen Sie snapshot-id durch die Version, zu der Sie zurückkehren möchten, und fügen Sie am Ende "L" hinzu. Beispiel: "3943776515926014142L"

Iceberg-Tabelle für Hive verwenden

Iceberg unterstützt Tabellen, die mit Hive gelesen werden, mithilfe eines StorageHandler. Beachten Sie, dass nur Hive 2.x und 3.1.2 unterstützt werden. Weitere Informationen finden Sie unter Apache Iceberg – Hive. Fügen Sie außerdem dem Hive-Klassenpfad die JAR-Datei für die Iceberg Hive-Laufzeit hinzu. Informationen zum Herunterladen der JAR-Datei finden Sie unter Apache Iceberg-Downloads.

Um eine Hive-Tabelle über eine Iceberg-Tabelle zu legen, müssen Sie die Eisberg-Tabelle mit einem Hive-Katalog oder einer Hadoop-Tabelle erstellen. Darüber hinaus müssen Sie Hive entsprechend konfigurieren, damit Daten aus der Iceberg-Tabelle gelesen werden können.

Iceberg-Tabelle (Hive-Katalog) in Hive lesen

  1. Öffnen Sie den Hive-Client und richten Sie Konfigurationen zum Lesen von Iceberg-Tabellen in der Hive-Clientsitzung ein:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Tabellenschema und Daten lesen. Folgendes ist ein Beispiel.

    1. Prüfen Sie das Tabellenschema und prüfen Sie, ob das Tabellenformat Iceberg ist:

      describe formatted example;
      
    2. Lesen Sie die Daten aus der Tabelle:

      select * from example;
      

Iceberg-Tabelle (Hadoop-Tabelle) in Hive lesen

  1. Öffnen Sie den Hive-Client und richten Sie Konfigurationen zum Lesen von Iceberg-Tabellen in der Hive-Clientsitzung ein:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Tabellenschema und Daten lesen. Folgendes ist ein Beispiel.

    1. Erstellen Sie eine externe Tabelle (legen Sie eine Hive-Tabelle über die Iceberg-Tabelle):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Prüfen Sie das Tabellenschema und prüfen Sie, ob das Tabellenformat Iceberg ist:

      describe formatted hadoop_table;
      
    3. Lesen Sie die Daten aus der Tabelle:

      select * from hadoop_table;
      

Iceberg-Tabelle in Presto verwenden

Presto-Abfragen verwenden den Hive-Connector zum Abrufen von Partitionsspeicherorten. Daher müssen Sie Presto entsprechend konfigurieren, um Daten in die Iceberg-Tabelle zu lesen und zu schreiben. Weitere Informationen finden Sie unter Presto/Trino – Hive Connector und Presto/Trino – Iceberg Connector.

Presto-Konfigurationen

  1. Erstellen Sie unter jedem Dataproc-Clusterknoten eine Datei mit dem Namen iceberg.properties /etc/presto/conf/catalog/iceberg.properties und konfigurieren Sie hive.metastore.uri so:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Ersetzen Sie example.net:9083 durch den korrekten Host und Port für den Thrift-Dienst von Hive-Metastore.

  2. Starten Sie den Presto-Dienst neu, um die Konfigurationen zu übertragen:

    sudo systemctl restart presto.service
    

Iceberg-Tabelle in Presto erstellen

  1. Öffnen Sie den Presto-Client und verwenden Sie den Connector „Iceberg“, um den Metastore abzurufen:

    --catalog iceberg --schema default
    
  2. Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.

    1. Erstellen Sie unter der default-Datenbank eine Tabelle mit dem Namen example:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Beispieldaten einfügen:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Daten aus der Tabelle lesen:

      SELECT * FROM iceberg.default.example;
      
    4. Fügen Sie weitere neue Daten ein, um Snapshots zu prüfen:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Sehen Sie sich die Snapshots an:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Mit dem Befehl ORDER BY committed_at DESC LIMIT 1; können Sie die neueste Snapshot-ID ermitteln.

    6. So führen Sie ein Rollback auf eine bestimmte Version der Tabelle durch:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Ersetzen Sie snapshot-id durch die Version, die Sie wiederherstellen möchten.

Nächste Schritte