本頁說明如何搭配使用 Apache Iceberg 表格和 Dataproc Metastore 服務,並將該服務附加至 Dataproc 叢集。Apache Iceberg 是開放式資料表格式,適用於大型分析資料集。
相容性
Iceberg 資料表支援下列功能。
驅動因素 | 選取 | 插入 | 建立資料表 |
---|---|---|---|
Spark | ✓ | ✓ | ✓ |
Hive | ✓ | ✓ | |
Presto | ✓ | ✓ | ✓ |
事前準備
搭配使用 Iceberg 資料表與 Spark
以下範例說明如何搭配 Spark 使用 Iceberg 資料表。
Iceberg 資料表支援讀取和寫入作業。詳情請參閱「Apache Iceberg - Spark」。
Spark 設定
首先,啟動 Spark Shell,並使用 Cloud Storage 值區儲存資料。如要在 Spark 安裝中加入 Iceberg,請將 Iceberg Spark 執行階段 JAR 檔案新增至 Spark 的 JAR 資料夾。如要下載 JAR 檔案,請參閱「Apache Iceberg Downloads」。下列指令會啟動 Spark Shell,並支援 Apache Iceberg:
$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar
使用 Hive Catalog 建立 Iceberg 資料表
設定 Hive Catalog,在 Spark Scala 中建立 Iceberg 資料表:
import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.catalog._ import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._ import java.util.HashMap
建立資料表,以便插入及更新資料。範例如下:
在
default
資料庫下方建立名為example
的資料表:val catalog = new HiveCatalog(); catalog.setConf(spark.sparkContext.hadoopConfiguration); catalog.initialize("hive", new HashMap[String,String]()); val name = TableIdentifier.of("default","example");
插入範例資料:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
根據資料欄
id
指定分割策略:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
建立資料表:
val table=catalog.createTable(name,df1_schema,partition_spec);
將 Iceberg 儲存空間處理常式和 SerDe 新增為資料表屬性:
table.updateProperties().set("engine.hive.enabled", "true").commit();
將資料寫入資料表:
df1.write.format("iceberg").mode("overwrite").save("default.example");
讀取資料:
val read_df1=spark.read.format("iceberg").load("default.example"); read_df1.show;
變更資料表結構定義。範例如下:
取得表格並新增資料欄
grade
:val table = catalog.loadTable(TableIdentifier.of("default", "example")); table.updateSchema.addColumn("grade", StringType.get()).commit();
檢查新資料表結構定義:
table.schema.toString;
插入更多資料,並查看結構定義演變。範例如下:
在表格中新增資料:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save("default.example"); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save("default.example");
檢查插入的新資料:
val read_df2=spark.read.format("iceberg").load("default.example"); read_df2.show;
查看資料表記錄:
spark.read.format("iceberg").load("default.example.history").show(truncate = false);
查看快照:
spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
查看資訊清單檔案:
spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
查看資料檔案:
spark.read.format("iceberg").load("default.example.files").show(truncate = false);
假設您新增資料列時輸入了
id=6
值,但發現有誤,想返回查看正確的表格版本:spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
將
snapshot-id
替換為要還原的版本。
使用 Hadoop Tables 建立 Iceberg 資料表
設定 Hadoop 資料表,在 Spark Scala 中建立 Iceberg 資料表:
import org.apache.hadoop.conf.Configuration import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.Table import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._
建立資料表,以便插入及更新資料。範例如下:
在
default
資料庫下方建立名為example
的資料表:val conf = new Configuration(); val tables = new HadoopTables(conf);
插入範例資料:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
根據資料欄
id
指定分割策略:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
建立資料表:
val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>"; val table = tables.create(df1_schema, partition_spec, table_location);
將資料寫入資料表:
df1.write.format("iceberg").mode("overwrite").save(table_location);
讀取資料:
val read_df1=spark.read.format("iceberg").load(table_location); read_df1.show;
變更資料表結構定義。範例如下:
取得表格並新增資料欄
grade
:val table = tables.load(table_location); table.updateSchema.addColumn("grade", StringType.get()).commit();
檢查新資料表結構定義:
table.schema.toString;
插入更多資料,並查看結構定義演變。範例如下:
在表格中新增資料:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save(table_location); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save(table_location);
檢查插入的新資料:
val read_df2=spark.read.format("iceberg").load(table_location); read_df2.show;
查看資料表記錄:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
查看快照:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
查看資訊清單檔案:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
查看資料檔案:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
如要查看特定版本的表格,請按照下列步驟操作:
spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
將
snapshot-id
替換成要返回的版本,並在結尾加上"L"
。例如:"3943776515926014142L"
。
在 Hive 上使用 Iceberg 資料表
Iceberg 支援使用 StorageHandler
讀取 Hive 資料表。請注意,系統僅支援 Hive 2.x 和 3.1.2 版。詳情請參閱「Apache Iceberg - Hive」。此外,請將 Iceberg Hive 執行階段 JAR 檔案新增至 Hive 類別路徑。如要下載 JAR 檔案,請參閱 Apache Iceberg 下載頁面。
如要在 Iceberg 資料表上疊加 Hive 資料表,您必須使用 Hive Catalog 或 Hadoop 資料表建立 Iceberg 資料表。此外,您也必須相應設定 Hive,才能從 Iceberg 資料表讀取資料。
在 Hive 上讀取 Iceberg 資料表 (Hive 目錄)
開啟 Hive 用戶端,並設定在 Hive 用戶端工作階段中讀取 Iceberg 資料表的設定:
add jar /path/to/iceberg-hive-runtime.jar; set iceberg.engine.hive.enabled=true; set engine.hive.enabled=true; set iceberg.mr.catalog=hive; set hive.vectorized.execution.enabled=false;
讀取資料表結構定義和資料。範例如下:
檢查資料表結構定義,以及資料表格式是否為 Iceberg:
describe formatted example;
從表格讀取資料:
select * from example;
在 Hive 上讀取 Iceberg 資料表 (Hadoop 資料表)
開啟 Hive 用戶端,並設定在 Hive 用戶端工作階段中讀取 Iceberg 資料表的設定:
add jar /path/to/iceberg-hive-runtime.jar; set engine.hive.enabled=true; set hive.vectorized.execution.enabled=false;
讀取資料表結構定義和資料。範例如下:
建立外部資料表 (在 Iceberg 資料表上疊加 Hive 資料表):
CREATE EXTERNAL TABLE hadoop_table STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>' TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
檢查資料表結構定義,以及資料表格式是否為 Iceberg:
describe formatted hadoop_table;
從表格讀取資料:
select * from hadoop_table;
在 Presto 上使用 Iceberg 資料表
Presto 查詢會使用 Hive 連接器取得分區位置,因此您必須相應設定 Presto,才能讀取及寫入 Iceberg 資料表中的資料。詳情請參閱「Presto/Trino - Hive 連接器」和「Presto/Trino - Iceberg 連接器」。
Presto 設定
在每個 Dataproc 叢集節點下,建立名為
iceberg.properties
/etc/presto/conf/catalog/iceberg.properties
的檔案,並依下列方式設定hive.metastore.uri
:connector.name=iceberg hive.metastore.uri=thrift://<example.net:9083>
將
example.net:9083
替換為 Hive Metastore Thrift 服務的正確主機和通訊埠。重新啟動 Presto 服務,以推送設定:
sudo systemctl restart presto.service
在 Presto 上建立 Iceberg 資料表
開啟 Presto 用戶端,並使用「Iceberg」連接器取得 Metastore:
--catalog iceberg --schema default
建立資料表,以便插入及更新資料。範例如下:
在
default
資料庫下方建立名為example
的資料表:CREATE TABLE iceberg.default.example ( id integer, name VARCHAR, major VARCHAR, grade VARCHAR) WITH (partitioning = ARRAY['major', 'grade']);
插入範例資料:
INSERT INTO iceberg.default.example VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
從表格讀取資料:
SELECT * FROM iceberg.default.example;
插入更多新資料來查看快照:
INSERT INTO example VALUES (4, 'Cindy', 'UX Design', 'Junior'); INSERT INTO example VALUES (5, 'Amy', 'UX Design', 'Sophomore');
查看快照:
SELECT snapshot_id FROM iceberg.default."example$snapshots";
加入
ORDER BY committed_at DESC LIMIT 1;
指令即可找出最新的快照 ID。復原為特定版本的表格:
CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
將
snapshot-id
替換為要還原的版本。