Suporte do Iceberg no Metastore do Dataproc

Nesta página, explicamos como usar o Apache Iceberg no Dataproc por meio da hospedagem do metastore Hive no Metastore do Dataproc. Ele contém informações sobre como usar a tabela do Iceberg pelo Spark, Hive e Presto.

Recursos

Apache Iceberg é um formato de tabela aberta para grandes conjuntos de dados analíticos. O Iceberg melhora muito o desempenho e fornece os seguintes recursos avançados:

  • Atomicidade: alterações de tabela concluídas ou com falha. Não há confirmação parcial de alterações na tabela.

  • Isolamento de snapshot: as leituras usam apenas um snapshot de uma tabela sem manter um bloqueio.

  • Vários gravadores simultâneos: usa simultaneidade otimista e tenta novamente para garantir que as atualizações compatíveis sejam bem-sucedidas, mesmo quando há conflito nas gravações.

  • Evolução do esquema: as colunas são rastreadas por código para compatibilidade com adição, ação de soltar, atualização e renomeação.

  • Viagem no tempo: consultas reproduzíveis podem usar a mesma tabela ou snapshot você pode examinar facilmente as alterações.

  • Planejamento distribuído: a remoção de arquivo e push-down de predicado são distribuídos para jobs, removendo o metastore como um gargalo.

  • Histórico de versões e reversão: corrija problemas redefinindo tabelas para um estado anterior.

  • Particionamento oculto: evita erros do usuário que causam resultados silenciosamente incorretos ou consultas extremamente lentas.

  • Evolução do layout da partição: pode atualizar o layout de uma tabela conforme os padrões de volume de dados ou consulta mudam.

  • Planejamento de verificação e filtragem de arquivos: localiza os arquivos necessários para uma consulta removendo arquivos de metadados desnecessários e filtrando arquivos de dados que não contêm dados correspondentes.

Compatibilidades

O Iceberg funciona bem com o Dataproc e o metastore do Dataproc. Podem ser adicionadas tabelas com formato de alto desempenho ao Spark e ao Presto, que funcionam como tabelas SQL. O Iceberg usa um ponteiro para a versão mais recente de um snapshot e precisa de um mecanismo para garantir atomicidade ao alternar versões. Ele fornece duas opções, como o Hive Catalog e o Hadoop, para rastrear tabelas.

Os recursos compatíveis incluem:

Impulsionadores Seleção Inserir Criar tabela
Spark
Hive
Presto

Antes de começar

Para começar, crie um cluster do Dataproc e use o serviço metastore do Dataproc como metastore Hive. Para mais informações, consulte Criar um cluster do Dataproc. Depois de criar o cluster, conecte-se via SSH ao cluster a partir de um navegador ou da linha de comando.

Tabela Iceberg com o Spark

As tabelas de Iceberg são compatíveis com operações de leitura e gravação. Para mais informações, consulte Apache Iceberg - Spark.

Configurações do Spark

Primeiro, inicie o shell do Spark e use um bucket do Cloud Storage para armazenar dados. Para incluir o Iceberg na instalação do Spark, adicione o arquivo JAR de tempo de execução do Iceberg Spark à pasta JARs do Spark. Para fazer o download do arquivo JAR, consulte Downloads do Apache Iceberg. O comando a seguir inicia o shell do Spark com suporte para Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Usar o Hive Catalog para criar tabelas de Iceberg

  1. Defina as configurações do Hive Catalog para criar tabelas de Iceberg no Spark Spark:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Crie uma tabela para inserir e atualizar dados. Veja um exemplo.

    1. Crie uma tabela chamada example no banco de dados default:

      val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration);
      val name = TableIdentifier.of("default","example");
      
    2. Inserir dados de amostra:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Especifique a estratégia de particionamento com base na coluna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crie a tabela:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Adicione o gerenciador de armazenamento Iceberg e o SerDe como a propriedade da tabela:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Grave os dados na tabela:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Leia os dados:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Altere o esquema da tabela. Veja um exemplo.

    1. Receba a tabela e adicione uma nova coluna grade:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Verifique o novo esquema da tabela:

      table.schema.toString;
      
  4. Insira mais dados e veja a evolução do esquema. Veja um exemplo.

    1. Adicione novos dados à tabela:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Verifique os novos dados inseridos:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Veja o histórico da tabela:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Veja os snapshots:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Veja os arquivos de manifesto:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Veja os arquivos de dados:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Suponha que você cometeu um erro ao adicionar a linha com o valor de id=6 e queira voltar para uma versão correta da tabela:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Substitua snapshot-id pela versão que você quer consultar.

