Work with stored procedures for Apache Spark

This document is intended for data analysts and data engineers to create and call stored procedures for Spark in BigQuery.

Using BigQuery, you can create and run Spark stored procedures that are written in Python, Java, and Scala. You can then run these stored procedures in BigQuery using a GoogleSQL query, similar to running SQL stored procedures.

To enroll in this preview, complete the enrollment form.

Before you begin

To create a stored procedure for Spark, ask your administrator to create a Spark connection and share it with you. Your administrator must also grant the service account associated with the connection the required Identity and Access Management (IAM) permissions.

Required roles

To get the permissions that you need to perform the tasks in this document, ask your administrator to grant you the following IAM roles:

For more information about granting roles, see Manage access.

These predefined roles contain the permissions required to perform the tasks in this document. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to perform the tasks in this document:

  • Create a connection:
    • bigquery.connections.create
    • bigquery.connections.list
  • Create a stored procedure for Spark:
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
  • Call a stored procedure for Spark:
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

You might also be able to get these permissions with custom roles or other predefined roles.

Location consideration

You must create a stored procedure for Spark in the same location as your connection because the stored procedure runs in the same location as the connection. For example, to create a stored procedure in the US multi-region, you use a connection located in the US multi-region.

Create a stored procedure for Spark

You must create the stored procedure in the same location as the connection that you use.

If the body of your stored procedure is more than 1 MB, then we recommend that you put your stored procedure in a file in a Cloud Storage bucket instead of using inline code. BigQuery provides two methods to create a stored procedure for Spark using Python:

Use Python with query editor

