Cloud Spanner change streams to BigQuery template

The Cloud Spanner change streams to BigQuery template is a streaming pipeline that streams Cloud Spanner data change records and writes them into BigQuery tables using Dataflow Runner V2.

All change stream watched columns are included in each BigQuery table row, regardless of whether they are modified by a Cloud Spanner transaction. Columns not watched are not included in the BigQuery row. Any Cloud Spanner change less than the Dataflow watermark are either successfully applied to the BigQuery tables or are stored in the dead-letter queue for retry. BigQuery rows are inserted out of order compared to the original Cloud Spanner commit timestamp ordering.

If the necessary BigQuery tables don't exist, the pipeline creates them. Otherwise, existing BigQuery tables are used. The schema of existing BigQuery tables must contain the corresponding tracked columns of the Cloud Spanner tables and any additional metadata columns that are not ignored explicitly by the ignoreFields option. See the description of the metadata fields in the following list. Each new BigQuery row includes all columns watched by the change stream from its corresponding row in your Cloud Spanner table at the change record's timestamp.

The following metadata fields are added to BigQuery tables. For more details about these fields, see Data change records in "Change streams partitions, records, and queries."

Note:

  • This template does not propagate schema changes from Cloud Spanner to BigQuery. Because performing a schema change in Cloud Spanner is likely going to break the pipeline, you might need to recreate the pipeline after the schema change.
  • For OLD_AND_NEW_VALUES and NEW_VALUES value capture types, when the data change record contains an UPDATE change, the template needs to do a stale read to Cloud Spanner at the commit timestamp of the data change record to retrieve the unchanged but watched columns. Make sure to configure your database 'version_retention_period' properly for the stale read. For the NEW_ROW value capture type, the template is more efficient, because the data change record captures the full new row including columns that are not updated in UPDATEs, and the template does not need to do a stale read.
  • You can minimize network latency and network transport costs by running the Dataflow job from the same region as your Cloud Spanner instance or BigQuery tables. If you use sources, sinks, staging file locations, or temporary file locations that are located outside of your job's region, your data might be sent across regions. See more about Dataflow regional endpoints.
  • This template supports all valid Cloud Spanner data types, but if the BigQuery type is more precise than the Cloud Spanner type, precision loss might occur during the transformation. Specifically:
    • For Cloud Spanner JSON type, the order of the members of an object is lexicographically ordered, but there is no such guarantee for BigQuery JSON type.
    • Cloud Spanner supports nanoseconds TIMESTAMP type, BigQuery only supports microseconds TIMESTAMP type.

Learn more about change streams, how to build change streams Dataflow pipelines, and best practices.

Pipeline requirements

  • The Cloud Spanner instance must exist prior to running the pipeline.
  • The Cloud Spanner database must exist prior to running the pipeline.
  • The Cloud Spanner metadata instance must exist prior to running the pipeline.
  • The Cloud Spanner metadata database must exist prior to running the pipeline.
  • The Cloud Spanner change stream must exist prior to running the pipeline.
  • The BigQuery dataset must exist prior to running the pipeline.

Template parameters

Parameter Description
spannerInstanceId The Cloud Spanner instance to read change streams from.
spannerDatabase The Cloud Spanner database to read change streams from.
spannerMetadataInstanceId The Cloud Spanner instance to use for the change streams connector metadata table.
spannerMetadataDatabase The Cloud Spanner database to use for the change streams connector metadata table.
spannerChangeStreamName The name of the Cloud Spanner change stream to read from.
bigQueryDataSet The BigQuery dataset for change streams output.
spannerProjectId (Optional) Project to read change streams from. This is also the project where the change streams connector metadata table is created. The default for this parameter is the project where the Dataflow pipeline is running.
spannerMetadataTableName (Optional) The Cloud Spanner change streams connector metadata table name to use. If not provided, a Cloud Spanner change streams connector metadata table is automatically created during the pipeline flow. This parameter must be provided when updating an existing pipeline and should not be provided otherwise.
rpcPriority (Optional) The request priority for Cloud Spanner calls. The value must be one of:[HIGH,MEDIUM,LOW]. (Default: HIGH)
startTimestamp (Optional) The starting DateTime, inclusive, to use for reading change streams. Ex-2021-10-12T07:20:50.52Z. Defaults to the timestamp when the pipeline starts, that is, the current time.
endTimestamp (Optional) The ending DateTime, inclusive, to use for reading change streams. Ex-2021-10-12T07:20:50.52Z. Defaults to an infinite time in the future.
bigQueryProjectId (Optional) The BigQuery project. Default is the project for the Dataflow job.
bigQueryChangelogTableNameTemplate (Optional) The template for the name of BigQuery changelog tables. Defaults to {_metadata_spanner_table_name}_changelog.
deadLetterQueueDirectory (Optional) The file path to store any unprocessed records with the reason they failed to be processed. Default is a directory under the Dataflow job's temp location. The default value is enough under most conditions.
dlqRetryMinutes (Optional) The number of minutes between dead letter queue retries. Defaults to 10.
ignoreFields (Optional) Comma separated list of fields (case sensitive) to be ignored. These could be fields of watched tables, or metadata fields added by the pipeline. Ignored fields won't be inserted into BigQuery.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default regional endpoint is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Cloud Spanner change streams to BigQuery template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • REGION_NAME: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner instance ID
  • SPANNER_DATABASE: Cloud Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner metadata database
  • SPANNER_CHANGE_STREAM: Cloud Spanner change stream
  • BIGQUERY_DATASET: The BigQuery dataset for change streams output

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • LOCATION: the regional endpoint where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner instance ID
  • SPANNER_DATABASE: Cloud Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner metadata database
  • SPANNER_CHANGE_STREAM: Cloud Spanner change stream
  • BIGQUERY_DATASET: The BigQuery dataset for change streams output