Usar tabelas do Hadoop para criar tabelas de Iceberg

  1. Defina as configurações da tabela do Hadoop para criar tabelas de Iceberg no Spark Spark:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Crie uma tabela para inserir e atualizar dados. Veja um exemplo.

    1. Crie uma tabela chamada example no banco de dados default:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Inserir dados de amostra:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Especifique a estratégia de particionamento com base na coluna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crie a tabela:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Grave os dados na tabela:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Leia os dados:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Altere o esquema da tabela. Veja um exemplo.

    1. Receba a tabela e adicione uma nova coluna grade:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Verifique o novo esquema da tabela:

      table.schema.toString;
      
  4. Insira mais dados e veja a evolução do esquema. Veja um exemplo.

    1. Adicione novos dados à tabela:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Verifique os novos dados inseridos:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Veja o histórico da tabela:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Veja os snapshots:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Veja os arquivos de manifesto:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Veja os arquivos de dados:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Volte para ver uma versão específica da tabela:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Substitua snapshot-id pela versão que você quer voltar e adicione "L" ao final. Por exemplo, "3943776515926014142L".

Usar uma tabela do Iceberg no Hive

O Iceberg é compatível com tabelas lidas pelo Hive usando um StorageHandler. Observe que somente as versões Hive 2.x e 3.1.2 são compatíveis. Para mais informações, consulte Apache Iceberg - Hive. Além disso, inclua o arquivo JAR do ambiente de execução Hive do Iceberg no caminho de classe do Hive. Para fazer o download do arquivo JAR, consulte Downloads do Apache Iceberg.

Para sobrepor uma tabela do Hive sobre uma tabela de Iceberg, é necessário criar uma tabela do Iceberg com um catálogo do Hive ou uma tabela do Hadoop. Além disso, configure o Hive de acordo com isso para ler dados da tabela do Iceberg.

Ler tabela do Iceberg (catálogo do Hive) no Hive

  1. Abra o cliente Hive e defina configurações para ler as tabelas de Iceberg na sessão do cliente Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Ler esquema e dados de tabela. Veja um exemplo.

    1. Verifique o esquema da tabela e se o formato da tabela é Iceberg:

      describe formatted example;
      
    2. Leia os dados da tabela:

      select * from example;
      

Ler tabela do Iceberg (tabela do Hadoop) no Hive

  1. Abra o cliente Hive e defina configurações para ler as tabelas de Iceberg na sessão do cliente Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Ler esquema e dados de tabela. Veja um exemplo.

    1. Crie uma tabela externa (sobreponha uma tabela do Hive na tabela do Iceberg):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Verifique o esquema da tabela e se o formato da tabela é Iceberg:

      describe formatted hadoop_table;
      
    3. Leia os dados da tabela:

      select * from hadoop_table;
      

Usar uma tabela de gelo no Presto

As consultas do Presto usam o conector Hive para receber locais de partição. Portanto, você precisa configurar o Presto corretamente para ler e gravar dados na tabela do Iceberg. Para mais informações, consulte Presto/Trino - Hive Connector e Presto/Trino - Iceberg Connector.

Configurações do Presto

  1. Em cada nó de cluster do Dataproc, crie um arquivo chamado iceberg.properties /etc/presto/conf/catalog/iceberg.properties e configure o hive.metastore.uri da seguinte maneira:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Substitua example.net:9083 pelo host e pela porta corretos do serviço Hive Thrift.

  2. Reinicie o serviço Presto para enviar as configurações:

    sudo systemctl restart presto.service
    

Criar tabela de Iceberg em Presto

  1. Abra o cliente Presto e use o conector "Iceberg" para receber o metastore:

    --catalog iceberg --schema default
    
  2. Crie uma tabela para inserir e atualizar dados. Veja um exemplo.

    1. Crie uma tabela chamada example no banco de dados default:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Inserir dados de amostra:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Leia os dados da tabela:

      SELECT * FROM iceberg.default.example;
      
    4. Insira mais dados para verificar os instantâneos:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Veja os snapshots:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Ao adicionar o comando ORDER BY committed_at DESC LIMIT 1;, é possível encontrar o ID do snapshot mais recente.

    6. Reverter para uma versão específica da tabela:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Substitua snapshot-id pela versão que você quer consultar.

A seguir