Apache Iceberg-Tabellen mit Dataproc Metastore verwenden

Auf dieser Seite wird erläutert, wie Sie Apache Iceberg-Tabellen mit einem Dataproc Metastore-Dienst verwenden, der an einen Dataproc-Cluster angehängt ist. Apache Iceberg ist ein offenes Tabellenformat für große analytische Datasets.

Kompatibilitäten

Iceberg-Tabellen unterstützen die folgenden Funktionen.

Erfolgsfaktoren Auswählen Einfügen Tabelle erstellen
Spark
Hive
Presto

Hinweise

Iceberg-Tabelle mit Spark verwenden

Im folgenden Beispiel wird gezeigt, wie Iceberg-Tabellen mit Spark verwendet werden.

Iceberg-Tabellen unterstützen Lese- und Schreibvorgänge. Weitere Informationen finden Sie unter Apache Iceberg – Spark.

Spark-Konfigurationen

Starten Sie zuerst die Spark-Shell und verwenden Sie einen Cloud Storage-Bucket zum Speichern von Daten. Wenn Sie Iceberg in die Spark-Installation einbeziehen möchten, fügen Sie die Iceberg Spark Runtime-JAR-Datei dem JAR-Ordner von Spark hinzu. Informationen zum Herunterladen der JAR-Datei finden Sie unter Apache Iceberg-Downloads. Mit dem folgenden Befehl wird die Spark-Shell mit Unterstützung für Apache Iceberg gestartet:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Iceberg-Tabellen mit dem Hive-Katalog erstellen

  1. Richten Sie Hive-Katalogkonfigurationen ein, um Iceberg-Tabellen in Spark Scala zu erstellen:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.

    1. Erstellen Sie unter der default-Datenbank eine Tabelle mit dem Namen example:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. Beispieldaten einfügen:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Partitionsstrategie basierend auf Spalte id angeben:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Erstellen Sie die Tabelle:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Fügen Sie den Iceberg-Speicher-Handler und SerDe als Tabelleneigenschaft hinzu:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Schreiben Sie die Daten in die Tabelle:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Daten lesen:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Ändern Sie das Tabellenschema. Folgendes ist ein Beispiel.

    1. Rufen Sie die Tabelle ab und fügen Sie eine neue Spalte grade hinzu:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Sehen Sie sich das neue Tabellenschema an:

      table.schema.toString;
      
  4. Fügen Sie weitere Daten ein und sehen Sie sich die Schemaentwicklung an. Folgendes ist ein Beispiel.

    1. Fügen Sie der Tabelle neue Daten hinzu:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Prüfen Sie die eingefügten neuen Daten:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Rufen Sie den Tabellenverlauf auf:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. So rufen Sie die Snapshots auf:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Rufen Sie die Manifestdateien auf:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Rufen Sie die Datendateien auf:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Angenommen, Sie haben beim Hinzufügen der Zeile mit dem Wert id=6 einen Fehler gemacht und möchten zurückgehen, um eine korrekte Version der Tabelle aufzurufen:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Ersetzen Sie snapshot-id durch die Version, zu der Sie zurückkehren möchten.

Iceberg-Tabellen mit Hadoop-Tabellen erstellen

  1. Richten Sie Hadoop-Tabellenkonfigurationen ein, um Iceberg-Tabellen in Spark Scala zu erstellen:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.

    1. Erstellen Sie unter der default-Datenbank eine Tabelle mit dem Namen example:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Beispieldaten einfügen:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Partitionsstrategie basierend auf Spalte id angeben:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Erstellen Sie die Tabelle:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Schreiben Sie die Daten in die Tabelle:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Daten lesen:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Ändern Sie das Tabellenschema. Folgendes ist ein Beispiel.

    1. Rufen Sie die Tabelle ab und fügen Sie eine neue Spalte grade hinzu:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Sehen Sie sich das neue Tabellenschema an:

      table.schema.toString;
      
  4. Fügen Sie weitere Daten ein und sehen Sie sich die Schemaentwicklung an. Folgendes ist ein Beispiel.

    1. Fügen Sie der Tabelle neue Daten hinzu:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Prüfen Sie die eingefügten neuen Daten:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Rufen Sie den Tabellenverlauf auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. So rufen Sie die Snapshots auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Rufen Sie die Manifestdateien auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Rufen Sie die Datendateien auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Kehren Sie zurück, um eine bestimmte Version der Tabelle aufzurufen:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Ersetzen Sie snapshot-id durch die Version, zu der Sie zurückkehren möchten, und fügen Sie "L" am Ende hinzu. Beispiel: "3943776515926014142L"

