Spanner change streams to BigQuery template

The Spanner change streams to BigQuery template is a streaming pipeline that streams Spanner data change records and writes them into BigQuery tables using Dataflow Runner V2.

All change stream watched columns are included in each BigQuery table row, regardless of whether they are modified by a Spanner transaction. Columns not watched aren't included in the BigQuery row. Any Spanner changes less than the Dataflow watermark are either successfully applied to the BigQuery tables or are stored in the dead-letter queue for retry. BigQuery rows are inserted out of order compared to the original Spanner commit timestamp ordering.

If the necessary BigQuery tables don't exist, the pipeline creates them. Otherwise, existing BigQuery tables are used. The schema of existing BigQuery tables must contain the corresponding tracked columns of the Spanner tables and any additional metadata columns that aren't ignored explicitly by the ignoreFields option. See the description of the metadata fields in the following list. Each new BigQuery row includes all columns watched by the change stream from its corresponding row in your Spanner table at the timestamp of the change record.

The following metadata fields are added to BigQuery tables. For more details about these fields, see Data change records in "Change streams partitions, records, and queries."

  • _metadata_spanner_mod_type: The modification type (insert, update, or delete) of the Spanner transaction. Extracted from change stream data change record.
  • _metadata_spanner_table_name: The Spanner table name. This field is not the metadata table name of the connector.
  • _metadata_spanner_commit_timestamp: The Spanner commit timestamp, which is the time when a change is committed. This value is extracted from the change stream data change record.
  • _metadata_spanner_server_transaction_id: A globally unique string that represents the Spanner transaction in which the change was committed. Only use this value in the context of processing change stream records. It isn't correlated with the transaction ID in Spanner's API. This value is extracted from the change stream data change record.
  • _metadata_spanner_record_sequence: The sequence number for the record within the Spanner transaction. Sequence numbers are guaranteed to be unique and monotonically increasing, but not necessarily contiguous, within a transaction. This value is extracted from the change stream data change record.
  • _metadata_spanner_is_last_record_in_transaction_in_partition: Indicates whether the record is the last record for a Spanner transaction in the current partition. This value is extracted from the change stream data change record.
  • _metadata_spanner_number_of_records_in_transaction: The number of data change records that are part of the Spanner transaction across all change stream partitions. This value is extracted from the change stream data change record.
  • _metadata_spanner_number_of_partitions_in_transaction: The number of partitions that return data change records for the Spanner transaction. This value is extracted from the change stream data change record.
  • _metadata_big_query_commit_timestamp: The commit timestamp when the row is inserted into BigQuery. If useStorageWriteApi is true, this column is not automatically created in the changelog table by the pipeline. In that case, you must manually add this column in the changelog table if needed.

When using this template, be aware of the following details:

  • You can use this template to propagate new columns on existing tables or new tables from Spanner to BigQuery. For more information, see Handle adding tracking tables or columns.
  • For OLD_AND_NEW_VALUES and NEW_VALUES value capture types, when the data change record contains an UPDATE change, the template needs to do a stale read to Spanner at the commit timestamp of the data change record to retrieve the unchanged but watched columns. Make sure to configure your database 'version_retention_period' properly for the stale read. For the NEW_ROW value capture type, the template is more efficient, because the data change record captures the full new row including columns that are not updated in UPDATE requests, and the template doesn't need to do a stale read.
  • To minimize network latency and network transport costs, run the Dataflow job from the same region as your Spanner instance or BigQuery tables. If you use sources, sinks, staging file locations, or temporary file locations that are located outside of your job's region, your data might be sent across regions. For more information, see Dataflow regions.
  • This template supports all valid Spanner data types. If the BigQuery type is more precise than the Spanner type, precision loss might occur during the transformation. Specifically:
    • For Spanner JSON type, the order of the members of an object is lexicographically ordered, but there is no such guarantee for BigQuery JSON type.
    • Spanner supports nanoseconds TIMESTAMP type, but BigQuery only supports microseconds TIMESTAMP type.
  • This template does not support using BigQuery Storage Write API in exactly-once mode.

Learn more about change streams, how to build change streams Dataflow pipelines, and best practices.

Pipeline requirements

  • The Spanner instance must exist prior to running the pipeline.
  • The Spanner database must exist prior to running the pipeline.
  • The Spanner metadata instance must exist prior to running the pipeline.
  • The Spanner metadata database must exist prior to running the pipeline.
  • The Spanner change stream must exist prior to running the pipeline.
  • The BigQuery dataset must exist prior to running the pipeline.

Handle adding tracking tables or columns

