Usar BigLake Metastore con Serverless para Apache Spark

En este documento se explica cómo usar BigLake Metastore con Serverless para Apache Spark.

Antes de empezar

  1. Habilita la facturación de tu Google Cloud proyecto. Consulta cómo comprobar si la facturación está habilitada en un proyecto.
  2. Habilita las APIs BigQuery y Dataproc.

    Habilitar las APIs

  3. Opcional: Consulta cómo funciona el metastore de BigLake y por qué deberías usarlo.

Roles obligatorios

Para obtener los permisos que necesitas para usar Spark y Serverless para Apache Spark con BigLake Metastore como almacén de metadatos, pide a tu administrador que te conceda los siguientes roles de gestión de identidades y accesos:

Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.

Flujo de trabajo general

Para usar BigQuery con Serverless para Apache Spark, sigue estos pasos generales:

  1. Crea un archivo con los comandos que quieras ejecutar en BigLake Metastore.
  2. Conéctate al motor de software de código abierto que prefieras.
  3. Envía una tarea por lotes con el método que prefieras, como Spark SQL o PySpark.

Conectar BigLake Metastore con Spark

En las siguientes instrucciones se muestra cómo conectar Serverless para Apache Spark a BigLake Metastore:

SparkSQL

Para enviar una tarea por lotes de Spark SQL, sigue estos pasos.

  1. Crea un archivo SQL con los comandos de Spark SQL que quieras ejecutar en el almacén de metadatos de BigLake. Por ejemplo, este comando crea un espacio de nombres y una tabla.

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Haz los cambios siguientes:

    • CATALOG_NAME: el nombre del catálogo que hace referencia a tu tabla de Spark.
    • NAMESPACE_NAME: el nombre del espacio de nombres que hace referencia a tu tabla de Spark.
    • TABLE_NAME: el nombre de la tabla de Spark.
    • WAREHOUSE_DIRECTORY: el URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.
  2. Envía una tarea de Spark SQL por lotes ejecutando el siguiente comando de gcloud dataproc batches submit spark-sql de la CLI de gcloud:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
    --project=PROJECT_ID \
    --region=REGION \
    --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
    --deps-bucket=BUCKET_PATH \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Haz los cambios siguientes:

    • SQL_SCRIPT_PATH: la ruta al archivo SQL que usa el trabajo por lotes.
    • PROJECT_ID: el ID del Google Cloud proyecto en el que se ejecutará la tarea por lotes.
    • REGION: la región en la que se ejecuta tu carga de trabajo.
    • SUBNET_NAME: opcional: nombre de una subred de VPC en REGION que tenga Acceso privado de Google habilitado y cumpla otros requisitos de subred de sesión.
    • LOCATION: la ubicación en la que se ejecutará el trabajo por lotes.
    • BUCKET_PATH: la ubicación del segmento de Cloud Storage en el que se subirán las dependencias de la carga de trabajo. El WAREHOUSE_FOLDER se encuentra en este contenedor. No es necesario incluir el prefijo de URI gs:// del bucket. Puedes especificar la ruta o el nombre del cubo; por ejemplo, mybucketname1.

    Para obtener más información sobre cómo enviar trabajos por lotes de Spark, consulta Ejecutar una carga de trabajo por lotes de Spark.

PySpark

Para enviar un trabajo por lotes de PySpark, sigue estos pasos.

  1. Crea un archivo de Python con los comandos de PySpark que quieras ejecutar en el metastore de BigLake.

    Por ejemplo, el siguiente comando configura un entorno de Spark para interactuar con tablas de Iceberg almacenadas en BigLake Metastore. A continuación, el comando crea un nuevo espacio de nombres y una tabla Iceberg en ese espacio de nombres.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Haz los cambios siguientes:

    • PROJECT_ID: el ID del Google Cloud proyecto en el que se ejecutará la tarea por lotes.
    • LOCATION: la ubicación en la que se encuentran los recursos de BigQuery.
    • CATALOG_NAME: el nombre del catálogo que hace referencia a tu tabla de Spark.
    • TABLE_NAME: el nombre de la tabla de Spark.
    • WAREHOUSE_DIRECTORY: el URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.
    • NAMESPACE_NAME: el nombre del espacio de nombres que hace referencia a tu tabla de Spark.
  2. Envía el trabajo por lotes con el siguiente comando gcloud dataproc batches submit pyspark.

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
     --version=2.2 \
     --project=PROJECT_ID \
     --region=REGION \
     --deps-bucket=BUCKET_PATH \
     --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Haz los cambios siguientes:

    • PYTHON_SCRIPT_PATH: la ruta a la secuencia de comandos de Python que usa el trabajo por lotes.
    • PROJECT_ID: el ID del Google Cloud proyecto en el que se ejecutará la tarea por lotes.
    • REGION: la región en la que se ejecuta tu carga de trabajo.
    • BUCKET_PATH: la ubicación del segmento de Cloud Storage en el que se subirán las dependencias de la carga de trabajo. No es necesario incluir el prefijo de URI gs:// del bucket. Puede especificar la ruta o el nombre del segmento, por ejemplo, mybucketname1.

    Para obtener más información sobre cómo enviar trabajos por lotes de PySpark, consulta la referencia de gcloud de PySpark.

Siguientes pasos