Iceberg-Tabelle in Hive verwenden

Iceberg unterstützt Tabellen, die mit Hive gelesen werden, mithilfe eines StorageHandler. Beachten Sie, dass nur Hive-Versionen 2.x und 3.1.2 unterstützt werden. Weitere Informationen finden Sie unter Apache Iceberg – Hive. Fügen Sie außerdem die Iceberg Hive Runtime-JAR-Datei dem Hive-Klassenpfad hinzu. Informationen zum Herunterladen der JAR-Datei finden Sie unter Apache Iceberg-Downloads.

Wenn Sie eine Hive-Tabelle über eine Iceberg-Tabelle legen möchten, müssen Sie die Iceberg-Tabelle entweder mit einem Hive-Katalog oder einer Hadoop-Tabelle erstellen. Außerdem müssen Sie Hive entsprechend konfigurieren, um Daten aus der Iceberg-Tabelle lesen zu können.

Iceberg-Tabelle (Hive-Katalog) in Hive lesen

  1. Öffnen Sie den Hive-Client und richten Sie Konfigurationen zum Lesen von Iceberg-Tabellen in der Hive-Clientsitzung ein:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Tabellenschema und -daten lesen. Folgendes ist ein Beispiel.

    1. Prüfen Sie das Tabellenschema und ob das Tabellenformat Iceberg ist:

      describe formatted example;
      
    2. Daten aus der Tabelle lesen:

      select * from example;
      

Iceberg-Tabelle (Hadoop-Tabelle) in Hive lesen

  1. Öffnen Sie den Hive-Client und richten Sie Konfigurationen zum Lesen von Iceberg-Tabellen in der Hive-Clientsitzung ein:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Tabellenschema und -daten lesen. Folgendes ist ein Beispiel.

    1. Erstellen Sie eine externe Tabelle (legen Sie eine Hive-Tabelle über die Iceberg-Tabelle):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Prüfen Sie das Tabellenschema und ob das Tabellenformat Iceberg ist:

      describe formatted hadoop_table;
      
    3. Daten aus der Tabelle lesen:

      select * from hadoop_table;
      

Iceberg-Tabelle in Presto verwenden

Presto-Abfragen verwenden den Hive-Connector, um Partitionsstandorte abzurufen. Sie müssen Presto also entsprechend konfigurieren, um Daten in die Iceberg-Tabelle zu lesen und zu schreiben. Weitere Informationen finden Sie unter Presto/Trino – Hive Connector und Presto/Trino – Iceberg Connector.

Presto-Konfigurationen

  1. Erstellen Sie auf jedem Dataproc-Clusterknoten eine Datei mit dem Namen iceberg.properties /etc/presto/conf/catalog/iceberg.properties und konfigurieren Sie hive.metastore.uri so:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Ersetzen Sie example.net:9083 durch den korrekten Host und Port für den Thrift-Dienst von Hive-Metastore.

  2. Starten Sie den Presto-Dienst neu, um die Konfigurationen zu übertragen:

    sudo systemctl restart presto.service
    

Iceberg-Tabelle in Presto erstellen

  1. Öffnen Sie den Presto-Client und verwenden Sie den Connector „Iceberg“, um den Metastore abzurufen:

    --catalog iceberg --schema default
    
  2. Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.

    1. Erstellen Sie unter der default-Datenbank eine Tabelle mit dem Namen example:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Beispieldaten einfügen:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Daten aus der Tabelle lesen:

      SELECT * FROM iceberg.default.example;
      
    4. Fügen Sie weitere neue Daten ein, um Snapshots zu prüfen:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. So rufen Sie die Snapshots auf:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Wenn Sie den Befehl ORDER BY committed_at DESC LIMIT 1; hinzufügen, können Sie die ID des letzten Snapshots finden.

    6. So führen Sie einen Rollback zu einer bestimmten Version der Tabelle durch:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Ersetzen Sie snapshot-id durch die Version, zu der Sie zurückkehren möchten.

Nächste Schritte