Work with stored procedures for Apache Spark
As a data analyst, you can create and call stored procedures for Apache Spark in BigQuery.
Using BigQuery, you can create and run Apache Spark stored procedures that are written in Python. You can then run these stored procedures in BigQuery using a GoogleSQL query, similar to running SQL stored procedures.
To enroll in this preview, complete the enrollment form.
Before you begin
To create a stored procedure for Apache Spark, ask your admin to create a Spark connection and share it with you. Your admin must also grant the service account associated with the connection the required Identity and Access Management (IAM) permissions.
Required roles
To get the permissions that you need to perform the tasks in this document, ask your administrator to grant you the following IAM roles:
-
Create a stored procedure for Apache Spark:
-
BigQuery Data Editor (
roles/bigquery.dataEditor
) on the dataset where you create the stored procedure -
BigQuery Connection Admin (
roles/bigquery.connectionAdmin
) on the connection that the stored procedure uses -
BigQuery Job User (
roles/bigquery.jobUser
) on your project
-
BigQuery Data Editor (
-
Call a stored procedure for Apache Spark:
-
BigQuery Metadata Viewer (
roles/bigquery.metadataViewer
) on the dataset where the stored procedure is stored -
BigQuery Connection User (
roles/bigquery.connectionUser
) on the connection -
BigQuery Job User (
roles/bigquery.jobUser
) on your project
-
BigQuery Metadata Viewer (
For more information about granting roles, see Manage access.
These predefined roles contain the permissions required to perform the tasks in this document. To see the exact permissions that are required, expand the Required permissions section:
Required permissions
The following permissions are required to perform the tasks in this document:
-
Create a connection:
-
bigquery.connections.create
-
bigquery.connections.list
-
-
Create a stored procedure for Apache Spark:
-
bigquery.routines.create
-
bigquery.connections.delegate
-
bigquery.jobs.create
-
-
Call a stored procedure for Apache Spark:
-
bigquery.routines.get
-
bigquery.connections.use
-
bigquery.jobs.create
-
You might also be able to get these permissions with custom roles or other predefined roles.
Location consideration
You must create a stored procedure for Apache Spark in the same location as your connection because the stored procedure runs in the same location as the connection. For example, to create a stored procedure in the US multi-region, you use a connection located in the US multi-region.
Create a stored procedure for Apache Spark
You must create the stored procedure in the same location as the connection that you use.
If the body of your stored procedure is more than 1 MB, then we recommend that you put your stored procedure in a file in a Cloud Storage bucket instead of using inline code.
To create a stored procedure for Apache Spark, follow these steps:
Go to the BigQuery page.
In the Explorer pane, click the connection in the project that you used to create the connection resource.
To create a stored procedure for Apache Spark, click
Create stored procedure.In the query editor, you can modify the sample code for the
CREATE PROCEDURE
statement that appears:CREATE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_NAME` OPTIONS ( engine="SPARK", main_file_uri=["URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Replace the following:
PROJECT_ID
: the project in which you want to create the stored procedure—for example,myproject
DATASET
: the dataset in which you want to create the stored procedure—for example,mydataset
PROCEDURE_NAME
: the name of the stored procedure that you want to run in BigQuery—for example,mysparkprocedure
PROCEDURE_ARGUMENT
: a parameter to enter the input argumentsStored procedures for Apache Spark only support IN parameters. In this parameter, specify the following fields:
ARGUMENT_NAME
: the name of the argumentARGUMENT_TYPE
: the type of the argument
For example:
myproject.mydataset.mysparkproc(num INT64)
For more information, see Pass a value as an input parameter in this document.
CONNECTION_NAME
: the name of the Spark connection—for example,myproject.us.mysparkconnection
URI
: the path to a PySpark file—for example,gs://mybucket/mypysparkmain.py
Alternatively, if you want to add the body of the stored procedure in the
CREATE PROCEDURE
statement, then add thePYSPARK_CODE
afterLANGUAGE PYTHON AS
as shown in the example in Use inline code in this document.PYSPARK_CODE
: the definition of a PySpark application in theCREATE PROCEDURE
statement if you want to pass the body of the procedure inlineThe value is a string literal. If the code includes quotation marks and backslashes, those must be either escaped or represented as a raw string. For example, the code return
"\n";
can be represented as one of the following:- Quoted string:
"return \"\\n\";"
. Both quotation marks and backslashes are escaped. - Triple-quoted string:
"""return "\\n";"""
. Backslashes are escaped while quotation marks are not. - Raw string:
r"""return "\n";"""
. No escaping is needed.
- Quoted string:
For additional options that you can specify in
OPTIONS
, see the procedure option list.
Call a stored procedure for Apache Spark
After you create a stored procedure, you can call it by using one the following options:
Console
Go to the BigQuery page.
In the Explorer pane, expand your project and select the stored procedure for Apache Spark that you want to run.
In the Stored procedure info window, click Invoke stored procedure. Alternatively, you can expand the View actions option and click Invoke.
Optional: In the Query results section, follow these steps:
If you want to view Spark driver logs, then click Execution details.
If you want to view logs in Cloud Logging, click Job information, and then in the Log field, click log.
If you want to get the Spark History Server endpoint, click Job information, and then click Spark history server.
SQL
To call a stored procedure, use the CALL PROCEDURE
statement:
In the Google Cloud console, go to the BigQuery page.
In the query editor, enter the following statement:
CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
Click
Run.
For more information about how to run queries, see Running interactive queries.
Examples of stored procedures for Apache Spark
This section shows examples of how you can create a stored procedure for Apache Spark.
Use a PySpark file in Cloud Storage
The following example shows how to create a stored
procedure for Apache Spark by using the connection my-project-id.us.my-connection
and a PySpark file (gs://my-bucket/my-pyspark-main.py
) that's stored in a
Cloud Storage bucket:
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", main_file_uri="gs://my-bucket/my-pyspark-main.py") LANGUAGE PYTHON
Use inline code
The following example shows how to create a stored procedure
for Apache Spark by using the connection my-project-id.us.my-connection
and
inline PySpark code:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() # Load data from BigQuery. words = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:samples.shakespeare") \ .load() words.createOrReplaceTempView("words") # Perform word count. word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count") word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("wordcount_dataset.wordcount_output") """
Pass a value as an input parameter
In the PySpark code, you can obtain the input parameters of the stored procedure
for Apache Spark through environment variables in the Spark driver and
executors. The name of the environment variable has the format of
BIGQUERY_PROC_PARAM.PARAMETER_NAME
,
where PARAMETER_NAME
is the name of the input parameter. For
example, if the name of the input parameter is var
, the name of the
corresponding environment variable is BIGQUERY_PROC_PARAM.var
. The input
parameters are JSON encoded.
In your PySpark code, you can get the input parameter value in a JSON
string from the environment variable and decode it to a Python variable or a
Spark Dataframe.
The following example shows how to get the value of an input parameter of type
INT64
and an input parameter of type STRUCT
into your PySpark code:
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import os import json spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc=spark.sparkContext # Get the input parameter num in JSON string and convert to a Python variable num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"])) # Get the input parameter info in JSON string and create a Spark DataFrame df = spark.read.json(sc.parallelize([os.environ["BIGQUERY_PROC_PARAM.info"]])) df.printSchema() df.show(truncate=False) """
Read from a Hive Metastore table and write results to BigQuery
The following example shows how to transform a Hive Metastore table and write the results to BigQuery:
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \ .enableHiveSupport() \ .getOrCreate() spark.sql("CREATE DATABASE IF NOT EXISTS records") spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)") spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)") df = spark.sql("SELECT * FROM records.student") df.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("records_dataset.student") """
View log filters
After you call a stored procedure for Apache Spark, you
can view the log information. To obtain the Cloud Logging filter information
and the Spark History Cluster endpoint, use the bq
show
command.
The filter information is available under the SparkStatistics
field of the
child job. To get log filters, follow these steps:
Go to the BigQuery page.
In the query editor, list child jobs of the stored procedure's script job:
bq ls -j --parent_job_id=$parent_job_id
To learn how to get the job ID, see View job details.
The output is similar to the following:
jobId Job Type State Start Time Duration ---------------------------------------------- --------- --------- --------------- ---------------- script_job_90fb26c32329679c139befcc638a7e71_0 query SUCCESS 07 Sep 18:00:27 0:05:15.052000
Identify the
jobId
for your stored procedure and use thebq show
command to view details of the job:bq show --format=prettyjson --job $child_job_id
Copy the
sparkStatistics
field because you need it in another step.The output is similar to the following:
{ "configuration": {...} … "statistics": { … "query": { "sparkStatistics": { "loggingInfo": { "projectId": "myproject", "resourceType": "myresource" }, "sparkJobId": "script-job-90f0", "sparkJobLocation": "us-central1" }, … } } }
For Logging, generate log filters with the
SparkStatistics
fields:resource.type = sparkStatistics.loggingInfo.resourceType resource.labels.project_id=sparkStatistics.loggingInfo.projectId resource.labels.batch_id=sparkStatistics.sparkJobId
Limitations
- You can only use gRPC endpoint protocol to connect to Dataproc Metastore. Other types of Hive Metastore are not yet supported.
- VPC Service Controls is not supported.
- Customer-managed encryption keys (CMEK) are not supported.
- Spark executor logs are not supported, but Spark driver logs are supported.
- Passing output parameters is not supported, but passing input parameters is supported.
Quotas and limits
For information about quotas and limits, see stored procedures for Apache Spark quotas and limits.
What's next
- Learn how to view a stored procedure.
- Learn how to delete a stored procedure.
- Learn how to work with a SQL stored procedure.