Iceberg-Unterstützung in Dataproc Metastore

Auf dieser Seite wird erläutert, wie Sie Apache Iceberg in Dataproc verwenden, indem Sie den Hive-Metastore in Dataproc Metastore hosten. Sie enthält Informationen zur Verwendung der Iceberg-Tabelle über Spark, Hive und Presto.

Features

Apache Iceberg ist ein offenes Tabellenformat für große analytische Datasets. Iceberg verbessert die Leistung erheblich und bietet die folgenden erweiterten Features:

  • Atomarität: Tabellenänderungen sind entweder vollständig oder fehlgeschlagen. Es gibt kein teilweises Commit von Tabellenänderungen.

  • Snapshot-Isolation: Lesevorgänge verwenden nur einen Snapshot einer Tabelle, ohne Sperre.

  • Mehrere gleichzeitige Autoren: Verwendet optimistische Nebenläufigkeitserkennung und Wiederholungsversuche, um sicherzustellen, dass kompatible Aktualisierungen auch bei Schreibvorgängen erfolgreich sind.

  • Schemaentwicklung: Die Spalten werden nach ID verfolgt, um das Hinzufügen, Löschen, Aktualisieren und Umbenennen zu unterstützen.

  • Zeitreisen: Reproduzierbare Abfragen können dieselbe Tabelle oder denselben Snapshot verwenden; Sie können Änderungen leicht überprüfen.

  • Verteilte Planung: Dateibereinigung und Prädikat-Push-down werden auf Jobs verteilt, wodurch der Metastore als Engpass entfernt wird.

  • Versionsverlauf und Rollback: Um Probleme zu korrigieren können Sie Tabellen auf einen früheren Zustand zurücksetzen.

  • Ausgeblendete Partitionierung: Verhindert Nutzerfehler, die zu stillschweigenden Fehlergebnissen oder extrem langsamen Abfragen führen.

  • Entwicklung des Partitionslayouts: Kann das Layout einer Tabelle aktualisieren, wenn sich Datenvolumen oder Abfragemuster ändern.

  • Scanplanung und Dateifilterung: Findet die für eine Abfrage erforderlichen Dateien durch Bereinigen nicht benötigter Metadatendateien und das Filtern von Datendateien ohne übereinstimmende Daten.

Kompatibilitäten

Iceberg funktioniert gut mit Dataproc und Dataproc Metastore. Tabellen mit Hochleistungsformat können zu Spark und Presto hinzugefügt werden, die wie SQL-Tabellen funktionieren. Iceberg verwendet einen Zeiger auf die neueste Version eines Snapshots und benötigt einen Mechanismus, um beim Wechsel der Versionen die Atomarität sicherzustellen. Es stehen zwei Optionen zur Verfügung: Hive-Katalog und Hadoop-Tabellen.

Zu den unterstützten Funktionen gehören:

Erfolgsfaktoren Auswählen Einfügen Tabelle erstellen
Spark
Hive
Presto

Hinweis

Erstellen Sie zuerst einen Dataproc-Cluster und verwenden Sie den Dataproc Metastore-Dienst als Hive-Metastore. Weitere Informationen finden Sie unter Dataproc-Cluster erstellen. Nachdem Sie den Cluster erstellt haben, stellen Sie eine SSH-Verbindung zum Cluster über einen Browser oder die Befehlszeile her.

Eisberg-Tisch mit Spark

Iceberg-Tabellen unterstützen Lese- und Schreibvorgänge. Weitere Informationen finden Sie unter Apache Iceberg – Spark.

Spark-Konfigurationen

Starten Sie zuerst die Spark-Shell und verwenden Sie einen Cloud Storage-Bucket zum Speichern von Daten. Wenn Sie Iceberg in die Spark-Installation einbinden möchten, fügen Sie die JAR-Datei der Iceberg Spark-Laufzeit in den JAR-Ordner des Sparks ein. Informationen zum Herunterladen der JAR-Datei finden Sie unter Apache Iceberg Downloads. Der folgende Befehl startet die Spark-Shell mit Unterstützung für Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Mit Hive-Katalog Iceberg-Tabellen erstellen

  1. Richten Sie Hive-Katalogkonfigurationen ein, um Iceberg-Tabellen in der Spark-Skala zu erstellen:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.

    1. Erstellen Sie unter der default-Datenbank eine Tabelle mit dem Namen example:

      val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration);
      val name = TableIdentifier.of("default","example");
      
    2. Beispieldaten einfügen:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Geben Sie die Partitionsstrategie basierend auf der Spalte id an:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Erstellen Sie die Tabelle:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Fügen Sie den Iceberg Storage Handler und SerDe als Tabellenattribut hinzu:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Schreiben Sie die Daten in die Tabelle:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Lesen Sie die Daten:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Ändern Sie das Tabellenschema. Folgendes ist ein Beispiel.

    1. Rufen Sie die Tabelle ab und fügen Sie eine neue Spalte grade hinzu:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Prüfen Sie das neue Tabellenschema:

      table.schema.toString;
      
  4. Fügen Sie mehr Daten ein und sehen Sie sich die Schemaentwicklung an. Folgendes ist ein Beispiel.

    1. Fügen Sie der Tabelle neue Daten hinzu:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Prüfen Sie die eingefügten neuen Daten:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Rufen Sie den Tabellenverlauf auf:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Rufen Sie die Snapshots auf:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Rufen Sie die Manifestdateien auf:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Rufen Sie die Datendateien auf:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Angenommen, Sie haben beim Hinzufügen der Zeile mit dem Wert id=6 einen Fehler gemacht und möchten zurückgehen, um eine korrekte Version der Tabelle aufzurufen:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Ersetzen Sie snapshot-id durch die Version, zu der Sie zurückkehren möchten.

