Usa el metastore de BigQuery con tablas en BigQuery

En este documento, se explica cómo usar el metastore de BigQuery con las tablas de BigQuery y Spark.

Con BigQuery Metastore, puedes crear y usar tablas estándar (integradas), tablas de BigQuery para Apache Spark Iceberg y tablas externas de BigLake desde BigQuery.

Antes de comenzar

  1. Habilita la facturación para tu proyecto de Google Cloud . Obtén información sobre cómo verificar si la facturación está habilitada en un proyecto.
  2. Habilita las APIs de BigQuery y Dataproc.

    Habilitar las API

  3. Opcional: Comprende cómo funciona el metastore de BigQuery y por qué deberías usarlo.

Roles obligatorios

Para obtener los permisos que necesitas para usar Spark y Dataproc con el metastore de BigQuery como un almacén de metadatos, pídele a tu administrador que te otorgue los siguientes roles de IAM:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Conectarse a una tabla

  1. Crea un conjunto de datos en la consola de Google Cloud .

    CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del proyecto de Google Cloud para crear el conjunto de datos.
    • DATASET_NAME: Un nombre para tu conjunto de datos.
  2. Crea una conexión de recursos de Cloud.

  3. Crea una tabla estándar de BigQuery.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);

    Reemplaza lo siguiente:

    • TABLE_NAME: Es un nombre para tu tabla.
  4. Inserta datos en la tabla estándar de BigQuery.

    INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  5. Crea una tabla de BigQuery para Apache Iceberg.

    Por ejemplo, para crear una tabla, ejecuta la siguiente sentencia CREATE.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME(
    name STRING,id INT64
    )
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
    file_format = 'PARQUET',
    table_format = 'ICEBERG',
    storage_uri = 'STORAGE_URI');

    Reemplaza lo siguiente:

    • ICEBERG_TABLE_NAME: Es un nombre para tu tabla de iceberg. Por ejemplo, iceberg_managed_table
    • CONNECTION_NAME: Es el nombre de tu conexión. La creaste en el paso anterior. Por ejemplo, myproject.us.myconnection
    • STORAGE_URI: Es un URI de Cloud Storage completamente calificado. Por ejemplo, gs://mybucket/table
  6. Inserta datos en la tabla de BigQuery para Apache Iceberg.

    INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  7. Crea una tabla de Iceberg de solo lectura.

    Por ejemplo, para crear una tabla de Iceberg de solo lectura, ejecuta la siguiente sentencia CREATE.

    CREATE OR REPLACE EXTERNAL TABLE  `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
      format = 'ICEBERG',
      uris =
        ['BUCKET_PATH'],
      require_partition_filter = FALSE);

    Reemplaza lo siguiente:

    • READONLY_ICEBERG_TABLE_NAME: Es un nombre para tu tabla de solo lectura.
    • BUCKET_PATH: La ruta de acceso al bucket de Cloud Storage que contiene los datos de la tabla externa, en el formato ['gs://bucket_name/[folder_name/]file_name'].
  8. Desde PySpark, consulta la tabla estándar, la tabla de Iceberg administrada y la tabla de Iceberg de solo lectura.

    from pyspark.sql import SparkSession
    
    # Create a spark session
    spark = SparkSession.builder \
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.conf.set("viewsEnabled","true")
    
    # Use the bqms_catalog
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("USE NAMESPACE DATASET_NAME;")
    
    # Configure spark for temp results
    spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
    spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")
    
    # List the tables in the dataset
    df = spark.sql("SHOW TABLES;")
    df.show();
    
    # Query a standard BigQuery table
    sql = """SELECT * FROM DATASET_NAME.TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Managed Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    # Query a BigQuery Readonly Apache Iceberg table
    sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()

    Reemplaza lo siguiente:

    • WAREHOUSE_DIRECTORY: Es el URI de la carpeta de Cloud Storage que contiene tu almacén de datos.
    • CATALOG_NAME: Es el nombre del catálogo que usas.
    • MATERIALIZATION_NAMESPACE: Es el espacio de nombres para almacenar los resultados temporales.
  9. Ejecuta la secuencia de comandos de PySpark con Spark sin servidores.

    gcloud dataproc batches submit pyspark SCRIPT_PATH \
      --version=2.2 \
      --project=PROJECT_ID \
      --region=REGION \
      --deps-bucket=YOUR_BUCKET \
      --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar

    Reemplaza lo siguiente:

    • SCRIPT_PATH: Es la ruta de acceso a la secuencia de comandos que usa el trabajo por lotes.
    • PROJECT_ID: El ID del proyecto de Google Cloud en el que se ejecutará el trabajo por lotes.
    • REGION: Es la región en la que se ejecuta tu carga de trabajo.
    • YOUR_BUCKET: Es la ubicación del bucket de Cloud Storage para subir dependencias de la carga de trabajo. No se requiere el prefijo de URI gs:// del bucket. Puedes especificar la ruta de acceso o el nombre del bucket, por ejemplo, mybucketname1.

¿Qué sigue?