Suporte ao Iceberg no metastore do Dataproc

Nesta página, explicamos como usar o Apache Iceberg no Dataproc hospedando o metastore do Hive no Metastore do Dataproc. Ele inclui informações sobre como usar a tabela de Iceberg pelo Spark, Hive e Presto.

Recursos

O Apache Iceberg é um formato de tabela aberta para grandes conjuntos de dados analíticos. O Iceberg melhora o desempenho e fornece os seguintes recursos avançados:

  • Atomicity: as alterações na tabela podem ser concluídas ou falhar. Não há confirmação parcial de mudanças na tabela.

  • Isolamento de snapshot: as leituras usam apenas um snapshot de uma tabela sem manter um bloqueio.

  • Vários gravadores simultâneos: usa simultaneidade otimista e tenta novamente para garantir que as atualizações compatíveis sejam bem-sucedidas, mesmo quando há conflitos entre as gravações.

  • Evolução do esquema: as colunas são rastreadas pelo código para aceitar a adição, a queda, a atualização e a renomeação.

  • Viagem no tempo: consultas reproduzíveis podem usar a mesma tabela ou snapshot. é possível examinar facilmente as mudanças.

  • Planejamento distribuído: a remoção de arquivo e o push-down de predicado são distribuídos para jobs, removendo o metastore como um gargalo.

  • Histórico de versões e reversão: corrija os problemas redefinindo as tabelas para um estado anterior.

  • Particionamento oculto: evita erros do usuário que causam resultados silenciosamente incorretos ou consultas extremamente lentas.

  • Evolução do layout da partição: pode atualizar o layout de uma tabela conforme o volume de dados ou os padrões de consulta mudam.

  • Planejamento de verificação e filtragem de arquivos: localiza os arquivos necessários para uma consulta removendo arquivos de metadados desnecessários e filtrando arquivos de dados que não contêm dados correspondentes.

Compatibilidades

O Iceberg funciona bem com o Dataproc e o metastore do Dataproc. Ele pode adicionar tabelas com um formato de alto desempenho ao Spark e ao Presto que funcionem como uma tabela SQL. O Iceberg usa um ponteiro para a versão mais recente de um snapshot e precisa de um mecanismo para garantir a atomicidade ao alternar entre versões. Ele fornece duas opções para rastrear tabelas do Hive Catalog e do Hadoop.

Os recursos compatíveis incluem:

Impulsionadores Seleção Inserir Criar tabela
Spark ✓ ✓ ✓
Hive ✓ ✓
Presto ✓ ✓ ✓

Pré-requisitos

Para começar, crie um cluster do Dataproc e use o serviço metastore do Dataproc como o metastore do Hive. Para mais informações, consulte Como criar um cluster do Dataproc. Depois de criar o cluster, use o SSH no navegador ou a partir da linha de comando.

Como usar a tabela de Iceberg com o Spark

As tabelas de Iceberg são compatíveis com operações de leitura e gravação. Para mais informações, consulte Apache Iceberg - Spark.

Configurações do Spark

Primeiro, inicie o shell do Spark e use um bucket do Cloud Storage para armazenar dados. Para incluir o Iceberg na instalação do Spark, adicione o arquivo JAR de tempo de execução do Iceberg Spark à pasta JARs do Spark. Para fazer o download do arquivo JAR, consulte Downloads do Apache Iceberg (em inglês). O comando a seguir inicia o shell do Spark com suporte para o Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Como usar o catálogo do Hive para criar tabelas de Iceberg

  1. Defina as configurações do Hive Catalog para criar tabelas de Iceberg no escalar do Spark:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Crie uma tabela para inserir e atualizar dados. Veja um exemplo.

    1. Crie uma tabela chamada example no banco de dados default:

      val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration);
      val name = TableIdentifier.of("default","example");
      
    2. Inserir dados de amostra:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Especifique a estratégia de partição com base na coluna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crie a tabela:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Adicione o gerenciador de armazenamento Iceberg e SerDe como a propriedade da tabela:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Grave os dados na tabela:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Leia os dados:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Altere o esquema da tabela. Veja um exemplo.

    1. Consiga a tabela e adicione uma nova coluna grade:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Verifique o novo esquema de tabela:

      table.schema.toString;
      
  4. Insira mais dados e veja a evolução do esquema. Veja um exemplo.

    1. Adicione novos dados à tabela:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Verifique os novos dados inseridos:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Veja o histórico da tabela:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Veja os snapshots:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Veja os arquivos de manifesto:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Veja os arquivos de dados:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Imagine que você cometeu um erro ao adicionar a linha com o valor de id=6 e quer ver uma versão correta da tabela:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Substitua snapshot-id pela versão que você quer recuperar.

