Stay organized with collections Save and categorize content based on your preferences.

Work with stored procedures for Apache Spark

As a data analyst, you can create and call stored procedures for Apache Spark in BigQuery.

Using BigQuery, you can create and run Apache Spark stored procedures that are written in Python. You can then run these stored procedures in BigQuery using a GoogleSQL query, similar to running SQL stored procedures.

To enroll in this preview, complete the enrollment form.

Before you begin

To create a stored procedure for Apache Spark, ask your admin to create a Spark connection and share it with you. Your admin must also grant the service account associated with the connection the required Identity and Access Management (IAM) permissions.

Required roles

To get the permissions that you need to perform the tasks in this document, ask your administrator to grant you the following IAM roles:

For more information about granting roles, see Manage access.

These predefined roles contain the permissions required to perform the tasks in this document. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to perform the tasks in this document:

  • Create a connection:
    • bigquery.connections.create
    • bigquery.connections.list
  • Create a stored procedure for Apache Spark:
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
  • Call a stored procedure for Apache Spark:
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

You might also be able to get these permissions with custom roles or other predefined roles.

Location consideration

You must create a stored procedure for Apache Spark in the same location as your connection because the stored procedure runs in the same location as the connection. For example, to create a stored procedure in the US multi-region, you use a connection located in the US multi-region.

Create a stored procedure for Apache Spark

You must create the stored procedure in the same location as the connection that you use.

If the body of your stored procedure is more than 1 MB, then we recommend that you put your stored procedure in a file in a Cloud Storage bucket instead of using inline code.

To create a stored procedure for Apache Spark, follow these steps:

  1. Go to the BigQuery page.

    Go to BigQuery

  2. In the Explorer pane, click the connection in the project that you used to create the connection resource.

  3. To create a stored procedure for Apache Spark, click Create stored procedure.

  4. In the query editor, you can modify the sample code for the CREATE PROCEDURE statement that appears:

    CREATE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_NAME`
     OPTIONS (
         engine="SPARK",
         main_file_uri=["URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Replace the following:

    • PROJECT_ID: the project in which you want to create the stored procedure—for example, myproject
    • DATASET: the dataset in which you want to create the stored procedure—for example, mydataset
    • PROCEDURE_NAME: the name of the stored procedure that you want to run in BigQuery—for example, mysparkprocedure
    • PROCEDURE_ARGUMENT: a parameter to enter the input arguments

      Stored procedures for Apache Spark only support IN parameters. In this parameter, specify the following fields:

      • ARGUMENT_NAME: the name of the argument
      • ARGUMENT_TYPE: the type of the argument

      For example: myproject.mydataset.mysparkproc(num INT64)

      For more information, see Pass a value as an input parameter in this document.

    • CONNECTION_NAME: the name of the Spark connection—for example, myproject.us.mysparkconnection
    • URI: the path to a PySpark file—for example, gs://mybucket/mypysparkmain.py

      Alternatively, if you want to add the body of the stored procedure in the CREATE PROCEDURE statement, then add the PYSPARK_CODE after LANGUAGE PYTHON AS as shown in the example in Use inline code in this document.

    • PYSPARK_CODE: the definition of a PySpark application in the CREATE PROCEDURE statement if you want to pass the body of the procedure inline

      The value is a string literal. If the code includes quotation marks and backslashes, those must be either escaped or represented as a raw string. For example, the code return "\n"; can be represented as one of the following:

      • Quoted string: "return \"\\n\";". Both quotation marks and backslashes are escaped.
      • Triple-quoted string: """return "\\n";""". Backslashes are escaped while quotation marks are not.
      • Raw string: r"""return "\n";""". No escaping is needed.
      To learn how to add inline PySpark code, see Use inline code.

    For additional options that you can specify in OPTIONS, see the procedure option list.

Call a stored procedure for Apache Spark

After you create a stored procedure, you can call it by using one the following options:

Console

  1. Go to the BigQuery page.

    Go to BigQuery

  2. In the Explorer pane, expand your project and select the stored procedure for Apache Spark that you want to run.

  3. In the Stored procedure info window, click Invoke stored procedure. Alternatively, you can expand the View actions option and click Invoke.

  4. Optional: In the Query results section, follow these steps:

    • If you want to view Spark driver logs, then click Execution details.

    • If you want to view logs in Cloud Logging, click Job information, and then in the Log field, click log.

    • If you want to get the Spark History Server endpoint, click Job information, and then click Spark history server.

SQL

To call a stored procedure, use the CALL PROCEDURE statement:

  1. In the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  2. In the query editor, enter the following statement:

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
    

  3. Click Run.

For more information about how to run queries, see Running interactive queries.

Examples of stored procedures for Apache Spark

This section shows examples of how you can create a stored procedure for Apache Spark.

Use a PySpark file in Cloud Storage

The following example shows how to create a stored procedure for Apache Spark by using the connection my-project-id.us.my-connection and a PySpark file (gs://my-bucket/my-pyspark-main.py) that's stored in a Cloud Storage bucket:

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Use inline code

The following example shows how to create a stored procedure for Apache Spark by using the connection my-project-id.us.my-connection and inline PySpark code:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

Pass a value as an input parameter

In the PySpark code, you can obtain the input parameters of the stored procedure for Apache Spark through environment variables in the Spark driver and executors. The name of the environment variable has the format of BIGQUERY_PROC_PARAM.PARAMETER_NAME, where PARAMETER_NAME is the name of the input parameter. For example, if the name of the input parameter is var, the name of the corresponding environment variable is BIGQUERY_PROC_PARAM.var. The input parameters are JSON encoded. In your PySpark code, you can get the input parameter value in a JSON string from the environment variable and decode it to a Python variable or a Spark Dataframe.

The following example shows how to get the value of an input parameter of type INT64 and an input parameter of type STRUCT into your PySpark code:

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc=spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

# Get the input parameter info in JSON string and create a Spark DataFrame
df = spark.read.json(sc.parallelize([os.environ["BIGQUERY_PROC_PARAM.info"]]))
df.printSchema()
df.show(truncate=False)
"""

Read from a Hive Metastore table and write results to BigQuery

The following example shows how to transform a Hive Metastore table and write the results to BigQuery:

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

View log filters

After you call a stored procedure for Apache Spark, you can view the log information. To obtain the Cloud Logging filter information and the Spark History Cluster endpoint, use the bq show command. The filter information is available under the SparkStatistics field of the child job. To get log filters, follow these steps:

  1. Go to the BigQuery page.

    Go to BigQuery

  2. In the query editor, list child jobs of the stored procedure's script job:

    bq ls -j --parent_job_id=$parent_job_id
    

    To learn how to get the job ID, see View job details.

    The output is similar to the following:

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
    
  3. Identify the jobId for your stored procedure and use the bq show command to view details of the job:

    bq show --format=prettyjson --job $child_job_id
    

    Copy the sparkStatistics field because you need it in another step.

    The output is similar to the following:

    {
    "configuration": {...}
    …
    "statistics": {
     …
      "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
        …
      }
    }
    }
    

  4. For Logging, generate log filters with the SparkStatistics fields:

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.project_id=sparkStatistics.loggingInfo.projectId
    resource.labels.batch_id=sparkStatistics.sparkJobId
    

Limitations

Quotas and limits

For information about quotas and limits, see stored procedures for Apache Spark quotas and limits.

What's next