Iceberg-Unterstützung in Dataproc Metastore

Auf dieser Seite wird erläutert, wie Sie Apache Iceberg in Dataproc verwenden. Dazu hosten Sie den Hive-Metaspeicher im Dataproc Metastore. Er enthält Informationen zur Verwendung der Iceberg-Tabelle über Spark, Hive und Presto.

Features

Apache Iceberg ist ein offenes Tabellenformat für große analytische Datasets. Iceberg verbessert die Leistung erheblich und bietet folgende erweiterte Features:

  • Atomarität: Tabellenänderungen werden entweder abgeschlossen oder fehlschlagen. Tabellenänderungen werden nicht teilweise übernommen.

  • Snapshot-Isolation: Lesevorgänge verwenden nur einen Snapshot einer Tabelle, ohne eine Sperre beizubehalten.

  • Mehrere gleichzeitige Autoren: Verwendet optimistische Gleichzeitigkeit und Wiederholungsversuche, um sicherzustellen, dass kompatible Aktualisierungen auch bei Schreibkonflikten erfolgreich sind.

  • Schema Evolution: Spalten werden nach ID verfolgt, um das Hinzufügen, Löschen, Aktualisieren und Umbenennen zu unterstützen.

  • Zeitreisen: Reproduzierbare Abfragen können dieselbe Tabelle oder denselben Snapshot verwenden. können Sie Änderungen problemlos untersuchen.

  • Verteilte Planung: Die Beschneidung und das Prädikat-Push-down werden an Jobs verteilt und der Metaspeicher als Engpass entfernt.

  • Versionsverlauf und Rollback: Probleme werden dadurch behoben, dass Tabellen auf einen vorherigen Status zurückgesetzt werden.

  • Ausgeblendete Partitionierung: Verhindert Nutzerfehler, die unbemerkt falsche Ergebnisse oder extrem langsame Abfragen verursachen.

  • Partitionslayout Evolution: kann das Layout einer Tabelle aktualisieren, wenn sich Daten- oder Abfragemuster ändern.

  • Planplanung und Dateifilterung: Ermittelt die für eine Abfrage erforderlichen Dateien, indem nicht benötigte Metadatendateien und Datendateien mit übereinstimmenden Daten bereinigt werden.

Kompatibilitäten

Iceberg funktioniert gut mit Dataproc und Dataproc Metastore. Sie können Tabellen und Tabellen mit einem leistungsstarken Format in Spark und Presto einfügen, die wie eine SQL-Tabelle funktionieren. Iceberg verwendet einen Zeiger auf die neueste Version eines Snapshots und benötigt einen Mechanismus, um bei dem Versionswechsel die Unteilbarkeit zu gewährleisten. Es bietet zwei Optionen: Hive-Kataloge und Hadoop-Tabellen.

Zu den unterstützten Funktionen gehören:

Erfolgsfaktoren Auswählen Einfügen Tabelle erstellen
spark ✓ ✓ ✓
Hive ✓ ✓
Presto ✓ ✓ ✓

Vorbereitung

Erstellen Sie zuerst einen Dataproc-Cluster und verwenden Sie den Dataproc Metastore-Dienst als Hive-Metaspeicher. Weitere Informationen finden Sie unter Dataproc-Cluster erstellen. Nachdem Sie den Cluster erstellt haben, stellen Sie eine SSH-Verbindung zum Cluster über einen Browser oder die Befehlszeile her.

Iceberg Table mit Spark verwenden

Iceberg-Tabellen unterstützen Lese- und Schreibvorgänge. Weitere Informationen finden Sie unter Apache Iceberg – Spark.

Spark-Konfigurationen

