Use the Spanner connector with Spark

This page shows you how to use the Spark Spanner Connector to read data from Spanner using Apache Spark

Calculating costs

In this document, you use the following billable components of Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

Before you begin

Before running the tutorial, make sure you know the connector version and get a connector URI.

How to specify the connector JAR file URI

Spark Spanner connector versions are listed in the GitHub GoogleCloudDataproc/spark-spanner-connector repository.

Specify the connector JAR file by substituting the connector version information in the following URI string:

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

The connector is available for Spark versions 3.1+

gcloud CLI example:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Prepare Spanner database

If you don't have a Spanner table, you can follow the tutorial to create a Spanner table. After that, you will have a instance ID, a database ID and a table Singers.

Create Dataproc cluster

Any Dataproc cluster using the connector needs the spanner or cloud-platform scopes. Dataproc clusters have default scope cloud-platform for image 2.1 or higher. If you use older version, you can use the Google Cloud console, Google Cloud CLI, and the Dataproc API to create a Dataproc cluster.

Console

  1. In the Google Cloud console, open the Dataproc Create a cluster page
  2. On the "Manage security" tab, click the "Enables the cloud-platform scope for this cluster" under the "Project access" section.
  3. Complete filling in or confirming the other cluster creation fields, then click "Create".

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

You can specify the GceClusterConfig.serviceAccountScopes as part of a clusters.create request. For example:
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

You'll have to make sure corresponding Spanner permission is assigned to the Dataproc VM service account. If you use Data Boost in the tutorial, refer to the Data Boost IAM Permission

Read data from Spanner

You can use Scala and Python to read data from Spanner into a Spark Dataframe using Spark data source API.

Scala

  1. Examine the code and replace the [projectId], [instanceId], [databaseId], and [table] placeholder with the project ID, instance ID, database ID and table you created earlier. The enableDataBoost option enables the Spanner Data Boost feature, which has near-zero impact on the main Spanner instance.
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
  2. Run the code on your cluster
    1. Use SSH to connect to the Dataproc cluster master node
      1. Go to the Dataproc Clusters page in the Google Cloud console, then click the name of your cluster
        Dataproc clusters page in the Cloud console.
      2. On the >Cluster details page, select the VM Instances tab. Then, click SSH to the right of the name of the cluster master node
        Dataproc Cluster details page in the Cloud console.

        A browser window opens at your home directory on the master node
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Create singers.scala with the pre-installed vi, vim, or nano text editor, then paste in the Scala code from the Scala code listing
      nano singers.scala
        
    3. Launch the spark-shell REPL.
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. Run singers.scala with the :load singers.scala command to create the Spanner Singers table. The output listing displays examples from the Singers output.
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. Examine the code and replace the [projectId], [instanceId], [databaseId], and [table] placeholder with the project ID, instance ID, database ID and table you created earlier. The enableDataBoost option enables the Spanner Data Boost feature, which has near-zero impact on the main Spanner instance.
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
  2. Run the code on your cluster
    1. Use SSH to connect to the Dataproc cluster master node
      1. Go to the Dataproc Clusters page in the Google Cloud console, then click the name of your cluster
        Clusters page in the Cloud console.
      2. On the Cluster details page, select the VM Instances tab. Then, click SSH to the right of the name of the cluster master node
        Select SSH on cluster name row on Cluster details page in the Cloud console.

        A browser window opens on your home directory on the primary node
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Create singers.py with the pre-installed vi, vim, or nano text editor, then paste in the PySpark code from the PySpark code listing
      nano singers.py
      
    3. Run singers.py with spark-submit to create the Spanner Singers table.
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      The output is:
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

Cleanup

To cleanup and avoid incurring ongoing charges to your Google Cloud account for the resources created in this walkthrough, follow these steps.

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

For more information