Usa el metastore de BigQuery con Spark en BigQuery Studio

En este documento, se explica cómo usar el metastore de BigQuery con Spark en BigQuery Studio.

Puedes usar Spark en BigQuery Studio para crear una tabla de Iceberg con Apache Spark en BigQuery Studio. Después de crear la tabla, puedes consultar los datos desde Spark. También puedes consultar los mismos datos desde la consola de BigQuery con SQL.

Antes de comenzar

  1. Solicita acceso a Spark en BigQuery Studio a través del siguiente formulario de registro.
  2. Habilita la facturación para tu proyecto de Google Cloud . Obtén información sobre cómo verificar si la facturación está habilitada en un proyecto.
  3. Habilita las APIs de BigQuery y Dataflow.

    Habilitar las API

  4. Opcional: Comprende cómo funciona el metastore de BigQuery y por qué deberías usarlo.

Roles obligatorios

Para obtener los permisos que necesitas para usar notebooks de Spark en BigQuery Studio, pídele a tu administrador que te otorgue los siguientes roles de IAM:

  • Crear tablas de metastore de BigQuery Studio en Spark: BigQuery Data Editor (roles/bigquery.dataEditor) en el proyecto
  • Crea una sesión de Spark a partir de las tablas de metastore del notebook en Spark: Trabajador de Dataproc (roles/dataproc.serverlessEditor) en la cuenta de usuario

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Cómo conectarse con una notebook

En el siguiente ejemplo, se muestra cómo configurar un notebook de Spark para interactuar con las tablas de Iceberg almacenadas en el metastore de BigQuery.

En este ejemplo, configurarás una sesión de Spark, crearás un espacio de nombres y una tabla, agregarás algunos datos a la tabla y, luego, consultarás los datos en BigQuery Studio.

  1. Crea un notebook de Spark en BigQuery Studio.

  2. En el notebook de Apache Spark, incluye las importaciones necesarias de Apache Spark:

    from dataproc_spark_session.session.spark.connect import DataprocSparkSession
    from google.cloud.dataproc_v1 import Session
    from pyspark.sql import SparkSession
  3. Define un catálogo, un espacio de nombres y un directorio de almacén.

    catalog = "CATALOG_NAME"
    namespace = "NAMESPACE_NAME"
    warehouse_dir = "gs://WAREHOUSE_DIRECTORY"

    Reemplaza lo siguiente:

    • CATALOG_NAME: Es un nombre de catálogo para hacer referencia a tu tabla de Spark.
    • NAMESPACE_NAME: Es una etiqueta de espacio de nombres para hacer referencia a tu tabla de Spark.
    • WAREHOUSE_DIRECTORY: Es el URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.
  4. Inicializa una sesión de Spark.

    session.environment_config.execution_config.network_uri = NETWORK_NAME
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME"] = "org.apache.iceberg.spark.SparkCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_project"] = "PROJECT_ID"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_location"] = "LOCATION"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.warehouse"] = warehouse_dir
    
    spark = (
     DataprocSparkSession.builder
     .appName("BigQuery metastore Iceberg table example")
     .dataprocConfig(session)
     .getOrCreate())

    Reemplaza lo siguiente:

    • NETWORK_NAME: El nombre o URI de la red que ejecuta el código de Spark. Si no se especifica, se usa la red default.
    • PROJECT_ID: El ID del proyecto de Google Cloud que ejecuta el código de Spark.
    • LOCATION: Es la ubicación en la que se ejecutará la tarea de Spark.
  5. Crea un catálogo y un espacio de nombres.

    spark.sql(f"USE `CATALOG_NAME`;")
    spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;")
    spark.sql(f"USE `NAMESPACE_NAME`;")
  6. Crear una tabla

    spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME ;")

    Reemplaza lo siguiente:

    • TABLE_NAME: Un nombre para tu tabla de Iceberg.
  7. Ejecuta un lenguaje de manipulación de datos (DML) desde Spark.

    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");")
    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  8. Ejecuta un lenguaje de definición de datos (DDL) desde Spark.

    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);")
    spark.sql("DESCRIBE TABLE_NAME ;")
  9. Inserta datos en la tabla.

    spark.sql("INSERT INTO TABLE_NAME  VALUES (1, \"It's a sunny day!\", 83);")
  10. Consulta la tabla desde Spark.

    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  11. Consulta la tabla desde la consola de Google Cloud en un conjunto de datos nuevo.

    SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100

¿Qué sigue?