This page shows you how to use the Spark Spanner Connector to read data from Spanner using Apache Spark
Calculating costs
In this document, you use the following billable components of Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
To generate a cost estimate based on your projected usage,
use the pricing calculator.
Before you begin
Before running the tutorial, make sure you know the connector version and get a connector URI.
How to specify the connector JAR file URI
Spark Spanner connector versions are listed in the GitHub GoogleCloudDataproc/spark-spanner-connector repository.
Specify the connector JAR file by substituting the connector version
information in the following URI string:
gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
The connector is available for Spark versions 3.1+
gcloud CLI example:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \ -- job-args
Prepare Spanner database
If you don't have a Spanner table, you can follow the
tutorial to create a
Spanner table. After that, you will have a instance ID,
a database ID and a table Singers
.
Create Dataproc cluster
Any Dataproc cluster using the connector needs the spanner
or cloud-platform
scopes. Dataproc clusters have default scope cloud-platform
for image 2.1 or higher. If you use older version, you can use the Google Cloud console, Google Cloud CLI, and the Dataproc API to create a Dataproc cluster.
Console
- In the Google Cloud console, open the Dataproc Create a cluster page
- On the "Manage security" tab, click the "Enables the cloud-platform scope for this cluster" under the "Project access" section.
- Complete filling in or confirming the other cluster creation fields, then click "Create".
Google Cloud CLI
gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
You can specify the GceClusterConfig.serviceAccountScopes as part of a clusters.create request. For example:"serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
You'll have to make sure corresponding Spanner permission is assigned to the Dataproc VM service account. If you use Data Boost in the tutorial, refer to the Data Boost IAM Permission
Read data from Spanner
You can use Scala and Python to read data from Spanner into a Spark Dataframe using Spark data source API.
Scala
- Examine the code and replace the [projectId], [instanceId], [databaseId], and [table] placeholder with
the project ID, instance ID, database ID and table you created earlier. The enableDataBoost option enables the Spanner Data Boost feature, which has
near-zero impact on the main Spanner instance.
object singers { def main(): Unit = { /* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "[projectId]") .option("instanceId", "[instanceId]") .option("databaseId", "[databaseId]") .option("enableDataBoost", true) .option("table", "[table]") .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
- Run the code on your cluster
- Use SSH to connect to the Dataproc cluster master node
- Go to the Dataproc Clusters page in the Google Cloud console, then click the name of your cluster
- On the >Cluster details page, select the VM Instances tab. Then, click
SSH
to the right of the name of the cluster master node
A browser window opens at your home directory on the master nodeConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Create
singers.scala
with the pre-installedvi
,vim
, ornano
text editor, then paste in the Scala code from the Scala code listingnano singers.scala
- Launch the
spark-shell
REPL.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
- Run singers.scala with the
:load singers.scala
command to create the SpannerSingers
table. The output listing displays examples from the Singers output.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
PySpark
- Examine the code and replace the [projectId], [instanceId], [databaseId], and [table] placeholder with
the project ID, instance ID, database ID and table you created earlier. The enableDataBoost option enables the Spanner Data Boost feature, which has
near-zero impact on the main Spanner instance.
#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "[projectId]") \ .option("instanceId", "[instanceId]") \ .option("databaseId", "[databaseId]") \ .option("enableDataBoost", "true") \ .option("table", "[table]") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
- Run the code on your cluster
- Use SSH to connect to the Dataproc cluster master node
- Go to the Dataproc Clusters page in the Google Cloud console, then click the name of your cluster
- On the Cluster details page, select the VM Instances tab. Then, click
SSH
to the right of the name of the cluster master node
A browser window opens on your home directory on the primary nodeConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Create
singers.py
with the pre-installedvi
,vim
, ornano
text editor, then paste in the PySpark code from the PySpark code listingnano singers.py
- Run singers.py with
spark-submit
to create the SpannerSingers
table. The output is:spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- Use SSH to connect to the Dataproc cluster master node
Cleanup
To cleanup and avoid incurring ongoing charges to your Google Cloud account for the resources created in this walkthrough, follow these steps.
gcloud dataproc clusters stop CLUSTER_NAME gcloud dataproc clusters delete CLUSTER_NAME