Stay organized with collections Save and categorize content based on your preferences.

Work with stored procedures for Apache Spark

This document shows how to create and call stored procedures for Apache Spark in BigQuery.

Using BigQuery, you can create and run Apache Spark stored procedures that are written in Python. You can then run these stored procedures in BigQuery using a Google Standard SQL query, similar to running SQL stored procedures.

To enroll in this preview, complete the enrollment form.

Limitations

Quotas and limits

For information about quotas and limits, see stored procedures for Apache Spark quotas and limits.

Before you begin

Required roles

To get the permissions that you need to perform the tasks in this document, ask your administrator to grant you the following IAM roles:

  • Create a connection: BigQuery Connection Admin (roles/bigquery.connectionAdmin) on your project
  • Create a stored procedure for Apache Spark:
    • BigQuery Data Editor (roles/bigquery.dataEditor) on the dataset where you create the stored procedure
    • BigQuery Connection Admin (roles/bigquery.connectionAdmin) on the connection that the stored procedure uses
    • BigQuery Job User (roles/bigquery.jobUser) on your project
  • Call a stored procedure for Apache Spark:
    • BigQuery Metadata Viewer (roles/bigquery.metadataViewer) on the dataset where the stored procedure is stored
    • BigQuery Connection User (roles/bigquery.connectionUser) on the connection
    • BigQuery Job User (roles/bigquery.jobUser) on your project

For more information about granting roles, see Manage access.

These predefined roles contain the permissions required to perform the tasks in this document. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

  • Create a connection:
    • bigquery.connections.create
    • bigquery.connections.list
  • Create a stored procedure for Apache Spark:
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
  • Call a stored procedure for Apache Spark:
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

You might also be able to get these permissions with custom roles or other predefined roles.

Location considerations

You can create a connection in regions and multi-regions that support BigQuery.

You must create a stored procedure for Apache Spark in the same location as your connection because the stored procedure runs in the same location as the connection. For example, to create a stored procedure in the US multi-region, you use a connection located in the US multi-region.

Multi-regions

You must specify Google Cloud resources located in the same large geographic area:

  • A connection in the BigQuery US multi-region can reference a Spark History Server or a Dataproc Metastore in any single region in the US geographic area, such as us-central1, us-east4, or us-west2.

  • A connection in the BigQuery EU multi-region can reference a Spark History Server or a Dataproc Metastore in member states of the European Union, such as europe-north1 or europe-west3.

Single regions

A connection in a single region can only reference Google Cloud resources in the same region. For example, a connection in the single region us-east4 can only reference a Spark History Server or a Dataproc Metastore in us-east4.

Create a connection

To create a connection, select one of the following options:

Console

  1. Go to the BigQuery page.

    Go to BigQuery

  2. To create a connection, click add Add data, and then click Connections to external data sources.

  3. In the Connection type list, select Apache Spark.

  4. In the Connection ID field, enter a name for your connection—for example, spark_connection.

  5. In the Data location list, select a region.

    You can create a connection in regions and multi-regions that support BigQuery. For more information, see Location considerations.

  6. Optional: From the Metastore service list, select a Dataproc Metastore.

  7. Optional: In the History server cluster field, enter a Dataproc Persistent History Server.

  8. Click Create connection.

  9. Click Go to connection.

  10. In the Connection info pane, copy the service account ID because you need it in another step.

bq

  1. In a command-line environment, use the bq mk command to create a connection:

    bq mk --connection --connection_type='SPARK' \
     --properties=PROPERTIES \
     --project_id=PROJECT_ID \
     --location=LOCATION
     CONNECTION_NAME
    

    Replace the following:

    • PROPERTIES: a key-value pair to include connection-specific parameters in JSON format

      For example:

      --properties='{
      "metastoreServiceConfig": {"metastoreService": "METASTORE_SERVICE_NAME"},
      "sparkHistoryServerConfig": {"dataprocCluster": "DATAPROC_CLUSTER_NAME"}
      }'
      

      Replace the following:

    • PROJECT_ID: your Google Cloud project ID

    • LOCATION: the location where you want to store the connection—for example, US

    • CONNECTION_NAME: a name for the connection—for example, my-connection

  2. Retrieve and copy the service account ID because you need it in another step:

    bq show --location=LOCATION --connection PROJECT_ID.LOCATION.CONNECTION_NAME
    

    The output is similar to the following:

    Connection myproject.us.myconnection 
    
           name           type                    properties
    ---------------------- ------- ---------------------------------------------------
    myproject.us.myconnection  SPARK   {"serviceAccountId": "bqserver@example.iam.gserviceaccount.com"}
    

