Usar el metastore de BigLake con Dataproc

En este documento se explica cómo usar el almacén de metadatos de BigLake con Dataproc en Compute Engine. Esta conexión te proporciona un único almacén de metadatos compartido que funciona en motores de software de código abierto, como Apache Spark o Apache Flink.

Antes de empezar

  1. Habilita la facturación de tu Google Cloud proyecto. Consulta cómo comprobar si la facturación está habilitada en un proyecto.
  2. Habilita las APIs BigQuery y Dataproc.

    Habilitar las APIs

  3. Opcional: Consulta cómo funciona el metastore de BigLake y por qué deberías usarlo.

Roles obligatorios

Para obtener los permisos que necesitas para usar Spark o Flink y Dataproc con BigLake Metastore como almacén de metadatos, pide a tu administrador que te conceda los siguientes roles de gestión de identidades y accesos:

Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.

Flujo de trabajo general

Para usar Dataproc en Compute Engine con BigLake Metastore, sigue estos pasos generales:

  1. Crea un clúster de Dataproc o configura uno que ya tengas.
  2. Conéctate al motor de software de código abierto que prefieras, como Spark o Flink.
  3. Usa un archivo JAR para instalar el complemento de catálogo de Apache Iceberg en el clúster.
  4. Crea y gestiona tus recursos de metaalmacén de BigLake según sea necesario, en función del motor de software de código abierto que estés usando.
  5. En BigQuery, accede a tus recursos de metastore de BigLake y úsalos.

Conectar BigLake Metastore a Spark

En las siguientes instrucciones se muestra cómo conectar Dataproc a BigLake Metastore mediante Spark SQL interactivo.

Descargar el complemento de catálogo de Iceberg

Para conectar BigLake Metastore con Dataproc y Spark, debes usar el archivo JAR del complemento del catálogo de Iceberg de BigLake Metastore.

Este archivo se incluye de forma predeterminada en la versión 2.2 de la imagen de Dataproc. Si tus clústeres de Dataproc no tienen acceso directo a Internet, debes descargar el complemento y subirlo a un segmento de Cloud Storage al que pueda acceder tu clúster de Dataproc.

Descarga el complemento del catálogo de Iceberg de metastore de BigLake.

Configurar un clúster de Dataproc

Antes de conectarte a BigLake Metastore, debes configurar un clúster de Dataproc.

Para ello, puedes crear un clúster o usar uno que ya tengas. Después, puedes usar este clúster para ejecutar Spark SQL interactivo y gestionar tus recursos de metastore de BigLake.

  • La subred de la región en la que se crea el clúster debe tener habilitada la opción Acceso privado de Google (PGA). De forma predeterminada, las VMs de clúster de Dataproc creadas con la versión de imagen 2.2 (predeterminada) o posterior solo tienen direcciones IP internas. Para permitir que las VMs del clúster se comuniquen con las APIs de Google, habilita el acceso privado de Google en la subred de la red default (o el nombre de la red especificado por el usuario, si procede) de la región en la que se crea el clúster.

  • Si quieres ejecutar el ejemplo de interfaz web de Zeppelin de esta guía, debes usar o crear un clúster de Dataproc con el componente opcional de Zeppelin habilitado.

Agrupación nueva

Para crear un clúster de Dataproc, ejecuta el siguiente comando gcloud dataproc clusters create. Esta configuración contiene los ajustes que necesitas para usar BigLake Metastore.

gcloud dataproc clusters create CLUSTER_NAME \
  --project=PROJECT_ID \
  --region=LOCATION \
  --optional-components=ZEPPELIN \
  --enable-component-gateway \
  --single-node

Haz los cambios siguientes:

  • CLUSTER_NAME: nombre del clúster de Dataproc.
  • PROJECT_ID: el ID del Google Cloud proyecto en el que vas a crear el clúster.
  • LOCATION: la Google Cloud región en la que vas a crear el clúster.

Clúster disponible

Para configurar un clúster, añade el siguiente tiempo de ejecución de Iceberg Spark al clúster.

org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1

Puedes añadir el tiempo de ejecución de una de las siguientes formas:

Enviar una tarea de Spark

Para enviar un trabajo de Spark, utilice uno de los siguientes métodos:

CLI de gcloud

gcloud dataproc jobs submit spark-sql \
--project=PROJECT_ID \
--cluster=CLUSTER_NAME \
--region==REGION \
--jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
--properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
--execute="SPARK_SQL_COMMAND"

Haz los cambios siguientes:

  • PROJECT_ID: el ID del Google Cloud proyecto que contiene el clúster de Dataproc.
  • CLUSTER_NAME: el nombre del clúster de Dataproc que usas para ejecutar el trabajo de Spark SQL.
  • REGION: la región de Compute Engine en la que se encuentra tu clúster.
  • LOCATION: la ubicación de los recursos de BigQuery.
  • CATALOG_NAME: el nombre del catálogo de Spark que estás usando con tu trabajo de SQL.
  • WAREHOUSE_DIRECTORY: la carpeta de Cloud Storage que contiene su almacén de datos. Este valor empieza por gs://.
  • SPARK_SQL_COMMAND: la consulta de Spark SQL que quieres ejecutar. Esta consulta incluye los comandos para crear tus recursos. Por ejemplo, para crear un espacio de nombres y una tabla.

