Work with stored procedures for Apache Spark
This document is intended for data analysts and data engineers to create and call stored procedures for Spark in BigQuery.
Using BigQuery, you can create and run Spark stored procedures that are written in Python, Java, and Scala. You can then run these stored procedures in BigQuery using a GoogleSQL query, similar to running SQL stored procedures.
To enroll in this preview, complete the enrollment form.
Before you begin
To create a stored procedure for Spark, ask your administrator to create a Spark connection and share it with you. Your administrator must also grant the service account associated with the connection the required Identity and Access Management (IAM) permissions.
Required roles
To get the permissions that you need to perform the tasks in this document, ask your administrator to grant you the following IAM roles:
-
Create a stored procedure for Spark:
-
BigQuery Data Editor (
roles/bigquery.dataEditor
) on the dataset where you create the stored procedure -
BigQuery Connection Admin (
roles/bigquery.connectionAdmin
) on the connection that the stored procedure uses -
BigQuery Job User (
roles/bigquery.jobUser
) on your project
-
BigQuery Data Editor (
-
Call a stored procedure for Spark:
-
BigQuery Metadata Viewer (
roles/bigquery.metadataViewer
) on the dataset where the stored procedure is stored -
BigQuery Connection User (
roles/bigquery.connectionUser
) on the connection -
BigQuery Job User (
roles/bigquery.jobUser
) on your project
-
BigQuery Metadata Viewer (
For more information about granting roles, see Manage access.
These predefined roles contain the permissions required to perform the tasks in this document. To see the exact permissions that are required, expand the Required permissions section:
Required permissions
The following permissions are required to perform the tasks in this document:
-
Create a connection:
-
bigquery.connections.create
-
bigquery.connections.list
-
-
Create a stored procedure for Spark:
-
bigquery.routines.create
-
bigquery.connections.delegate
-
bigquery.jobs.create
-
-
Call a stored procedure for Spark:
-
bigquery.routines.get
-
bigquery.connections.use
-
bigquery.jobs.create
-
You might also be able to get these permissions with custom roles or other predefined roles.
Location consideration
You must create a stored procedure for Spark in the same location as your connection because the stored procedure runs in the same location as the connection. For example, to create a stored procedure in the US multi-region, you use a connection located in the US multi-region.
Create a stored procedure for Spark
You must create the stored procedure in the same location as the connection that you use.
If the body of your stored procedure is more than 1 MB, then we recommend that you put your stored procedure in a file in a Cloud Storage bucket instead of using inline code. BigQuery provides two methods to create a stored procedure for Spark using Python:
- If you want to use the
CREATE PROCEDURE
statement, use the query editor. - If you want to add the Python code, use the PySpark editor. After you create the stored procedure by using the PySpark editor, you can save the stored procedure.
Use Python with query editor
To create a stored procedure for Spark in the query editor, follow these steps:
Go to the BigQuery page.
In the query editor, add the sample code for the
CREATE PROCEDURE
statement that appears.Alternatively, in the Explorer pane, click the connection in the project that you used to create the connection resource. Then, to create a stored procedure for Spark, click
Create stored procedure.To create a stored procedures for Spark in Python, use the following sample:
CREATE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_NAME` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Replace the following:
PROJECT_ID
: the project in which you want to create the stored procedure—for example,myproject
DATASET
: the dataset in which you want to create the stored procedure—for example,mydataset
PROCEDURE_NAME
: the name of the stored procedure that you want to run in BigQuery—for example,mysparkprocedure
PROCEDURE_ARGUMENT
: a parameter to enter the input argumentsIn this parameter, specify the following fields:
ARGUMENT_MODE
: the mode of the argumentValid values include
IN
,OUT
, andINOUT
. By default the value isIN
. Stored procedures of Spark only supportOUT
andINOUT
for the Python language andIN
for Python, Java, and Scala languages.ARGUMENT_NAME
: the name of the argumentARGUMENT_TYPE
: the type of the argument
For example:
myproject.mydataset.mysparkproc(num INT64)
For more information, see pass a value as an
IN
parameter or theOUT
andINOUT
parameters in this document.CONNECTION_NAME
: the name of the Spark connection—for example,myproject.us.mysparkconnection
RUNTIME_VERSION
: the runtime version of Spark—for example,1.1
MAIN_PYTHON_FILE_URI
: the path to a PySpark file—for example,gs://mybucket/mypysparkmain.py
Alternatively, if you want to add the body of the stored procedure in the
CREATE PROCEDURE
statement, then addPYSPARK_CODE
afterLANGUAGE PYTHON AS
as shown in the example in Use inline code in this document.PYSPARK_CODE
: the definition of a PySpark application in theCREATE PROCEDURE
statement if you want to pass the body of the procedure inline.The value is a string literal. If the code includes quotation marks and backslashes, those must be either escaped or represented as a raw string. For example, the code return
"\n";
can be represented as one of the following:- Quoted string:
"return \"\\n\";"
. Both quotation marks and backslashes are escaped. - Triple-quoted string:
"""return "\\n";"""
. Backslashes are escaped while quotation marks are not. - Raw string:
r"""return "\n";"""
. No escaping is needed.
- Quoted string:
For additional options that you can specify in
OPTIONS
, see the procedure option list.
Use Python with PySpark editor
To create a stored procedure for Spark in the PySpark editor, follow these steps:
Go to the BigQuery page.
If you want to add the PySpark code, open the PySpark editor. To open the PySpark editor, click the
menu next to Compose new query, and then select Compose new PySpark Procedure.To set options for a stored procedure for Spark written in Python, click More > PySpark Options, and then do the following:
Specify the location where you want to run the PySpark code.
In the Connections field, specify the Spark connection.
In the Stored procedure invocation section, specify the dataset in which you want to store the temporary stored procedures that are generated. You can either set a specific dataset or allow for the use of a temporary dataset to invoke the PySpark code.
The temporary dataset is generated with the location specified in the preceding step. If a dataset name is specified, ensure that the dataset and Spark connection must be in the same location.
In the Input parameters section, define input parameters for the stored procedure. The value of the parameter is only used during in-session runs of the PySpark code, but the declaration itself is stored in the procedure.
In the Advanced options section, specify the procedure options. For a detailed list of the procedure options, see the procedure option list.
In the Properties section, add the key-value pairs to configure the job.
Click Save.
Save a stored procedure for Spark
After you create the stored procedure by using the PySpark editor, you can save the stored procedure. To do so, follow these steps:
In the Google Cloud console, go to the BigQuery page.
In the query editor, create a stored procedure for Spark using Python with PySpark editor.
Click
Save > Save procedure.
In the Save stored procedure dialog, specify the dataset name where you want to store the stored procedure and the name of the stored procedure.
Click Save.
Use Java or Scala
To create a stored procedure for Spark using Java or Scala, follow these steps:
Go to the BigQuery page.
In the query editor, add the sample code for the
CREATE PROCEDURE
statement that appears.To create a stored procedures for Spark in Java or Scala with the
main_file_uri
option, use the following sample:# Create procedure with main_file_uri option CREATE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_NAME` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_JAR_URI"]); LANGUAGE JAVA|SCALA
To create a stored procedures for Spark in Java or Scala with
main_class
andjar_uris
options, use the following sample:# Create procedure with main_class and jar_uris option CREATE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_NAME` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_class=["CLASS_NAME"], jar_uris=["URI", ""]); LANGUAGE JAVA|SCALA
Replace the following:
MAIN_JAR_URI
: the path to a JAR file that has the main class with it—for example,gs://mybucket/my_main.jar
CLASS_NAME
: the fully qualified name of a class in a JAR set with thejar_uris
option—for example,com.example.wordcount
URI
: the path to a JAR file that has a class specified in the main class—for example,gs://mybucket/mypysparkmain.jar
RUNTIME_VERSION
: the runtime version of Spark—for example,1.1
For the definition of other variables, see Use Python.
Call a stored procedure for Spark
After you create a stored procedure, you can call it by using one the following options:
Console
Go to the BigQuery page.
In the Explorer pane, expand your project and select the stored procedure for Spark that you want to run.
In the Stored procedure info window, click Invoke stored procedure. Alternatively, you can expand the View actions option and click Invoke.
Click Run.
In the All results section, click View results.
Optional: In the Query results section, follow these steps:
If you want to view Spark driver logs, then click Execution details.
If you want to view logs in Cloud Logging, click Job information, and then in the Log field, click log.
If you want to get the Spark History Server endpoint, click Job information, and then click Spark history server.
SQL
To call a stored procedure, use the CALL PROCEDURE
statement:
In the Google Cloud console, go to the BigQuery page.
In the query editor, enter the following statement:
CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
Click
Run.
For more information about how to run queries, see Running interactive queries.
Examples of stored procedures for Spark
This section shows examples of how you can create a stored procedure for Apache Spark.
Use a PySpark or a JAR file in Cloud Storage
The following example shows how to create a stored procedure for Spark by
using the my-project-id.us.my-connection
connection and a PySpark or a JAR
file that's stored in a Cloud Storage bucket:
Python
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py") LANGUAGE PYTHON
Java or Scala
Use main_file_uri
to create a stored procedure:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar") LANGUAGE SCALA
Use main_class
to create a stored procedure:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"]) LANGUAGE SCALA
Use inline code
The following example shows how to create a stored procedure
for Spark by using the connection my-project-id.us.my-connection
and
inline PySpark code:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() # Load data from BigQuery. words = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:samples.shakespeare") \ .load() words.createOrReplaceTempView("words") # Perform word count. word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count") word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("wordcount_dataset.wordcount_output") """
Pass a value as an input parameter
The following examples display the two methods to pass a value as an input parameter in Python:
Method 1: Use environment variables
In the PySpark code, you can obtain the input parameters of the stored procedure
for Spark through environment variables in the Spark driver and
executors. The name of the environment variable has the format of
BIGQUERY_PROC_PARAM.PARAMETER_NAME
,
where PARAMETER_NAME
is the name of the input parameter. For
example, if the name of the input parameter is var
, the name of the
corresponding environment variable is BIGQUERY_PROC_PARAM.var
. The input
parameters are JSON encoded.
In your PySpark code, you can get the input parameter value in a JSON
string from the environment variable and decode it to a Python variable.
The following example shows how to get the value of an input parameter of type
INT64
into your PySpark code:
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import os import json spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc = spark.sparkContext # Get the input parameter num in JSON string and convert to a Python variable num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"])) """
Method 2: Use a built-in library
In the PySpark code, you can simply import a built-in library and use it to
populate all types of parameters. To pass the parameters to executors, populate
the parameters in a Spark driver as Python variables and pass the values to
executors. The built-in library supports most of the BigQuery
data types except INTERVAL
, GEOGRAPHY
, NUMERIC
, and BIGNUMERIC
.
BigQuery data type | Python data type |
---|---|
BOOL
|
bool
|
STRING
|
str
|
FLOAT64
|
float
|
INT64
|
int
|
BYTES
|
bytes
|
DATE
|
datetime.date
|
TIMESTAMP
|
datetime.datetime
|
TIME
|
datetime.time
|
DATETIME
|
datetime.datetime
|
Array
|
Array
|
Struct
|
Struct
|
JSON
|
Object
|
NUMERIC
|
Unsupported |
BIGNUMERIC
|
Unsupported |
INTERVAL
|
Unsupported |
GEOGRAPHY
|
Unsupported |
The following example shows how to import the built-in library and use it to populate an input parameter of type INT64 and an input parameter of type ARRAY<STRUCT<a INT64, b STRING>> into your PySpark code:
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession from bigquery.spark.procedure import SparkProcParamContext def check_in_param(x, num): return x['a'] + num def main(): spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc=spark.sparkContext spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Get the input parameter num of type INT64 num = spark_proc_param_context.num # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>> info = spark_proc_param_context.info # Pass the parameter to executors df = sc.parallelize(info) value = df.map(lambda x : check_in_param(x, num)).sum() main() """
In the Java or Scala code, you can obtain the input parameters of the stored
procedure for Spark through environment variables in the Spark driver and
executors. The name of the environment variable has the format of
BIGQUERY_PROC_PARAM.PARAMETER_NAME
, where PARAMETER_NAME
is the name of the
input parameter. For example, if the name of the input parameter is var, the
name of the corresponding environment variable is BIGQUERY_PROC_PARAM.var
.
In your Java or Scala code, you can get the input parameter value from the
environment variable.
The following example shows how to get the value of an input parameter from environment variables into your Scala code:
val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get
The following example shows getting input parameters from environment variables into your Java code:
String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");
Pass values as OUT
and INOUT
parameters
Output parameters return the value from the Spark procedure, whereas the INOUT
parameter accepts a value for the procedure and returns a value from the procedure.
To use the OUT
and INOUT
parameters, add the OUT
or INOUT
keyword
before the parameter name when creating the Spark procedure. In the PySpark
code, you use the built-in library to return a value as an OUT
or an INOUT
parameter. Same as input parameters, the built-in library supports most of the
BigQuery data types except INTERVAL
, GEOGRAPHY
, NUMERIC
,
and BIGNUMERIC
. The TIME
and DATETIME
type values are converted to the UTC
timezone when returning as the OUT
or INOUT
parameters.
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON) WITH CONNECTION `my_bq_project.my_dataset.my_connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql.session import SparkSession import datetime from bigquery.spark.procedure import SparkProcParamContext spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate() spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Reading the IN and INOUT parameter values. int = spark_proc_param_context.int dt = spark_proc_param_context.datetime print("IN parameter value: ", int, ", INOUT parameter value: ", dt) # Returning the value of the OUT and INOUT parameters. spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.b = True spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}] spark_proc_param_context.time = datetime.time(23, 20, 50, 520000) spark_proc_param_context.f = 20.23 spark_proc_param_context.bs = b"hello" spark_proc_param_context.date = datetime.date(1985, 4, 12) spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.js = {"name": "Alice", "age": 30} """;
Read from a Hive Metastore table and write results to BigQuery
The following example shows how to transform a Hive Metastore table and write the results to BigQuery:
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \ .enableHiveSupport() \ .getOrCreate() spark.sql("CREATE DATABASE IF NOT EXISTS records") spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)") spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)") df = spark.sql("SELECT * FROM records.student") df.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("records_dataset.student") """
View log filters
After you call a stored procedure for Spark, you
can view the log information. To obtain the Cloud Logging filter information
and the Spark History Cluster endpoint, use the bq
show
command.
The filter information is available under the SparkStatistics
field of the
child job. To get log filters, follow these steps:
Go to the BigQuery page.
In the query editor, list child jobs of the stored procedure's script job:
bq ls -j --parent_job_id=$parent_job_id
To learn how to get the job ID, see View job details.
The output is similar to the following:
jobId Job Type State Start Time Duration ---------------------------------------------- --------- --------- --------------- ---------------- script_job_90fb26c32329679c139befcc638a7e71_0 query SUCCESS 07 Sep 18:00:27 0:05:15.052000
Identify the
jobId
for your stored procedure and use thebq show
command to view details of the job:bq show --format=prettyjson --job $child_job_id
Copy the
sparkStatistics
field because you need it in another step.The output is similar to the following:
{ "configuration": {...} … "statistics": { … "query": { "sparkStatistics": { "loggingInfo": { "projectId": "myproject", "resourceType": "myresource" }, "sparkJobId": "script-job-90f0", "sparkJobLocation": "us-central1" }, … } } }
For Logging, generate log filters with the
SparkStatistics
fields:resource.type = sparkStatistics.loggingInfo.resourceType resource.labels.resource_container=sparkStatistics.loggingInfo.projectId resource.labels.spark_job_id=sparkStatistics.sparkJobId resource.labels.location=sparkStatistics.sparkJobLocation
The logs are written in the
bigquery.googleapis.com/SparkJob
monitored resource. The logs are labeled by theINFO
andDRIVER
components. To filter logs from The Spark driver, add thelabels.component = "DRIVER"
component to the log filters.
Limitations
- You can only use gRPC endpoint protocol to connect to Dataproc Metastore. Other types of Hive Metastore are not yet supported.
- VPC Service Controls is not supported.
- Customer-managed encryption keys (CMEK) are not supported.
- Spark executor logs are not supported, but Spark driver logs are supported.
- Passing output parameters is only supported for PySpark.
- If the dataset associated with the stored procedure for Spark is replicated to a destination region through cross-region dataset replication, the stored procedure can only be queried in the region that it was created in.
Quotas and limits
For information about quotas and limits, see stored procedures for Spark quotas and limits.
What's next
- Learn how to view a stored procedure.
- Learn how to delete a stored procedure.
- Learn how to work with a SQL stored procedure.