Configura BigLake Metastore

En este documento, se explica cómo configurar BigLake Metastore con Dataproc o Google Cloud Serverless para Apache Spark para crear un metastore único y compartido que funcione en motores de código abierto, como Apache Spark o Apache Flink.

Antes de comenzar

  1. Habilita la facturación para tu proyecto de Google Cloud . Obtén información para verificar si la facturación está habilitada en un proyecto.
  2. Habilita las APIs de BigQuery y Dataproc.

    Habilitar las API

  3. Opcional: Comprende cómo funciona BigLake Metastore y por qué deberías usarlo.

Roles requeridos

Para obtener los permisos que necesitas para configurar el metastore de BigLake, pídele a tu administrador que te otorgue los siguientes roles de IAM:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios a través de roles personalizados o cualquier otro rol predefinido.

Configura tu almacén de metadatos con Dataproc

Puedes configurar BigLake Metastore con Dataproc usando Spark o Flink:

Spark

  1. Configura un clúster nuevo. Para crear un clúster de Dataproc nuevo, ejecuta el siguiente comando gcloud dataproc clusters create, que contiene la configuración que necesitas para usar BigLake Metastore:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=LOCATION \
        --single-node

    Reemplaza lo siguiente:

    • CLUSTER_NAME: Es un nombre para tu clúster de Dataproc.
    • PROJECT_ID: Es el ID del Google Cloud proyecto en el que crearás el clúster.
    • LOCATION: Es la región de Compute Engine en la que crearás el clúster.
  2. Envía un trabajo de Spark con uno de los siguientes métodos:

    Google Cloud CLI

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
        --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
        --execute="SPARK_SQL_COMMAND"

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el ID del proyecto Google Cloud que contiene el clúster de Dataproc.
    • CLUSTER_NAME: Es el nombre del clúster de Dataproc que usas para ejecutar el trabajo de Spark SQL.
    • REGION: La región de Compute Engine en la que se encuentra tu clúster.
    • LOCATION: Es la ubicación de los recursos de BigQuery.
    • CATALOG_NAME: Es el nombre del catálogo de Spark que se usará con tu trabajo de SQL.
    • WAREHOUSE_DIRECTORY: Es la carpeta de Cloud Storage que contiene tu almacén de datos. Este valor comienza con gs://.
    • SPARK_SQL_COMMAND: Es la consulta de Spark SQL que deseas ejecutar. Esta consulta incluye los comandos para crear tus recursos. Por ejemplo, para crear un espacio de nombres y una tabla.

    CLI de spark-sql

    1. En la Google Cloud consola, ve a la página Instancias de VM.

      Ir a Instancias de VM

    2. Para conectarte a una instancia de VM de Dataproc, haz clic en SSH en la fila que enumera el nombre de la instancia de VM principal del clúster de Dataproc, que es el nombre del clúster seguido de un sufijo -m. El resultado es similar a este:

      Connected, host fingerprint: ssh-rsa ...
      Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
      ...
      example-cluster@cluster-1-m:~$
      
    3. En la terminal, ejecuta el siguiente comando de inicialización del metastore de BigLake:

      spark-sql \
          --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
          --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
          --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

      Reemplaza lo siguiente:

      • CATALOG_NAME: Es el nombre del catálogo de Spark que usas con tu trabajo de SQL.
      • PROJECT_ID: Es el ID del Google Cloud proyecto del catálogo de metastore de BigLake con el que se vincula tu catálogo de Spark.
      • LOCATION: Es la Google Cloud ubicación del metastore de BigLake.
      • WAREHOUSE_DIRECTORY: Es la carpeta de Cloud Storage que contiene tu almacén de datos. Este valor comienza con gs://.

      Después de conectarte correctamente al clúster, la terminal de Spark mostrará el mensaje spark-sql, que puedes usar para enviar trabajos de Spark.

      spark-sql (default)>
      
  1. Crea un clúster de Dataproc con el componente Flink opcional habilitado y asegúrate de usar Dataproc 2.2 o una versión posterior.
  2. En la Google Cloud consola, ve a la página Instancias de VM.

    Ir a Instancias de VM

  3. En la lista de instancias de máquina virtual, haz clic en SSH para conectarte a la instancia principal de la VM del clúster de Dataproc, que se muestra como el nombre del clúster seguido de un sufijo -m.

  4. Configura el complemento del catálogo personalizado de Iceberg para BigLake Metastore:

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. Inicia la sesión de Flink en YARN:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
    -s yarn-session
  6. Crea un catálogo en Flink:

    CREATE CATALOG CATALOG_NAME WITH (
    'type'='iceberg',
    'warehouse'='WAREHOUSE_DIRECTORY',
    'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
    'gcp_project'='PROJECT_ID',
    'gcp_location'='LOCATION'
    );

    Reemplaza lo siguiente:

    • CATALOG_NAME: Es el identificador del catálogo de Flink, que está vinculado a un catálogo de metastore de BigLake.
    • WAREHOUSE_DIRECTORY: Es la ruta de acceso base para el directorio del almacén (la carpeta de Cloud Storage en la que Flink crea archivos). Este valor comienza con gs://.
    • PROJECT_ID: Es el ID del proyecto del catálogo de BigLake Metastore con el que se vincula el catálogo de Flink.
    • LOCATION: La ubicación de los recursos de BigQuery.

