Use BigQuery metastore with Spark in BigQuery Studio

This document explains how to use BigQuery metastore with Spark in BigQuery Studio.

You can use Spark in BigQuery Studio to create an Iceberg table with Apache Spark in BigQuery Studio. After creating the table, you can query the data from Spark. You can also query the same data from the BigQuery console using SQL.

Before you begin

  1. Request access to Spark in BigQuery Studio through the following sign-up form.
  2. Enable billing for your Google Cloud project. Learn how to check if billing is enabled on a project.
  3. Enable the BigQuery and Dataflow APIs.

    Enable the APIs

  4. Optional: Understand how BigQuery metastore works and why you should use it.

Required roles

To get the permissions that you need to use Spark notebooks in BigQuery Studio, ask your administrator to grant you the following IAM roles:

  • Create BigQuery Studio metastore tables in Spark: BigQuery Data Editor (roles/bigquery.dataEditor) on the project
  • Create a Spark session from the notebook metastore tables in Spark: Dataproc Worker (roles/dataproc.serverlessEditor) on the user account

For more information about granting roles, see Manage access to projects, folders, and organizations.

You might also be able to get the required permissions through custom roles or other predefined roles.

Connect with a notebook

The following example shows you how to configure a Spark notebook to interact with Iceberg tables stored in BigQuery metastore.

In this example, you set up a Spark session, create a namespace and table, add some data to the table, and then query the data in BigQuery Studio.

  1. Create a Spark notebook in BigQuery Studio.

  2. In the Apache Spark notebook, include the necessary Apache Spark imports:

    from dataproc_spark_session.session.spark.connect import DataprocSparkSession
    from google.cloud.dataproc_v1 import Session
    from pyspark.sql import SparkSession
  3. Define a catalog, namespace, and a warehouse directory.

    catalog = "CATALOG_NAME"
    namespace = "NAMESPACE_NAME"
    warehouse_dir = "gs://WAREHOUSE_DIRECTORY"

    Replace the following:

    • CATALOG_NAME: a catalog name to reference your Spark table.
    • NAMESPACE_NAME: a namespace label to reference your Spark table.
    • WAREHOUSE_DIRECTORY: the URI of the Cloud Storage folder where your data warehouse is stored.
  4. Initialize a Spark session.

    session.environment_config.execution_config.network_uri = NETWORK_NAME
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME"] = "org.apache.iceberg.spark.SparkCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_project"] = "PROJECT_ID"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_location"] = "LOCATION"
    session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.warehouse"] = warehouse_dir
    
    spark = (
     DataprocSparkSession.builder
     .appName("BigQuery metastore Iceberg table example")
     .dataprocConfig(session)
     .getOrCreate())

    Replace the following:

    • NETWORK_NAME: the name or URI of the network running the Spark code. If unspecified, the default network is used.
    • PROJECT_ID: the ID of the Google Cloud project that is running the Spark code.
    • LOCATION: the location to run the Spark job in.
  5. Create a catalog and namespace.

    spark.sql(f"USE `CATALOG_NAME`;")
    spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;")
    spark.sql(f"USE `NAMESPACE_NAME`;")
  6. Create a table.

    spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME ;")

    Replace the following:

    • TABLE_NAME: a name for your Iceberg table.
  7. Run a Data Manipulation Language (DML) from Spark.

    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");")
    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  8. Run a Data Definition Language (DDL) from Spark.

    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);")
    spark.sql("DESCRIBE TABLE_NAME ;")
  9. Insert data into table.

    spark.sql("INSERT INTO TABLE_NAME  VALUES (1, \"It's a sunny day!\", 83);")
  10. Query the table from Spark.

    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  11. Query the table from the Google Cloud console in a new dataset.

    SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100

What's next