Usa el metastore de BigQuery con Dataproc Serverless

En este documento, se explica cómo usar el metastore de BigQuery con Dataproc sin servidores.

Antes de comenzar

  1. Habilita la facturación para tu proyecto de Google Cloud . Obtén información sobre cómo verificar si la facturación está habilitada en un proyecto.
  2. Habilita las APIs de BigQuery y Dataproc.

    Habilitar las API

  3. Opcional: Comprende cómo funciona el metastore de BigQuery y por qué deberías usarlo.

Roles obligatorios

Para obtener los permisos que necesitas para usar Spark y Dataproc sin servidor con el metastore de BigQuery como un almacén de metadatos, pídele a tu administrador que te otorgue los siguientes roles de IAM:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Flujo de trabajo general

Para usar BigQuery con Dataproc Serverless, sigue estos pasos generales:

  1. Crea un archivo con los comandos que deseas ejecutar en el almacén de metadatos de BigQuery.
  2. Conéctate al motor de software de código abierto que elijas.
  3. Envía un trabajo por lotes con el método que elijas, como Spark SQL o PySpark.

Conecta el metastore de BigQuery con Spark

En las siguientes instrucciones, se muestra cómo conectar Dataproc Serverless al metastore de BigQuery:

SparkSQL

Para enviar un trabajo por lotes de Spark SQL, completa los siguientes pasos:

  1. Crea un archivo SQL con los comandos de Spark SQL que deseas ejecutar en el metastore de BigQuery. Por ejemplo, este comando crea un espacio de nombres y una tabla.

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Reemplaza lo siguiente:

    • CATALOG_NAME: Es el nombre del catálogo que hace referencia a tu tabla de Spark.
    • NAMESPACE_NAME: Es el nombre del espacio de nombres que hace referencia a tu tabla de Spark.
    • TABLE_NAME: Es un nombre de tabla para tu tabla de Spark.
    • WAREHOUSE_DIRECTORY: Es el URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.
  2. Ejecuta el siguiente comando de gcloud CLI gcloud dataproc batches submit spark-sql para enviar un trabajo por lotes de Spark SQL:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
    --project=PROJECT_ID \
    --region=REGION \
    --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
    --deps-bucket=BUCKET_PATH \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Reemplaza lo siguiente:

    • SQL_SCRIPT_PATH: Es la ruta de acceso al archivo SQL que usa el trabajo por lotes.
    • PROJECT_ID: El ID del proyecto de Google Cloud en el que se ejecutará el trabajo por lotes.
    • REGION: Es la región en la que se ejecuta tu carga de trabajo.
    • SUBNET_NAME: Opcional. Es el nombre de una subred de VPC en el REGION que tiene habilitado el Acceso privado a Google y cumple con otros requisitos de subred de sesión.
    • LOCATION: Es la ubicación en la que se ejecutará el trabajo por lotes.
    • BUCKET_PATH: Es la ubicación del bucket de Cloud Storage para subir dependencias de la carga de trabajo. WAREHOUSE_FOLDER se encuentra en este bucket. No se requiere el prefijo de URI gs:// del bucket. Puedes especificar la ruta de acceso o el nombre del bucket, por ejemplo, mybucketname1.

    Para obtener más información sobre cómo enviar trabajos por lotes de Spark, consulta Cómo ejecutar una carga de trabajo por lotes de Spark.

PySpark

Para enviar un trabajo por lotes de PySpark, completa los siguientes pasos.

  1. Crea un archivo Python con los comandos de PySpark que deseas ejecutar en el metastore de BigQuery.

    Por ejemplo, el siguiente comando configura un entorno de Spark para interactuar con las tablas de Iceberg almacenadas en el metastore de BigQuery. Luego, el comando crea un espacio de nombres nuevo y una tabla Iceberg dentro de ese espacio de nombres.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del proyecto de Google Cloud en el que se ejecutará el trabajo por lotes.
    • LOCATION: Es la ubicación en la que se encuentran los recursos de BigQuery.
    • CATALOG_NAME: Es el nombre del catálogo que hace referencia a tu tabla de Spark.
    • TABLE_NAME: Es un nombre de tabla para tu tabla de Spark.
    • WAREHOUSE_DIRECTORY: Es el URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.
    • NAMESPACE_NAME: Es el nombre del espacio de nombres que hace referencia a tu tabla de Spark.
  2. Envía el trabajo por lotes con el siguiente comando gcloud dataproc batches submit pyspark.

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
     --version=2.2 \
     --project=PROJECT_ID \
     --region=REGION \
     --deps-bucket=BUCKET_PATH \
     --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar
     --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Reemplaza lo siguiente:

    • PYTHON_SCRIPT_PATH: Es la ruta de acceso a la secuencia de comandos de Python que usa el trabajo por lotes.
    • PROJECT_ID: El ID del proyecto de Google Cloud en el que se ejecutará el trabajo por lotes.
    • REGION: Es la región en la que se ejecuta tu carga de trabajo.
    • BUCKET_PATH: Es la ubicación del bucket de Cloud Storage para subir dependencias de la carga de trabajo. No se requiere el prefijo de URI gs:// del bucket. Puedes especificar la ruta de acceso o el nombre del bucket, por ejemplo, mybucketname1.

    Para obtener más información sobre cómo enviar trabajos por lotes de PySpark, consulta la referencia de gcloud de PySpark.

¿Qué sigue?