Starten Sie zuerst die Spark-Shell und verwenden Sie einen Cloud Storage-Bucket zum Speichern der Daten. Damit Iceberg in die Spark-Installation aufgenommen wird, fügen Sie die Iceberg-Spark-Laufzeit-JAR-Datei zum Spark-Ordner JARs hinzu. Informationen zum Herunterladen der JAR-Datei finden Sie unter Apache Iceberg-Downloads. Mit dem folgenden Befehl wird die Spark-Shell mit Unterstützung für Apache Iceberg gestartet:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Hive-Tabellen zum Erstellen von Iceberg-Tabellen verwenden

  1. Richten Sie Hive-Katalog-Konfigurationen ein, um Iceberg-Tabellen in der Spark-Skala zu erstellen:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Erstellen Sie eine Tabelle, um Daten einzufügen und zu aktualisieren. Folgendes ist ein Beispiel.

    1. Erstellen Sie eine Tabelle mit dem Namen example in der Datenbank default:

      val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration);
      val name = TableIdentifier.of("default","example");
      
    2. Beispieldaten einfügen:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Geben Sie die Partitionsstrategie basierend auf der Spalte id an:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Erstellen Sie die Tabelle:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Fügen Sie den Iceberg Storage-Handler und SerDe als Tabellenattribut hinzu:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Schreiben Sie die Daten in die Tabelle:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Lesen der Daten:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Ändern Sie das Tabellenschema. Folgendes ist ein Beispiel.

    1. Rufen Sie die Tabelle ab und fügen Sie eine neue Spalte grade hinzu:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Prüfen Sie das neue Tabellenschema:

      table.schema.toString;
      
  4. Fügen Sie weitere Daten ein und sehen Sie sich die Schemaentwicklung an. Folgendes ist ein Beispiel.

    1. Fügen Sie der Tabelle neue Daten hinzu:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Prüfen Sie die eingefügten neuen Daten:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. So rufen Sie den Tabellenverlauf auf:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Rufen Sie die Snapshots auf:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Rufen Sie die Manifestdateien auf:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Rufen Sie die Datendateien auf:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Angenommen, Sie haben einen Fehler gemacht, indem Sie die Zeile mit dem Wert id=6 eingefügt haben und zurück zur richtigen Version der Tabelle wechseln möchten:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Ersetzen Sie snapshot-id durch die Version, die Sie wiederherstellen möchten.

Iceberg-Tabellen in Hadoop-Tabellen erstellen

  1. Richten Sie Hadoop-Tabellenkonfigurationen ein, um Iceberg-Tabellen in der Spark-Skala zu erstellen:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Erstellen Sie eine Tabelle, um Daten einzufügen und zu aktualisieren. Folgendes ist ein Beispiel.

    1. Erstellen Sie eine Tabelle mit dem Namen example in der Datenbank default:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Beispieldaten einfügen:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Geben Sie die Partitionsstrategie basierend auf der Spalte id an:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Erstellen Sie die Tabelle:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Schreiben Sie die Daten in die Tabelle:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Lesen der Daten:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Ändern Sie das Tabellenschema. Folgendes ist ein Beispiel.

    1. Rufen Sie die Tabelle ab und fügen Sie eine neue Spalte grade hinzu:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Prüfen Sie das neue Tabellenschema:

      table.schema.toString;
      
  4. Fügen Sie weitere Daten ein und sehen Sie sich die Schemaentwicklung an. Folgendes ist ein Beispiel.

    1. Fügen Sie der Tabelle neue Daten hinzu:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Prüfen Sie die eingefügten neuen Daten:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. So rufen Sie den Tabellenverlauf auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Rufen Sie die Snapshots auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Rufen Sie die Manifestdateien auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Rufen Sie die Datendateien auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Gehen Sie zurück, um eine bestimmte Version der Tabelle zu sehen:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Ersetzen Sie snapshot-id durch die Version, zu der Sie zurückkehren möchten, und fügen Sie am Ende "L" hinzu. Beispiel: "3943776515926014142L"

Iceberg Table auf Hive verwenden

