Halaman ini menjelaskan cara menggunakan tabel Apache Iceberg dengan layanan Dataproc Metastore yang terhubung ke cluster Dataproc. Apache Iceberg adalah format tabel terbuka untuk set data analisis berukuran besar.
Kompatibilitas
Tabel gunung es mendukung fitur berikut.
Pendorong | Pilih | Sisipkan | Buat Tabel |
---|---|---|---|
Spark | ✓ | ✓ | ✓ |
Hive | ✓ | ✓ | |
Presto | ✓ | ✓ | ✓ |
Sebelum memulai
- Buat layanan Dataproc Metastore.
- Lampirkan Dataproc Metastore ke cluster Dataproc.
Menggunakan tabel Iceberg dengan Spark
Contoh berikut menunjukkan bahwa Anda harus menggunakan tabel Iceberg dengan Spark.
Tabel gunung es mendukung operasi baca dan tulis. Untuk informasi selengkapnya, lihat Apache Iceberg - Spark.
Konfigurasi Spark
Pertama, mulai shell Spark dan gunakan bucket Cloud Storage untuk menyimpan data. Untuk menyertakan Iceberg dalam penginstalan Spark, tambahkan file JAR Iceberg Spark Runtime ke folder JARs Spark. Untuk mendownload file JAR, lihat Download Apache Iceberg. Perintah berikut memulai Spark shell dengan dukungan untuk Apache Iceberg:
$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar
Menggunakan Katalog Hive untuk membuat tabel Iceberg
Siapkan konfigurasi Hive Catalog untuk membuat tabel Iceberg di spark scala:
import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.catalog._ import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._ import java.util.HashMap
Buat tabel untuk menyisipkan dan memperbarui data. Berikut adalah contohnya.
Buat tabel bernama
example
pada databasedefault
:val catalog = new HiveCatalog(); catalog.setConf(spark.sparkContext.hadoopConfiguration); catalog.initialize("hive", new HashMap[String,String]()); val name = TableIdentifier.of("default","example");
Sisipkan data sampel:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Tentukan strategi partisi berdasarkan kolom
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Buat tabel:
val table=catalog.createTable(name,df1_schema,partition_spec);
Tambahkan Iceberg Storage Handler dan SerDe sebagai properti tabel:
table.updateProperties().set("engine.hive.enabled", "true").commit();
Tulis data ke tabel:
df1.write.format("iceberg").mode("overwrite").save("default.example");
Baca datanya:
val read_df1=spark.read.format("iceberg").load("default.example"); read_df1.show;
Ubah skema tabel. Berikut adalah contohnya.
Dapatkan tabel dan tambahkan kolom baru
grade
:val table = catalog.loadTable(TableIdentifier.of("default", "example")); table.updateSchema.addColumn("grade", StringType.get()).commit();
Periksa skema tabel baru:
table.schema.toString;
Menyisipkan lebih banyak data dan melihat evolusi skema. Berikut adalah contohnya.
Tambahkan data baru ke tabel:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save("default.example"); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save("default.example");
Periksa data baru yang disisipkan:
val read_df2=spark.read.format("iceberg").load("default.example"); read_df2.show;
Lihat histori tabel:
spark.read.format("iceberg").load("default.example.history").show(truncate = false);
Lihat snapshot:
spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
Lihat file manifes:
spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
Lihat file data:
spark.read.format("iceberg").load("default.example.files").show(truncate = false);
Asumsikan Anda membuat kesalahan dengan menambahkan baris dengan nilai
id=6
dan ingin kembali untuk melihat versi tabel yang benar:spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
Ganti
snapshot-id
dengan versi yang ingin Anda gunakan kembali.
Menggunakan Tabel Hadoop untuk membuat tabel Iceberg
Siapkan konfigurasi Hadoop Table untuk membuat tabel Iceberg di spark scala:
import org.apache.hadoop.conf.Configuration import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.Table import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._
Buat tabel untuk menyisipkan dan memperbarui data. Berikut adalah contohnya.
Buat tabel bernama
example
pada databasedefault
:val conf = new Configuration(); val tables = new HadoopTables(conf);
Sisipkan data sampel:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Tentukan strategi partisi berdasarkan kolom
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Buat tabel:
val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>"; val table = tables.create(df1_schema, partition_spec, table_location);
Tulis data ke tabel:
df1.write.format("iceberg").mode("overwrite").save(table_location);
Baca datanya:
val read_df1=spark.read.format("iceberg").load(table_location); read_df1.show;
Ubah skema tabel. Berikut adalah contohnya.
Dapatkan tabel dan tambahkan kolom baru
grade
:val table = tables.load(table_location); table.updateSchema.addColumn("grade", StringType.get()).commit();
Periksa skema tabel baru:
table.schema.toString;
Menyisipkan lebih banyak data dan melihat evolusi skema. Berikut adalah contohnya.
Tambahkan data baru ke tabel:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save(table_location); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save(table_location);
Periksa data baru yang disisipkan:
val read_df2=spark.read.format("iceberg").load(table_location); read_df2.show;
Lihat histori tabel:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
Lihat snapshot:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
Lihat file manifes:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
Lihat file data:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
Kembali untuk melihat versi tabel tertentu:
spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
Ganti
snapshot-id
dengan versi yang ingin Anda gunakan kembali dan tambahkan"L"
ke bagian akhir. Misalnya,"3943776515926014142L"
.
Menggunakan tabel Iceberg di Hive
Iceberg mendukung pembacaan tabel menggunakan Hive menggunakan StorageHandler
. Perhatikan
bahwa hanya versi Hive 2.x dan 3.1.2 yang didukung. Untuk informasi selengkapnya, lihat
Apache Iceberg - Hive. Selain itu, tambahkan file Iceberg Hive Runtime JAR ke classpath Hive. Untuk
mendownload file JAR, lihat Download Apache Iceberg.
Untuk menempatkan tabel Hive di atas tabel Iceberg, Anda harus membuat tabel Iceberg menggunakan Katalog Hive atau Tabel Hadoop. Selain itu, Anda harus mengonfigurasi Hive agar dapat membaca data dari tabel Iceberg.
Baca tabel Iceberg (Katalog Hive) di Hive
Buka klien Hive dan siapkan konfigurasi untuk membaca tabel Iceberg pada sesi klien Hive:
add jar /path/to/iceberg-hive-runtime.jar; set iceberg.engine.hive.enabled=true; set engine.hive.enabled=true; set iceberg.mr.catalog=hive; set hive.vectorized.execution.enabled=false;
Membaca data dan skema tabel. Berikut adalah contohnya.
Periksa skema tabel dan apakah format tabelnya adalah Iceberg:
describe formatted example;
Baca data dari tabel:
select * from example;
Baca tabel Iceberg (Tabel Hadoop) di Hive
Buka klien Hive dan siapkan konfigurasi untuk membaca tabel Iceberg pada sesi klien Hive:
add jar /path/to/iceberg-hive-runtime.jar; set engine.hive.enabled=true; set hive.vectorized.execution.enabled=false;
Membaca data dan skema tabel. Berikut adalah contohnya.
Buat tabel eksternal (hamparkan tabel Hive di atas tabel Iceberg):
CREATE EXTERNAL TABLE hadoop_table STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>' TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
Periksa skema tabel dan apakah format tabelnya adalah Iceberg:
describe formatted hadoop_table;
Baca data dari tabel:
select * from hadoop_table;
Menggunakan tabel Iceberg di Presto
Kueri Presto menggunakan konektor Hive untuk mendapatkan lokasi partisi, sehingga Anda harus mengonfigurasi Presto sebagaimana mestinya untuk membaca dan menulis data pada tabel Iceberg. Untuk mengetahui informasi selengkapnya, lihat Presto/Trino - Konektor Hive dan Presto/Trino - Konektor Iceberg.
Konfigurasi Presto
Di setiap node cluster Dataproc, buat file bernama
iceberg.properties
/etc/presto/conf/catalog/iceberg.properties
dan konfigurasikanhive.metastore.uri
sebagai berikut:connector.name=iceberg hive.metastore.uri=thrift://<example.net:9083>
Ganti
example.net:9083
dengan host dan port yang benar untuk layanan metastore Thrift Hive.Mulai ulang layanan Presto untuk mengirim konfigurasi:
sudo systemctl restart presto.service
Buat tabel Iceberg di Presto
Buka klien Presto dan gunakan konektor "Iceberg" untuk mendapatkan metastore:
--catalog iceberg --schema default
Buat tabel untuk menyisipkan dan memperbarui data. Berikut adalah contohnya.
Buat tabel bernama
example
pada databasedefault
:CREATE TABLE iceberg.default.example ( id integer, name VARCHAR, major VARCHAR, grade VARCHAR) WITH (partitioning = ARRAY['major', 'grade']);
Sisipkan data sampel:
INSERT INTO iceberg.default.example VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
Membaca data dari tabel:
SELECT * FROM iceberg.default.example;
Sisipkan lebih banyak data baru untuk memeriksa snapshot:
INSERT INTO example VALUES (4, 'Cindy', 'UX Design', 'Junior'); INSERT INTO example VALUES (5, 'Amy', 'UX Design', 'Sophomore');
Lihat snapshot:
SELECT snapshot_id FROM iceberg.default."example$snapshots";
Dengan menambahkan perintah
ORDER BY committed_at DESC LIMIT 1;
, Anda dapat menemukan ID snapshot terbaru.Melakukan roll back ke versi tabel tertentu:
CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
Ganti
snapshot-id
dengan versi yang ingin Anda gunakan kembali.