Use BigQuery metastore with Spark stored procedures

This document explains how to use Apache Spark stored procedures with BigQuery metastore.

Before you begin

  1. Enable billing for your Google Cloud project. Learn how to check if billing is enabled on a project.
  2. Enable the BigQuery and Dataflow APIs.

    Enable the APIs

  3. Optional: Learn more about the following:

Required roles

To use Spark stored procedures, review the required roles for stored procedures and grant the necessary roles.

To get the permissions that you need to use Spark and stored procedures with BigQuery metastore as a metadata store, ask your administrator to grant you the following IAM roles:

For more information about granting roles, see Manage access to projects, folders, and organizations.

You might also be able to get the required permissions through custom roles or other predefined roles.

Create and run a stored procedure

The following example shows you how to create and run a stored procedure with BigQuery metastore.

  1. Go to the BigQuery page.

    Go to BigQuery

  2. In the query editor, add the following sample code for the CREATE PROCEDURE statement.

    CREATE OR REPLACE PROCEDURE
    `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`()
    WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK',
    runtime_version='1.1',
    properties=[("spark.sql.catalog.CATALOG_NAME.warehouse",
    "WAREHOUSE_DIRECTORY"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_location",
    "LOCATION"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_project",
    "PROJECT_ID"),
    ("spark.sql.catalog.CATALOG_NAME",
    "org.apache.iceberg.spark.SparkCatalog"),
    ("spark.sql.catalog.CATALOG_NAME.catalog-impl",
    "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"),
    ("spark.jars.packages",
    "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2")],
    jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"])
    LANGUAGE python AS R"""
    from pyspark.sql import SparkSession
    spark = SparkSession \
    .builder \
    .appName("BigQuery metastore Iceberg") \
    .getOrCreate()
    spark.sql("USE CATALOG_NAME;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'")
    spark.sql("DESCRIBE TABLE_NAME;")
    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");")
    spark.sql("SELECT * from TABLE_NAME;")
    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);")
    spark.sql("DESCRIBE TABLE_NAME;")
    """;
    CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();

    Replace the following:

    • PROJECT_ID: the ID of your Google Cloud project.
    • BQ_DATASET_ID: the ID of the dataset in BigQuery that contains the procedure.
    • PROCEDURE_NAME: the name of the procedure that you're creating or replacing.
    • REGION: the location of your Spark connection.
    • LOCATION: the location of your BigQuery resources.
    • SPARK_CONNECTION_ID: the ID of your Spark connection.
    • CATALOG_NAME: the name of the catalog that you're using.
    • WAREHOUSE_DIRECTORY: the URI of the Cloud Storage folder that contains your data warehouse.
    • NAMESPACE_NAME: the namespace that you're using.

What's next