Compatibilidad con Iceberg en Dataproc Metastore

En esta página, se explica cómo usar Apache Iceberg en Dataproc mediante el hosting del almacén de metadatos de Hive en Dataproc Metastore. Incluye información sobre cómo usar la tabla Iceberg a través de Spark, Hive y Presto.

Funciones

Apache Ibergberg es un formato de tabla abierto para grandes conjuntos de datos de estadísticas. Iceberg mejora en gran medida el rendimiento y proporciona las siguientes funciones avanzadas:

  • Atomicity: los cambios de tabla se completaron o fallar. No hay confirmación de cambios parcial en la tabla.

  • Aislamiento de instantánea: Las lecturas usan solo una instantánea de una tabla sin mantener un bloqueo.

  • Varios escritores simultáneos: usa la simultaneidad optimista y los reintentos para garantizar que las actualizaciones compatibles se ejecuten de forma correcta, incluso cuando haya conflictos de escritura.

  • Evolución del esquema: Se realiza un seguimiento de las columnas por ID para admitir las acciones de agregar, soltar, actualizar y cambiar el nombre.

  • Viaje en el tiempo: Las consultas que se pueden reproducir pueden usar la misma tabla o instantánea. puedes examinar los cambios con facilidad.

  • Planificación distribuida: la reducción de archivos y el rechazo de predicados se distribuyen a los trabajos, lo que quita el almacén de metadatos como un cuello de botella.

  • Historial de versiones y reversión: Soluciona problemas mediante el restablecimiento de las tablas a un estado anterior.

  • Partición oculta: evita los errores de los usuarios que causan consultas incorrectas y silenciosas, o consultas muy lentas.

  • Evolución del diseño de partición: puede actualizar el diseño de una tabla a medida que cambian los volúmenes de datos o los patrones de consulta.

  • Planificación de análisis y filtrado de archivos: encuentra los archivos necesarios para una consulta mediante la reducción de archivos de metadatos innecesarios y el filtrado de archivos de datos que no contienen datos coincidentes.

Compatibilidades

Iceberg funciona bien con Dataproc y Dataproc Metastore. Puede agregar tablas con un formato de alto rendimiento a Spark y Presto que funcionen como una tabla de SQL. Iceberg usa un puntero a la última versión de una instantánea y necesita un mecanismo para garantizar la atomicidad al cambiar de versión. Proporciona dos opciones, Catálogo de Hive y tablas de Hadoop, para hacer un seguimiento de las tablas.

Entre las funciones compatibles, se incluyen las siguientes:

Controladores Selecciona Insertar Crear tabla
Spark ✓ ✓ ✓
Hive ✓ ✓
Presto ✓ ✓ ✓

Requisitos previos

Para comenzar, crea un clúster de Dataproc y usa el servicio de Dataproc Metastore como su almacén de metadatos de Hive. Para obtener más información, consulta Crea un clúster de Dataproc. Después de crear el clúster, conéctate a él desde un navegador o desde la línea de comandos.

Usa tablas de Iceberg con Spark

Las tablas de Iceberg admiten operaciones de lectura y escritura. Para obtener más información, consulta Apache Iceberg: Spark.

Configuración de Spark

Primero, inicia la shell de Spark y usa un bucket de Cloud Storage para almacenar datos. Para incluir Iceberg en la instalación de Spark, agrega el archivo JAR del entorno de ejecución de Sparkberg a la carpeta JAR de Spark. Para descargar el archivo JAR, consulta Descargas de Apache Iceberg. El siguiente comando inicia la shell de Spark compatible con Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Usa el catálogo de Hive para crear tablas de Iceberg

  1. Establezca las configuraciones del catálogo de Hive para crear tablas de Iceberg en la escala de Spark:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Crea una tabla para insertar y actualizar datos. A continuación, se muestra un ejemplo.

    1. Crea una tabla llamada example en la base de datos default:

      val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration);
      val name = TableIdentifier.of("default","example");
      
    2. Inserta datos de muestra:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Especifica la estrategia de partición según la columna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crea la tabla:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Agrega el controlador de almacenamiento de Iceberg y SerDe como la propiedad de la tabla:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Escribe los datos en la tabla:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Lee los datos:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Cambia el esquema de la tabla. A continuación, se muestra un ejemplo.

    1. Obtén la tabla y agrega una columna nueva grade:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Verifica el esquema de la tabla nuevo:

      table.schema.toString;
      
  4. Inserta más datos y observa la evolución del esquema. A continuación, se muestra un ejemplo.

    1. Agrega datos nuevos a la tabla:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Verifica los datos nuevos que se insertaron:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Visualiza el historial de tablas:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Observa las instantáneas:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Observa los archivos de manifiesto:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Visualiza los archivos de datos:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Supongamos que cometiste un error cuando agregaste la fila con el valor de id=6 y deseas regresar para ver la versión correcta de la tabla:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Reemplaza snapshot-id por la versión a la que quieres volver.