Chispa interactiva

Conéctate a Spark e instala el complemento de catálogo

Para instalar el complemento de catálogo de BigLake Metastore, conéctate a tu clúster de Dataproc mediante SSH.

  1. En la consola de Google Cloud , ve a la página Instancias de VM.
  2. Para conectarte a una instancia de VM de Dataproc, haz clic en SSH en la lista de instancias de máquinas virtuales. La salida es similar a la siguiente:

    Connected, host fingerprint: ssh-rsa ...
    Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
    ...
    example-cluster@cluster-1-m:~$
    
  3. En el terminal, ejecuta el siguiente comando de inicialización del metastore de BigLake:

    spark-sql \
    --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
    --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
    --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Haz los cambios siguientes:

    • CATALOG_NAME: el nombre del catálogo de Spark que estás usando con tu trabajo de SQL.
    • PROJECT_ID: el ID del proyecto Google Cloud del catálogo de metastore de BigLake con el que se vincula tu catálogo de Spark.
    • LOCATION: la Google Cloud ubicación del metastore de BigLake.
    • WAREHOUSE_DIRECTORY: la carpeta de Cloud Storage que contiene su almacén de datos. Este valor empieza por gs://.

    Después de conectarte correctamente a un clúster, tu terminal de Spark mostrará la petición spark-sql.

    spark-sql (default)>
    

Gestionar recursos de metastore de BigLake

Ahora estás conectado a BigLake Metastore. Puede ver los recursos que ya tiene o crear otros nuevos a partir de los metadatos almacenados en BigLake Metastore.

Por ejemplo, prueba a ejecutar los siguientes comandos en la sesión interactiva de Spark SQL para crear un espacio de nombres y una tabla de Iceberg.

  • Usa el catálogo personalizado de Iceberg:

    USE `CATALOG_NAME`;
  • Crea un espacio de nombres:

    CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
  • Usa el espacio de nombres creado:

    USE NAMESPACE_NAME;
  • Crea una tabla de Iceberg:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
  • Insertar una fila de una tabla:

    INSERT INTO TABLE_NAME VALUES (1, "first row");
  • Añadir una columna a una tabla:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
  • Para ver los metadatos de una tabla, sigue estos pasos:

    DESCRIBE EXTENDED TABLE_NAME;
  • Mostrar las tablas del espacio de nombres:

    SHOW TABLES;

Cuaderno de Zeppelin

  1. En la Google Cloud consola, ve a la página Clústeres de Dataproc.

    Ir a clústeres de Dataproc

  2. Haz clic en el nombre del clúster que quieras usar.

    Se abrirá la página Cluster Details (Detalles del clúster).

  3. En el menú de navegación, haga clic en Interfaces web.

  4. En Pasarela de componentes, haz clic en Zeppelin. Se abrirá la página del cuaderno de Zeppelin.

  5. En el menú de navegación, haz clic en Notebook (Cuaderno) y, a continuación, en +Create new note (+Crear nota).

  6. En el cuadro de diálogo, introduce el nombre del cuaderno. Deja Spark seleccionado como intérprete predeterminado.

  7. Haz clic en Crear. Se crea un cuaderno.

  8. En el cuaderno, haz clic en el menú de configuración y, a continuación, en Intérprete.

  9. En el campo Buscar intérpretes, busca Spark.

  10. Haz clic en Editar.

  11. En el campo Spark.jars, introduce el URI del archivo JAR de Spark.

    https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
    
  12. Haz clic en Guardar.

  13. Haz clic en Aceptar.

  14. Copia el siguiente código de PySpark en tu notebook de Zeppelin.

    %pyspark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.sql("select version()").show()
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME;").show()

    Haz los cambios siguientes:

    • CATALOG_NAME: nombre del catálogo de Spark que se va a usar en la tarea de SQL.
    • PROJECT_ID: el ID del Google Cloud proyecto que contiene el clúster de Dataproc.
    • WAREHOUSE_DIRECTORY: la carpeta de Cloud Storage que contiene su almacén de datos. Este valor empieza por gs://.
    • NAMESPACE_NAME: el nombre del espacio de nombres que hace referencia a tu tabla de Spark.
    • WAREHOUSE_DIRECTORY: el URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.
    • TABLE_NAME: el nombre de la tabla de Spark.
  15. Haz clic en el icono de ejecución o pulsa Shift-Enter para ejecutar el código. Cuando se complete el trabajo, el mensaje de estado mostrará "Spark Job Finished" (Trabajo de Spark finalizado) y el resultado mostrará el contenido de la tabla:

En las siguientes instrucciones se muestra cómo conectar Dataproc al metastore de BigLake mediante el cliente de Flink SQL.

