Apache Iceberg-Tabellen mit Dataproc Metastore verwenden

Auf dieser Seite wird erläutert, wie Sie Apache Iceberg-Tabellen mit einem Dataproc Metastore-Dienst verwenden, der mit einem Dataproc-Cluster verbunden ist. Apache Iceberg ist ein offenes Tabellenformat für große analytische Datensätze.

Kompatibilitäten

Iceberg-Tabellen unterstützen die folgenden Funktionen.

Erfolgsfaktoren Auswählen Einfügen Tabelle erstellen
Spark
Hive
Presto

Hinweise

Iceberg-Tabelle mit Spark verwenden

Im folgenden Beispiel wird gezeigt, wie Sie Eisbergtabellen mit Spark verwenden sollten.

Eisbergtabellen unterstützen Lese- und Schreibvorgänge. Weitere Informationen finden Sie unter Apache Iceberg – Spark

Spark-Konfigurationen

Starten Sie zuerst die Spark-Shell und verwenden Sie einen Cloud Storage-Bucket, um Daten zu speichern. Wenn Sie Iceberg in die Spark-Installation aufnehmen möchten, fügen Sie die JAR-Datei der Iceberg-Spark-Laufzeit dem JARs-Ordner von Spark hinzu. Informationen zum Herunterladen der JAR-Datei finden Sie unter Apache Iceberg-Downloads. Mit dem folgenden Befehl wird die Spark-Shell mit Unterstützung für Apache Iceberg gestartet:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Hive-Katalog zum Erstellen von Iceberg-Tabellen verwenden

  1. Richten Sie Hive-Katalogkonfigurationen ein, um Iceberg-Tabellen in der Spark-Scala zu erstellen:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.

    1. Erstellen Sie unter der default-Datenbank eine Tabelle mit dem Namen example:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. Beispieldaten einfügen:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Partitionsstrategie basierend auf Spalte id angeben:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Erstellen Sie die Tabelle:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Fügen Sie den Iceberg Storage Handler und SerDe als Tabelleneigenschaft hinzu:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Schreiben Sie die Daten in die Tabelle:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Lesen Sie die Daten:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Ändern Sie das Tabellenschema. Folgendes ist ein Beispiel.

    1. Rufen Sie die Tabelle ab und fügen Sie eine neue Spalte grade hinzu:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Prüfen Sie das neue Tabellenschema:

      table.schema.toString;
      
  4. Fügen Sie weitere Daten ein und sehen Sie sich die Schemaentwicklung an. Folgendes ist ein Beispiel.

    1. Fügen Sie der Tabelle neue Daten hinzu:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Prüfen Sie die eingefügten neuen Daten:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Rufen Sie den Tabellenverlauf auf:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Sehen Sie sich die Snapshots an:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Rufen Sie die Manifestdateien auf:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Rufen Sie die Datendateien auf:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Angenommen, Sie haben beim Hinzufügen der Zeile mit dem Wert id=6 einen Fehler gemacht und möchten zurückgehen, um eine korrekte Version der Tabelle aufzurufen:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Ersetzen Sie snapshot-id durch die Version, zu der Sie zurückkehren möchten.

Iceberg-Tabellen mit Hadoop-Tabellen erstellen

  1. Richten Sie Hadoop-Tabellenkonfigurationen ein, um Iceberg-Tabellen in der Spark-Scala zu erstellen:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.

    1. Erstellen Sie unter der default-Datenbank eine Tabelle mit dem Namen example:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Beispieldaten einfügen:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Partitionsstrategie basierend auf Spalte id angeben:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Erstellen Sie die Tabelle:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Schreiben Sie die Daten in die Tabelle:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Lesen Sie die Daten:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Ändern Sie das Tabellenschema. Folgendes ist ein Beispiel.

    1. Rufen Sie die Tabelle ab und fügen Sie eine neue Spalte grade hinzu:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Prüfen Sie das neue Tabellenschema:

      table.schema.toString;
      
  4. Fügen Sie weitere Daten ein und sehen Sie sich die Schemaentwicklung an. Folgendes ist ein Beispiel.

    1. Fügen Sie der Tabelle neue Daten hinzu:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Prüfen Sie die eingefügten neuen Daten:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Rufen Sie den Tabellenverlauf auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Sehen Sie sich die Snapshots an:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Rufen Sie die Manifestdateien auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Rufen Sie die Datendateien auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Kehren Sie zurück, um eine bestimmte Version der Tabelle aufzurufen:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Ersetzen Sie snapshot-id durch die Version, zu der Sie zurückkehren möchten, und fügen Sie "L" hinzu bis zum Ende. Beispiel: "3943776515926014142L"

