Build change streams connections using Dataflow

This page demonstrates how to create Dataflow pipelines that consume and forward Spanner change data by using change streams. You can use the example code on this page to build custom pipelines.

Core concepts

The following are some core concepts for Dataflow pipelines for change streams.

Dataflow

Dataflow is a serverless, fast, and cost-effective service that supports both stream and batch processing. It provides portability with processing jobs written using the open source Apache Beam libraries and automates infrastructure provisioning and cluster management. Dataflow provides near real-time streaming when reading from change streams.

You can use Dataflow to consume Spanner change streams with the SpannerIO connector, which offers an abstraction over the Spanner API for querying change streams. With this connector, you don't have to manage the change streams partition lifecycle, which is necessary when you use the Spanner API directly. The connector provides you with a stream of data change records so that you are free to focus more on application logic, and less on specific API details and dynamic change stream partitioning. We recommend using the SpannerIO connector rather than the Spanner API in most circumstances where you need to read change stream data.

Dataflow templates are prebuilt Dataflow pipelines that implement common use cases. See Dataflow templates for an overview.

Dataflow pipeline

A Spanner change streams Dataflow pipeline is composed of four main parts:

  1. A Spanner database with a change stream
  2. The SpannerIO connector
  3. User-defined transforms and sinks
  4. An Apache Beam sink I/O writer

image

Spanner change stream

For details on how to create a change stream, see Create a change stream.

Apache Beam SpannerIO connector

This is the SpannerIO connector described in the earlier Dataflow section. It is a source I/O connector that emits a PCollection of data change records to later stages of the pipeline. The event time for each emitted data change record will be the commit timestamp. Note that the records emitted are unordered, and that the SpannerIO connector guarantees there will be no late records.

When working with change streams, Dataflow uses checkpointing. As a result, each worker might wait for up to the configured checkpoint interval for buffering changes before sending the changes for further processing.

User-defined transforms

A user-defined transform allows a user to aggregate, transform or modify processing data within a Dataflow pipeline. Common use cases for this are the removal of personally identifiable information, satisfying downstream data format requirements, and sorting. See the official Apache Beam documentation for the programming guide on transforms.

Apache Beam sink I/O writer

Apache Beam contains built-in I/O connectors that can be used to write from a Dataflow pipeline into a data sink like BigQuery. Most common data sinks are natively supported.

Dataflow templates

Dataflow templates provide a method to create Dataflow jobs based on prebuilt Docker images for common use-cases using the Google Cloud console, the Google Cloud CLI, or Rest API calls.

For Spanner change streams, we provide three Dataflow flex templates:

Set IAM Permissions for Dataflow templates

Before creating a Dataflow job with the three flex templates listed, ensure that you have the required IAM permissions for the following service accounts:

If you don't have the required IAM permissions, then you must specify a user-managed worker service account to create the Dataflow job. For more information, see Dataflow security and permissions.

When you try to run a job from a Dataflow flex template without all the required permissions, then your job might fail with a failed to read the result file error or a permission denied on resource error. For more information, see Troubleshoot Flex Templates.

Build a Dataflow pipeline

This section covers the connector's initial configuration, and provides samples for common integrations with the Spanner change streams feature.

To follow these steps, you need a Java development environment for Dataflow. For more information, see Create a Dataflow pipeline using Java.

Create a change stream

For details about how to create a change stream, see Create a change stream. To continue with the next steps, you must have a Spanner database with a change stream configured.

Grant fine-grained access control privileges

If you expect any fine-grained access control users to run the Dataflow job, ensure that the users are granted access to a database role that has the SELECT privilege on the change stream and the EXECUTE privilege on the change stream's table-valued function. Also ensure that the principal specifies the database role in the SpannerIO configuration or in the Dataflow flex template.

For more information, see About fine-grained access control.

Add the SpannerIO connector as a dependency

The Apache Beam SpannerIO connector encapsulates the complexity of consuming the change streams directly using the Cloud Spanner API, emitting a PCollection of change stream data records to later stages of the pipeline.

These objects can be consumed in other stages of the user's Dataflow pipeline. The change stream integration is part of the SpannerIO connector. To be able to use the SpannerIO connector, the dependency needs to be added to your pom.xml file:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Create a metadata database

The connector needs to keep track of each partition when running the Apache Beam pipeline. It keeps this metadata in a Spanner table created by the connector during initialization. You specify the database in which this table will be created when configuring the connector.