Para conectar BigLake Metastore a Flink, haz lo siguiente:

  1. Crea un clúster de Dataproc con el componente opcional Flink habilitado y asegúrate de usar Dataproc 2.2 o una versión posterior.
  2. En la consola de Google Cloud , ve a la página Instancias de VM.

    Ir a instancias de VM

  3. En la lista de instancias de máquina virtual, haz clic en SSH para conectarte a una instancia de VM de Dataproc.

  4. Configura el complemento de catálogo personalizado de Iceberg para BigLake Metastore:

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. Inicia la sesión de Flink en YARN:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
      -s yarn-session
  6. Crear un catálogo en Flink:

    CREATE CATALOG CATALOG_NAME WITH (
      'type'='iceberg',
      'warehouse'='WAREHOUSE_DIRECTORY',
      'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
      'gcp_project'='PROJECT_ID',
      'gcp_location'='LOCATION'
    );

    Haz los cambios siguientes:

    • CATALOG_NAME: el identificador del catálogo de Flink, que está vinculado a un catálogo del metastore de BigLake.
    • WAREHOUSE_DIRECTORY: la ruta base del directorio del almacén (la carpeta de Cloud Storage en la que Flink crea archivos). Este valor empieza por gs://.
    • PROJECT_ID: el ID del proyecto del catálogo de metastore de BigLake al que se vincula el catálogo de Flink.
    • LOCATION: la ubicación de los recursos de BigQuery.

Tu sesión de Flink ahora está conectada al metastore de BigLake y puedes ejecutar comandos de Flink SQL.

Ahora que te has conectado a BigLake Metastore, puedes crear y ver recursos basados en los metadatos almacenados en BigLake Metastore.

Por ejemplo, prueba a ejecutar los siguientes comandos en tu sesión interactiva de Flink SQL para crear una base de datos y una tabla de Iceberg.

  1. Usa el catálogo personalizado de Iceberg:

    USE CATALOG CATALOG_NAME;

    Sustituye CATALOG_NAME por el identificador de catálogo de Flink.

  2. Crea una base de datos, lo que crea un conjunto de datos en BigQuery:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    Sustituye DATABASE_NAME por el nombre de tu nueva base de datos.

  3. Usa la base de datos que has creado:

    USE DATABASE_NAME;
  4. Crea una tabla Iceberg. Con el siguiente comando se crea una tabla de ventas de ejemplo:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
        order_number BIGINT,
        price        DECIMAL(32,2),
        buyer        ROW<first_name STRING, last_name STRING>,
        order_time   TIMESTAMP(3)
    );

    Sustituye ICEBERG_TABLE_NAME por el nombre de la nueva tabla.

  5. Para ver los metadatos de una tabla, sigue estos pasos:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. Mostrar las tablas de la base de datos:

    SHOW TABLES;

Ingerir datos en una tabla

Después de crear una tabla Iceberg en la sección anterior, puedes usar Flink DataGen como fuente de datos para ingerir datos en tiempo real en tu tabla. Los pasos siguientes son un ejemplo de este flujo de trabajo:

  1. Para crear una tabla temporal con DataGen, sigue estos pasos:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.order_number.kind' = 'sequence',
        'fields.order_number.start' = '1',
        'fields.order_number.end' = '1000000',
        'fields.price.min' = '0',
        'fields.price.max' = '10000',
        'fields.buyer.first_name.length' = '10',
        'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    Haz los cambios siguientes:

    • DATABASE_NAME: el nombre de la base de datos en la que se almacenará tu tabla temporal.
    • TEMP_TABLE_NAME: un nombre para la tabla temporal.
    • ICEBERG_TABLE_NAME: el nombre de la tabla Iceberg que has creado en la sección anterior.
  2. Define el paralelismo en 1:

    SET 'parallelism.default' = '1';
  3. Define el intervalo del punto de control:

    SET 'execution.checkpointing.interval' = '10second';
  4. Define el punto de control:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. Inicia la tarea de streaming en tiempo real:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    El resultado debería ser similar al siguiente:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. Para comprobar el estado del trabajo de streaming, haz lo siguiente:

    1. En la Google Cloud consola, ve a la página Clusters.

      Ir a Clústeres

    2. Selecciona tu clúster.

    3. Haga clic en la pestaña Interfaces web.

    4. Haz clic en el enlace YARN ResourceManager.

    5. En la interfaz de YARN ResourceManager, busca tu sesión de Flink y haz clic en el enlace ApplicationMaster de Tracking UI.

    6. En la columna Estado, comprueba que el estado de tu trabajo sea En ejecución.

  7. Consulta datos de streaming en el cliente de Flink SQL:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. Consultar datos de streaming en BigQuery:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Finaliza el trabajo de streaming en el cliente de Flink SQL:

    STOP JOB 'JOB_ID';

    Sustituye JOB_ID por el ID de trabajo que se mostró en la salida cuando creaste el trabajo de streaming.

Siguientes pasos