Halaman ini menjelaskan cara menggunakan tabel Apache Iceberg dengan layanan Dataproc Metastore yang dilampirkan ke cluster Dataproc. Apache Iceberg adalah format tabel terbuka untuk set data analisis yang besar.
Kompatibilitas
Tabel Iceberg mendukung fitur berikut.
Pendorong | Pilih | Sisipkan | Buat Tabel |
---|---|---|---|
Spark | ✓ | ✓ | ✓ |
Hive | ✓ | ✓ | |
Presto | ✓ | ✓ | ✓ |
Sebelum memulai
- Buat layanan Metastore Dataproc.
- Lampirkan Metastore Dataproc ke cluster Dataproc.
Menggunakan tabel Iceberg dengan Spark
Contoh berikut menunjukkan bahwa Anda harus menggunakan tabel Iceberg dengan Spark.
Tabel Iceberg mendukung operasi baca dan tulis. Untuk informasi selengkapnya, lihat Apache Iceberg - Spark.
Konfigurasi Spark
Pertama, mulai shell Spark dan gunakan bucket Cloud Storage untuk menyimpan data. Untuk menyertakan Iceberg dalam penginstalan Spark, tambahkan file JAR Runtime Spark Iceberg ke folder JAR Spark. Untuk mendownload file JAR, lihat Download Apache Iceberg. Perintah berikut memulai shell Spark dengan dukungan untuk Apache Iceberg:
$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar
Menggunakan Hive Catalog untuk membuat tabel Iceberg
Siapkan konfigurasi Katalog Hive untuk membuat tabel Iceberg di spark scala:
import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.catalog._ import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._ import java.util.HashMap
Buat tabel untuk menyisipkan dan memperbarui data. Berikut adalah contohnya.
Buat tabel bernama
example
di databasedefault
:val catalog = new HiveCatalog(); catalog.setConf(spark.sparkContext.hadoopConfiguration); catalog.initialize("hive", new HashMap[String,String]()); val name = TableIdentifier.of("default","example");
Masukkan data sampel:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Tentukan strategi partisi berdasarkan kolom
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Buat tabel:
val table=catalog.createTable(name,df1_schema,partition_spec);
Tambahkan Pengendali Penyimpanan dan SerDe Iceberg sebagai properti tabel:
table.updateProperties().set("engine.hive.enabled", "true").commit();
Tulis data ke tabel:
df1.write.format("iceberg").mode("overwrite").save("default.example");
Baca data:
val read_df1=spark.read.format("iceberg").load("default.example"); read_df1.show;
Mengubah skema tabel. Berikut adalah contohnya.
Dapatkan tabel dan tambahkan kolom baru
grade
:val table = catalog.loadTable(TableIdentifier.of("default", "example")); table.updateSchema.addColumn("grade", StringType.get()).commit();
Periksa skema tabel baru:
table.schema.toString;
Sisipkan lebih banyak data dan lihat evolusi skema. Berikut adalah contohnya.
Tambahkan data baru ke tabel:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save("default.example"); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save("default.example");
Periksa data baru yang dimasukkan:
val read_df2=spark.read.format("iceberg").load("default.example"); read_df2.show;
Lihat histori tabel:
spark.read.format("iceberg").load("default.example.history").show(truncate = false);
Lihat ringkasan:
spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
Lihat file manifes:
spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
Lihat file data:
spark.read.format("iceberg").load("default.example.files").show(truncate = false);
Anggap Anda melakukan kesalahan dengan menambahkan baris dengan nilai
id=6
dan ingin kembali untuk melihat versi tabel yang benar:spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
Ganti
snapshot-id
dengan versi yang ingin Anda kembalikan.
Menggunakan Tabel Hadoop untuk membuat tabel Iceberg
Siapkan konfigurasi Tabel Hadoop untuk membuat tabel Iceberg di spark scala:
import org.apache.hadoop.conf.Configuration import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.Table import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._
Buat tabel untuk menyisipkan dan memperbarui data. Berikut adalah contohnya.
Buat tabel bernama
example
di databasedefault
:val conf = new Configuration(); val tables = new HadoopTables(conf);
Masukkan data sampel:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Tentukan strategi partisi berdasarkan kolom
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Buat tabel:
val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>"; val table = tables.create(df1_schema, partition_spec, table_location);
Tulis data ke tabel:
df1.write.format("iceberg").mode("overwrite").save(table_location);
Baca data:
val read_df1=spark.read.format("iceberg").load(table_location); read_df1.show;
Mengubah skema tabel. Berikut adalah contohnya.
Dapatkan tabel dan tambahkan kolom baru
grade
:val table = tables.load(table_location); table.updateSchema.addColumn("grade", StringType.get()).commit();
Periksa skema tabel baru:
table.schema.toString;
Sisipkan lebih banyak data dan lihat evolusi skema. Berikut adalah contohnya.
Tambahkan data baru ke tabel:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save(table_location); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save(table_location);
Periksa data baru yang dimasukkan:
val read_df2=spark.read.format("iceberg").load(table_location); read_df2.show;
Lihat histori tabel:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
Lihat ringkasan:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
Lihat file manifes:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
Lihat file data:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
Kembali untuk melihat versi tabel tertentu:
spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
Ganti
snapshot-id
dengan versi yang ingin Anda kembalikan dan tambahkan"L"
di bagian akhir. Misalnya,"3943776515926014142L"
.
Menggunakan tabel Iceberg di Hive
Iceberg mendukung tabel yang dibaca menggunakan Hive dengan menggunakan StorageHandler
. Perhatikan bahwa hanya versi Hive 2.x dan 3.1.2 yang didukung. Untuk informasi selengkapnya, lihat
Apache Iceberg - Hive. Selain itu, tambahkan file JAR Runtime Iceberg Hive ke classpath Hive. Untuk mendownload file JAR, lihat Download Apache Iceberg.
Untuk menempatkan tabel Hive di atas tabel Iceberg, Anda harus membuat tabel Iceberg menggunakan Katalog Hive atau Tabel Hadoop. Selain itu, Anda harus mengonfigurasi Hive dengan benar untuk membaca data dari tabel Iceberg.
Membaca tabel Iceberg (Katalog Hive) di Hive
Buka klien Hive dan siapkan konfigurasi untuk membaca tabel Iceberg di sesi klien Hive:
add jar /path/to/iceberg-hive-runtime.jar; set iceberg.engine.hive.enabled=true; set engine.hive.enabled=true; set iceberg.mr.catalog=hive; set hive.vectorized.execution.enabled=false;
Membaca skema dan data tabel. Berikut adalah contohnya.
Periksa skema tabel dan apakah format tabelnya adalah Iceberg:
describe formatted example;
Baca data dari tabel:
select * from example;
Membaca tabel Iceberg (Tabel Hadoop) di Hive
Buka klien Hive dan siapkan konfigurasi untuk membaca tabel Iceberg di sesi klien Hive:
add jar /path/to/iceberg-hive-runtime.jar; set engine.hive.enabled=true; set hive.vectorized.execution.enabled=false;
Membaca skema dan data tabel. Berikut adalah contohnya.
Buat tabel eksternal (tumpang-tindih tabel Hive di atas tabel Iceberg):
CREATE EXTERNAL TABLE hadoop_table STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>' TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
Periksa skema tabel dan apakah format tabelnya adalah Iceberg:
describe formatted hadoop_table;
Baca data dari tabel:
select * from hadoop_table;
Menggunakan tabel Iceberg di Presto
Kueri Presto menggunakan konektor Hive untuk mendapatkan lokasi partisi, sehingga Anda harus mengonfigurasi Presto dengan benar untuk membaca dan menulis data di tabel Iceberg. Untuk mengetahui informasi selengkapnya, lihat Presto/Trino - Konektor Hive dan Presto/Trino - Konektor Iceberg.
Konfigurasi Presto
Di setiap node cluster Dataproc, buat file bernama
iceberg.properties
/etc/presto/conf/catalog/iceberg.properties
dan konfigurasihive.metastore.uri
sebagai berikut:connector.name=iceberg hive.metastore.uri=thrift://<example.net:9083>
Ganti
example.net:9083
dengan host dan port yang benar untuk layanan Thrift metastore Hive Anda.Mulai ulang layanan Presto untuk mendorong konfigurasi:
sudo systemctl restart presto.service
Membuat tabel Iceberg di Presto
Buka klien Presto dan gunakan konektor "Iceberg" untuk mendapatkan metastore:
--catalog iceberg --schema default
Buat tabel untuk menyisipkan dan memperbarui data. Berikut adalah contohnya.
Buat tabel bernama
example
di databasedefault
:CREATE TABLE iceberg.default.example ( id integer, name VARCHAR, major VARCHAR, grade VARCHAR) WITH (partitioning = ARRAY['major', 'grade']);
Masukkan data sampel:
INSERT INTO iceberg.default.example VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
Baca data dari tabel:
SELECT * FROM iceberg.default.example;
Masukkan lebih banyak data baru untuk memeriksa snapshot:
INSERT INTO example VALUES (4, 'Cindy', 'UX Design', 'Junior'); INSERT INTO example VALUES (5, 'Amy', 'UX Design', 'Sophomore');
Lihat ringkasan:
SELECT snapshot_id FROM iceberg.default."example$snapshots";
Dengan menambahkan perintah
ORDER BY committed_at DESC LIMIT 1;
, Anda dapat menemukan ID snapshot terbaru.Melakukan roll back ke versi tabel tertentu:
CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
Ganti
snapshot-id
dengan versi yang ingin Anda kembalikan.