Nesta página, explicamos como usar tabelas do Apache Iceberg com um serviço do metastore do Dataproc anexado a um cluster do Dataproc. O Apache Iceberg é um formato de tabela aberta para grandes conjuntos de dados analíticos.
Compatibilidades
As tabelas Iceberg oferecem suporte aos seguintes recursos:
Impulsionadores | Seleção | Inserir | Criar tabela |
---|---|---|---|
Spark | ✓ | ✓ | ✓ |
Hive | ✓ | ✓ | |
Presto | ✓ | ✓ | ✓ |
Antes de começar
Usar a tabela Iceberg com o Spark
O exemplo a seguir mostra como usar tabelas Iceberg com o Spark.
As tabelas de Iceberg são compatíveis com operações de leitura e gravação. Para mais informações, consulte Apache Iceberg - Spark.
Configurações do Spark
Primeiro, inicie o shell do Spark e use um bucket do Cloud Storage para armazenar dados. Para incluir o Iceberg na instalação do Spark, adicione o arquivo JAR de tempo de execução do Iceberg Spark à pasta JARs do Spark. Para fazer o download do arquivo JAR, consulte Downloads do Apache Iceberg. O comando a seguir inicia o shell do Spark com suporte para Apache Iceberg:
$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar
Usar o Hive Catalog para criar tabelas de Iceberg
Defina as configurações do Hive Catalog para criar tabelas de Iceberg no Spark Spark:
import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.catalog._ import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._ import java.util.HashMap
Crie uma tabela para inserir e atualizar dados. Veja um exemplo.
Crie uma tabela chamada
example
no banco de dadosdefault
:val catalog = new HiveCatalog(); catalog.setConf(spark.sparkContext.hadoopConfiguration); catalog.initialize("hive", new HashMap[String,String]()); val name = TableIdentifier.of("default","example");
Inserir dados de amostra:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Especifique a estratégia de particionamento com base na coluna
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Crie a tabela:
val table=catalog.createTable(name,df1_schema,partition_spec);
Adicione o gerenciador de armazenamento Iceberg e o SerDe como a propriedade da tabela:
table.updateProperties().set("engine.hive.enabled", "true").commit();
Grave os dados na tabela:
df1.write.format("iceberg").mode("overwrite").save("default.example");
Leia os dados:
val read_df1=spark.read.format("iceberg").load("default.example"); read_df1.show;
Altere o esquema da tabela. Veja um exemplo.
Receba a tabela e adicione uma nova coluna
grade
:val table = catalog.loadTable(TableIdentifier.of("default", "example")); table.updateSchema.addColumn("grade", StringType.get()).commit();
Verifique o novo esquema da tabela:
table.schema.toString;
Insira mais dados e veja a evolução do esquema. Veja um exemplo.
Adicione novos dados à tabela:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save("default.example"); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save("default.example");
Verifique os novos dados inseridos:
val read_df2=spark.read.format("iceberg").load("default.example"); read_df2.show;
Veja o histórico da tabela:
spark.read.format("iceberg").load("default.example.history").show(truncate = false);
Veja os snapshots:
spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
Veja os arquivos de manifesto:
spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
Veja os arquivos de dados:
spark.read.format("iceberg").load("default.example.files").show(truncate = false);
Suponha que você cometeu um erro ao adicionar a linha com o valor de
id=6
e queira voltar para uma versão correta da tabela:spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
Substitua
snapshot-id
pela versão que você quer consultar.
Usar tabelas do Hadoop para criar tabelas de Iceberg
Defina as configurações da tabela do Hadoop para criar tabelas de Iceberg no Spark Spark:
import org.apache.hadoop.conf.Configuration import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.Table import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._
Crie uma tabela para inserir e atualizar dados. Veja um exemplo.
Crie uma tabela chamada
example
no banco de dadosdefault
:val conf = new Configuration(); val tables = new HadoopTables(conf);
Inserir dados de amostra:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Especifique a estratégia de particionamento com base na coluna
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Crie a tabela:
val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>"; val table = tables.create(df1_schema, partition_spec, table_location);
Grave os dados na tabela:
df1.write.format("iceberg").mode("overwrite").save(table_location);
Leia os dados:
val read_df1=spark.read.format("iceberg").load(table_location); read_df1.show;
Altere o esquema da tabela. Veja um exemplo.
Receba a tabela e adicione uma nova coluna
grade
:val table = tables.load(table_location); table.updateSchema.addColumn("grade", StringType.get()).commit();
Verifique o novo esquema da tabela:
table.schema.toString;
Insira mais dados e veja a evolução do esquema. Veja um exemplo.
Adicione novos dados à tabela:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save(table_location); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save(table_location);
Verifique os novos dados inseridos:
val read_df2=spark.read.format("iceberg").load(table_location); read_df2.show;
Veja o histórico da tabela:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
Veja os snapshots:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
Veja os arquivos de manifesto:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
Veja os arquivos de dados:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
Volte para ver uma versão específica da tabela:
spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
Substitua
snapshot-id
pela versão que você quer voltar e adicione"L"
ao final. Por exemplo,"3943776515926014142L"
.
Usar uma tabela do Iceberg no Hive
O Iceberg é compatível com tabelas lidas pelo Hive usando um StorageHandler
. Observe que
somente as versões Hive 2.x e 3.1.2 são compatíveis. Para mais informações, consulte
Apache Iceberg - Hive. Além disso, inclua o arquivo JAR do ambiente de execução Hive do Iceberg no caminho de classe do Hive. Para
fazer o download do arquivo JAR, consulte Downloads do Apache Iceberg.
Para sobrepor uma tabela do Hive sobre uma tabela de Iceberg, é necessário criar uma tabela do Iceberg com um catálogo do Hive ou uma tabela do Hadoop. Além disso, configure o Hive de acordo com isso para ler dados da tabela do Iceberg.
Ler tabela do Iceberg (catálogo do Hive) no Hive
Abra o cliente Hive e defina configurações para ler as tabelas de Iceberg na sessão do cliente Hive:
add jar /path/to/iceberg-hive-runtime.jar; set iceberg.engine.hive.enabled=true; set engine.hive.enabled=true; set iceberg.mr.catalog=hive; set hive.vectorized.execution.enabled=false;
Ler esquema e dados de tabela. Veja um exemplo.
Verifique o esquema da tabela e se o formato da tabela é Iceberg:
describe formatted example;
Leia os dados da tabela:
select * from example;
Ler tabela do Iceberg (tabela do Hadoop) no Hive
Abra o cliente Hive e defina configurações para ler as tabelas de Iceberg na sessão do cliente Hive:
add jar /path/to/iceberg-hive-runtime.jar; set engine.hive.enabled=true; set hive.vectorized.execution.enabled=false;
Ler esquema e dados de tabela. Veja um exemplo.
Crie uma tabela externa (sobreponha uma tabela do Hive na tabela do Iceberg):
CREATE EXTERNAL TABLE hadoop_table STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>' TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
Verifique o esquema da tabela e se o formato da tabela é Iceberg:
describe formatted hadoop_table;
Leia os dados da tabela:
select * from hadoop_table;
Usar uma tabela de gelo no Presto
As consultas do Presto usam o conector Hive para receber locais de partição. Portanto, você precisa configurar o Presto corretamente para ler e gravar dados na tabela do Iceberg. Para mais informações, consulte Presto/Trino - Hive Connector e Presto/Trino - Iceberg Connector.
Configurações do Presto
Em cada nó de cluster do Dataproc, crie um arquivo chamado
iceberg.properties
/etc/presto/conf/catalog/iceberg.properties
e configure ohive.metastore.uri
da seguinte maneira:connector.name=iceberg hive.metastore.uri=thrift://<example.net:9083>
Substitua
example.net:9083
pelo host e pela porta corretos do serviço Hive Thrift.Reinicie o serviço Presto para enviar as configurações:
sudo systemctl restart presto.service
Criar tabela de Iceberg em Presto
Abra o cliente Presto e use o conector "Iceberg" para receber o metastore:
--catalog iceberg --schema default
Crie uma tabela para inserir e atualizar dados. Veja um exemplo.
Crie uma tabela chamada
example
no banco de dadosdefault
:CREATE TABLE iceberg.default.example ( id integer, name VARCHAR, major VARCHAR, grade VARCHAR) WITH (partitioning = ARRAY['major', 'grade']);
Inserir dados de amostra:
INSERT INTO iceberg.default.example VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
Leia os dados da tabela:
SELECT * FROM iceberg.default.example;
Insira mais dados para verificar os instantâneos:
INSERT INTO example VALUES (4, 'Cindy', 'UX Design', 'Junior'); INSERT INTO example VALUES (5, 'Amy', 'UX Design', 'Sophomore');
Veja os snapshots:
SELECT snapshot_id FROM iceberg.default."example$snapshots";
Ao adicionar o comando
ORDER BY committed_at DESC LIMIT 1;
, é possível encontrar o ID do snapshot mais recente.Reverter para uma versão específica da tabela:
CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
Substitua
snapshot-id
pela versão que você quer consultar.