Usar tablas de Hadoop para crear tablas de Iceberg

  1. Establezca las configuraciones de la tabla de Hadoop para crear tablas de Iceberg en la escala de Spark:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Crea una tabla para insertar y actualizar datos. A continuación, se muestra un ejemplo.

    1. Crea una tabla llamada example en la base de datos default:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Inserta datos de muestra:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Especifica la estrategia de partición según la columna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crea la tabla:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Escribe los datos en la tabla:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Lee los datos:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Cambia el esquema de la tabla. A continuación, se muestra un ejemplo.

    1. Obtén la tabla y agrega una columna nueva grade:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Verifica el esquema de la tabla nuevo:

      table.schema.toString;
      
  4. Inserta más datos y observa la evolución del esquema. A continuación, se muestra un ejemplo.

    1. Agrega datos nuevos a la tabla:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Verifica los datos nuevos que se insertaron:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Visualiza el historial de tablas:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Observa las instantáneas:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Observa los archivos de manifiesto:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Visualiza los archivos de datos:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Regrese para ver una versión específica de la tabla:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Reemplaza snapshot-id por la versión a la que deseas volver y agrega "L" al final. Por ejemplo, "3943776515926014142L".

Usa tablas de Iceberg en Hive

Iceberg admite las tablas leídas a través de Hive con un StorageHandler. Ten en cuenta que solo se admiten las versiones de Hive 2.x y 3.1.2. Para obtener más información, consulta Apache Iceberg: Hive. Además, agrega el archivo JAR del entorno de ejecución de Hiveberg a la ruta de clase de Hive. Para descargar el archivo JAR, consulta Descargas de Apache Iceberg.

Para superponer una tabla de Hive sobre una tabla de Iceberg, debes crear la tabla de Iceberg mediante un catálogo de Hive o una tabla de Hadoop. Además, debes configurar Hive en consecuencia para leer datos de la tabla de Iceberg.

Cómo leer tablas de Iceberg (catálogo de Hive) en Hive

  1. Abre el cliente de Hive y establece la configuración para leer las tablas del cliente de Iceberg en la sesión de cliente de Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Leer el esquema y los datos de la tabla A continuación, se muestra un ejemplo.

    1. Verifica el esquema de la tabla y si el formato de la tabla es Iceberg:

      describe formatted example;
      
    2. Lee los datos de la tabla:

      select * from example;
      

Cómo leer tablas de Iceberg (tablas de Hadoop) en Hive

  1. Abre el cliente de Hive y establece la configuración para leer las tablas del cliente de Iceberg en la sesión de cliente de Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Leer el esquema y los datos de la tabla A continuación, se muestra un ejemplo.

    1. Crea una tabla externa (superpón una tabla de Hive sobre la tabla de Iceberg):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Verifica el esquema de la tabla y si el formato de la tabla es Iceberg:

      describe formatted hadoop_table;
      
    3. Lee los datos de la tabla:

      select * from hadoop_table;
      

Cómo usar la tabla de Iceberg en Presto

Las consultas de Presto usan el conector de Hive para obtener ubicaciones de partición, por lo que debes configurar Presto de forma correcta a fin de leer y escribir datos en la tabla Iceberg. Para obtener más información, consulta Presto/Trino - Hive Connector y Presto/Trino - Iceberg Connector.

Configuraciones de Presto

  1. En cada nodo del clúster de Dataproc, crea un archivo llamado iceberg.properties /etc/presto/conf/catalog/iceberg.properties y configura hive.metastore.uri de la siguiente manera:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Reemplaza example.net:9083 por el host y el puerto correctos para el servicio de Hive Metastore.

  2. Reinicia el servicio de Presto para enviar la configuración:

    sudo systemctl restart presto.service
    

Crea tablas de Iceberg en Presto

  1. Abre el cliente de Presto y usa el conector “Iceberg” para obtener el almacén de metadatos:

    --catalog iceberg --schema default
    
  2. Crea una tabla para insertar y actualizar datos. A continuación, se muestra un ejemplo.

    1. Crea una tabla llamada example en la base de datos default:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Inserta datos de muestra:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Lee los datos de la tabla:

      SELECT * FROM iceberg.default.example;
      
    4. Inserta más datos nuevos para verificar instantáneas:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Observa las instantáneas:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Agrega el comando ORDER BY committed_at DESC LIMIT 1; para encontrar el ID de la instantánea más reciente.

    6. Revierte a una versión específica de la tabla:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Reemplaza snapshot-id por la versión a la que quieres volver.

¿Qué sigue?