Usar BigLake Metastore con procedimientos almacenados de Spark

En este documento se explica cómo usar procedimientos almacenados de Apache Spark con BigLake Metastore.

Antes de empezar

  1. Habilita la facturación de tu Google Cloud proyecto. Consulta cómo comprobar si la facturación está habilitada en un proyecto.
  2. Habilita las APIs BigQuery y Dataflow.

    Habilitar las APIs

  3. Opcional: Consulta más información sobre lo siguiente:

Roles obligatorios

Para usar procedimientos almacenados de Spark, consulta los roles necesarios para los procedimientos almacenados y asigna los roles necesarios.

Para obtener los permisos que necesitas para usar Spark y los procedimientos almacenados con BigLake Metastore como almacén de metadatos, pide a tu administrador que te conceda los siguientes roles de gestión de identidades y accesos:

Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.

Crear y ejecutar un procedimiento almacenado

En el siguiente ejemplo se muestra cómo crear y ejecutar un procedimiento almacenado con el metastore de BigLake.

  1. Ve a la página BigQuery.

    Ir a BigQuery

  2. En el editor de consultas, añade el siguiente código de muestra para la CREATE PROCEDURE sentencia.

    CREATE OR REPLACE PROCEDURE
    `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`()
    WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK',
    runtime_version='1.1',
    properties=[("spark.sql.catalog.CATALOG_NAME.warehouse",
    "WAREHOUSE_DIRECTORY"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_location",
    "LOCATION"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_project",
    "PROJECT_ID"),
    ("spark.sql.catalog.CATALOG_NAME",
    "org.apache.iceberg.spark.SparkCatalog"),
    ("spark.sql.catalog.CATALOG_NAME.catalog-impl",
    "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"),
    ("spark.jars.packages",
    "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1")],
    jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar"])
    LANGUAGE python AS R"""
    from pyspark.sql import SparkSession
    spark = SparkSession \
    .builder \
    .appName("BigLake Metastore Iceberg") \
    .getOrCreate()
    spark.sql("USE CATALOG_NAME;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'")
    spark.sql("DESCRIBE TABLE_NAME;")
    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");")
    spark.sql("SELECT * from TABLE_NAME;")
    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);")
    spark.sql("DESCRIBE TABLE_NAME;")
    """;
    CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();

    Haz los cambios siguientes:

    • PROJECT_ID: el ID de tu proyecto de Google Cloud .
    • BQ_DATASET_ID: el ID del conjunto de datos de BigQuery que contiene el procedimiento.
    • PROCEDURE_NAME: el nombre del procedimiento que vas a crear o sustituir.
    • REGION: la ubicación de tu conexión Spark.
    • LOCATION: la ubicación de tus recursos de BigQuery.
    • SPARK_CONNECTION_ID: el ID de tu conexión de Spark.
    • CATALOG_NAME: el nombre del catálogo que estás usando.
    • WAREHOUSE_DIRECTORY: el URI de la carpeta de Cloud Storage que contiene tu almacén de datos.
    • NAMESPACE_NAME: el espacio de nombres que estás usando.

Siguientes pasos