This section describes best practices for handling adding tracking Spanner tables and columns while the pipeline is running.

  • Before you add a new column to a Spanner change stream scope, first add the column to the BigQuery changelog table. The added column must have a matching data type and be NULLABLE. Wait for at least 10 minutes before you continue to create the new column or table in Spanner. Writing into the new column without waiting might result in an unprocessed record with an invalid error code in your dead letter queue directory.
  • To add a new table, first add the table in the Spanner database. The table is automatically created in BigQuery when the pipeline receives a record for the new table.
  • After you add the new columns or tables in the Spanner database, make sure to alter your change stream to track the new columns or tables that you want if they aren't tracked implicitly already.
  • The template doesn't drop tables or columns from BigQuery. If a column is dropped from the Spanner table, then null values are populated to the BigQuery changelog columns for records generated after the columns are dropped from the Spanner table, unless you manually drop the column from BigQuery.
  • The template doesn't support column type updates. Although Spanner supports changing a STRING column to a BYTES column or a BYTES column to a STRING column, you can't modify an existing column's data type or use the same column name with different data types in BigQuery. If you drop and recreate a column with the same name but a different type in Spanner, the data might be written into the existing BigQuery column, but the type is unchanged.
  • This template doesn't support column mode updates. Metadata columns replicated into BigQuery are set to the REQUIRED mode. All other columns replicated into BigQuery are set to NULLABLE, regardless of whether they are defined as NOT NULL in the Spanner table. You can't update the NULLABLE columns to the REQUIRED mode in BigQuery.
  • Changing a change stream's value capture type is not supported for running pipelines.

Template parameters

Required parameters

  • spannerInstanceId : The Spanner instance to read change streams from.
  • spannerDatabase : The Spanner database to read change streams from.
  • spannerMetadataInstanceId : The Spanner instance to use for the change streams connector metadata table.
  • spannerMetadataDatabase : The Spanner database to use for the change streams connector metadata table.
  • spannerChangeStreamName : The name of the Spanner change stream to read from.
  • bigQueryDataset : The BigQuery dataset for change streams output.

Optional parameters

  • spannerProjectId : The project to read change streams from. This value is also the project where the change streams connector metadata table is created. The default value for this parameter is the project where the Dataflow pipeline is running.
  • spannerDatabaseRole : The Spanner database role to use when running the template. This parameter is required only when the IAM principal who is running the template is a fine-grained access control user. The database role must have the SELECT privilege on the change stream and the EXECUTE privilege on the change stream's read function. For more information, see Fine-grained access control for change streams (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName : The Spanner change streams connector metadata table name to use. If not provided, a Spanner change streams connector metadata table is automatically created during the pipeline flow. You must provide this parameter when updating an existing pipeline. Otherwise, don't provide this parameter.
  • rpcPriority : The request priority for Spanner calls. The value must be one of the following values: HIGH, MEDIUM, or LOW. The default value is HIGH.
  • spannerHost : The Cloud Spanner endpoint to call in the template. Only used for testing. (Example: https://batch-spanner.googleapis.com).
  • startTimestamp : The starting DateTime (https://datatracker.ietf.org/doc/html/rfc3339), inclusive, to use for reading change streams. Ex-2021-10-12T07:20:50.52Z. Defaults to the timestamp when the pipeline starts, that is, the current time.
  • endTimestamp : The ending DateTime (https://datatracker.ietf.org/doc/html/rfc3339), inclusive, to use for reading change streams.Ex-2021-10-12T07:20:50.52Z. Defaults to an infinite time in the future.
  • bigQueryProjectId : The BigQuery project. The default value is the project for the Dataflow job.
  • bigQueryChangelogTableNameTemplate : The template for the name of the BigQuery table that contains the changelog. Defaults to: {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory : The path to store any unprocessed records. The default path is a directory under the Dataflow job's temp location. The default value is usually sufficient.
  • dlqRetryMinutes : The number of minutes between dead-letter queue retries. The default value is 10.
  • ignoreFields : A comma-separated list of fields (case sensitive) to ignore. These fields might be fields of watched tables, or metadata fields added by the pipeline. Ignored fields aren't inserted into BigQuery. When you ignore the _metadata_spanner_table_name field, the bigQueryChangelogTableNameTemplate parameter is also ignored. Defaults to empty.
  • disableDlqRetries : Whether or not to disable retries for the DLQ. Defaults to: false.
  • useStorageWriteApi : If true, the pipeline uses the BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). The default value is false. For more information, see Using the Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce : When using the Storage Write API, specifies the write semantics. To use at-least once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to true. To use exactly-once semantics, set the parameter to false. This parameter applies only when useStorageWriteApi is true. The default value is false.
  • numStorageWriteApiStreams : When using the Storage Write API, specifies the number of write streams. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter. Defaults to: 0.
  • storageWriteApiTriggeringFrequencySec : When using the Storage Write API, specifies the triggering frequency, in seconds. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Cloud Spanner change streams to BigQuery template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Spanner instance ID
  • SPANNER_DATABASE: Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Spanner metadata database
  • SPANNER_CHANGE_STREAM: Spanner change stream
  • BIGQUERY_DATASET: The BigQuery dataset for change streams output

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • SPANNER_INSTANCE_ID: Spanner instance ID
  • SPANNER_DATABASE: Spanner database
  • SPANNER_METADATA_INSTANCE_ID: Spanner metadata instance ID
  • SPANNER_METADATA_DATABASE: Spanner metadata database
  • SPANNER_CHANGE_STREAM: Spanner change stream
  • BIGQUERY_DATASET: The BigQuery dataset for change streams output

What's next