Tu sesión de Flink ahora está conectada a BigLake Metastore, y puedes ejecutar comandos de Flink SQL.

Ahora que te conectaste a BigLake Metastore, puedes crear y ver recursos basados en los metadatos almacenados en BigLake Metastore.

Por ejemplo, intenta ejecutar los siguientes comandos en tu sesión interactiva de Flink SQL para crear una base de datos y una tabla de Iceberg.

  1. Usa el catálogo de Iceberg personalizado:

    USE CATALOG CATALOG_NAME;

    Reemplaza CATALOG_NAME por el identificador de tu catálogo de Flink.

  2. Crea una base de datos, lo que crea un conjunto de datos en BigQuery:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    Reemplaza DATABASE_NAME por el nombre de tu nueva base de datos.

  3. Usa la base de datos que creaste:

    USE DATABASE_NAME;
  4. Crea una tabla de Iceberg. En el siguiente ejemplo, se crea una tabla de ventas:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    );

    Reemplaza ICEBERG_TABLE_NAME por un nombre para tu tabla nueva.

  5. Sigue estos pasos para ver los metadatos de la tabla:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. Enumera las tablas de la base de datos:

    SHOW TABLES;

Transfiere datos a tu tabla

Después de crear una tabla de Iceberg en la sección anterior, puedes usar Flink DataGen como fuente de datos para transferir datos en tiempo real a tu tabla. Los siguientes pasos son un ejemplo de este flujo de trabajo:

  1. Sigue estos pasos para crear una tabla temporal con DataGen:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.order_number.kind' = 'sequence',
      'fields.order_number.start' = '1',
      'fields.order_number.end' = '1000000',
      'fields.price.min' = '0',
      'fields.price.max' = '10000',
      'fields.buyer.first_name.length' = '10',
      'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    Reemplaza lo siguiente:

    • DATABASE_NAME: Es el nombre de la base de datos en la que se almacenará tu tabla temporal.
    • TEMP_TABLE_NAME: Es un nombre para tu tabla temporal.
    • ICEBERG_TABLE_NAME: Es el nombre de la tabla de Iceberg que creaste en la sección anterior.
  2. Establece el paralelismo en 1:

    SET 'parallelism.default' = '1';
  3. Establece el intervalo del punto de control:

    SET 'execution.checkpointing.interval' = '10second';
  4. Establece el punto de control:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. Inicia el trabajo de transmisión en tiempo real:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    El resultado es similar a este:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. Para verificar el estado del trabajo de transmisión, haz lo siguiente:

    1. En la consola de Google Cloud , ve a la página Clústeres.

      Ir a los clústeres

    2. Selecciona tu clúster.

    3. Haz clic en la pestaña Interfaces web.

    4. Haz clic en el vínculo YARN ResourceManager.

    5. En la interfaz de YARN ResourceManager, busca tu sesión de Flink y haz clic en el vínculo ApplicationMaster en Tracking UI.

    6. En la columna Estado, confirma que el estado del trabajo sea En ejecución.

  7. Consulta datos de transmisión en el cliente de Flink SQL:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. Consulta datos de transmisión en BigQuery:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Finaliza el trabajo de transmisión en el cliente de Flink SQL:

    STOP JOB 'JOB_ID';

    Reemplaza JOB_ID por el ID de trabajo que se mostró en el resultado cuando creaste el trabajo de transmisión.