Como usar tabelas do Hadoop para criar tabelas de Iceberg

  1. Defina as configurações da tabela do Hadoop para criar tabelas de Iceberg no spark scala:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Crie uma tabela para inserir e atualizar dados. Veja um exemplo.

    1. Crie uma tabela chamada example no banco de dados default:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Inserir dados de amostra:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Especifique a estratégia de partição com base na coluna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crie a tabela:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Grave os dados na tabela:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Leia os dados:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Altere o esquema da tabela. Veja um exemplo.

    1. Consiga a tabela e adicione uma nova coluna grade:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Verifique o novo esquema de tabela:

      table.schema.toString;
      
  4. Insira mais dados e veja a evolução do esquema. Veja um exemplo.

    1. Adicione novos dados à tabela:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Verifique os novos dados inseridos:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Veja o histórico da tabela:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Veja os snapshots:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Veja os arquivos de manifesto:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Veja os arquivos de dados:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Volte para ver uma versão específica da tabela:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Substitua snapshot-id pela versão que você quer acessar e adicione "L" ao final. Por exemplo, "3943776515926014142L".

Como usar a tabela de Iceberg no Hive

O Iceberg é compatível com tabelas lidas pelo Hive usando um StorageHandler. Somente as versões 2.x e 3.1.2 do Hive são compatíveis. Para mais informações, consulte Apache Iceberg - Hive. Além disso, adicione o arquivo JAR do ambiente de execução do Hive do Iceberg ao caminho de classe do Hive. Para fazer o download do arquivo JAR, consulte Downloads do Apache Iceberg (em inglês).

Para sobrepor uma tabela Hive sobre uma tabela de Iceberg, é preciso criá-la usando um Catálogo do Hive ou uma Tabela do Hadoop. Além disso, configure o Hive de maneira adequada para ler dados da tabela do Iceberg.

Como ler a tabela de Iceberg (catálogo do Hive) no Hive

  1. Abra o cliente Hive e defina as configurações para ler as tabelas do Iceberg na sessão do cliente Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Ler esquema e dados da tabela. Veja um exemplo.

    1. Verifique o esquema da tabela e se o formato da tabela é Iceberg:

      describe formatted example;
      
    2. Leia os dados da tabela:

      select * from example;
      

Como ler a tabela de Iceberg (tabela do Hadoop) no Hive

  1. Abra o cliente Hive e defina as configurações para ler as tabelas do Iceberg na sessão do cliente Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Ler esquema e dados da tabela. Veja um exemplo.

    1. Crie uma tabela externa (sobreponha uma tabela Hive sobre a tabela do Iceberg):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Verifique o esquema da tabela e se o formato da tabela é Iceberg:

      describe formatted hadoop_table;
      
    3. Leia os dados da tabela:

      select * from hadoop_table;
      

Como usar a tabela de Iceberg no Presto

As consultas do Presto usam o conector Hive para conseguir os locais de partição. Portanto, você precisa configurar o Presto de acordo para ler e gravar dados na tabela do Iceberg. Para mais informações, consulte Presto/Trino - Conector do Hive e Conector Presto/Trino - Iceberg (links em inglês).

Configurações do Presto

  1. Em cada nó de cluster do Dataproc, crie um arquivo chamado iceberg.properties /etc/presto/conf/catalog/iceberg.properties e configure o hive.metastore.uri da seguinte maneira:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Substitua example.net:9083 pelo host e pela porta corretos para o serviço metastore do Thrift do Hive.

  2. Reinicie o serviço Presto para enviar as configurações:

    sudo systemctl restart presto.service
    

Como criar uma mesa de Iceberg no Presto

  1. Abra o cliente Presto e use o conector "Iceberg" para conseguir o metastore:

    --catalog iceberg --schema default
    
  2. Crie uma tabela para inserir e atualizar dados. Veja um exemplo.

    1. Crie uma tabela chamada example no banco de dados default:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Inserir dados de amostra:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Leia os dados da tabela:

      SELECT * FROM iceberg.default.example;
      
    4. Insira mais dados novos para verificar os snapshots:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Veja os snapshots:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Adicione o comando ORDER BY committed_at DESC LIMIT 1; para encontrar o ID do snapshot mais recente.

    6. Reverta para uma versão específica da tabela:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Substitua snapshot-id pela versão que você quer recuperar.

A seguir