As described in Change streams best practices, we recommend creating a new database for this purpose, rather than allowing the connector to use your application's database to store its metadata table.

The owner of a Dataflow job that uses the SpannerIO connector needs to have the following IAM permissions set with this metadata database:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Configure the connector

The Spanner change streams connector can be configured as follows:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

The following are descriptions of the readChangeStream() options:

Spanner Config (Required)

Used for configuring the project, instance and database where the change stream was created and should be queried from. Also optionally specifies the database role to use when the IAM principal who is running the Dataflow job is a fine-grained access control user. The job assumes this database role for access to the change stream. For more information, see About fine-grained access control.

Change stream name (Required)

This name uniquely identifies the change stream. The name here must be the same as the one used when creating it.

Metadata instance ID (Optional)

This is the instance to store the metadata used by the connector to control the consumption of the change stream API data.

Metadata database ID (Required)

This is the database to store the metadata used by the connector to control the consumption of the change stream API data.

Metadata table name (Optional)

This should only be used when updating an existing pipeline.

This is the pre-existing metadata table name to be used by the connector. This is used by the connector to store the metadata to control the consumption of the change stream API data. If this option is omitted, Spanner creates a new table with a generated name on connector initialization.

RPC priority (Optional)

The request priority to be used for the change stream queries. If this parameter is omitted, high priority will be used.

InclusiveStartAt (Required)

Changes from the given timestamp are returned to the caller.

InclusiveEndAt (Optional)

Changes up to the given timestamp are returned to the caller. If this parameter is omitted, changes will be emitted indefinitely.

Add transforms and sinks to process change data

With the previous steps complete, the configured SpannerIO connector is ready to emit a PCollection of DataChangeRecord objects. See Example transforms and sinks for several sample pipeline configurations that process this streamed data in various ways.

Note that change stream records emitted by the SpannerIO connector are unordered. This is because PCollections don't provide any ordering guarantees. If you need an ordered stream, you must group and sort the records as transforms in your pipelines: see Sample: Order by key. You can extend this sample to sort the records based on any fields of the records, such as by transaction IDs.

Example transforms and sinks

You can define your own transforms and specify sinks to write the data into. Apache Beam documentation provides a myriad of transforms that can be applied, as well as ready to use I/O connectors to write the data into external systems.

Sample: Order by key

This code sample emits data change records ordered by commit timestamp and grouped by primary keys using the Dataflow connector.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

This code sample uses states and timers to buffer records for each key, and sets the expiration time of the timer to some user-configured time T in the future (defined in the BufferKeyUntilOutputTimestamp function). When the Dataflow watermark passes time T, this code flushes all records in the buffer with timestamp less than T, orders these records by commit timestamp, and outputs a key-value pair where:

  • The key is the input key, that is the primary key hashed to a bucket array of size 1000.
  • The value is the ordered data change records that were buffered for the key.

For each key, we have the following guarantees:

  • Timers are guaranteed to fire in order of expiration timestamp.
  • Downstream stages are guaranteed to receive the elements in the same order that they were produced.

For example, with a key of the value 100, the timer fires at T1 and T10 respectively, producing a bundle of data change records at each timestamp. Because the data change records outputted at T1 were produced before the data change records outputted at T10, the data change records outputted at T1 are also guaranteed to be received by the next stage before the data change records outputted at T10. This mechanism helps us guarantee strict commit timestamp ordering per primary key for downstream processing.

This process will repeat until the pipeline ends and all data change records have been processed (or it will repeat indefinitely if no end time is specified).

Note that this code sample uses states and timers, instead of windows, to perform ordering per key. The rationale is that windows aren't guaranteed to be processed in order. This means that older windows can be processed later than more recent windows, which could result in out of order processing.

BreakRecordByModFn

Each data change record may contain several mods. Each mod represents an insert, update, or delete to a single primary key value. This function breaks each data change record into separate data change records, one per mod.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

This function takes in a DataChangeRecord and outputs a DataChangeRecord keyed by the Spanner primary key hashed to an integer value.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

Timers and buffers are per-key. This function buffers each data change record until the watermark passes the timestamp at which we want to output the buffered data change records.

