Auf dieser Seite wird erläutert, wie Sie Apache Iceberg-Tabellen mit einem Dataproc Metastore-Dienst verwenden, der mit einem Dataproc-Cluster verbunden ist. Apache Iceberg ist ein offenes Tabellenformat für große analytische Datensätze.
Kompatibilitäten
Iceberg-Tabellen unterstützen die folgenden Funktionen.
Erfolgsfaktoren | Auswählen | Einfügen | Tabelle erstellen |
---|---|---|---|
Spark | ✓ | ✓ | ✓ |
Hive | ✓ | ✓ | |
Presto | ✓ | ✓ | ✓ |
Hinweise
- Erstellen Sie einen Dataproc Metastore-Dienst.
- Dataproc Metastore an einen Dataproc-Cluster anhängen
Iceberg-Tabelle mit Spark verwenden
Im folgenden Beispiel wird gezeigt, wie Sie Eisbergtabellen mit Spark verwenden sollten.
Eisbergtabellen unterstützen Lese- und Schreibvorgänge. Weitere Informationen finden Sie unter Apache Iceberg – Spark
Spark-Konfigurationen
Starten Sie zuerst die Spark-Shell und verwenden Sie einen Cloud Storage-Bucket, um Daten zu speichern. Wenn Sie Iceberg in die Spark-Installation aufnehmen möchten, fügen Sie die JAR-Datei der Iceberg-Spark-Laufzeit dem JARs-Ordner von Spark hinzu. Informationen zum Herunterladen der JAR-Datei finden Sie unter Apache Iceberg-Downloads. Mit dem folgenden Befehl wird die Spark-Shell mit Unterstützung für Apache Iceberg gestartet:
$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar
Hive-Katalog zum Erstellen von Iceberg-Tabellen verwenden
Richten Sie Hive-Katalogkonfigurationen ein, um Iceberg-Tabellen in der Spark-Scala zu erstellen:
import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.catalog._ import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._ import java.util.HashMap
Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.
Erstellen Sie unter der
default
-Datenbank eine Tabelle mit dem Namenexample
:val catalog = new HiveCatalog(); catalog.setConf(spark.sparkContext.hadoopConfiguration); catalog.initialize("hive", new HashMap[String,String]()); val name = TableIdentifier.of("default","example");
Beispieldaten einfügen:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Partitionsstrategie basierend auf Spalte
id
angeben:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Erstellen Sie die Tabelle:
val table=catalog.createTable(name,df1_schema,partition_spec);
Fügen Sie den Iceberg Storage Handler und SerDe als Tabelleneigenschaft hinzu:
table.updateProperties().set("engine.hive.enabled", "true").commit();
Schreiben Sie die Daten in die Tabelle:
df1.write.format("iceberg").mode("overwrite").save("default.example");
Lesen Sie die Daten:
val read_df1=spark.read.format("iceberg").load("default.example"); read_df1.show;
Ändern Sie das Tabellenschema. Folgendes ist ein Beispiel.
Rufen Sie die Tabelle ab und fügen Sie eine neue Spalte
grade
hinzu:val table = catalog.loadTable(TableIdentifier.of("default", "example")); table.updateSchema.addColumn("grade", StringType.get()).commit();
Prüfen Sie das neue Tabellenschema:
table.schema.toString;
Fügen Sie weitere Daten ein und sehen Sie sich die Schemaentwicklung an. Folgendes ist ein Beispiel.
Fügen Sie der Tabelle neue Daten hinzu:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save("default.example"); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save("default.example");
Prüfen Sie die eingefügten neuen Daten:
val read_df2=spark.read.format("iceberg").load("default.example"); read_df2.show;
Rufen Sie den Tabellenverlauf auf:
spark.read.format("iceberg").load("default.example.history").show(truncate = false);
Sehen Sie sich die Snapshots an:
spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
Rufen Sie die Manifestdateien auf:
spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
Rufen Sie die Datendateien auf:
spark.read.format("iceberg").load("default.example.files").show(truncate = false);
Angenommen, Sie haben beim Hinzufügen der Zeile mit dem Wert
id=6
einen Fehler gemacht und möchten zurückgehen, um eine korrekte Version der Tabelle aufzurufen:spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
Ersetzen Sie
snapshot-id
durch die Version, zu der Sie zurückkehren möchten.
Iceberg-Tabellen mit Hadoop-Tabellen erstellen
Richten Sie Hadoop-Tabellenkonfigurationen ein, um Iceberg-Tabellen in der Spark-Scala zu erstellen:
import org.apache.hadoop.conf.Configuration import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.Table import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._
Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.
Erstellen Sie unter der
default
-Datenbank eine Tabelle mit dem Namenexample
:val conf = new Configuration(); val tables = new HadoopTables(conf);
Beispieldaten einfügen:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Partitionsstrategie basierend auf Spalte
id
angeben:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Erstellen Sie die Tabelle:
val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>"; val table = tables.create(df1_schema, partition_spec, table_location);
Schreiben Sie die Daten in die Tabelle:
df1.write.format("iceberg").mode("overwrite").save(table_location);
Lesen Sie die Daten:
val read_df1=spark.read.format("iceberg").load(table_location); read_df1.show;
Ändern Sie das Tabellenschema. Folgendes ist ein Beispiel.
Rufen Sie die Tabelle ab und fügen Sie eine neue Spalte
grade
hinzu:val table = tables.load(table_location); table.updateSchema.addColumn("grade", StringType.get()).commit();
Prüfen Sie das neue Tabellenschema:
table.schema.toString;
Fügen Sie weitere Daten ein und sehen Sie sich die Schemaentwicklung an. Folgendes ist ein Beispiel.
Fügen Sie der Tabelle neue Daten hinzu:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save(table_location); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save(table_location);
Prüfen Sie die eingefügten neuen Daten:
val read_df2=spark.read.format("iceberg").load(table_location); read_df2.show;
Rufen Sie den Tabellenverlauf auf:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
Sehen Sie sich die Snapshots an:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
Rufen Sie die Manifestdateien auf:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
Rufen Sie die Datendateien auf:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
Kehren Sie zurück, um eine bestimmte Version der Tabelle aufzurufen:
spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
Ersetzen Sie
snapshot-id
durch die Version, zu der Sie zurückkehren möchten, und fügen Sie"L"
hinzu bis zum Ende. Beispiel:"3943776515926014142L"
Iceberg-Tabelle in Hive verwenden
Iceberg unterstützt Tabellen, die mit Hive gelesen werden, mithilfe einer StorageHandler
. Hinweis
dass nur die Versionen Hive 2.x und 3.1.2 unterstützt werden. Weitere Informationen finden Sie unter Apache Iceberg – Hive. In
Fügen Sie dem Hive-Klassenpfad außerdem die JAR-Datei für die Iceberg Hive-Laufzeit hinzu. Informationen zum Herunterladen der JAR-Datei finden Sie unter Apache Iceberg-Downloads.
Um eine Hive-Tabelle über eine Iceberg-Tabelle zu legen, müssen Sie Iceberg-Tabelle mit einem Hive-Katalog- oder einer Hadoop-Tabelle Darüber hinaus können Sie muss Hive entsprechend konfigurieren, um Daten aus der Iceberg-Tabelle zu lesen.
Iceberg-Tabelle (Hive-Katalog) in Hive lesen
Öffnen Sie den Hive-Client und richten Sie Konfigurationen zum Lesen von Iceberg-Tabellen in der Hive-Clientsitzung ein:
add jar /path/to/iceberg-hive-runtime.jar; set iceberg.engine.hive.enabled=true; set engine.hive.enabled=true; set iceberg.mr.catalog=hive; set hive.vectorized.execution.enabled=false;
Tabellenschema und -daten lesen Folgendes ist ein Beispiel.
Prüfen Sie das Tabellenschema und ob das Tabellenformat Iceberg ist:
describe formatted example;
Lesen Sie die Daten aus der Tabelle:
select * from example;
Iceberg-Tabelle (Hadoop-Tabelle) in Hive lesen
Öffnen Sie den Hive-Client und richten Sie Konfigurationen zum Lesen von Iceberg-Tabellen in der Hive-Clientsitzung ein:
add jar /path/to/iceberg-hive-runtime.jar; set engine.hive.enabled=true; set hive.vectorized.execution.enabled=false;
Tabellenschema und Daten lesen. Folgendes ist ein Beispiel.
Erstellen Sie eine externe Tabelle (legen Sie eine Hive-Tabelle über die Iceberg-Tabelle):
CREATE EXTERNAL TABLE hadoop_table STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>' TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
Prüfen Sie das Tabellenschema und ob das Tabellenformat Iceberg ist:
describe formatted hadoop_table;
Daten aus der Tabelle lesen:
select * from hadoop_table;
Iceberg-Tabelle in Presto verwenden
Presto-Abfragen verwenden den Hive-Connector zum Abrufen von Partitionsstandorten. Daher müssen Sie konfigurieren Sie Presto so, dass Daten aus der Iceberg-Tabelle gelesen und geschrieben werden. Weitere Informationen finden Sie unter Presto/Trino – Hive-Connector und Presto/Trino – Iceberg-Connector.
Presto-Konfigurationen
Erstellen Sie unter jedem Dataproc-Clusterknoten eine Datei mit dem Namen
iceberg.properties
/etc/presto/conf/catalog/iceberg.properties
und Konfigurieren Siehive.metastore.uri
so:connector.name=iceberg hive.metastore.uri=thrift://<example.net:9083>
Ersetzen Sie
example.net:9083
durch den korrekten Host und Port für den Thrift-Dienst von Hive-Metastore.Starten Sie den Presto-Dienst neu, um die Konfigurationen per Push zu übertragen:
sudo systemctl restart presto.service
Iceberg-Tabelle in Presto erstellen
Öffnen Sie den Presto-Client und verwenden Sie den Connector „Iceberg“, um den Metastore abzurufen:
--catalog iceberg --schema default
Erstellen Sie eine Tabelle zum Einfügen und Aktualisieren von Daten. Folgendes ist ein Beispiel.
Erstellen Sie unter der
default
-Datenbank eine Tabelle mit dem Namenexample
:CREATE TABLE iceberg.default.example ( id integer, name VARCHAR, major VARCHAR, grade VARCHAR) WITH (partitioning = ARRAY['major', 'grade']);
Beispieldaten einfügen:
INSERT INTO iceberg.default.example VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
Daten aus der Tabelle lesen:
SELECT * FROM iceberg.default.example;
Fügen Sie weitere neue Daten ein, um Snapshots zu prüfen:
INSERT INTO example VALUES (4, 'Cindy', 'UX Design', 'Junior'); INSERT INTO example VALUES (5, 'Amy', 'UX Design', 'Sophomore');
Snapshots ansehen:
SELECT snapshot_id FROM iceberg.default."example$snapshots";
Durch Hinzufügen des Befehls
ORDER BY committed_at DESC LIMIT 1;
können Sie die aktuelle Snapshot-ID ermitteln.Führen Sie ein Rollback auf eine bestimmte Version der Tabelle durch:
CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
Ersetzen Sie
snapshot-id
durch die Version, zu der Sie zurückkehren möchten.