Using the Cloud Dataflow Connector

Cloud Dataflow is a managed service for transforming and enriching data. The Cloud Dataflow connector for Cloud Spanner lets you read data from and write data to Cloud Spanner in a Cloud Dataflow pipeline, optionally transforming or modifying the data. You can also create pipelines that transfer data between Cloud Spanner and other Google Cloud Platform products.

The Cloud Dataflow connector is the recommended method for efficiently moving data into and out of Cloud Spanner in bulk.

The Cloud Dataflow connector for Cloud Spanner is part of the Apache Beam Java SDK, and it provides an API for performing the above actions. See the Apache Beam Programming Guide for more information about some of the concepts discussed below, such as PCollection objects and transforms.

Adding the connector to your Maven project

To add the GCP Cloud Dataflow connector to a Maven project, add the Maven artifact to your pom.xml file as a dependency:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>2.4.0</version>
</dependency>

Reading data from Cloud Spanner

To read from Cloud Spanner, apply the SpannerIO.read() transform. Configure the read using the methods in the SpannerIO.Read class. Applying the transform returns a PCollection<Struct>, where each element in the collection represents an individual row returned by the read operation. You can read from Cloud Spanner with and without a specific SQL query, depending on your desired output.

Applying the SpannerIO.read() transform returns a consistent view of data by performing a strong read. Unless you specify otherwise, the result of the read is snapshotted at the time that you started the read. See reads for more information about the different types of reads Cloud Spanner can perform.

Reading using a query

To read a specific set of data from Cloud Spanner, configure the transform using the SpannerIO.Read.withQuery() method to specify a SQL query. For example:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = p.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Reading without specifying a query

To read from a database without using a query, you can specify a table name and a list of columns, or you can read using an index. To read from selected columns, specify a table name and a list of columns when you construct your transform using SpannerIO.read(). For example:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = p.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

You can also read from the table using a specific set of keys as index values. To do so, build the read using an index that contains the desired key values with the SpannerIO.Read.withIndex() method.

Controlling the staleness of transaction data

A transform is guaranteed to be executed on a consistent snapshot of data. To control the staleness of data, use the SpannerIO.Read.withTimestampBound() method. See transactions for more information.

Reading from multiple tables in the same transaction

If you want to read data from multiple tables at the same point in time to ensure data consistency, perform all of the reads in a single transaction. To do so, apply a createTransaction() transform, creating a PCollectionView<Transaction> object which then creates a transaction. The resulting view can be passed to a read operation using SpannerIO.Read.withTransaction().

SpannerConfig spannerConfig = SpannerConfig.create()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId);
PCollectionView<Transaction> tx = p.apply(
    SpannerIO.createTransaction()
        .withSpannerConfig(spannerConfig)
        .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers = p.apply(SpannerIO.read()
    .withSpannerConfig(spannerConfig)
    .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
    .withTransaction(tx));
PCollection<Struct> albums = p.apply(SpannerIO.read().withSpannerConfig(spannerConfig)
    .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
    .withTransaction(tx));

Reading data from all available tables

You can read data from all available tables in a Cloud Spanner database:

PCollection<Struct> allRecords = p.apply(SpannerIO.read()
    .withSpannerConfig(spannerConfig)
    .withQuery("SELECT t.table_name FROM information_schema.tables AS t WHERE t"
        + ".table_catalog = '' AND t.table_schema = ''")).apply(
    MapElements.into(TypeDescriptor.of(ReadOperation.class))
        .via((SerializableFunction<Struct, ReadOperation>) input -> {
          String tableName = input.getString(0);
          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
        })).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Troubleshooting unsupported queries

The Cloud Dataflow connector only supports Cloud Spanner SQL queries where the first operator in the query execution plan is a Distributed Union. If you attempt to read data from Cloud Spanner using a query and you get an exception stating that the query does not have a DistributedUnion at the root, follow the steps in Understanding how Cloud Spanner Executes Queries to retrieve an execution plan for your query using the GCP Console.

If your SQL query isn't supported, simplify it to a query that has a distributed union as the first operator in the query execution plan. Remove aggregate functions, as well as the operators DISTINCT, GROUP BY, and ORDER, as they are the operators that are most likely to prevent the query from working.

Creating mutations for a write

Use the Mutation class's newInsertOrUpdateBuilder() method instead of the newInsertBuilder() method unless absolutely necessary. Cloud Dataflow provides at-least-once guarantees, meaning that the mutation is likely to be written several times. As a result, insert mutations are likely to generate errors that cause the pipeline to fail. To prevent these errors, create insert-or-update mutations, which can be applied more than once.

Writing to Cloud Spanner and transforming data

You can write data to Cloud Spanner with the Cloud Dataflow connector by using a SpannerIO.write() transform to execute a collection of input row mutations. The Cloud Dataflow connector groups mutations into batches for efficiency.

The following example shows how to apply a write transform to a PCollection of mutations:

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

If a transform unexpectedly stops before completion, mutations that have already been applied will not be rolled back.

If you are importing data into a new Cloud Spanner table, you can achieve significantly faster write throughput by dropping the indexes before doing the bulk upload of data. Restore the indexes after the upload finishes.

Applying groups of mutations atomically

You can use the MutationGroup class to ensure that a group of mutations are applied together atomically. Mutations in a MutationGroup are guaranteed to be submitted in the same transaction, but the transaction might be retried.

Mutation groups perform best when they are used to group together mutations that affect data stored close together in the key space. Because Cloud Spanner interleaves parent and child table data together in the parent table, that data is always close together in the key space. We recommend that you either structure your mutation group so that it contains one mutation that is applied to a parent table and additional mutations that are applied to child tables, or so that all of its mutations modify data that is close together in the key space. For more information about how Cloud Spanner stores parent and child table data, see Schema and Data Model. If you don't organize your mutation groups around the recommended table hierarchies, or if the data being accessed is not close together in the key space, Cloud Spanner might need to perform two-phase commits, which will result in slower performance. For more information, see Locality Tradeoffs.

To use MutationGroup, build a SpannerIO.write() transform and call the SpannerIO.Write.grouped() method, which returns a transform that you can then apply to a PCollection of MutationGroup objects.

When creating a MutationGroup, the first mutation listed becomes the primary mutation. If your mutation group affects both a parent and a child table, the primary mutation should be a mutation to the parent table. Otherwise, you can use any mutation as the primary mutation. The Cloud Dataflow connector uses the primary mutation to determine partition boundaries in order to efficiently batch mutations together.

For example, imagine that your application monitors behavior and flags problematic user behavior for review. For each flagged behavior, you want to update the Users table to block the user's access to your application, and you also need to record the incident in the PendingReviews table. To make sure that both of the tables are updated atomically, use a MutationGroup:

PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

When creating a mutation group, the first mutation supplied as an argument becomes the primary mutation. In this case, the two tables are unrelated, so there is no clear primary mutation. We've selected userMutation as primary by placing it first. Applying the two mutations separately would be faster, but wouldn't guarantee atomicity, so the mutation group is the best choice in this situation.

What's next

Learn more about designing an Apache Beam data pipeline.

Send feedback about...

Cloud Spanner Documentation