This code utilizes a looping timer to determine when to flush the buffer:

  1. When it sees a data change record for a key for the first time, it sets the timer to fire at the data change record's commit timestamp + incrementIntervalSeconds (a user-configurable option).
  2. When the timer fires, it adds all data change records in the buffer with timestamp less than the timer's expiration time to recordsToOutput. If the buffer has data change records whose timestamp is greater than or equal to the timer's expiration time, it adds those data change records back into the buffer instead of outputting them. It then sets the next timer to the current timer's expiration time plus incrementIntervalInSeconds.
  3. If recordsToOutput is not empty, the function orders the data change records in recordsToOutput by commit timestamp and transaction ID and then outputs them.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Ordering transactions

This pipeline can be changed to order by transaction ID and commit timestamp. To do so, buffer records for each transaction ID / commit timestamp pair, instead of for each Spanner key. This requires modification of the code in KeyByIdFn.

Sample: Assemble transactions

This code sample reads data change records, assembles all data change records belonging to the same transaction into a single element, and outputs that element. Note that the transactions outputted by this sample code are not ordered by commit timestamp.

This code sample uses buffers to assemble transactions from data change records. Upon receiving a data change record belonging to a transaction for the first time, it reads the numberOfRecordsInTransaction field in the data change record, which describes the expected number of data change records belonging to that transaction. It buffers the data change records belonging to that transaction until the number of buffered records matches numberOfRecordsInTransaction, upon which it outputs the bundled data change records.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

