Iceberg support on Dataproc Metastore

This page explains how to use Apache Iceberg on Dataproc by hosting Hive metastore in Dataproc Metastore. It includes information on how to use Iceberg table via Spark, Hive, and Presto.

Features

Apache Iceberg is an open table format for large analytical datasets. Iceberg greatly improves performance and provides the following advanced features:

  • Atomicity: Table changes either complete or fail. There's no partial commit of table changes.

  • Snapshot isolation: Reads use only one snapshot of a table without holding a lock.

  • Multiple concurrent writers: Uses optimistic concurrency and retries to ensure that compatible updates succeed, even when writes conflict.

  • Schema evolution: Columns are tracked by ID to support add, drop, update, and rename.

  • Time travel: Reproducible queries can use the same table or snapshot; you can easily examine changes.

  • Distributed planning: File pruning and predicate push-down are distributed to jobs, removing the metastore as a bottleneck.

  • Version history and rollback: Correct problems by resetting tables to a previous state.

  • Hidden partitioning: Prevents user mistakes that cause silently incorrect results or extremely slow queries.

  • Partition layout evolution: Can update the layout of a table as data volume or query patterns change.

  • Scan planning and File filtering: Finds the files needed for a query by pruning unneeded metadata files and filtering data files that don't contain matching data.

Compatibilities

Iceberg works well with Dataproc and Dataproc Metastore. It can add tables with a high-performance format to Spark and Presto that work like a SQL table. Iceberg uses a pointer to the latest version of a snapshot and needs a mechanism to ensure atomicity when switching versions. It provides two options, Hive Catalog and Hadoop tables, to track tables.

Supported features include:

Drivers Select Insert Create Table
Spark
Hive
Presto

Prerequisites

To get started, create a Dataproc cluster and use the Dataproc Metastore service as its Hive metastore. For more information, see Creating a Dataproc cluster. After creating the cluster, SSH into the cluster from either a browser or from the command line.

Using Iceberg Table with Spark

Iceberg tables support read and write operations. For more information, see Apache Iceberg - Spark.

Spark Configurations

First, start the Spark shell and use a Cloud Storage bucket to store data. In order to include Iceberg in the Spark installation, add the Iceberg Spark Runtime JAR file to the Spark's JARs folder. To download the JAR file, see Apache Iceberg Downloads. The following command starts the Spark shell with support for Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Using Hive Catalog to Create Iceberg Tables

  1. Set up Hive Catalog configurations to create Iceberg Tables in the spark scala:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Create a table to insert and update data. The following is an example.

    1. Create a table called example under default database:

      val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration);
      val name = TableIdentifier.of("default","example");
      
    2. Insert sample data:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Specify partition strategy based on column id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Create the table:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Add the Iceberg Storage Handler and SerDe as the table property:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Write the data to the table:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Read the data:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Change the table schema. The following is an example.

    1. Get the table and add a new column grade:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Check the new table schema:

      table.schema.toString;
      
  4. Insert more data and view the schema evolution. The following is an example.

    1. Add new data to the table:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Check the inserted new data:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. View the table history:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. View the snapshots:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. View the manifest files:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. View the data files:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Assume you made a mistake by adding the row with the value of id=6 and want to go back to see a correct version of the table:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Replace snapshot-id with the version you want to go back to.

Using Hadoop Tables to Create Iceberg Tables

  1. Set up Hadoop Table configurations to create Iceberg tables in the spark scala:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Create a table to insert and update data. The following is an example.

    1. Create a table called example under default database:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Insert sample data:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Specify partition strategy based on column id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Create the table:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Write the data to the table:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Read the data:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Change the table schema. The following is an example.

    1. Get the table and add a new column grade:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Check the new table schema:

      table.schema.toString;
      
  4. Insert more data and view the schema evolution. The following is an example.

    1. Add new data to the table:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Check the inserted new data:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. View the table history:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. View the snapshots:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. View the manifest files:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. View the data files:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Go back to see a specific version of the table:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Replace snapshot-id with the version you want to go back to and add "L" to the end. For example, "3943776515926014142L".

Using Iceberg Table on Hive

Iceberg supports tables read via Hive by using a StorageHandler. Note that only Hive 2.x and 3.1.2 versions are supported. For more information, see Apache Iceberg - Hive. In addition, add the Iceberg Hive Runtime JAR file to the Hive classpath. To download the JAR file, see Apache Iceberg Downloads.

In order to overlay a Hive table on top of an Iceberg table, you must create the Iceberg table using either a Hive Catalog or a Hadoop Table. In addition, you must configure Hive accordingly to read data from the Iceberg table.

Reading Iceberg Table (Hive Catalog) on Hive

  1. Open the Hive client and set up configurations to read Iceberg Tables on Hive client session:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Read table schema and data. The following is an example.

    1. Check the table schema and whether the table format is Iceberg:

      describe formatted example;
      
    2. Read the data from the table:

      select * from example;
      

Reading Iceberg Table (Hadoop Table) on Hive

  1. Open the Hive client and set up configurations to read Iceberg Tables on Hive client session:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Read table schema and data. The following is an example.

    1. Create an external table (overlay a Hive table on top of the Iceberg table):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Check the table schema and whether the table format is Iceberg:

      describe formatted hadoop_table;
      
    3. Read the data from the table:

      select * from hadoop_table;
      

Using Iceberg Table on Presto

Presto queries use the Hive connector to get partition locations, so you must configure Presto accordingly to read and write data on the Iceberg table. For more information, see Presto/Trino - Hive Connector and Presto/Trino - Iceberg Connector.

Presto Configurations

  1. Under each Dataproc cluster node, create a file named iceberg.properties /etc/presto/conf/catalog/iceberg.properties and configure the hive.metastore.uri as following:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Replace example.net:9083 with the correct host and port for your Hive metastore Thrift service.

  2. Restart the Presto service to push the configurations:

    sudo systemctl restart presto.service
    

Creating Iceberg Table on Presto

  1. Open the Presto client and use the "Iceberg" connector to get the metastore:

    --catalog iceberg --schema default
    
  2. Create a table to insert and update data. The following is an example.

    1. Create a table called example under default database:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Insert sample data:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Read data from the table:

      SELECT * FROM iceberg.default.example;
      
    4. Insert more new data to check snapshots:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. View the snapshots:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      By adding the command ORDER BY committed_at DESC LIMIT 1;, you can find the latest snapshot ID.

    6. Roll back to a specific version of the table:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Replace snapshot-id with the version you want to go back to.

What's next