Configurar metastore de BigLake

En este documento se explica cómo configurar el almacén de metadatos de BigLake con Dataproc o Google Cloud Serverless para Apache Spark para crear un almacén de metadatos único y compartido que funcione en motores de código abierto, como Apache Spark o Apache Flink.

Antes de empezar

  1. Habilita la facturación de tu Google Cloud proyecto. Consulta cómo comprobar si la facturación está habilitada en un proyecto.
  2. Habilita las APIs BigQuery y Dataproc.

    Habilitar las APIs

  3. Opcional: Consulta cómo funciona el metastore de BigLake y por qué deberías usarlo.

Roles obligatorios

Para obtener los permisos que necesitas para configurar el metastore de BigLake, pide a tu administrador que te conceda los siguientes roles de gestión de identidades y accesos:

Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.

Configurar el metastore con Dataproc

Puedes configurar BigLake Metastore con Dataproc mediante Spark o Flink:

Spark

  1. Configura un clúster nuevo. Para crear un clúster de Dataproc, ejecuta el siguiente gcloud dataproc clusters create comando, que contiene los ajustes que necesitas para usar el metastore de BigLake:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=LOCATION \
        --single-node

    Haz los cambios siguientes:

    • CLUSTER_NAME: nombre del clúster de Dataproc.
    • PROJECT_ID: el ID del Google Cloud proyecto en el que vas a crear el clúster.
    • LOCATION: la región de Compute Engine en la que vas a crear el clúster.
  2. Envía una tarea de Spark con uno de los siguientes métodos:

    Google Cloud CLI

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
        --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
        --execute="SPARK_SQL_COMMAND"

    Haz los cambios siguientes:

    • PROJECT_ID: el ID del Google Cloud proyecto que contiene el clúster de Dataproc.
    • CLUSTER_NAME: el nombre del clúster de Dataproc que usas para ejecutar el trabajo de Spark SQL.
    • REGION: la región de Compute Engine en la que se encuentra tu clúster.
    • LOCATION: la ubicación de los recursos de BigQuery.
    • CATALOG_NAME: el nombre del catálogo de Spark que se va a usar con tu tarea de SQL.
    • WAREHOUSE_DIRECTORY: la carpeta de Cloud Storage que contiene su almacén de datos. Este valor empieza por gs://.
    • SPARK_SQL_COMMAND: la consulta de Spark SQL que quieres ejecutar. Esta consulta incluye los comandos para crear tus recursos. Por ejemplo, para crear un espacio de nombres y una tabla.

    CLI de spark-sql

    1. En la consola de Google Cloud , ve a la página Instancias de VM.

      Ir a Instancias de VM

    2. Para conectarte a una instancia de VM de Dataproc, haz clic en SSH en la fila que muestra el nombre de la VM principal del clúster de Dataproc, que es el nombre del clúster seguido del sufijo -m. El resultado debería ser similar al siguiente:

      Connected, host fingerprint: ssh-rsa ...
      Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
      ...
      example-cluster@cluster-1-m:~$
      
    3. En el terminal, ejecuta el siguiente comando de inicialización del metastore de BigLake:

      spark-sql \
          --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
          --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
          --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

      Haz los cambios siguientes:

      • CATALOG_NAME: el nombre del catálogo de Spark que estás usando con tu trabajo de SQL.
      • PROJECT_ID: el ID del proyecto Google Cloud del catálogo de metastore de BigLake con el que se vincula tu catálogo de Spark.
      • LOCATION: la Google Cloud ubicación del metastore de BigLake.
      • WAREHOUSE_DIRECTORY: la carpeta de Cloud Storage que contiene su almacén de datos. Este valor empieza por gs://.

      Una vez que te hayas conectado correctamente al clúster, el terminal de Spark mostrará el prompt spark-sql, que puedes usar para enviar trabajos de Spark.

      spark-sql (default)>
      
  1. Crea un clúster de Dataproc con el componente opcional Flink habilitado y asegúrate de usar Dataproc 2.2 o una versión posterior.
  2. En la consola de Google Cloud , ve a la página Instancias de VM.

    Ir a instancias de VM

  3. En la lista de instancias de máquina virtual, haz clic en SSH para conectarte a la instancia de máquina virtual del clúster principal de Dataproc, que aparece con el nombre del clúster seguido del sufijo -m.

  4. Configura el complemento de catálogo personalizado de Iceberg para BigLake Metastore:

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. Inicia la sesión de Flink en YARN:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
    -s yarn-session
  6. Crear un catálogo en Flink:

    CREATE CATALOG CATALOG_NAME WITH (
    'type'='iceberg',
    'warehouse'='WAREHOUSE_DIRECTORY',
    'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
    'gcp_project'='PROJECT_ID',
    'gcp_location'='LOCATION'
    );

    Haz los cambios siguientes:

    • CATALOG_NAME: el identificador del catálogo de Flink, que está vinculado a un catálogo del metastore de BigLake.
    • WAREHOUSE_DIRECTORY: la ruta base del directorio del almacén (la carpeta de Cloud Storage en la que Flink crea archivos). Este valor empieza por gs://.
    • PROJECT_ID: el ID del proyecto del catálogo de metastore de BigLake al que se vincula el catálogo de Flink.
    • LOCATION: la ubicación de los recursos de BigQuery.