For information about how to manage connections, see Work with connection to external data sources.

Set up access control for the connection

To let a stored procedure for Apache Spark access your Google Cloud resources, you need to grant the service account that's associated with the connection the necessary IAM permissions.

  • To read and write data from and to BigQuery, you need to give the service account the following IAM permissions:

    • bigquery.tables.* on your BigQuery tables
    • bigquery.readsessions.* on your project

    The roles/bigquery.admin IAM role includes the permissions that you need in order to read and write data from and to BigQuery.

  • To read and write data from and to Cloud Storage, you need to give the service account the storage.objects.* permission on your Cloud Storage objects.

    The roles/storage.objectAdmin IAM role includes the permissions that you need in order to read and write data from and to Cloud Storage.

  • If you specify Dataproc Metastore when you create a connection, then for BigQuery to retrieve details about the metastore configuration, you need to give the service account the metastore.services.get permission on your Dataproc Metastore.

    The predefined roles/metastore.metadataViewer role includes the permission that the service account needs in order to retrieve details about the metastore configuration.

    You also need to grant the service account the roles/storage.objectAdmin role on the Cloud Storage bucket so that your stored procedure can access the Hive warehouse directory of your Dataproc Metastore. If your stored procedure performs operations on the metastore, you might need to give additional permissions. For more information about IAM roles and permissions in Dataproc Metastore, see Dataproc Metastore predefined roles and permissions.

  • If you specify a Dataproc Persistent History Server when you create a connection, then you need to grant the service account the following roles:

    • The roles/dataproc.viewer role on your Dataproc Persistent History Server that contains the dataproc.clusters.get permission.
    • The roles/storage.objectAdmin role on the Cloud Storage bucket that you specify for the property spark:spark.history.fs.logDirectory when you create the Dataproc Persistent History Server.

    For more information, see Dataproc Persistent History Server and Dataproc roles and permissions.

Create a stored procedure for Apache Spark

After you create a connection and give the required permissions to the service account that's associated with the connection, you can create a stored procedure for Apache Spark.

You must create the stored procedure in the same location as the connection that you use. For more information, see Location considerations.

If the body of your stored procedure is more than 1 MB, then we recommend that you put your stored procedure in a file in a Cloud Storage bucket instead of using inline code.