Iceberg-Tabelle in Hive verwenden

Iceberg unterstützt Tabellen, die mit Hive gelesen werden, mithilfe einer StorageHandler. Hinweis dass nur die Versionen Hive 2.x und 3.1.2 unterstützt werden. Weitere Informationen finden Sie unter Apache Iceberg – Hive. In Fügen Sie dem Hive-Klassenpfad außerdem die JAR-Datei für die Iceberg Hive-Laufzeit hinzu. Informationen zum Herunterladen der JAR-Datei finden Sie unter Apache Iceberg-Downloads.

Um eine Hive-Tabelle über eine Iceberg-Tabelle zu legen, müssen Sie Iceberg-Tabelle mit einem Hive-Katalog- oder einer Hadoop-Tabelle Darüber hinaus können Sie muss Hive entsprechend konfigurieren, um Daten aus der Iceberg-Tabelle zu lesen.

Iceberg-Tabelle (Hive-Katalog) in Hive lesen

  1. Öffnen Sie den Hive-Client und richten Sie Konfigurationen zum Lesen von Iceberg-Tabellen in der Hive-Clientsitzung ein:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Tabellenschema und -daten lesen Folgendes ist ein Beispiel.

    1. Prüfen Sie das Tabellenschema und ob das Tabellenformat Iceberg ist:

      describe formatted example;
      
    2. Lesen Sie die Daten aus der Tabelle:

      select * from example;
      

Iceberg-Tabelle (Hadoop-Tabelle) in Hive lesen

  1. Öffnen Sie den Hive-Client und richten Sie Konfigurationen zum Lesen von Iceberg-Tabellen in der Hive-Clientsitzung ein:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Tabellenschema und Daten lesen. Folgendes ist ein Beispiel.

    1. Erstellen Sie eine externe Tabelle (legen Sie eine Hive-Tabelle über die Iceberg-Tabelle):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Prüfen Sie das Tabellenschema und ob das Tabellenformat Iceberg ist:

      describe formatted hadoop_table;
      
    3. Daten aus der Tabelle lesen:

      select * from hadoop_table;
      

Iceberg-Tabelle in Presto verwenden

Presto-Abfragen verwenden den Hive-Connector zum Abrufen von Partitionsstandorten. Daher müssen Sie konfigurieren Sie Presto so, dass Daten aus der Iceberg-Tabelle gelesen und geschrieben werden. Weitere Informationen finden Sie unter Presto/Trino – Hive-Connector und Presto/Trino – Iceberg-Connector.

Presto-Konfigurationen

  1. Erstellen Sie unter jedem Dataproc-Clusterknoten eine Datei mit dem Namen iceberg.properties /etc/presto/conf/catalog/iceberg.properties und Konfigurieren Sie hive.metastore.uri so:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Ersetzen Sie example.net:9083 durch den korrekten Host und Port für den Thrift-Dienst von Hive-Metastore.

  2. Starten Sie den Presto-Dienst neu, um die Konfigurationen per Push zu übertragen:

    sudo systemctl restart presto.service
    

Iceberg-Tabelle in Presto erstellen

  1. Öffnen Sie den Presto-Client und verwenden Sie den Connector „Iceberg“, um den Metastore abzurufen:

    --catalog iceberg --schema default
    
  2. Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.

    1. Erstellen Sie unter der default-Datenbank eine Tabelle mit dem Namen example:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Beispieldaten einfügen:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Daten aus der Tabelle lesen:

      SELECT * FROM iceberg.default.example;
      
    4. Fügen Sie weitere neue Daten ein, um Snapshots zu prüfen:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Snapshots ansehen:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Durch Hinzufügen des Befehls ORDER BY committed_at DESC LIMIT 1; können Sie die aktuelle Snapshot-ID ermitteln.

    6. Führen Sie ein Rollback auf eine bestimmte Version der Tabelle durch:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Ersetzen Sie snapshot-id durch die Version, zu der Sie zurückkehren möchten.

Nächste Schritte