This function takes in a DataChangeRecord and outputs a DataChangeRecord keyed by the transaction ID.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn buffers received key-value pairs of {TransactionId, DataChangeRecord} from KeyByTransactionIdFn and buffers them in groups based on TransactionId. When the number of records buffered is equal to the number of records contained in the entire transaction, this function sorts the DataChangeRecord objects in the group by record sequence and outputs a key-value pair of {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

Here, we are assuming that SortKey is a user-defined class that represents a {CommitTimestamp, TransactionId} pair. For more information about the SortKey, see the sample implementation.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Sample: Filter by transaction tag

When a transaction modifying user data is tagged, the corresponding tag and its type get stored as part of DataChangeRecord. These examples demonstrate how to filter change stream records based on user-defined transaction tags as well as system tags:

User-defined tag filtering for my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

System tag filtering/TTL auditing:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Sample: Fetch full row

This example works with a Spanner table named Singer that has the following definition:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

Under the default OLD_AND_NEW_VALUES value capture mode of change streams, when there is an update to a Spanner row, the data change record received will contain only the columns that were changed. Tracked but unchanged columns will not be included in the record. The primary key of the mod can be used to do a Spanner snapshot read at the commit timestamp of the data change record to fetch the unchanged columns or even retrieve the full row.

Note that the database retention policy might need to be changed to a value greater or equal to the change stream retention policy in order for the snapshot read to succeed.

Also note that using the NEW_ROW value capture type is the recommended and more efficient way to do this, since it returns all tracked columns of the row by default and does not require an extra snapshot read into Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

This transformation will perform a stale read at the commit timestamp of each record received, and map the full row to JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

This code creates a Spanner database client to perform the full row fetch, and configures the session pool to have just a few sessions, performing reads in one instance of the ToFullReowJsonFn sequentially. Dataflow makes sure to spawn many instances of this function, each with its own client pool.

Sample: Spanner to Pub/Sub

In this scenario, the caller streams records to Pub/Sub as fast as possible, without any grouping or aggregation. This is a good fit for triggering downstream processing, as as streaming all new rows inserted into a Spanner table to Pub/Sub for further processing.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Note that the Pub/Sub sink can be configured to assure exactly-once semantics.

Sample: Spanner to Cloud Storage

In this scenario, the caller groups all the records within a given window and saves the group in separate Cloud Storage files. This is a good fit for analytics and point-in-time archival, which is independent from Spanner's retention period.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Note that the Cloud Storage sink provides at-least-once semantics by default. With extra processing, it can be modified to have exactly-once semantics.

We also provide a Dataflow template for this use case: see Connect change streams to Cloud Storage.

Sample: Spanner to BigQuery (ledger table)

Here, the caller streams change records into BigQuery. Each data change record is reflected as one row in BigQuery. This is a good fit for analytics. This code uses the functions defined earlier, in the Fetch full row section, to retrieve the full row of the record and write it into BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Note that the BigQuery sink provides at-least-once semantics by default. With extra processing, it can be modified to have exactly-once semantics.

We also provide a Dataflow template for this use case; see Connect change streams to BigQuery.

Monitor a pipeline

There are two classes of metrics available to monitor a change stream Dataflow pipeline.

Standard Dataflow metrics

Dataflow provides several metrics to make sure your job is healthy, such as data freshness, system lag, job throughput, worker CPU utilization and more. You can find more information in Using Monitoring for Dataflow pipelines.

For change streams pipelines, there are two main metrics that should be taken into account: the system latency and the data freshness.

The system latency will tell you the current maximum duration of time (in seconds) for which an item of data is processed or awaiting processing.

The data freshness will show you the amount of time between now (real time) and the output watermark. The output watermark of time T indicates that all elements with an event time (strictly) before T have been processed for computation. In other words, the data freshness measures how up to date the pipeline is, in regards to processing the events it has received.

If the pipeline is under-resourced, you can see that effect in these two metrics. The system latency will increase, because items need to wait for longer before they are processed. The data freshness will also increase, because the pipeline won't be able to keep up with the amount of data received.

Custom change stream metrics

These metrics are exposed in Cloud Monitoring and include:

  • Bucketed (histogram) latency between a record being committed in Spanner to it being emitted into a PCollection by the connector. This metric can be used to see any performance (latency) issues with the pipeline.
  • Total number of data records read. This is an overall indication of the number of records emitted by the connector. This number should be ever-increasing, mirroring the trend of writes in the underlying Spanner database.
  • Number of partitions that are being read. There should always be partitions being read. If this number is zero, it indicates that an error has occurred in the pipeline.
  • Total number of queries issued during the execution of the connector. This is an overall indication of change stream queries made to the Spanner instance throughout the execution of the pipeline. This can be used to get an estimate of the load from the connector to the Spanner database.

Update an existing pipeline

It is possible to update a running pipeline that uses the SpannerIO connector to process change streams if the job compatibility checks pass. To do this, you have to explicitly set the metadata table name parameter of the new job when updating it. Use the value of the metadataTable pipeline option from the job you are updating.

If you are using a Google-provided Dataflow template, set the table name using the parameter spannerMetadataTableName. You can also modify your existing job to explicitly use the metadata table with the method withMetadataTable(your-metadata-table-name) in the connector configuration. After that is done, you can follow the instructions in Launching your replacement job from the Dataflow documentation to update a running job.

Best practices for change streams and Dataflow

The following are some best practices for building change streams connections by using Dataflow.

Use a separate metadata database

We recommend creating a separate database for the SpannerIO connector to use for metadata storage, rather than configuring it to use your application database.

For more information, see Consider a separate metadata database.

Size your cluster

A rule of thumb for an initial number of workers in a Spanner change streams job is one worker per 1,000 writes per second. Note that this estimate can vary depending on several factors, such as the size of each transaction, how many change stream records are produced from a single transaction and other transformations, aggregations, or sinks that are being used in the pipeline.

After initial resourcing, it is important to keep track of the metrics mentioned in Monitor a pipeline, to ensure the pipeline is healthy. We recommend experimenting with an initial worker pool size and monitor how your pipeline deals with the load, increasing the number of nodes if necessary. The CPU utilization is a key metric to check if the load is proper and more nodes are needed.

Known limitations

There are a few known limitations when using Spanner change streams with Dataflow:

Autoscaling

Autoscaling support for any pipelines that include SpannerIO.readChangeStream requires Apache Beam 2.39.0 or higher.

If you use an Apache Beam version prior to 2.39.0, pipelines that include SpannerIO.readChangeStream need to explicitly specify the autoscaling algorithm as NONE, as described in Horizontal autoscaling.

To manually scale a Dataflow pipeline instead of using autoscaling, see Manually scaling a streaming pipeline.

Runner V2

The Spanner change streams connector requires Dataflow Runner V2. This has to be manually specified during the execution or an error will be thrown. You are able to specify Runner V2 by configuring your job with --experiments=use_unified_worker,use_runner_v2.

Snapshot

The Spanner change streams connector does not support Dataflow Snapshots.

Draining

The Spanner change streams connector does not support draining a job. It is only possible to cancel an existing job.

You can also update an existing pipeline without needing to stop it.

OpenCensus

To use OpenCensus to monitor your pipeline, specify version 0.28.3 or later.

NullPointerException on pipeline start

A bug in Apache Beam version 2.38.0 can cause a NullPointerException when starting the pipeline under certain conditions. This would prevent your job from starting, and display this error message instead:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

To address this issue, either use Apache Beam version 2.39.0 or later, or manually specify the version of beam-sdks-java-core as 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

More information