Usar tabelas do Apache Iceberg com o Dataproc Metastore

Nesta página, explicamos como usar tabelas do Apache Iceberg com um serviço do metastore do Dataproc anexado a um cluster do Dataproc. O Apache Iceberg é um formato de tabela aberta para grandes conjuntos de dados analíticos.

Compatibilidades

As tabelas Iceberg oferecem suporte aos seguintes recursos:

Impulsionadores Seleção Inserir Criar tabela
Spark
Hive
Presto

Antes de começar

Usar a tabela Iceberg com o Spark

O exemplo a seguir mostra como usar tabelas Iceberg com o Spark.

As tabelas de Iceberg são compatíveis com operações de leitura e gravação. Para mais informações, consulte Apache Iceberg - Spark.

Configurações do Spark

Primeiro, inicie o shell do Spark e use um bucket do Cloud Storage para armazenar dados. Para incluir o Iceberg na instalação do Spark, adicione o arquivo JAR de tempo de execução do Iceberg Spark à pasta JARs do Spark. Para fazer o download do arquivo JAR, consulte Downloads do Apache Iceberg. O comando a seguir inicia o shell do Spark com suporte para Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Usar o Hive Catalog para criar tabelas de Iceberg

  1. Defina as configurações do Hive Catalog para criar tabelas de Iceberg no Spark Spark:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. Crie uma tabela para inserir e atualizar dados. Veja um exemplo.

    1. Crie uma tabela chamada example no banco de dados default:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. Inserir dados de amostra:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Especifique a estratégia de particionamento com base na coluna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crie a tabela:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Adicione o gerenciador de armazenamento Iceberg e o SerDe como a propriedade da tabela:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Grave os dados na tabela:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Leia os dados:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Altere o esquema da tabela. Veja um exemplo.

    1. Receba a tabela e adicione uma nova coluna grade:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Verifique o novo esquema da tabela:

      table.schema.toString;
      
  4. Insira mais dados e veja a evolução do esquema. Veja um exemplo.

    1. Adicione novos dados à tabela:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Verifique os novos dados inseridos:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Veja o histórico da tabela:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Veja os snapshots:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Veja os arquivos de manifesto:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Veja os arquivos de dados:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Suponha que você cometeu um erro ao adicionar a linha com o valor de id=6 e queira voltar para uma versão correta da tabela:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Substitua snapshot-id pela versão que você quer consultar.

Usar tabelas do Hadoop para criar tabelas de Iceberg

  1. Defina as configurações da tabela do Hadoop para criar tabelas de Iceberg no Spark Spark:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Crie uma tabela para inserir e atualizar dados. Veja um exemplo.

    1. Crie uma tabela chamada example no banco de dados default:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Inserir dados de amostra:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Especifique a estratégia de particionamento com base na coluna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crie a tabela:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Grave os dados na tabela:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Leia os dados:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Altere o esquema da tabela. Veja um exemplo.

    1. Receba a tabela e adicione uma nova coluna grade:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Verifique o novo esquema da tabela:

      table.schema.toString;
      
  4. Insira mais dados e veja a evolução do esquema. Veja um exemplo.

    1. Adicione novos dados à tabela:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Verifique os novos dados inseridos:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Veja o histórico da tabela:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Veja os snapshots:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Veja os arquivos de manifesto:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Veja os arquivos de dados:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Volte para ver uma versão específica da tabela:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Substitua snapshot-id pela versão que você quer voltar e adicione "L" ao final. Por exemplo, "3943776515926014142L".

Usar uma tabela do Iceberg no Hive

O Iceberg é compatível com tabelas lidas pelo Hive usando um StorageHandler. Observe que somente as versões Hive 2.x e 3.1.2 são compatíveis. Para mais informações, consulte Apache Iceberg - Hive. Além disso, inclua o arquivo JAR do ambiente de execução Hive do Iceberg no caminho de classe do Hive. Para fazer o download do arquivo JAR, consulte Downloads do Apache Iceberg.

Para sobrepor uma tabela do Hive sobre uma tabela de Iceberg, é necessário criar uma tabela do Iceberg com um catálogo do Hive ou uma tabela do Hadoop. Além disso, configure o Hive de acordo com isso para ler dados da tabela do Iceberg.

Ler tabela do Iceberg (catálogo do Hive) no Hive

  1. Abra o cliente Hive e defina configurações para ler as tabelas de Iceberg na sessão do cliente Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Ler esquema e dados de tabela. Veja um exemplo.

    1. Verifique o esquema da tabela e se o formato da tabela é Iceberg:

      describe formatted example;
      
    2. Leia os dados da tabela:

      select * from example;
      

Ler tabela do Iceberg (tabela do Hadoop) no Hive

  1. Abra o cliente Hive e defina configurações para ler as tabelas de Iceberg na sessão do cliente Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Ler esquema e dados de tabela. Veja um exemplo.

    1. Crie uma tabela externa (sobreponha uma tabela do Hive na tabela do Iceberg):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Verifique o esquema da tabela e se o formato da tabela é Iceberg:

      describe formatted hadoop_table;
      
    3. Leia os dados da tabela:

      select * from hadoop_table;
      

Usar uma tabela de gelo no Presto

As consultas do Presto usam o conector Hive para receber locais de partição. Portanto, você precisa configurar o Presto corretamente para ler e gravar dados na tabela do Iceberg. Para mais informações, consulte Presto/Trino - Hive Connector e Presto/Trino - Iceberg Connector.

Configurações do Presto

  1. Em cada nó de cluster do Dataproc, crie um arquivo chamado iceberg.properties /etc/presto/conf/catalog/iceberg.properties e configure o hive.metastore.uri da seguinte maneira:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Substitua example.net:9083 pelo host e pela porta corretos do serviço Hive Thrift.

  2. Reinicie o serviço Presto para enviar as configurações:

    sudo systemctl restart presto.service
    

Criar tabela de Iceberg em Presto

  1. Abra o cliente Presto e use o conector "Iceberg" para receber o metastore:

    --catalog iceberg --schema default
    
  2. Crie uma tabela para inserir e atualizar dados. Veja um exemplo.

    1. Crie uma tabela chamada example no banco de dados default:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Inserir dados de amostra:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Leia os dados da tabela:

      SELECT * FROM iceberg.default.example;
      
    4. Insira mais dados para verificar os instantâneos:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Veja os snapshots:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Ao adicionar o comando ORDER BY committed_at DESC LIMIT 1;, é possível encontrar o ID do snapshot mais recente.

    6. Reverter para uma versão específica da tabela:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Substitua snapshot-id pela versão que você quer consultar.

A seguir