将 Apache Iceberg 表与 Dataproc Metastore 搭配使用

本页面介绍了如何将 Apache Iceberg 表与附加到 Dataproc 集群的 Dataproc Metastore 服务结合使用。Apache Iceberg 是一种适用于大型分析数据集的开放表格式。

兼容性

Iceberg 表支持以下功能。

推动因素 选择 插入 创建表
Spark
Hive
Presto

准备工作

将 Iceberg 表与 Spark 搭配使用

以下示例显示应在 Spark 中使用 Iceberg 表。

Iceberg 表支持读取和写入操作。如需了解详情,请参阅 Apache Iceberg - Spark

Spark 配置

首先,启动 Spark shell 并使用 Cloud Storage 存储桶来存储数据。 如需在 Spark 安装中添加 Iceberg,请将 Iceberg Spark 运行时 JAR 文件添加到 Spark 的 JAR 文件夹中。如需下载 JAR 文件,请参阅 Apache Iceberg 下载。以下命令将启动支持 Apache Iceberg 的 Spark shell:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

使用 Hive Catalog 创建 Iceberg 表

  1. 设置 Hive Catalog 配置以在 spark scala 中创建 Iceberg 表:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. 创建表以插入和更新数据。下面给出了一个示例。

    1. default 数据库下创建名为 example 的表:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. 插入示例数据:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. 根据列 id 指定分区策略:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. 创建表:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. 将 Iceberg Storage 处理程序和 SerDe 添加为表属性:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. 将数据写入表中:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. 读取数据:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. 更改表架构。下面给出了一个示例。

    1. 获取表并添加新列 grade

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 检查新表架构:

      table.schema.toString;
      
  4. 插入更多数据并查看架构演变。下面给出了一个示例。

    1. 向表中添加新数据:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. 检查插入的新数据:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. 查看表历史记录:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. 查看快照:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. 查看清单文件:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. 查看数据文件:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. 假设您添加值为 id=6 的行并想返回以查看表的正确版本,但遇到错误:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      snapshot-id 替换为您要返回到的版本。

使用 Hadoop 表创建 Iceberg 表

  1. 设置 Hadoop 表配置以在 Spark Scala 中创建 Iceberg 表:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. 创建表以插入和更新数据。下面给出了一个示例。

    1. default 数据库下创建名为 example 的表:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. 插入示例数据:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. 根据列 id 指定分区策略:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. 创建表:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. 将数据写入表中:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. 读取数据:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. 更改表架构。下面给出了一个示例。

    1. 获取表并添加新列 grade

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 检查新表架构:

      table.schema.toString;
      
  4. 插入更多数据并查看架构演变。下面给出了一个示例。

    1. 向表中添加新数据:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. 检查插入的新数据:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. 查看表历史记录:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. 查看快照:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. 查看清单文件:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. 查看数据文件:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. 返回以查看该表的特定版本:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      snapshot-id 替换为您要返回到的版本,并将 "L" 添加到末尾。例如 "3943776515926014142L"

在 Hive 上使用 Iceberg 表

Iceberg 支持通过 StorageHandler 使用 Hive 读取的表。请注意,仅支持 Hive 2.x 和 3.1.2 版本。如需了解详情,请参阅 Apache Iceberg - Hive。此外,将 Iceberg Hive 运行时 JAR 文件添加到 Hive 类路径。如需下载 JAR 文件,请参阅 Apache Iceberg 下载

如需在 Iceberg 表之上叠加 Hive 表,您必须通过 Hive Catalog 或 Hadoop 表创建 Iceberg 表。此外,您还必须相应地配置 Hive 以便读取 Iceberg 表中的数据。

在 Hive 上读取 Iceberg 表 (Hive Catalog)

  1. 打开 Hive 客户端并设置相关配置,以在 Hive 客户端会话上读取 Iceberg 表:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. 读取表架构和数据。下面给出了一个示例。

    1. 检查表架构以及表格式是否为 Iceberg:

      describe formatted example;
      
    2. 从表中读取数据:

      select * from example;
      

在 Hive 上读取 Iceberg 表(Hadoop 表)

  1. 打开 Hive 客户端并设置相关配置,以在 Hive 客户端会话上读取 Iceberg 表:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. 读取表架构和数据。下面给出了一个示例。

    1. 创建外部表(在 Iceberg 表上叠加 Hive 表):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. 检查表架构以及表格式是否为 Iceberg:

      describe formatted hadoop_table;
      
    3. 从表中读取数据:

      select * from hadoop_table;
      

在 Presto 上使用 Iceberg 表

Presto 查询使用 Hive 连接器获取分区位置,因此您必须相应地配置 Presto 以在 Iceberg 表上读取和写入数据。如需了解详情,请参阅 Presto/Trino - Hive 连接器Presto/Trino - Iceberg 连接器

Presto 配置

  1. 在每个 Dataproc 集群节点下,创建一个名为 iceberg.properties /etc/presto/conf/catalog/iceberg.properties 的文件,并按如下方式配置 hive.metastore.uri

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    example.net:9083 替换为 Hive Metastore Thrift 服务的正确主机和端口。

  2. 重启 Presto 服务以推送配置:

    sudo systemctl restart presto.service
    

在 Presto 上创建 Iceberg 表

  1. 打开 Presto 客户端并使用“Iceberg”连接器获取 Metastore:

    --catalog iceberg --schema default
    
  2. 创建表以插入和更新数据。下面给出了一个示例。

    1. default 数据库下创建名为 example 的表:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. 插入示例数据:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. 从表中读取数据:

      SELECT * FROM iceberg.default.example;
      
    4. 插入更多新数据以检查快照:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. 查看快照:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      通过添加 ORDER BY committed_at DESC LIMIT 1; 命令,您可以找到最新的快照 ID。

    6. 回滚到表的特定版本:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      snapshot-id 替换为您要返回到的版本。

后续步骤