Dataflow is a managed service for transforming and enriching data. The Dataflow connector for Spanner lets you read data from and write data to Spanner in a Dataflow pipeline, optionally transforming or modifying the data. You can also create pipelines that transfer data between Spanner and other Google Cloud products.
The Dataflow connector is the recommended method for efficiently moving data into and out of Spanner in bulk, and for performing large transformations to a database which are not supported by Partitioned DML, such as table moves, bulk deletes that require a JOIN, and so on. When working with individual databases, there are other methods you can use to import and export data:
- Use the Google Cloud console to export an individual database from Spanner to Cloud Storage in Avro format.
- Use the Google Cloud console to import a database back into Spanner from files you exported to Cloud Storage.
- Use the REST API or Google Cloud CLI to run export or import jobs from Spanner to Cloud Storage and back (also using Avro format).
The Dataflow connector for Spanner is part of the Apache Beam Java SDK, and it provides an API for performing the above actions. For more information about some of the concepts discussed below, such as PCollection objects and transforms, see the Apache Beam programming guide.
Add the connector to your Maven project
To add the Google Cloud Dataflow connector to a Maven
project, add the beam-sdks-java-io-google-cloud-platform
Maven artifact to
your pom.xml
file as a dependency.
For example, assuming that your pom.xml
file sets beam.version
to the
appropriate version number, you would add the following dependency:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
Read data from Spanner
To read from Spanner, apply the SpannerIO.read() transform.
Configure the read using the methods in the SpannerIO.Read
class.
Applying the transform returns a PCollection<Struct>
, where
each element in the collection represents an individual row returned by the read
operation. You can read from Spanner with and without a specific SQL
query, depending on your desired output.
Applying the SpannerIO.read()
transform returns a consistent view of data by
performing a strong read. Unless you specify otherwise, the result of the read
is snapshotted at the time that you started the read. See reads for more
information about the different types of reads Spanner can perform.
Read data using a query
To read a specific set of data from Spanner, configure the transform
using the SpannerIO.Read.withQuery()
method to specify a SQL
query. For example:
Read data without specifying a query
To read from a database without using a query, you can specify a table name using the SpannerIO.Read.withTable() method, and specify a list of columns to read using the SpannerIO.Read.withColumns() method. For example:
GoogleSQL
PostgreSQL
To limit the rows read, you can specify a set of primary keys to read using the SpannerIO.Read.withKeySet() method.
You can also read a table using a specified secondary index. As with the readUsingIndex() API call, the index must contain all of the data that appears in the query results.
To do so, specify the table as shown in the previous example, and specify the
index that contains the desired column values using the
SpannerIO.Read.withIndex()
method. The index must store all
the columns that the transform needs to read. The base table's primary key is
implicitly stored. For example, to read the Songs
table using the index
SongsBySongName
, you use the
following code:
GoogleSQL
PostgreSQL
Control the staleness of transaction data
A transform is guaranteed to be executed on a consistent snapshot of data. To
control the staleness of data, use the
SpannerIO.Read.withTimestampBound()
method. See
transactions for more information.
Read from multiple tables in the same transaction
If you want to read data from multiple tables at the same point in time to
ensure data consistency, perform all of the reads in a single transaction. To do
so, apply a createTransaction()
transform, creating
a PCollectionView<Transaction>
object which then creates a transaction. The
resulting view can be passed to a read operation using
SpannerIO.Read.withTransaction()
.
GoogleSQL
PostgreSQL
Read data from all available tables
You can read data from all available tables in a Spanner database.
GoogleSQL
PostgreSQL
Troubleshoot unsupported queries
The Dataflow connector only supports Spanner SQL queries
where the first operator in the query execution plan is a Distributed
Union. If you attempt to read data from Spanner using a query and
you get an exception stating that the query does not have a DistributedUnion at
the root
, follow the steps in Understand how Spanner executes
queries to retrieve an execution plan for your query using the
Google Cloud console.
If your SQL query isn't supported, simplify it to a query that has a distributed
union as the first operator in the query execution plan. Remove aggregate
functions, table joins, as well as the operators DISTINCT
, GROUP BY
, and
ORDER
, as they are the operators that are most likely to prevent the query
from working.
Create mutations for a write
Use the Mutation
class's
newInsertOrUpdateBuilder()
method instead of the
newInsertBuilder()
method
unless absolutely necessary for Java pipelines. For Python pipelines, use
SpannerInsertOrUpdate()
instead of
SpannerInsert()
. Dataflow provides
at-least-once guarantees, meaning that the mutation might be written
several times. As a result, INSERT
only mutations might generate
com.google.cloud.spanner.SpannerException: ALREADY_EXISTS
errors that cause
the pipeline to fail. To prevent this error, use the INSERT_OR_UPDATE
mutation instead, which adds a new row or updates column values if the row
already exists. The INSERT_OR_UPDATE
mutation can be applied more than once.
Write to Spanner and transform data
You can write data to Spanner with the Dataflow
connector by using a SpannerIO.write()
transform to execute a
collection of input row mutations. The Dataflow connector groups
mutations into batches for efficiency.
The following example shows how to apply a write transform to a PCollection
of
mutations:
GoogleSQL
PostgreSQL
If a transform unexpectedly stops before completion, mutations that have already been applied won't be rolled back.
Apply groups of mutations atomically
You can use the MutationGroup
class to ensure that a
group of mutations are applied together atomically. Mutations in a
MutationGroup
are guaranteed to be submitted in the same transaction, but the
transaction might be retried.
Mutation groups perform best when they are used to group together mutations that affect data stored close together in the key space. Because Spanner interleaves parent and child table data together in the parent table, that data is always close together in the key space. We recommend that you either structure your mutation group so that it contains one mutation that is applied to a parent table and additional mutations that are applied to child tables, or so that all of its mutations modify data that is close together in the key space. For more information about how Spanner stores parent and child table data, see Schema and data model. If you don't organize your mutation groups around the recommended table hierarchies, or if the data being accessed is not close together in the key space, Spanner might need to perform two-phase commits, which will result in slower performance. For more information, see Locality tradeoffs.
To use MutationGroup
, build a SpannerIO.write()
transform and call the
SpannerIO.Write.grouped()
method, which returns a
transform that you can then apply to a PCollection
of MutationGroup
objects.
When creating a MutationGroup
, the first mutation listed becomes the
primary mutation. If your mutation group affects both a parent and a child
table, the primary mutation should be a mutation to the parent table. Otherwise,
you can use any mutation as the primary mutation. The Dataflow
connector uses the primary mutation to determine partition boundaries in order
to efficiently batch mutations together.
For example, imagine that your application monitors behavior and flags
problematic user behavior for review. For each flagged behavior, you want to
update the Users
table to block the user's access to your application, and you
also need to record the incident in the PendingReviews
table. To make sure
that both of the tables are updated atomically, use a MutationGroup
:
GoogleSQL
PostgreSQL
When creating a mutation group, the first mutation supplied as an argument
becomes the primary mutation. In this case, the two tables are unrelated, so
there is no clear primary mutation. We've selected userMutation
as primary by
placing it first. Applying the two mutations separately would be faster, but
wouldn't guarantee atomicity, so the mutation group is the best choice in this
situation.
What's next
- Learn more about designing an Apache Beam data pipeline.
- Export and import Spanner databases in the Google Cloud console using Dataflow.