Hadoop-Tabellen mithilfe von Hadoop-Tabellen erstellen

  1. Richten Sie Hadoop-Tabellenkonfigurationen ein, um Iceberg-Tabellen in der Spark-Skala zu erstellen:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.

    1. Erstellen Sie unter der default-Datenbank eine Tabelle mit dem Namen example:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Beispieldaten einfügen:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Geben Sie die Partitionsstrategie basierend auf der Spalte id an:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Erstellen Sie die Tabelle:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Schreiben Sie die Daten in die Tabelle:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Lesen Sie die Daten:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Ändern Sie das Tabellenschema. Folgendes ist ein Beispiel.

    1. Rufen Sie die Tabelle ab und fügen Sie eine neue Spalte grade hinzu:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Prüfen Sie das neue Tabellenschema:

      table.schema.toString;
      
  4. Fügen Sie mehr Daten ein und sehen Sie sich die Schemaentwicklung an. Folgendes ist ein Beispiel.

    1. Fügen Sie der Tabelle neue Daten hinzu:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Prüfen Sie die eingefügten neuen Daten:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Rufen Sie den Tabellenverlauf auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Rufen Sie die Snapshots auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Rufen Sie die Manifestdateien auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Rufen Sie die Datendateien auf:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Kehren Sie zurück, um eine bestimmte Version der Tabelle aufzurufen:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Ersetzen Sie snapshot-id durch die Version, zu der Sie zurückgehen möchten, und fügen Sie am Ende "L" hinzu. Beispiel: "3943776515926014142L"

Iceberg-Tabelle auf Hive verwenden

Iceberg unterstützt Tabellen, die über Hive gelesen werden, indem ein StorageHandler verwendet wird. Es werden nur Hive 2.x- und 3.1.2-Versionen unterstützt. Weitere Informationen finden Sie unter Apache Iceberg – Hive. Fügen Sie außerdem die Hive-Laufzeit-JAR-Datei in den Hive-Klassenpfad ein. Informationen zum Herunterladen der JAR-Datei finden Sie unter Apache Iceberg Downloads.

Wenn Sie eine Hive-Tabelle über einer Iceberg-Tabelle einblenden möchten, müssen Sie entweder die Iceberg Table mithilfe einer Hive-Tabelle oder einer Hadoop-Tabelle erstellen. Außerdem müssen Sie Hive so konfigurieren, dass Daten aus der Iceberg-Tabelle gelesen werden.

Iceberg-Tabelle (Hive-Katalog) in Hive lesen

  1. Öffnen Sie den Hive-Client und richten Sie Konfigurationen zum Lesen von Iceberg-Tabellen in der Hive-Clientsitzung ein:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Tabellenschema und Daten lesen. Folgendes ist ein Beispiel.

    1. Prüfen Sie das Tabellenschema und stellen Sie fest, ob das Tabellenformat Iceberg ist:

      describe formatted example;
      
    2. Lesen Sie die Daten aus der Tabelle:

      select * from example;
      

Iceberg-Tabelle (Hadoop-Tabelle) in Hive lesen

  1. Öffnen Sie den Hive-Client und richten Sie Konfigurationen zum Lesen von Iceberg-Tabellen in der Hive-Clientsitzung ein:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Tabellenschema und Daten lesen. Folgendes ist ein Beispiel.

    1. Erstellen Sie eine externe Tabelle (legen Sie eine Hive-Tabelle über die Iceberg-Tabelle):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Prüfen Sie das Tabellenschema und stellen Sie fest, ob das Tabellenformat Iceberg ist:

      describe formatted hadoop_table;
      
    3. Lesen Sie die Daten aus der Tabelle:

      select * from hadoop_table;
      

Iceberg-Tabelle in Presto verwenden

Presto-Abfragen verwenden den Hive-Connector, um Partitionsstandorte abzurufen. Daher müssen Sie Presto entsprechend konfigurieren, um Daten auf der Iceberg-Tabelle zu lesen und zu schreiben. Weitere Informationen finden Sie unter Presto/Trino – Hive-Connector und Presto/Trino – Iceberg Connector.

Presto-Konfigurationen

  1. Erstellen Sie unter jedem Dataproc-Clusterknoten eine Datei mit dem Namen iceberg.properties /etc/presto/conf/catalog/iceberg.properties und konfigurieren Sie hive.metastore.uri so:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Ersetzen Sie example.net:9083 durch den korrekten Host und Port für den Thrift-Dienst von Hive-Metastore.

  2. Starten Sie den Presto-Dienst neu, um die Konfigurationen zu übertragen:

    sudo systemctl restart presto.service
    

Eisbergtabelle bei Presto erstellen

  1. Öffnen Sie den Presto-Client und verwenden Sie den Connector „Iceberg“, um den Metastore abzurufen:

    --catalog iceberg --schema default
    
  2. Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.

    1. Erstellen Sie unter der default-Datenbank eine Tabelle mit dem Namen example:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Beispieldaten einfügen:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Daten aus der Tabelle lesen:

      SELECT * FROM iceberg.default.example;
      
    4. Fügen Sie weitere neue Daten ein, um die Snapshots zu prüfen:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Rufen Sie die Snapshots auf:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Durch Hinzufügen des Befehls ORDER BY committed_at DESC LIMIT 1; können Sie die aktuelle Snapshot-ID ermitteln.

    6. So führen Sie ein Rollback auf eine bestimmte Version der Tabelle durch:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Ersetzen Sie snapshot-id durch die Version, zu der Sie zurückkehren möchten.

Nächste Schritte