Configura tu metastore con Serverless for Apache Spark

Puedes configurar el metastore de BigLake con Serverless for Apache Spark usando Spark SQL o PySpark.

Spark SQL

  1. Crea un archivo SQL con los comandos de Spark SQL que deseas ejecutar en BigLake Metastore. Por ejemplo, este comando crea un espacio de nombres y una tabla:

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Reemplaza lo siguiente:

    • CATALOG_NAME: Es el nombre del catálogo que hace referencia a tu tabla de Spark.
    • NAMESPACE_NAME: Es el nombre del espacio de nombres que hace referencia a tu tabla de Spark.
    • TABLE_NAME: Es el nombre de la tabla de Spark.
    • WAREHOUSE_DIRECTORY: Es el URI de la carpeta de Cloud Storage en la que se almacena tu data warehouse.
  2. Ejecuta el siguiente comando gcloud dataproc batches submit spark-sql para enviar un trabajo por lotes de Spark SQL:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Reemplaza lo siguiente:

    • SQL_SCRIPT_PATH: Es la ruta de acceso al archivo SQL que usa el trabajo por lotes.
    • PROJECT_ID: Es el ID del proyecto Google Cloud en el que se ejecutará el trabajo por lotes.
    • REGION: Es la región en la que se ejecuta tu carga de trabajo.
    • SUBNET_NAME (opcional): Es el nombre de una subred de VPC en REGION que cumple con los requisitos de subred de sesión.
    • BUCKET_PATH: Es la ubicación del bucket de Cloud Storage en el que se subirán las dependencias de la carga de trabajo. WAREHOUSE_DIRECTORY se encuentra en este bucket. No se requiere el prefijo de URI gs:// del bucket. Puedes especificar la ruta de acceso o el nombre del bucket, por ejemplo, mybucketname1.
    • LOCATION: Es la ubicación en la que se ejecutará el trabajo por lotes.

    Para obtener más información sobre cómo enviar trabajos por lotes de Spark, consulta Ejecuta una carga de trabajo por lotes de Spark.

PySpark

  1. Crea un archivo de Python con los comandos de PySpark que deseas ejecutar en BigLake Metastore.

    Por ejemplo, el siguiente comando configura un entorno de Spark para interactuar con las tablas de Iceberg almacenadas en BigLake Metastore. Luego, el comando crea un espacio de nombres nuevo y una tabla de Iceberg dentro de ese espacio de nombres.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el ID del proyecto Google Cloud en el que se ejecutará el trabajo por lotes.
    • LOCATION: La ubicación en la que se encuentran los recursos de BigQuery.
    • CATALOG_NAME: Es el nombre del catálogo que hace referencia a tu tabla de Spark.
    • TABLE_NAME: Es el nombre de la tabla de Spark.
    • WAREHOUSE_DIRECTORY: Es el URI de la carpeta de Cloud Storage en la que se almacena tu data warehouse.
    • NAMESPACE_NAME: Es el nombre del espacio de nombres que hace referencia a tu tabla de Spark.
  2. Envía el trabajo por lotes con el siguiente comando gcloud dataproc batches submit pyspark:

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
        --version=2.2 \
        --project=PROJECT_ID \
        --region=REGION \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Reemplaza lo siguiente:

    • PYTHON_SCRIPT_PATH: Es la ruta de acceso a la secuencia de comandos de Python que usa el trabajo por lotes.
    • PROJECT_ID: Es el ID del proyecto Google Cloud en el que se ejecutará el trabajo por lotes.
    • REGION: Es la región en la que se ejecuta tu carga de trabajo.
    • BUCKET_PATH: Es la ubicación del bucket de Cloud Storage en el que se subirán las dependencias de la carga de trabajo. No se requiere el prefijo de URI gs:// del bucket. Puedes especificar la ruta de acceso o el nombre del bucket, por ejemplo, mybucketname1.

    Para obtener más información sobre el envío de trabajos por lotes de PySpark, consulta la referencia de gcloud de PySpark.

¿Qué sigue?