Tu sesión de Flink ahora está conectada al metastore de BigLake y puedes ejecutar comandos de Flink SQL.

Ahora que te has conectado a BigLake Metastore, puedes crear y ver recursos basados en los metadatos almacenados en BigLake Metastore.

Por ejemplo, prueba a ejecutar los siguientes comandos en tu sesión interactiva de Flink SQL para crear una base de datos y una tabla de Iceberg.

  1. Usa el catálogo personalizado de Iceberg:

    USE CATALOG CATALOG_NAME;

    Sustituye CATALOG_NAME por el identificador de catálogo de Flink.

  2. Crea una base de datos, lo que crea un conjunto de datos en BigQuery:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    Sustituye DATABASE_NAME por el nombre de tu nueva base de datos.

  3. Usa la base de datos que has creado:

    USE DATABASE_NAME;
  4. Crea una tabla Iceberg. Con el siguiente comando se crea una tabla de ventas de ejemplo:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    );

    Sustituye ICEBERG_TABLE_NAME por el nombre de la nueva tabla.

  5. Para ver los metadatos de una tabla, sigue estos pasos:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. Mostrar las tablas de la base de datos:

    SHOW TABLES;

Ingerir datos en una tabla

Después de crear una tabla Iceberg en la sección anterior, puedes usar Flink DataGen como fuente de datos para ingerir datos en tiempo real en tu tabla. Los pasos siguientes son un ejemplo de este flujo de trabajo:

  1. Para crear una tabla temporal con DataGen, sigue estos pasos:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.order_number.kind' = 'sequence',
      'fields.order_number.start' = '1',
      'fields.order_number.end' = '1000000',
      'fields.price.min' = '0',
      'fields.price.max' = '10000',
      'fields.buyer.first_name.length' = '10',
      'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    Haz los cambios siguientes:

    • DATABASE_NAME: el nombre de la base de datos en la que se almacenará tu tabla temporal.
    • TEMP_TABLE_NAME: un nombre para la tabla temporal.
    • ICEBERG_TABLE_NAME: el nombre de la tabla Iceberg que has creado en la sección anterior.
  2. Define el paralelismo en 1:

    SET 'parallelism.default' = '1';
  3. Define el intervalo del punto de control:

    SET 'execution.checkpointing.interval' = '10second';
  4. Define el punto de control:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. Inicia la tarea de streaming en tiempo real:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    El resultado debería ser similar al siguiente:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. Para comprobar el estado del trabajo de streaming, haz lo siguiente:

    1. En la Google Cloud consola, ve a la página Clusters.

      Ir a Clústeres

    2. Selecciona tu clúster.

    3. Haga clic en la pestaña Interfaces web.

    4. Haz clic en el enlace YARN ResourceManager.

    5. En la interfaz de YARN ResourceManager, busca tu sesión de Flink y haz clic en el enlace ApplicationMaster de Tracking UI.

    6. En la columna Estado, comprueba que el estado de tu trabajo sea En ejecución.

  7. Consulta datos de streaming en el cliente de Flink SQL:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. Consultar datos de streaming en BigQuery:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Finaliza el trabajo de streaming en el cliente de Flink SQL:

    STOP JOB 'JOB_ID';

    Sustituye JOB_ID por el ID de trabajo que se mostró en la salida cuando creaste el trabajo de streaming.

