Usar el metastore de BigLake con tablas de BigQuery

En este documento se explica cómo usar el metastore de BigLake con tablas de BigQuery y Spark.

Con BigLake Metastore, puedes crear y usar tablas estándar (integradas), tablas de BigLake para Apache Iceberg en BigQuery y tablas externas de Apache Iceberg desde BigQuery.

Antes de empezar

  1. Habilita la facturación de tu Google Cloud proyecto. Consulta cómo comprobar si la facturación está habilitada en un proyecto.
  2. Habilita las APIs BigQuery y Dataproc.

    Habilitar las APIs

  3. Opcional: Consulta cómo funciona el metastore de BigLake y por qué deberías usarlo.

Roles obligatorios

Para obtener los permisos que necesitas para usar Spark y Dataproc con el metastore de BigLake como almacén de metadatos, pide a tu administrador que te conceda los siguientes roles de gestión de identidades y accesos:

Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.

Conectarse a una tabla

  1. Crea un conjunto de datos en la Google Cloud consola.

    CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;

    Haz los cambios siguientes:

    • PROJECT_ID: el ID del proyecto Google Cloud en el que se va a crear el conjunto de datos.
    • DATASET_NAME: un nombre para el conjunto de datos.
  2. Crea una conexión de recursos en la nube.

  3. Crea una tabla estándar de BigQuery.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);

    Haz los cambios siguientes:

    • TABLE_NAME: el nombre de la tabla.
  4. Insertar datos en la tabla estándar de BigQuery.

    INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  5. Crea una tabla de BigLake para Apache Iceberg en BigQuery.

    Por ejemplo, para crear una tabla, ejecuta la siguiente instrucción CREATE.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME(
    name STRING,id INT64
    )
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
    file_format = 'PARQUET',
    table_format = 'ICEBERG',
    storage_uri = 'STORAGE_URI');

    Haz los cambios siguientes:

    • ICEBERG_TABLE_NAME: nombre de la tabla de BigLake para Apache Iceberg en BigQuery. Por ejemplo, iceberg_managed_table.
    • CONNECTION_NAME: el nombre de tu conexión. Lo has creado en el paso anterior. Por ejemplo, myproject.us.myconnection.
    • STORAGE_URI: URI de Cloud Storage completo. Por ejemplo, gs://mybucket/table.
  6. Inserta datos en la tabla de BigLake de Apache Iceberg en BigQuery.

    INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  7. Crea una tabla externa de Apache Iceberg.

    Por ejemplo, para crear una tabla externa de Iceberg, ejecuta la siguiente instrucción CREATE.

    CREATE OR REPLACE EXTERNAL TABLE  `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
      format = 'ICEBERG',
      uris =
        ['BUCKET_PATH'],
      require_partition_filter = FALSE);

    Haz los cambios siguientes:

    • READONLY_ICEBERG_TABLE_NAME: un nombre para tu tabla de solo lectura.
    • BUCKET_PATH: la ruta al segmento de Cloud Storage que contiene los datos de la tabla externa, con el formato ['gs://bucket_name/[folder_name/]file_name'].
  8. Desde PySpark, consulta la tabla estándar, la tabla de BigLake para Apache Iceberg en BigQuery y la tabla externa de Apache Iceberg.

    from pyspark.sql import SparkSession
    
    # Create a spark session
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.conf.set("viewsEnabled","true")
    
    # Use the blms_catalog
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("USE NAMESPACE DATASET_NAME;")
    
    # Configure spark for temp results
    spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
    spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")
    
    # List the tables in the dataset
    df = spark.sql("SHOW TABLES;")
    df.show();
    
    # Query the tables
    sql = """SELECT * FROM DATASET_NAME.TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()

    Haz los cambios siguientes:

    • WAREHOUSE_DIRECTORY: el URI de la carpeta de Cloud Storage que está conectada a tu tabla Iceberg de BigLake en BigQuery y a tu tabla externa Iceberg.
    • CATALOG_NAME: el nombre del catálogo que estás usando.
    • MATERIALIZATION_NAMESPACE: el espacio de nombres para almacenar resultados temporales.
  9. Ejecuta la secuencia de comandos de PySpark con Spark sin servidor.

    gcloud dataproc batches submit pyspark SCRIPT_PATH \
      --version=2.2 \
      --project=PROJECT_ID \
      --region=REGION \
      --deps-bucket=YOUR_BUCKET \

    Haz los cambios siguientes:

    • SCRIPT_PATH: la ruta a la secuencia de comandos que usa el trabajo por lotes.
    • PROJECT_ID: el ID del Google Cloud proyecto en el que se ejecutará la tarea por lotes.
    • REGION: la región en la que se ejecuta tu carga de trabajo.
    • YOUR_BUCKET: la ubicación del segmento de Cloud Storage en el que se subirán las dependencias de la carga de trabajo. No es necesario incluir el prefijo de URI gs:// del bucket. Puede especificar la ruta o el nombre del segmento, por ejemplo, mybucketname1.

Siguientes pasos