搭配 Dataproc Metastore 使用 Apache Iceberg 資料表

本頁說明如何搭配使用 Apache Iceberg 表格和 Dataproc Metastore 服務,並將該服務附加至 Dataproc 叢集。Apache Iceberg 是開放式資料表格式,適用於大型分析資料集。

相容性

Iceberg 資料表支援下列功能。

驅動因素 選取 插入 建立資料表
Spark
Hive
Presto

事前準備

搭配使用 Iceberg 資料表與 Spark

以下範例說明如何搭配 Spark 使用 Iceberg 資料表。

Iceberg 資料表支援讀取和寫入作業。詳情請參閱「Apache Iceberg - Spark」。

Spark 設定

首先,啟動 Spark Shell,並使用 Cloud Storage 值區儲存資料。如要在 Spark 安裝中加入 Iceberg,請將 Iceberg Spark 執行階段 JAR 檔案新增至 Spark 的 JAR 資料夾。如要下載 JAR 檔案,請參閱「Apache Iceberg Downloads」。下列指令會啟動 Spark Shell,並支援 Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

使用 Hive Catalog 建立 Iceberg 資料表

  1. 設定 Hive Catalog,在 Spark Scala 中建立 Iceberg 資料表:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. 建立資料表,以便插入及更新資料。範例如下:

    1. default 資料庫下方建立名為 example 的資料表:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. 插入範例資料:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. 根據資料欄 id 指定分割策略:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. 建立資料表:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. 將 Iceberg 儲存空間處理常式和 SerDe 新增為資料表屬性:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. 將資料寫入資料表:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. 讀取資料:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. 變更資料表結構定義。範例如下:

    1. 取得表格並新增資料欄 grade

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 檢查新資料表結構定義:

      table.schema.toString;
      
  4. 插入更多資料,並查看結構定義演變。範例如下:

    1. 在表格中新增資料:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. 檢查插入的新資料:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. 查看資料表記錄:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. 查看快照:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. 查看資訊清單檔案:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. 查看資料檔案:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. 假設您新增資料列時輸入了 id=6 值,但發現有誤,想返回查看正確的表格版本:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      snapshot-id 替換為要還原的版本。

使用 Hadoop Tables 建立 Iceberg 資料表

  1. 設定 Hadoop 資料表,在 Spark Scala 中建立 Iceberg 資料表:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. 建立資料表,以便插入及更新資料。範例如下:

    1. default 資料庫下方建立名為 example 的資料表:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. 插入範例資料:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. 根據資料欄 id 指定分割策略:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. 建立資料表:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. 將資料寫入資料表:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. 讀取資料:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. 變更資料表結構定義。範例如下:

    1. 取得表格並新增資料欄 grade

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 檢查新資料表結構定義:

      table.schema.toString;
      
  4. 插入更多資料,並查看結構定義演變。範例如下:

    1. 在表格中新增資料:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. 檢查插入的新資料:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. 查看資料表記錄:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. 查看快照:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. 查看資訊清單檔案:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. 查看資料檔案:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. 如要查看特定版本的表格,請按照下列步驟操作:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      snapshot-id 替換成要返回的版本,並在結尾加上 "L"。例如:"3943776515926014142L"

在 Hive 上使用 Iceberg 資料表

Iceberg 支援使用 StorageHandler 讀取 Hive 資料表。請注意,系統僅支援 Hive 2.x 和 3.1.2 版。詳情請參閱「Apache Iceberg - Hive」。此外,請將 Iceberg Hive 執行階段 JAR 檔案新增至 Hive 類別路徑。如要下載 JAR 檔案,請參閱 Apache Iceberg 下載頁面

如要在 Iceberg 資料表上疊加 Hive 資料表,您必須使用 Hive Catalog 或 Hadoop 資料表建立 Iceberg 資料表。此外,您也必須相應設定 Hive,才能從 Iceberg 資料表讀取資料。

在 Hive 上讀取 Iceberg 資料表 (Hive 目錄)

  1. 開啟 Hive 用戶端,並設定在 Hive 用戶端工作階段中讀取 Iceberg 資料表的設定:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. 讀取資料表結構定義和資料。範例如下:

    1. 檢查資料表結構定義,以及資料表格式是否為 Iceberg:

      describe formatted example;
      
    2. 從表格讀取資料:

      select * from example;
      

在 Hive 上讀取 Iceberg 資料表 (Hadoop 資料表)

  1. 開啟 Hive 用戶端,並設定在 Hive 用戶端工作階段中讀取 Iceberg 資料表的設定:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. 讀取資料表結構定義和資料。範例如下:

    1. 建立外部資料表 (在 Iceberg 資料表上疊加 Hive 資料表):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. 檢查資料表結構定義,以及資料表格式是否為 Iceberg:

      describe formatted hadoop_table;
      
    3. 從表格讀取資料:

      select * from hadoop_table;
      

在 Presto 上使用 Iceberg 資料表

Presto 查詢會使用 Hive 連接器取得分區位置,因此您必須相應設定 Presto,才能讀取及寫入 Iceberg 資料表中的資料。詳情請參閱「Presto/Trino - Hive 連接器」和「Presto/Trino - Iceberg 連接器」。

Presto 設定

  1. 在每個 Dataproc 叢集節點下,建立名為 iceberg.properties /etc/presto/conf/catalog/iceberg.properties 的檔案,並依下列方式設定 hive.metastore.uri

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    example.net:9083 替換為 Hive Metastore Thrift 服務的正確主機和通訊埠。

  2. 重新啟動 Presto 服務,以推送設定:

    sudo systemctl restart presto.service
    

在 Presto 上建立 Iceberg 資料表

  1. 開啟 Presto 用戶端,並使用「Iceberg」連接器取得 Metastore:

    --catalog iceberg --schema default
    
  2. 建立資料表,以便插入及更新資料。範例如下:

    1. default 資料庫下方建立名為 example 的資料表:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. 插入範例資料:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. 從表格讀取資料:

      SELECT * FROM iceberg.default.example;
      
    4. 插入更多新資料來查看快照:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. 查看快照:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      加入 ORDER BY committed_at DESC LIMIT 1; 指令即可找出最新的快照 ID。

    6. 復原為特定版本的表格:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      snapshot-id 替換為要還原的版本。

後續步驟