Esta página explica como usar tabelas do Apache Iceberg com um serviço Dataproc Metastore anexado a um cluster do Dataproc. O Apache Iceberg é um formato de tabela aberta para grandes conjuntos de dados analíticos.
Compatibilidades
As tabelas Iceberg suportam as seguintes funcionalidades.
Impulsionadores | Selecionar | Inserir | Criar tabela |
---|---|---|---|
Spark | ✓ | ✓ | ✓ |
Hive | ✓ | ✓ | |
Presto | ✓ | ✓ | ✓ |
Antes de começar
- Crie um serviço Dataproc Metastore.
- Anexe o Dataproc Metastore a um cluster do Dataproc.
Use a tabela Iceberg com o Spark
O exemplo seguinte mostra como usar tabelas Iceberg com o Spark.
As tabelas Iceberg suportam operações de leitura e escrita. Para mais informações, consulte o artigo Apache Iceberg - Spark.
Configurações do Spark
Primeiro, inicie o shell do Spark e use um contentor do Cloud Storage para armazenar dados. Para incluir o Iceberg na instalação do Spark, adicione o ficheiro JAR do Iceberg Spark Runtime à pasta JARs do Spark. Para transferir o ficheiro JAR, consulte a página Transferências do Apache Iceberg. O comando seguinte inicia o shell do Spark com suporte para o Apache Iceberg:
$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar
Use o catálogo do Hive para criar tabelas Iceberg
Configure as configurações do catálogo do Hive para criar tabelas Iceberg no spark scala:
import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.catalog._ import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._ import java.util.HashMap
Crie uma tabela para inserir e atualizar dados. Segue-se um exemplo.
Cria uma tabela denominada
example
na base de dadosdefault
:val catalog = new HiveCatalog(); catalog.setConf(spark.sparkContext.hadoopConfiguration); catalog.initialize("hive", new HashMap[String,String]()); val name = TableIdentifier.of("default","example");
Inserir dados de amostra:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Especifique a estratégia de partição com base na coluna
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Crie a tabela:
val table=catalog.createTable(name,df1_schema,partition_spec);
Adicione o controlador de armazenamento e o SerDe do Iceberg como propriedade da tabela:
table.updateProperties().set("engine.hive.enabled", "true").commit();
Escreva os dados na tabela:
df1.write.format("iceberg").mode("overwrite").save("default.example");
Leia os dados:
val read_df1=spark.read.format("iceberg").load("default.example"); read_df1.show;
Altere o esquema da tabela. Segue-se um exemplo.
Obtenha a tabela e adicione uma nova coluna
grade
:val table = catalog.loadTable(TableIdentifier.of("default", "example")); table.updateSchema.addColumn("grade", StringType.get()).commit();
Verifique o novo esquema da tabela:
table.schema.toString;
Inserir mais dados e ver a evolução do esquema. Segue-se um exemplo.
Adicione novos dados à tabela:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save("default.example"); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save("default.example");
Verifique os novos dados inseridos:
val read_df2=spark.read.format("iceberg").load("default.example"); read_df2.show;
Veja o histórico da tabela:
spark.read.format("iceberg").load("default.example.history").show(truncate = false);
Veja os resumos:
spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
Veja os ficheiros de manifesto:
spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
Veja os ficheiros de dados:
spark.read.format("iceberg").load("default.example.files").show(truncate = false);
Suponha que cometeu um erro ao adicionar a linha com o valor
id=6
e quer voltar atrás para ver uma versão correta da tabela:spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
Substitua
snapshot-id
pela versão à qual quer reverter.
Use tabelas Hadoop para criar tabelas Iceberg
Configure as configurações da tabela Hadoop para criar tabelas Iceberg no spark scala:
import org.apache.hadoop.conf.Configuration import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.Table import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._
Crie uma tabela para inserir e atualizar dados. Segue-se um exemplo.
Cria uma tabela denominada
example
na base de dadosdefault
:val conf = new Configuration(); val tables = new HadoopTables(conf);
Inserir dados de amostra:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Especifique a estratégia de partição com base na coluna
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Crie a tabela:
val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>"; val table = tables.create(df1_schema, partition_spec, table_location);
Escreva os dados na tabela:
df1.write.format("iceberg").mode("overwrite").save(table_location);
Leia os dados:
val read_df1=spark.read.format("iceberg").load(table_location); read_df1.show;
Altere o esquema da tabela. Segue-se um exemplo.
Obtenha a tabela e adicione uma nova coluna
grade
:val table = tables.load(table_location); table.updateSchema.addColumn("grade", StringType.get()).commit();
Verifique o novo esquema da tabela:
table.schema.toString;
Inserir mais dados e ver a evolução do esquema. Segue-se um exemplo.
Adicione novos dados à tabela:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save(table_location); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save(table_location);
Verifique os novos dados inseridos:
val read_df2=spark.read.format("iceberg").load(table_location); read_df2.show;
Veja o histórico da tabela:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
Veja os resumos:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
Veja os ficheiros de manifesto:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
Veja os ficheiros de dados:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
Volte para ver uma versão específica da tabela:
spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
Substitua
snapshot-id
pela versão para a qual quer reverter e adicione"L"
no final. Por exemplo,"3943776515926014142L"
.
Use a tabela Iceberg no Hive
O Iceberg suporta tabelas lidas através do Hive com um StorageHandler
. Tenha em atenção que apenas são suportadas as versões 2.x e 3.1.2 do Hive. Para mais informações, consulte o artigo
Apache Iceberg - Hive. Além disso, adicione o ficheiro JAR do tempo de execução do Iceberg Hive ao caminho de classe do Hive. Para transferir o ficheiro JAR, consulte Transferências do Apache Iceberg.
Para sobrepor uma tabela do Hive a uma tabela do Iceberg, tem de criar a tabela do Iceberg através de um catálogo do Hive ou de uma tabela do Hadoop. Além disso, tem de configurar o Hive em conformidade para ler dados da tabela Iceberg.
Ler tabela Iceberg (catálogo Hive) no Hive
Abra o cliente do Hive e configure as configurações para ler tabelas do Iceberg na sessão do cliente do Hive:
add jar /path/to/iceberg-hive-runtime.jar; set iceberg.engine.hive.enabled=true; set engine.hive.enabled=true; set iceberg.mr.catalog=hive; set hive.vectorized.execution.enabled=false;
Ler o esquema e os dados da tabela. Segue-se um exemplo.
Verifique o esquema da tabela e se o formato da tabela é Iceberg:
describe formatted example;
Ler os dados da tabela:
select * from example;
Ler tabela Iceberg (tabela Hadoop) no Hive
Abra o cliente do Hive e configure as configurações para ler tabelas do Iceberg na sessão do cliente do Hive:
add jar /path/to/iceberg-hive-runtime.jar; set engine.hive.enabled=true; set hive.vectorized.execution.enabled=false;
Ler o esquema e os dados da tabela. Segue-se um exemplo.
Crie uma tabela externa (sobreponha uma tabela do Hive a uma tabela do Iceberg):
CREATE EXTERNAL TABLE hadoop_table STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>' TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
Verifique o esquema da tabela e se o formato da tabela é Iceberg:
describe formatted hadoop_table;
Ler os dados da tabela:
select * from hadoop_table;
Use a tabela Iceberg no Presto
As consultas Presto usam o conetor Hive para obter localizações de partições, pelo que tem de configurar o Presto em conformidade para ler e escrever dados na tabela Iceberg. Para mais informações, consulte os artigos Presto/Trino – Conetor do Hive e Presto/Trino – Conetor do Iceberg.
Configurações do Presto
Em cada nó do cluster do Dataproc, crie um ficheiro com o nome
iceberg.properties
/etc/presto/conf/catalog/iceberg.properties
e configure ohive.metastore.uri
da seguinte forma:connector.name=iceberg hive.metastore.uri=thrift://<example.net:9083>
Substitua
example.net:9083
pelo anfitrião e pela porta corretos do seu serviço Thrift do Hive Metastore.Reinicie o serviço Presto para enviar as configurações:
sudo systemctl restart presto.service
Crie uma tabela Iceberg no Presto
Abra o cliente Presto e use o conetor "Iceberg" para obter o metastore:
--catalog iceberg --schema default
Crie uma tabela para inserir e atualizar dados. Segue-se um exemplo.
Cria uma tabela denominada
example
na base de dadosdefault
:CREATE TABLE iceberg.default.example ( id integer, name VARCHAR, major VARCHAR, grade VARCHAR) WITH (partitioning = ARRAY['major', 'grade']);
Inserir dados de amostra:
INSERT INTO iceberg.default.example VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
Ler dados da tabela:
SELECT * FROM iceberg.default.example;
Insira mais dados novos para verificar os instantâneos:
INSERT INTO example VALUES (4, 'Cindy', 'UX Design', 'Junior'); INSERT INTO example VALUES (5, 'Amy', 'UX Design', 'Sophomore');
Veja os resumos:
SELECT snapshot_id FROM iceberg.default."example$snapshots";
Adicionando o comando
ORDER BY committed_at DESC LIMIT 1;
, pode encontrar o ID da imagem instantânea mais recente.Reverter para uma versão específica da tabela:
CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
Substitua
snapshot-id
pela versão à qual quer reverter.