To create a stored procedure for Spark in the query editor, follow these steps:

  1. Go to the BigQuery page.

    Go to BigQuery

  2. In the query editor, add the sample code for the CREATE PROCEDURE statement that appears.

    Alternatively, in the Explorer pane, click the connection in the project that you used to create the connection resource. Then, to create a stored procedure for Spark, click Create stored procedure.

    To create a stored procedures for Spark in Python, use the following sample:

    CREATE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_NAME`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_PYTHON_FILE_URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Replace the following:

    • PROJECT_ID: the project in which you want to create the stored procedure—for example, myproject
    • DATASET: the dataset in which you want to create the stored procedure—for example, mydataset
    • PROCEDURE_NAME: the name of the stored procedure that you want to run in BigQuery—for example, mysparkprocedure
    • PROCEDURE_ARGUMENT: a parameter to enter the input arguments

      In this parameter, specify the following fields:

      • ARGUMENT_MODE: the mode of the argument

        Valid values include IN, OUT, and INOUT. By default the value is IN. Stored procedures of Spark only support OUT and INOUT for the Python language and IN for Python, Java, and Scala languages.

      • ARGUMENT_NAME: the name of the argument
      • ARGUMENT_TYPE: the type of the argument

      For example: myproject.mydataset.mysparkproc(num INT64)

      For more information, see pass a value as an IN parameter or the OUT and INOUT parameters in this document.

    • CONNECTION_NAME: the name of the Spark connection—for example, myproject.us.mysparkconnection
    • RUNTIME_VERSION: the runtime version of Spark—for example, 1.1
    • MAIN_PYTHON_FILE_URI: the path to a PySpark file—for example, gs://mybucket/mypysparkmain.py

      Alternatively, if you want to add the body of the stored procedure in the CREATE PROCEDURE statement, then add PYSPARK_CODE after LANGUAGE PYTHON AS as shown in the example in Use inline code in this document.

    • PYSPARK_CODE: the definition of a PySpark application in the CREATE PROCEDURE statement if you want to pass the body of the procedure inline.

      The value is a string literal. If the code includes quotation marks and backslashes, those must be either escaped or represented as a raw string. For example, the code return "\n"; can be represented as one of the following:

      • Quoted string: "return \"\\n\";". Both quotation marks and backslashes are escaped.
      • Triple-quoted string: """return "\\n";""". Backslashes are escaped while quotation marks are not.
      • Raw string: r"""return "\n";""". No escaping is needed.
      To learn how to add inline PySpark code, see Use inline code.

    For additional options that you can specify in OPTIONS, see the procedure option list.

Use Python with PySpark editor

To create a stored procedure for Spark in the PySpark editor, follow these steps:

  1. Go to the BigQuery page.

    Go to BigQuery

  2. If you want to add the PySpark code, open the PySpark editor. To open the PySpark editor, click the menu next to Compose new query, and then select Compose new PySpark Procedure.

  3. To set options for a stored procedure for Spark written in Python, click More > PySpark Options, and then do the following:

    1. Specify the location where you want to run the PySpark code.

    2. In the Connections field, specify the Spark connection.

    3. In the Stored procedure invocation section, specify the dataset in which you want to store the temporary stored procedures that are generated. You can either set a specific dataset or allow for the use of a temporary dataset to invoke the PySpark code.

      The temporary dataset is generated with the location specified in the preceding step. If a dataset name is specified, ensure that the dataset and Spark connection must be in the same location.

    4. In the Input parameters section, define input parameters for the stored procedure. The value of the parameter is only used during in-session runs of the PySpark code, but the declaration itself is stored in the procedure.

    5. In the Advanced options section, specify the procedure options. For a detailed list of the procedure options, see the procedure option list.

    6. In the Properties section, add the key-value pairs to configure the job.

    7. Click Save.

Save a stored procedure for Spark

After you create the stored procedure by using the PySpark editor, you can save the stored procedure. To do so, follow these steps:

  1. In the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  2. In the query editor, create a stored procedure for Spark using Python with PySpark editor.

  3. Click Save > Save procedure.

  4. In the Save stored procedure dialog, specify the dataset name where you want to store the stored procedure and the name of the stored procedure.

  5. Click Save.

Use Java or Scala

To create a stored procedure for Spark using Java or Scala, follow these steps:

  1. Go to the BigQuery page.

    Go to BigQuery

  2. In the query editor, add the sample code for the CREATE PROCEDURE statement that appears.

    To create a stored procedures for Spark in Java or Scala with the main_file_uri option, use the following sample:

    # Create procedure with main_file_uri option
    CREATE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_NAME`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_JAR_URI"]);
    LANGUAGE JAVA|SCALA
    

    To create a stored procedures for Spark in Java or Scala with main_class and jar_uris options, use the following sample:

    # Create procedure with main_class and jar_uris option
    CREATE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_NAME`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_class=["CLASS_NAME"],
        jar_uris=["URI", ""]);
    LANGUAGE JAVA|SCALA
    

    Replace the following:

    • MAIN_JAR_URI: the path to a JAR file that has the main class with it—for example, gs://mybucket/my_main.jar

    • CLASS_NAME: the fully qualified name of a class in a JAR set with the jar_uris option—for example, com.example.wordcount

    • URI: the path to a JAR file that has a class specified in the main class—for example, gs://mybucket/mypysparkmain.jar

    • RUNTIME_VERSION: the runtime version of Spark—for example, 1.1

    For the definition of other variables, see Use Python.

Call a stored procedure for Spark

After you create a stored procedure, you can call it by using one the following options:

Console

  1. Go to the BigQuery page.

    Go to BigQuery

  2. In the Explorer pane, expand your project and select the stored procedure for Spark that you want to run.

  3. In the Stored procedure info window, click Invoke stored procedure. Alternatively, you can expand the View actions option and click Invoke.

  4. Click Run.

  5. In the All results section, click View results.

  6. Optional: In the Query results section, follow these steps:

    • If you want to view Spark driver logs, then click Execution details.

    • If you want to view logs in Cloud Logging, click Job information, and then in the Log field, click log.

    • If you want to get the Spark History Server endpoint, click Job information, and then click Spark history server.

SQL

To call a stored procedure, use the CALL PROCEDURE statement:

  1. In the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  2. In the query editor, enter the following statement:

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
    

  3. Click Run.

For more information about how to run queries, see Running interactive queries.

Examples of stored procedures for Spark

This section shows examples of how you can create a stored procedure for Apache Spark.

Use a PySpark or a JAR file in Cloud Storage

The following example shows how to create a stored procedure for Spark by using the my-project-id.us.my-connection connection and a PySpark or a JAR file that's stored in a Cloud Storage bucket:

Python

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Java or Scala

Use main_file_uri to create a stored procedure:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar")
LANGUAGE SCALA

Use main_class to create a stored procedure:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1",
main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"])
LANGUAGE SCALA

Use inline code

The following example shows how to create a stored procedure for Spark by using the connection my-project-id.us.my-connection and inline PySpark code:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

Pass a value as an input parameter

The following examples display the two methods to pass a value as an input parameter in Python:

Method 1: Use environment variables

In the PySpark code, you can obtain the input parameters of the stored procedure for Spark through environment variables in the Spark driver and executors. The name of the environment variable has the format of BIGQUERY_PROC_PARAM.PARAMETER_NAME, where PARAMETER_NAME is the name of the input parameter. For example, if the name of the input parameter is var, the name of the corresponding environment variable is BIGQUERY_PROC_PARAM.var. The input parameters are JSON encoded. In your PySpark code, you can get the input parameter value in a JSON string from the environment variable and decode it to a Python variable.

The following example shows how to get the value of an input parameter of type INT64 into your PySpark code:

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc = spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

"""

Method 2: Use a built-in library

In the PySpark code, you can simply import a built-in library and use it to populate all types of parameters. To pass the parameters to executors, populate the parameters in a Spark driver as Python variables and pass the values to executors. The built-in library supports most of the BigQuery data types except INTERVAL, GEOGRAPHY, NUMERIC, and BIGNUMERIC.

BigQuery data type Python data type
BOOL bool
STRING str
FLOAT64 float
INT64 int
BYTES bytes
DATE datetime.date
TIMESTAMP datetime.datetime
TIME datetime.time
DATETIME datetime.datetime
Array Array
Struct Struct
JSON Object
NUMERIC Unsupported
BIGNUMERIC Unsupported
INTERVAL Unsupported
GEOGRAPHY Unsupported

The following example shows how to import the built-in library and use it to populate an input parameter of type INT64 and an input parameter of type ARRAY<STRUCT<a INT64, b STRING>> into your PySpark code:

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
from bigquery.spark.procedure import SparkProcParamContext

def check_in_param(x, num):
  return x['a'] + num

def main():
  spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
  sc=spark.sparkContext
  spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

  # Get the input parameter num of type INT64
  num = spark_proc_param_context.num

  # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>>
  info = spark_proc_param_context.info

  # Pass the parameter to executors
  df = sc.parallelize(info)
  value = df.map(lambda x : check_in_param(x, num)).sum()

main()
"""

In the Java or Scala code, you can obtain the input parameters of the stored procedure for Spark through environment variables in the Spark driver and executors. The name of the environment variable has the format of BIGQUERY_PROC_PARAM.PARAMETER_NAME, where PARAMETER_NAME is the name of the input parameter. For example, if the name of the input parameter is var, the name of the corresponding environment variable is BIGQUERY_PROC_PARAM.var. In your Java or Scala code, you can get the input parameter value from the environment variable.

The following example shows how to get the value of an input parameter from environment variables into your Scala code:

val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get

The following example shows getting input parameters from environment variables into your Java code:

String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");

Pass values as OUT and INOUT parameters

Output parameters return the value from the Spark procedure, whereas the INOUT parameter accepts a value for the procedure and returns a value from the procedure. To use the OUT and INOUT parameters, add the OUT or INOUT keyword before the parameter name when creating the Spark procedure. In the PySpark code, you use the built-in library to return a value as an OUT or an INOUT parameter. Same as input parameters, the built-in library supports most of the BigQuery data types except INTERVAL, GEOGRAPHY, NUMERIC, and BIGNUMERIC. The TIME and DATETIME type values are converted to the UTC timezone when returning as the OUT or INOUT parameters.


CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON)
WITH CONNECTION `my_bq_project.my_dataset.my_connection`
OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS
R"""
from pyspark.sql.session import SparkSession
import datetime
from bigquery.spark.procedure import SparkProcParamContext

spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate()
spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

# Reading the IN and INOUT parameter values.
int = spark_proc_param_context.int
dt = spark_proc_param_context.datetime
print("IN parameter value: ", int, ", INOUT parameter value: ", dt)

# Returning the value of the OUT and INOUT parameters.
spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.b = True
spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}]
spark_proc_param_context.time = datetime.time(23, 20, 50, 520000)
spark_proc_param_context.f = 20.23
spark_proc_param_context.bs = b"hello"
spark_proc_param_context.date = datetime.date(1985, 4, 12)
spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.js = {"name": "Alice", "age": 30}
""";

Read from a Hive Metastore table and write results to BigQuery

The following example shows how to transform a Hive Metastore table and write the results to BigQuery:

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

View log filters

After you call a stored procedure for Spark, you can view the log information. To obtain the Cloud Logging filter information and the Spark History Cluster endpoint, use the bq show command. The filter information is available under the SparkStatistics field of the child job. To get log filters, follow these steps:

  1. Go to the BigQuery page.

    Go to BigQuery

  2. In the query editor, list child jobs of the stored procedure's script job:

    bq ls -j --parent_job_id=$parent_job_id
    

    To learn how to get the job ID, see View job details.

    The output is similar to the following:

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
    
  3. Identify the jobId for your stored procedure and use the bq show command to view details of the job:

    bq show --format=prettyjson --job $child_job_id
    

    Copy the sparkStatistics field because you need it in another step.

    The output is similar to the following:

    {
    "configuration": {...}
    …
    "statistics": {
     …
      "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
        …
      }
    }
    }
    

  4. For Logging, generate log filters with the SparkStatistics fields:

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.resource_container=sparkStatistics.loggingInfo.projectId
    resource.labels.spark_job_id=sparkStatistics.sparkJobId
    resource.labels.location=sparkStatistics.sparkJobLocation
    
    

    The logs are written in the bigquery.googleapis.com/SparkJob monitored resource. The logs are labeled by the INFO and DRIVER components. To filter logs from The Spark driver, add the labels.component = "DRIVER" component to the log filters.

Limitations

Quotas and limits

For information about quotas and limits, see stored procedures for Spark quotas and limits.

What's next