Iceberg unterstützt Tabellen, die über Hive gelesen wurden, indem ein StorageHandler verwendet wird. Beachten Sie, dass nur Hive 2.x- und 3.1.2-Versionen unterstützt werden. Weitere Informationen finden Sie unter Apache Iceberg – Hive. Fügen Sie außerdem die Hive-Laufzeit-JAR-Datei von Iceberg zum Hive-Klassenpfad hinzu. Informationen zum Herunterladen der JAR-Datei finden Sie unter Apache Iceberg-Downloads.

Wenn Sie eine Hive-Tabelle über einer Iceberg-Tabelle einblenden möchten, müssen Sie die Iceberg-Tabelle entweder mit einem Hive-Katalog oder einer Hadoop-Tabelle erstellen. Darüber hinaus müssen Sie Hive entsprechend konfigurieren, um Daten aus der Iceberg-Tabelle zu lesen.

Iceberg-Tabelle (Hive-Katalog) mit Hive lesen

  1. Öffnen Sie den Hive-Client und richten Sie Konfigurationen zum Lesen von Iceberg Tables in Hive-Clientsitzung ein:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Tabellenschema und Daten lesen. Folgendes ist ein Beispiel.

    1. Überprüfen Sie das Tabellenschema und prüfen Sie, ob das Tabellenformat Iceberg ist:

      describe formatted example;
      
    2. Lesen Sie die Daten aus der Tabelle:

      select * from example;
      

Iceberg-Tabelle (Hadoop-Tabelle) in Hive lesen

  1. Öffnen Sie den Hive-Client und richten Sie Konfigurationen zum Lesen von Iceberg Tables in Hive-Clientsitzung ein:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Tabellenschema und Daten lesen. Folgendes ist ein Beispiel.

    1. Erstellen Sie eine externe Tabelle (überlegen Sie eine Hive-Tabelle über der Iceberg-Tabelle):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Überprüfen Sie das Tabellenschema und prüfen Sie, ob das Tabellenformat Iceberg ist:

      describe formatted hadoop_table;
      
    3. Lesen Sie die Daten aus der Tabelle:

      select * from hadoop_table;
      

Iceberg Table auf Presto verwenden

Presto-Abfragen verwenden den Hive-Connector, um Partitionsstandorte abzurufen. Daher müssen Sie Presto entsprechend konfigurieren, um Daten in der Iceberg-Tabelle zu lesen und zu schreiben. Weitere Informationen finden Sie unter Presto/Trino – Hive Connector und Presto/Trino – Iceberg Connector.

Presto-Konfigurationen

  1. Erstellen Sie unter jedem Dataproc-Clusterknoten eine Datei mit dem Namen iceberg.properties /etc/presto/conf/catalog/iceberg.properties und konfigurieren Sie hive.metastore.uri so:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Ersetzen Sie example.net:9083 durch den richtigen Host und den richtigen Port für den Hive-Metaspeicher-Trift-Dienst.

  2. Starten Sie den Presto-Dienst neu, um die Konfigurationen zu übertragen:

    sudo systemctl restart presto.service
    

Iceberg-Tabelle in Presto erstellen

  1. Öffnen Sie den Presto-Client und verwenden Sie den "Iceberg"-Connector, um den Metaspeicher abzurufen:

    --catalog iceberg --schema default
    
  2. Erstellen Sie eine Tabelle, um Daten einzufügen und zu aktualisieren. Folgendes ist ein Beispiel.

    1. Erstellen Sie eine Tabelle mit dem Namen example in der Datenbank default:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Beispieldaten einfügen:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Lesen Sie Daten aus der Tabelle:

      SELECT * FROM iceberg.default.example;
      
    4. Fügen Sie neue Daten ein, um Snapshots zu prüfen:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Rufen Sie die Snapshots auf:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Durch Hinzufügen des Befehls ORDER BY committed_at DESC LIMIT 1; können Sie die neueste Snapshot-ID ermitteln.

    6. So führen Sie ein Rollback zu einer bestimmten Version der Tabelle aus:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Ersetzen Sie snapshot-id durch die Version, die Sie wiederherstellen möchten.

Nächste Schritte