Apache Iceberg-Tabellen mit Dataproc Metastore verwenden

Auf dieser Seite wird erläutert, wie Sie Apache Iceberg-Tabellen mit einem Dataproc Metastore-Dienst verwenden, der mit einem Dataproc-Cluster verbunden ist. Apache Iceberg ist ein offenes Tabellenformat für große analytische Datensätze.

Kompatibilitäten

Iceberg-Tabellen unterstützen die folgenden Funktionen:

Erfolgsfaktoren Auswählen Einfügen Tabelle erstellen
Spark
Hive
Presto

Hinweise

Iceberg-Tabelle mit Spark verwenden

Im folgenden Beispiel wird gezeigt, wie Sie Eisbergtabellen mit Spark verwenden sollten.

Iceberg-Tabellen unterstützen Lese- und Schreibvorgänge. Weitere Informationen finden Sie unter Apache Iceberg – Spark.

Spark-Konfigurationen

Starten Sie zuerst die Spark-Shell und verwenden Sie einen Cloud Storage-Bucket, um Daten zu speichern. Wenn Sie Iceberg in die Spark-Installation aufnehmen möchten, fügen Sie die JAR-Datei der Iceberg-Spark-Laufzeit dem JARs-Ordner von Spark hinzu. Informationen zum Herunterladen der JAR-Datei finden Sie unter Apache Iceberg-Downloads. Mit dem folgenden Befehl wird die Spark-Shell mit Unterstützung für Apache Iceberg gestartet:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Iceberg-Tabellen mit dem Hive-Katalog erstellen

  1. Richten Sie Hive-Katalogkonfigurationen ein, um Iceberg-Tabellen in Spark Scala zu erstellen:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.

    1. Erstellen Sie unter der default-Datenbank eine Tabelle mit dem Namen example:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. Beispieldaten einfügen:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Partitionsstrategie basierend auf Spalte id angeben:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Erstellen Sie die Tabelle:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Fügen Sie den Iceberg Storage Handler und SerDe als Tabelleneigenschaft hinzu:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Schreiben Sie die Daten in die Tabelle:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Daten lesen:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Ändern Sie das Tabellenschema. Folgendes ist ein Beispiel.

    1. Rufen Sie die Tabelle auf und fügen Sie eine neue Spalte grade hinzu:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Prüfen Sie das neue Tabellenschema:

      table.schema.toString;
      
  4. Fügen Sie weitere Daten ein und sehen Sie sich die Schemaentwicklung an. Folgendes ist ein Beispiel.

    1. Fügen Sie der Tabelle neue Daten hinzu:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Prüfen Sie die eingefügten neuen Daten:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Rufen Sie den Tabellenverlauf auf:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Snapshots ansehen:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Rufen Sie die Manifestdateien auf:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Rufen Sie die Datendateien auf:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Angenommen, Sie haben beim Hinzufügen der Zeile mit dem Wert id=6 einen Fehler gemacht und möchten zurückgehen, um eine korrekte Version der Tabelle aufzurufen:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Ersetzen Sie snapshot-id durch die Version, zu der Sie zurückkehren möchten.

Iceberg-Tabellen mit Hadoop-Tabellen erstellen

  1. Richten Sie Hadoop-Tabellenkonfigurationen ein, um Iceberg-Tabellen in Spark Scala zu erstellen:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.

    1. Erstellen Sie unter der default-Datenbank eine Tabelle mit dem Namen example:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Beispieldaten einfügen:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Partitionsstrategie basierend auf Spalte id angeben:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Erstellen Sie die Tabelle:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Schreiben Sie die Daten in die Tabelle:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Daten lesen:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Ändern Sie das Tabellenschema. Folgendes ist ein Beispiel.

    1. Rufen Sie die Tabelle auf und fügen Sie eine neue Spalte grade hinzu:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Prüfen Sie das neue Tabellenschema:

      table.schema.toString;
      
  4. Fügen Sie weitere Daten ein und sehen Sie sich die Schemaentwicklung an. Folgendes ist ein Beispiel.

    1. Fügen Sie der Tabelle neue Daten hinzu:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Prüfen Sie die eingefügten neuen Daten:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Rufen Sie den Tabellenverlauf auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Snapshots ansehen:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Rufen Sie die Manifestdateien auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Rufen Sie die Datendateien auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Kehren Sie zurück, um eine bestimmte Version der Tabelle aufzurufen:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Ersetzen Sie snapshot-id durch die Version, zu der Sie zurückkehren möchten, und fügen Sie am Ende "L" hinzu. Beispiel: "3943776515926014142L"

