Spanner change streams to BigQuery template

The Spanner change streams to BigQuery template is a streaming pipeline that streams Spanner data change records and writes them into BigQuery tables using Dataflow Runner V2.

All change stream watched columns are included in each BigQuery table row, regardless of whether they are modified by a Spanner transaction. Columns not watched aren't included in the BigQuery row. Any Spanner changes less than the Dataflow watermark are either successfully applied to the BigQuery tables or are stored in the dead-letter queue for retry. BigQuery rows are inserted out of order compared to the original Spanner commit timestamp ordering.

If the necessary BigQuery tables don't exist, the pipeline creates them. Otherwise, existing BigQuery tables are used. The schema of existing BigQuery tables must contain the corresponding tracked columns of the Spanner tables and any additional metadata columns that aren't ignored explicitly by the ignoreFields option. See the description of the metadata fields in the following list. Each new BigQuery row includes all columns watched by the change stream from its corresponding row in your Spanner table at the timestamp of the change record.

The following metadata fields are added to BigQuery tables. For more details about these fields, see Data change records in "Change streams partitions, records, and queries."

When using this template, be aware of the following details:

  • This template doesn't propagate schema changes from Spanner to BigQuery. Because performing a schema change in Spanner is likely going to break the pipeline, you might need to recreate the pipeline after the schema change.
  • For OLD_AND_NEW_VALUES and NEW_VALUES value capture types, when the data change record contains an UPDATE change, the template needs to do a stale read to Spanner at the commit timestamp of the data change record to retrieve the unchanged but watched columns. Make sure to configure your database 'version_retention_period' properly for the stale read. For the NEW_ROW value capture type, the template is more efficient, because the data change record captures the full new row including columns that are not updated in UPDATE requests, and the template doesn't need to do a stale read.
  • To minimize network latency and network transport costs, run the Dataflow job from the same region as your Spanner instance or BigQuery tables. If you use sources, sinks, staging file locations, or temporary file locations that are located outside of your job's region, your data might be sent across regions. For more information, see Dataflow regions.
  • This template supports all valid Spanner data types. If the BigQuery type is more precise than the Spanner type, precision loss might occur during the transformation. Specifically:
    • For Spanner JSON type, the order of the members of an object is lexicographically ordered, but there is no such guarantee for BigQuery JSON type.
    • Spanner supports nanoseconds TIMESTAMP type, but BigQuery only supports microseconds TIMESTAMP type.
  • This template does not support using BigQuery Storage Write API in exactly-once mode.

Learn more about change streams, how to build change streams Dataflow pipelines, and best practices.

Pipeline requirements

  • The Spanner instance must exist prior to running the pipeline.
  • The Spanner database must exist prior to running the pipeline.
  • The Spanner metadata instance must exist prior to running the pipeline.
  • The Spanner metadata database must exist prior to running the pipeline.
  • The Spanner change stream must exist prior to running the pipeline.
  • The BigQuery dataset must exist prior to running the pipeline.

Template parameters

Parameter Description
spannerInstanceId The Spanner instance to read change streams from.
spannerDatabase The Spanner database to read change streams from.
spannerDatabaseRole Optional: The Spanner database role to use when running the template. This parameter is required only when the IAM principal who is running the template is a fine-grained access control user. The database role must have the SELECT privilege on the change stream and the EXECUTE privilege on the change stream's read function. For more information, see Fine-grained access control for change streams.
spannerMetadataInstanceId The Spanner instance to use for the change streams connector metadata table.
spannerMetadataDatabase The Spanner database to use for the change streams connector metadata table.
spannerChangeStreamName The name of the Spanner change stream to read from.
bigQueryDataset The BigQuery dataset for change streams output.
spannerProjectId Optional: The project to read change streams from. This value is also the project where the change streams connector metadata table is created. The default value for this parameter is the project where the Dataflow pipeline is running.
spannerMetadataTableName Optional: The Spanner change streams connector metadata table name to use. If not provided, a Spanner change streams connector metadata table is automatically created during the pipeline flow. You must provide this parameter when updating an existing pipeline. Otherwise, don't provide this parameter.
rpcPriority Optional: The request priority for Spanner calls. The value must be one of the following values: HIGH, MEDIUM, or LOW. The default value is HIGH.
startTimestamp Optional: The starting DateTime, inclusive, to use for reading change streams. Ex-2021-10-12T07:20:50.52Z. Defaults to the timestamp when the pipeline starts, that is, the current time.
endTimestamp Optional: The ending DateTime, inclusive, to use for reading change streams. Ex-2021-10-12T07:20:50.52Z. Defaults to an infinite time in the future.
bigQueryProjectId Optional: The BigQuery project. The default value is the project for the Dataflow job.
bigQueryChangelogTableNameTemplate Optional: The template for the name of BigQuery changelog tables. Defaults to _metadata_spanner_table_name_changelog.
deadLetterQueueDirectory Optional: The path to store any unprocessed records that failed to be processed. The default path is a directory under the Dataflow job's temp location. The default value is usually sufficient.
dlqRetryMinutes Optional: The number of minutes between dead-letter queue retries. The default value is 10.
ignoreFields Optional: A comma-separated list of fields (case sensitive) to ignore. These fields might be fields of watched tables, or metadata fields added by the pipeline. Ignored fields aren't inserted into BigQuery. When you ignore the _metadata_spanner_table_name field, the bigQueryChangelogTableNameTemplate parameter is also ignored.
useStorageWriteApi Optional: If true, the pipeline uses the BigQuery Storage Write API. The default value is false. For more information, see Using the Storage Write API.
useStorageWriteApiAtLeastOnce Optional: When using the Storage Write API, specifies the write semantics. To use at-least-once semantics, set this parameter to true. To use exactly-once semantics, set the parameter to false. This parameter applies only when useStorageWriteApi is true. The default value is false.
numStorageWriteApiStreams Optional: When using the Storage Write API, specifies the number of write streams. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter.
storageWriteApiTriggeringFrequencySec Optional: When using the Storage Write API, specifies the triggering frequency, in seconds. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Cloud Spanner change streams to BigQuery template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Spanner instance ID
  • SPANNER_DATABASE: Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Spanner metadata database
  • SPANNER_CHANGE_STREAM: Spanner change stream
  • BIGQUERY_DATASET: The BigQuery dataset for change streams output

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Spanner instance ID
  • SPANNER_DATABASE: Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Spanner metadata database
  • SPANNER_CHANGE_STREAM: Spanner change stream
  • BIGQUERY_DATASET: The BigQuery dataset for change streams output

What's next