En esta página, se explica cómo usar las tablas de Apache Iceberg con un servicio de Dataproc Metastore conectado a un clúster de Dataproc. Apache Iceberg es un formato de tabla abierta para grandes conjuntos de datos analíticos.
Compatibilidad
Las tablas de Iceberg admiten las siguientes funciones.
Controladores | Selecciona | Insertar | Crear tabla |
---|---|---|---|
Spark | ✓ | ✓ | ✓ |
Hive | ✓ | ✓ | |
Presto | ✓ | ✓ | ✓ |
Antes de comenzar
- Crea un servicio de Dataproc Metastore.
- Adjunta Dataproc Metastore a un clúster de Dataproc.
Usa la tabla de Iceberg con Spark
En el siguiente ejemplo, se muestra que debes usar tablas de Iceberg con Spark.
Las tablas de Iceberg admiten operaciones de lectura y escritura. Para obtener más información, consulta Apache Iceberg: Spark.
Configuraciones de Spark
Primero, inicia la shell de Spark y usa un bucket de Cloud Storage para almacenar datos. Para incluir Iceberg en la instalación de Spark, agrega el archivo JAR del entorno de ejecución de Iceberg Spark a la carpeta JARs de Spark. Para descargar el archivo JAR, consulta Descargas de Apache Iceberg. El siguiente comando inicia la shell de Spark compatible con Apache Iceberg:
$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar
Usar el catálogo de Hive para crear tablas de Iceberg
Establece la configuración del catálogo de Hive para crear tablas de Iceberg en Spark Scala:
import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.catalog._ import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._ import java.util.HashMap
Crea una tabla para insertar y actualizar datos. A continuación, se muestra un ejemplo.
Crea una tabla llamada
example
en la base de datosdefault
:val catalog = new HiveCatalog(); catalog.setConf(spark.sparkContext.hadoopConfiguration); catalog.initialize("hive", new HashMap[String,String]()); val name = TableIdentifier.of("default","example");
Inserta datos de muestra:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Especifica la estrategia de partición según la columna
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Crea la tabla:
val table=catalog.createTable(name,df1_schema,partition_spec);
Agrega el controlador de almacenamiento de Iceberg y SerDe como la propiedad de la tabla:
table.updateProperties().set("engine.hive.enabled", "true").commit();
Escribe los datos en la tabla:
df1.write.format("iceberg").mode("overwrite").save("default.example");
Lee los datos:
val read_df1=spark.read.format("iceberg").load("default.example"); read_df1.show;
Cambia el esquema de la tabla. A continuación, se muestra un ejemplo.
Obtén la tabla y agrega una columna nueva
grade
:val table = catalog.loadTable(TableIdentifier.of("default", "example")); table.updateSchema.addColumn("grade", StringType.get()).commit();
Verifica el nuevo esquema de la tabla:
table.schema.toString;
Inserta más datos y observa la evolución del esquema. A continuación, se muestra un ejemplo.
Agrega datos nuevos a la tabla:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save("default.example"); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save("default.example");
Verifica los datos nuevos insertados:
val read_df2=spark.read.format("iceberg").load("default.example"); read_df2.show;
Visualiza el historial de la tabla:
spark.read.format("iceberg").load("default.example.history").show(truncate = false);
Visualiza las instantáneas:
spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
Visualiza los archivos de manifiesto:
spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
Visualiza los archivos de datos:
spark.read.format("iceberg").load("default.example.files").show(truncate = false);
Supongamos que cometiste un error cuando agregas la fila con el valor de
id=6
y quieres volver para ver una versión correcta de la tabla:spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
Reemplaza
snapshot-id
por la versión a la que deseas volver.
Usar tablas de Hadoop para crear tablas de Iceberg
Establece la configuración de la tabla de Hadoop para crear tablas de Iceberg en la escala de Spark:
import org.apache.hadoop.conf.Configuration import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.Table import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._
Crea una tabla para insertar y actualizar datos. A continuación, se muestra un ejemplo.
Crea una tabla llamada
example
en la base de datosdefault
:val conf = new Configuration(); val tables = new HadoopTables(conf);
Inserta datos de muestra:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Especifica la estrategia de partición según la columna
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Crea la tabla:
val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>"; val table = tables.create(df1_schema, partition_spec, table_location);
Escribe los datos en la tabla:
df1.write.format("iceberg").mode("overwrite").save(table_location);
Lee los datos:
val read_df1=spark.read.format("iceberg").load(table_location); read_df1.show;
Cambia el esquema de la tabla. A continuación, se muestra un ejemplo.
Obtén la tabla y agrega una columna nueva
grade
:val table = tables.load(table_location); table.updateSchema.addColumn("grade", StringType.get()).commit();
Verifica el nuevo esquema de la tabla:
table.schema.toString;
Inserta más datos y observa la evolución del esquema. A continuación, se muestra un ejemplo.
Agrega datos nuevos a la tabla:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save(table_location); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save(table_location);
Verifica los datos nuevos insertados:
val read_df2=spark.read.format("iceberg").load(table_location); read_df2.show;
Visualiza el historial de la tabla:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
Visualiza las instantáneas:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
Visualiza los archivos de manifiesto:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
Visualiza los archivos de datos:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
Regresa para ver una versión específica de la tabla:
spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
Reemplaza
snapshot-id
por la versión a la que deseas volver y agrega"L"
al final. Por ejemplo,"3943776515926014142L"
.
Usar la tabla Iceberg en Hive
Iceberg admite la lectura de tablas con Hive mediante un StorageHandler
. Ten en cuenta que solo se admiten las versiones de Hive 2.x y 3.1.2. Para obtener más información, consulta Apache Iceberg: Hive. Además, agrega el archivo JAR del entorno de ejecución de Hive Iceberg a la ruta de clase de Hive. Para descargar el archivo JAR, consulta Descargas de Apache Iceberg.
Para superponer una tabla de Hive sobre una tabla de Iceberg, debes crear la tabla de Iceberg con un catálogo de Hive o una tabla de Hadoop. Además, debes configurar Hive según corresponda para leer datos de la tabla de Iceberg.
Leer la tabla de Iceberg (catálogo de Hive) en Hive
Abre el cliente de Hive y establece las configuraciones para leer las tablas de Iceberg en la sesión de cliente de Hive:
add jar /path/to/iceberg-hive-runtime.jar; set iceberg.engine.hive.enabled=true; set engine.hive.enabled=true; set iceberg.mr.catalog=hive; set hive.vectorized.execution.enabled=false;
Leer el esquema y los datos de la tabla A continuación, se muestra un ejemplo.
Verifica el esquema de la tabla y si el formato de la tabla es Iceberg:
describe formatted example;
Lee los datos de la tabla:
select * from example;
Leer la tabla de Iceberg (Hadoop Table) en Hive
Abre el cliente de Hive y establece las configuraciones para leer las tablas de Iceberg en la sesión de cliente de Hive:
add jar /path/to/iceberg-hive-runtime.jar; set engine.hive.enabled=true; set hive.vectorized.execution.enabled=false;
Leer el esquema y los datos de la tabla A continuación, se muestra un ejemplo.
Crea una tabla externa (superpón una tabla de Hive a la tabla de Iceberg):
CREATE EXTERNAL TABLE hadoop_table STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>' TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
Verifica el esquema de la tabla y si el formato de la tabla es Iceberg:
describe formatted hadoop_table;
Lee los datos de la tabla:
select * from hadoop_table;
Usa la tabla de Iceberg en Presto
Las consultas de Presto usan el conector de Hive para obtener ubicaciones de las particiones, por lo que debes configurar Presto según corresponda para leer y escribir datos en la tabla de Iceberg. Para obtener más información, consulta Presto/Trino: Conector de Hive y Conector de Iceberg de Presto/Trino.
Configuraciones de Presto
En cada nodo del clúster de Dataproc, crea un archivo llamado
iceberg.properties
/etc/presto/conf/catalog/iceberg.properties
y configurahive.metastore.uri
de la siguiente manera:connector.name=iceberg hive.metastore.uri=thrift://<example.net:9083>
Reemplaza
example.net:9083
por el host y el puerto correctos para el servicio de Thrift del almacén de metadatos de Hive.Reinicia el servicio de Presto para enviar los parámetros de configuración:
sudo systemctl restart presto.service
Crea una tabla de Iceberg en Presto
Abre el cliente de Presto y usa el conector “Iceberg” para obtener el almacén de metadatos:
--catalog iceberg --schema default
Crea una tabla para insertar y actualizar datos. A continuación, se muestra un ejemplo.
Crea una tabla llamada
example
en la base de datosdefault
:CREATE TABLE iceberg.default.example ( id integer, name VARCHAR, major VARCHAR, grade VARCHAR) WITH (partitioning = ARRAY['major', 'grade']);
Inserta datos de muestra:
INSERT INTO iceberg.default.example VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
Lee los datos de la tabla:
SELECT * FROM iceberg.default.example;
Inserta más datos nuevos para verificar los resúmenes:
INSERT INTO example VALUES (4, 'Cindy', 'UX Design', 'Junior'); INSERT INTO example VALUES (5, 'Amy', 'UX Design', 'Sophomore');
Visualiza las instantáneas:
SELECT snapshot_id FROM iceberg.default."example$snapshots";
Si agregas el comando
ORDER BY committed_at DESC LIMIT 1;
, podrás encontrar el ID de la instantánea más reciente.Revierte a una versión específica de la tabla:
CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
Reemplaza
snapshot-id
por la versión a la que deseas volver.