Usa BigQuery Metastore con procedimientos almacenados de Spark

En este documento, se explica cómo usar los procedimientos almacenados de Apache Spark con el metastore de BigQuery.

Antes de comenzar

  1. Habilita la facturación para tu proyecto de Google Cloud . Obtén información sobre cómo verificar si la facturación está habilitada en un proyecto.
  2. Habilita las APIs de BigQuery y Dataflow.

    Habilitar las API

  3. Opcional: Obtén más información sobre los siguientes temas:

Roles obligatorios

Para usar los procedimientos almacenados de Spark, revisa los roles necesarios para los procedimientos almacenados y otorga los roles necesarios.

Para obtener los permisos que necesitas para usar Spark y procedimientos almacenados con el metastore de BigQuery como un almacén de metadatos, pídele a tu administrador que te otorgue los siguientes roles de IAM:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Crea y ejecuta un procedimiento almacenado

En el siguiente ejemplo, se muestra cómo crear y ejecutar un procedimiento almacenado con el metastore de BigQuery.

  1. Ve a la página de BigQuery.

    Ir a BigQuery

  2. En el editor de consultas, agrega el siguiente código de muestra para la declaración CREATE PROCEDURE.

    CREATE OR REPLACE PROCEDURE
    `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`()
    WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK',
    runtime_version='1.1',
    properties=[("spark.sql.catalog.CATALOG_NAME.warehouse",
    "WAREHOUSE_DIRECTORY"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_location",
    "LOCATION"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_project",
    "PROJECT_ID"),
    ("spark.sql.catalog.CATALOG_NAME",
    "org.apache.iceberg.spark.SparkCatalog"),
    ("spark.sql.catalog.CATALOG_NAME.catalog-impl",
    "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"),
    ("spark.jars.packages",
    "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2")],
    jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"])
    LANGUAGE python AS R"""
    from pyspark.sql import SparkSession
    spark = SparkSession \
    .builder \
    .appName("BigQuery metastore Iceberg") \
    .getOrCreate()
    spark.sql("USE CATALOG_NAME;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'")
    spark.sql("DESCRIBE TABLE_NAME;")
    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");")
    spark.sql("SELECT * from TABLE_NAME;")
    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);")
    spark.sql("DESCRIBE TABLE_NAME;")
    """;
    CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID de tu proyecto de Google Cloud .
    • BQ_DATASET_ID: El ID del conjunto de datos de BigQuery que contiene el procedimiento.
    • PROCEDURE_NAME: Es el nombre del procedimiento que crearás o reemplazarás.
    • REGION: Es la ubicación de tu conexión a Spark.
    • LOCATION: la ubicación de tus recursos de BigQuery.
    • SPARK_CONNECTION_ID: Es el ID de tu conexión a Spark.
    • CATALOG_NAME: Es el nombre del catálogo que usas.
    • WAREHOUSE_DIRECTORY: Es el URI de la carpeta de Cloud Storage que contiene tu almacén de datos.
    • NAMESPACE_NAME: Es el espacio de nombres que usas.

¿Qué sigue?