Iceberg-Tabelle in Hive verwenden

Iceberg unterstützt Tabellen, die mit Hive gelesen werden, mithilfe einer StorageHandler. Beachten Sie, dass nur Hive 2.x und 3.1.2 unterstützt werden. Weitere Informationen finden Sie unter Apache Iceberg – Hive. Fügen Sie außerdem die JAR-Datei der Iceberg Hive Runtime zum Hive-Klassenpfad hinzu. Informationen zum Herunterladen der JAR-Datei finden Sie unter Apache Iceberg-Downloads.

Wenn Sie eine Hive-Tabelle über eine Iceberg-Tabelle legen möchten, müssen Sie die Iceberg-Tabelle entweder mit einem Hive-Katalog oder einer Hadoop-Tabelle erstellen. Außerdem müssen Sie Hive so konfigurieren, dass Daten aus der Iceberg-Tabelle gelesen werden können.

Iceberg-Tabelle (Hive-Katalog) in Hive lesen

  1. Öffnen Sie den Hive-Client und richten Sie Konfigurationen zum Lesen von Iceberg-Tabellen in der Hive-Clientsitzung ein:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Tabellenschema und -daten lesen Folgendes ist ein Beispiel.

    1. Prüfen Sie das Tabellenschema und ob das Tabellenformat „Iceberg“ ist:

      describe formatted example;
      
    2. Daten aus der Tabelle lesen:

      select * from example;
      

Iceberg-Tabelle (Hadoop-Tabelle) in Hive lesen

  1. Öffnen Sie den Hive-Client und richten Sie Konfigurationen zum Lesen von Iceberg-Tabellen in der Hive-Clientsitzung ein:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Tabellenschema und -daten lesen Folgendes ist ein Beispiel.

    1. Erstellen Sie eine externe Tabelle (legen Sie eine Hive-Tabelle über die Iceberg-Tabelle):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Prüfen Sie das Tabellenschema und ob das Tabellenformat „Iceberg“ ist:

      describe formatted hadoop_table;
      
    3. Daten aus der Tabelle lesen:

      select * from hadoop_table;
      

Iceberg-Tabelle in Presto verwenden

Bei Presto-Abfragen wird der Hive-Connector verwendet, um Partitionsspeicherorte abzurufen. Sie müssen Presto daher entsprechend konfigurieren, um Daten in der Iceberg-Tabelle zu lesen und zu schreiben. Weitere Informationen finden Sie unter Presto/Trino – Hive-Connector und Presto/Trino – Iceberg-Connector.

Presto-Konfigurationen

  1. Erstellen Sie unter jedem Dataproc-Clusterknoten eine Datei mit dem Namen iceberg.properties /etc/presto/conf/catalog/iceberg.properties und konfigurieren Sie die hive.metastore.uri so:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Ersetzen Sie example.net:9083 durch den korrekten Host und Port für den Thrift-Dienst von Hive-Metastore.

  2. Starten Sie den Presto-Dienst neu, um die Konfigurationen zu pushen:

    sudo systemctl restart presto.service
    

Iceberg-Tabelle in Presto erstellen

  1. Öffnen Sie den Presto-Client und verwenden Sie den Connector „Iceberg“, um den Metastore abzurufen:

    --catalog iceberg --schema default
    
  2. Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.

    1. Erstellen Sie unter der default-Datenbank eine Tabelle mit dem Namen example:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Beispieldaten einfügen:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Daten aus der Tabelle lesen:

      SELECT * FROM iceberg.default.example;
      
    4. Fügen Sie weitere neue Daten ein, um Snapshots zu prüfen:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Snapshots ansehen:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Wenn Sie den Befehl ORDER BY committed_at DESC LIMIT 1; hinzufügen, können Sie die ID des letzten Snapshots ermitteln.

    6. So führen Sie ein Rollback auf eine bestimmte Version der Tabelle durch:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Ersetzen Sie snapshot-id durch die Version, zu der Sie zurückkehren möchten.

Nächste Schritte