To create a stored procedure for Apache Spark, follow these steps:

  1. Go to the BigQuery page.

    Go to BigQuery

  2. In the Explorer pane, click the connection in the project that you used to create the connection resource.

  3. To create a stored procedure for Apache Spark, click Create stored procedure.

  4. In the query editor, you can modify the sample code for the CREATE PROCEDURE statement that appears:

    CREATE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_NAME`
     OPTIONS (
         engine="SPARK",
         main_file_uri=["URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Replace the following:

    • PROJECT_ID: the project in which you want to create the stored procedure—for example, myproject
    • DATASET: the dataset in which you want to create the stored procedure—for example, mydataset
    • PROCEDURE_NAME: the name of the stored procedure that you want to run in BigQuery—for example, mysparkprocedure
    • PROCEDURE_ARGUMENT: a parameter to enter the input arguments

      Stored procedures for Apache Spark only support IN parameters. In this parameter, specify the following fields:

      • ARGUMENT_NAME: the name of the argument
      • ARGUMENT_TYPE: the type of the argument

      For example: myproject.mydataset.mysparkproc(num INT64)

      For more information, see Pass a value as an input parameter in this document.

    • CONNECTION_NAME: the name of the Spark connection—for example, myproject.us.mysparkconnection
    • URI: the path to a PySpark file—for example, gs://mybucket/mypysparkmain.py

      Alternatively, if you want to add the body of the stored procedure in the CREATE PROCEDURE statement, then add the PYSPARK_CODE after LANGUAGE PYTHON AS as shown in the example in Use inline code in this document.

    • PYSPARK_CODE: the definition of a PySpark application in the CREATE PROCEDURE statement if you want to pass the body of the procedure inline

      The value is a string literal. If the code includes quotation marks and backslashes, those must be either escaped or represented as a raw string. For example, the code return "\n"; can be represented as one of the following:

      • Quoted string: "return \"\\n\";". Both quotation marks and backslashes are escaped.
      • Triple-quoted string: """return "\\n";""". Backslashes are escaped while quotation marks are not.
      • Raw string: r"""return "\n";""". No escaping is needed.
      To learn how to add inline PySpark code, see Use inline code.

    For additional options that you can specify in OPTIONS, see the procedure option list.

Call a stored procedure for Apache Spark

After you create a stored procedure, you can call it by using one the following options:

Console

  1. Go to the BigQuery page.

    Go to BigQuery

  2. In the Explorer pane, expand your project and select the stored procedure for Apache Spark that you want to run.

  3. In the Stored procedure info window, click Invoke stored procedure. Alternatively, you can expand the View actions option and click Invoke.

  4. Optional: In the Query results section, follow these steps:

    • If you want to view Spark driver logs, then click Execution details.

    • If you want to view logs in Cloud Logging, click Job information, and then in the Log field, click log.

    • If you want to get the Spark History Server endpoint, click Job information, and then click Spark history server.

SQL

To call a stored procedure, use the CALL PROCEDURE statement:

  1. In the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  2. In the query editor, enter the following statement:

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
    

  3. Click Run.

For more information about how to run queries, see Running interactive queries.

Examples of stored procedures for Apache Spark

This section shows examples of how you can create a stored procedure for Apache Spark.

Use a PySpark file in Cloud Storage

The following example shows how to create a stored procedure for Apache Spark by using the connection my-project-id.us.my-connection and a PySpark file (gs://my-bucket/my-pyspark-main.py) that's stored in a Cloud Storage bucket:

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Use inline code

The following example shows how to create a stored procedure for Apache Spark by using the connection my-project-id.us.my-connection and inline PySpark code:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count')
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

Pass a value as an input parameter

In the PySpark code, you can obtain the input parameters of the stored procedure for Apache Spark through environment variables in the Spark driver and executors. The name of the environment variable has the format of BIGQUERY_PROC_PARAM.PARAMETER_NAME, where PARAMETER_NAME is the name of the input parameter. For example, if the name of the input parameter is var, the name of the corresponding environment variable is BIGQUERY_PROC_PARAM.var. The input parameters are JSON encoded. In your PySpark code, you can get the input parameter value in a JSON string from the environment variable and decode it to a Python variable or a Spark Dataframe.

The following example shows how to get the value of an input parameter of type INT64 and an input parameter of type STRUCT into your PySpark code:

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc=spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

# Get the input parameter info in JSON string and create a Spark DataFrame
df = spark.read.json(sc.parallelize([os.environ["BIGQUERY_PROC_PARAM.info"]]))
df.printSchema()
df.show(truncate=False)
"""

Read from a Hive Metastore table and write results to BigQuery

The following example shows how to transform a Hive Metastore table and write the results to BigQuery:

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

View log filters

After you call a stored procedure for Apache Spark, you can view the log information. To obtain the Cloud Logging filter information and the Spark History Cluster endpoint, use the bq show command. The filter information is available under the SparkStatistics field of the child job. To get log filters, follow these steps:

  1. Go to the BigQuery page.

    Go to BigQuery

  2. In the query editor, list child jobs of the stored procedure's script job:

    bq ls -j --parent_job_id=$parent_job_id
    

    To learn how to get the job ID, see View job details.

    The output is similar to the following:

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
    
  3. Identify the jobId for your stored procedure and use the bq show command to view details of the job:

    bq show --format=prettyjson --job $child_job_id
    

    Copy the sparkStatistics field because you need it in another step.

    The output is similar to the following:

    {
    "configuration": {...}
    …
    "statistics": {
     …
      "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
        …
      }
    }
    }
    

  4. For Logging, generate log filters with the SparkStatistics fields:

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.project_id=sparkStatistics.loggingInfo.projectId
    resource.labels.batch_id=sparkStatistics.sparkJobId
    

What's next