Use tabelas do Apache Iceberg com o Dataproc Metastore

Esta página explica como usar tabelas do Apache Iceberg com um serviço Dataproc Metastore anexado a um cluster do Dataproc. O Apache Iceberg é um formato de tabela aberta para grandes conjuntos de dados analíticos.

Compatibilidades

As tabelas Iceberg suportam as seguintes funcionalidades.

Impulsionadores Selecionar Inserir Criar tabela
Spark
Hive
Presto

Antes de começar

Use a tabela Iceberg com o Spark

O exemplo seguinte mostra como usar tabelas Iceberg com o Spark.

As tabelas Iceberg suportam operações de leitura e escrita. Para mais informações, consulte o artigo Apache Iceberg - Spark.

Configurações do Spark

Primeiro, inicie o shell do Spark e use um contentor do Cloud Storage para armazenar dados. Para incluir o Iceberg na instalação do Spark, adicione o ficheiro JAR do Iceberg Spark Runtime à pasta JARs do Spark. Para transferir o ficheiro JAR, consulte a página Transferências do Apache Iceberg. O comando seguinte inicia o shell do Spark com suporte para o Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Use o catálogo do Hive para criar tabelas Iceberg

  1. Configure as configurações do catálogo do Hive para criar tabelas Iceberg no spark scala:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. Crie uma tabela para inserir e atualizar dados. Segue-se um exemplo.

    1. Cria uma tabela denominada example na base de dados default:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. Inserir dados de amostra:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Especifique a estratégia de partição com base na coluna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crie a tabela:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Adicione o controlador de armazenamento e o SerDe do Iceberg como propriedade da tabela:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Escreva os dados na tabela:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Leia os dados:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Altere o esquema da tabela. Segue-se um exemplo.

    1. Obtenha a tabela e adicione uma nova coluna grade:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Verifique o novo esquema da tabela:

      table.schema.toString;
      
  4. Inserir mais dados e ver a evolução do esquema. Segue-se um exemplo.

    1. Adicione novos dados à tabela:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Verifique os novos dados inseridos:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Veja o histórico da tabela:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Veja os resumos:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Veja os ficheiros de manifesto:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Veja os ficheiros de dados:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Suponha que cometeu um erro ao adicionar a linha com o valor id=6 e quer voltar atrás para ver uma versão correta da tabela:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Substitua snapshot-id pela versão à qual quer reverter.

Use tabelas Hadoop para criar tabelas Iceberg

  1. Configure as configurações da tabela Hadoop para criar tabelas Iceberg no spark scala:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Crie uma tabela para inserir e atualizar dados. Segue-se um exemplo.

    1. Cria uma tabela denominada example na base de dados default:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Inserir dados de amostra:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Especifique a estratégia de partição com base na coluna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crie a tabela:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Escreva os dados na tabela:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Leia os dados:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Altere o esquema da tabela. Segue-se um exemplo.

    1. Obtenha a tabela e adicione uma nova coluna grade:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Verifique o novo esquema da tabela:

      table.schema.toString;
      
  4. Inserir mais dados e ver a evolução do esquema. Segue-se um exemplo.

    1. Adicione novos dados à tabela:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Verifique os novos dados inseridos:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Veja o histórico da tabela:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Veja os resumos:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Veja os ficheiros de manifesto:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Veja os ficheiros de dados:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Volte para ver uma versão específica da tabela:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Substitua snapshot-id pela versão para a qual quer reverter e adicione "L" no final. Por exemplo, "3943776515926014142L".

Use a tabela Iceberg no Hive

O Iceberg suporta tabelas lidas através do Hive com um StorageHandler. Tenha em atenção que apenas são suportadas as versões 2.x e 3.1.2 do Hive. Para mais informações, consulte o artigo Apache Iceberg - Hive. Além disso, adicione o ficheiro JAR do tempo de execução do Iceberg Hive ao caminho de classe do Hive. Para transferir o ficheiro JAR, consulte Transferências do Apache Iceberg.

Para sobrepor uma tabela do Hive a uma tabela do Iceberg, tem de criar a tabela do Iceberg através de um catálogo do Hive ou de uma tabela do Hadoop. Além disso, tem de configurar o Hive em conformidade para ler dados da tabela Iceberg.

Ler tabela Iceberg (catálogo Hive) no Hive

  1. Abra o cliente do Hive e configure as configurações para ler tabelas do Iceberg na sessão do cliente do Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Ler o esquema e os dados da tabela. Segue-se um exemplo.

    1. Verifique o esquema da tabela e se o formato da tabela é Iceberg:

      describe formatted example;
      
    2. Ler os dados da tabela:

      select * from example;
      

Ler tabela Iceberg (tabela Hadoop) no Hive

  1. Abra o cliente do Hive e configure as configurações para ler tabelas do Iceberg na sessão do cliente do Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Ler o esquema e os dados da tabela. Segue-se um exemplo.

    1. Crie uma tabela externa (sobreponha uma tabela do Hive a uma tabela do Iceberg):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Verifique o esquema da tabela e se o formato da tabela é Iceberg:

      describe formatted hadoop_table;
      
    3. Ler os dados da tabela:

      select * from hadoop_table;
      

Use a tabela Iceberg no Presto

As consultas Presto usam o conetor Hive para obter localizações de partições, pelo que tem de configurar o Presto em conformidade para ler e escrever dados na tabela Iceberg. Para mais informações, consulte os artigos Presto/Trino – Conetor do Hive e Presto/Trino – Conetor do Iceberg.

Configurações do Presto

  1. Em cada nó do cluster do Dataproc, crie um ficheiro com o nome iceberg.properties /etc/presto/conf/catalog/iceberg.properties e configure o hive.metastore.uri da seguinte forma:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Substitua example.net:9083 pelo anfitrião e pela porta corretos do seu serviço Thrift do Hive Metastore.

  2. Reinicie o serviço Presto para enviar as configurações:

    sudo systemctl restart presto.service
    

Crie uma tabela Iceberg no Presto

  1. Abra o cliente Presto e use o conetor "Iceberg" para obter o metastore:

    --catalog iceberg --schema default
    
  2. Crie uma tabela para inserir e atualizar dados. Segue-se um exemplo.

    1. Cria uma tabela denominada example na base de dados default:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Inserir dados de amostra:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Ler dados da tabela:

      SELECT * FROM iceberg.default.example;
      
    4. Insira mais dados novos para verificar os instantâneos:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Veja os resumos:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Adicionando o comando ORDER BY committed_at DESC LIMIT 1;, pode encontrar o ID da imagem instantânea mais recente.

    6. Reverter para uma versão específica da tabela:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Substitua snapshot-id pela versão à qual quer reverter.

O que se segue?