Configurar el metastore con Serverless para Apache Spark

Puedes configurar BigLake Metastore con Serverless para Apache Spark mediante Spark SQL o PySpark.

Spark SQL

  1. Crea un archivo SQL con los comandos de Spark SQL que quieras ejecutar en el almacén de metadatos de BigLake. Por ejemplo, este comando crea un espacio de nombres y una tabla:

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Haz los cambios siguientes:

    • CATALOG_NAME: el nombre del catálogo que hace referencia a tu tabla de Spark.
    • NAMESPACE_NAME: el nombre del espacio de nombres que hace referencia a tu tabla de Spark.
    • TABLE_NAME: el nombre de la tabla de Spark.
    • WAREHOUSE_DIRECTORY: el URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.
  2. Envía una tarea por lotes de Spark SQL ejecutando el siguiente comando gcloud dataproc batches submit spark-sql:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Haz los cambios siguientes:

    • SQL_SCRIPT_PATH: la ruta al archivo SQL que usa el trabajo por lotes.
    • PROJECT_ID: el ID del Google Cloud proyecto en el que se ejecutará la tarea por lotes.
    • REGION: la región en la que se ejecuta tu carga de trabajo.
    • SUBNET_NAME (opcional): el nombre de una subred de VPC en REGION que cumpla los requisitos de la subred de sesión.
    • BUCKET_PATH: la ubicación del segmento de Cloud Storage en el que se subirán las dependencias de la carga de trabajo. El WAREHOUSE_DIRECTORY se encuentra en este contenedor. No es necesario incluir el prefijo de URI gs:// del bucket. Puedes especificar la ruta o el nombre del cubo; por ejemplo, mybucketname1.
    • LOCATION: la ubicación en la que se ejecutará el trabajo por lotes.

    Para obtener más información sobre cómo enviar trabajos por lotes de Spark, consulta Ejecutar una carga de trabajo por lotes de Spark.

PySpark

  1. Crea un archivo de Python con los comandos de PySpark que quieras ejecutar en el metastore de BigLake.

    Por ejemplo, el siguiente comando configura un entorno de Spark para interactuar con tablas de Iceberg almacenadas en BigLake Metastore. A continuación, el comando crea un nuevo espacio de nombres y una tabla Iceberg en ese espacio de nombres.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Haz los cambios siguientes:

    • PROJECT_ID: el ID del Google Cloud proyecto en el que se ejecutará la tarea por lotes.
    • LOCATION: la ubicación en la que se encuentran los recursos de BigQuery.
    • CATALOG_NAME: el nombre del catálogo que hace referencia a tu tabla de Spark.
    • TABLE_NAME: el nombre de la tabla de Spark.
    • WAREHOUSE_DIRECTORY: el URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.
    • NAMESPACE_NAME: el nombre del espacio de nombres que hace referencia a tu tabla de Spark.
  2. Envía el trabajo por lotes con el siguiente comando gcloud dataproc batches submit pyspark:

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
        --version=2.2 \
        --project=PROJECT_ID \
        --region=REGION \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Haz los cambios siguientes:

    • PYTHON_SCRIPT_PATH: la ruta a la secuencia de comandos de Python que usa el trabajo por lotes.
    • PROJECT_ID: el ID del Google Cloud proyecto en el que se ejecutará la tarea por lotes.
    • REGION: la región en la que se ejecuta tu carga de trabajo.
    • BUCKET_PATH: la ubicación del segmento de Cloud Storage en el que se subirán las dependencias de la carga de trabajo. No es necesario incluir el prefijo de URI gs:// del bucket. Puede especificar la ruta o el nombre del segmento, por ejemplo, mybucketname1.

    Para obtener más información sobre cómo enviar trabajos por lotes de PySpark, consulta la referencia de gcloud de PySpark.

Siguientes pasos