The Cloud Spanner change streams to BigQuery template is a streaming pipeline that streams Cloud Spanner data change records and writes them into BigQuery tables using Dataflow Runner V2.
All change stream watched columns are included in each BigQuery table row, regardless of whether they are modified by a Cloud Spanner transaction. Columns not watched aren't included in the BigQuery row. Any Cloud Spanner changes less than the Dataflow watermark are either successfully applied to the BigQuery tables or are stored in the dead-letter queue for retry. BigQuery rows are inserted out of order compared to the original Cloud Spanner commit timestamp ordering.
If the necessary BigQuery tables don't exist, the pipeline creates them. Otherwise,
existing BigQuery tables are used. The schema of existing BigQuery tables must
contain the corresponding tracked columns of the Cloud Spanner tables and any additional
metadata columns that aren't ignored explicitly by the ignoreFields
option.
See the description of the metadata fields in the following list.
Each new BigQuery row includes all columns
watched by the change stream from its corresponding row in your Cloud Spanner table at the
timestamp of the change record.
The following metadata fields are added to BigQuery tables. For more details about these fields, see Data change records in "Change streams partitions, records, and queries."
_metadata_spanner_mod_type
: The modification type (insert, update, or delete) of the Cloud Spanner transaction. Extracted from change stream data change record._metadata_spanner_table_name
: The Cloud Spanner table name. This field is not the metadata table name of the connector._metadata_spanner_commit_timestamp
: The Spanner commit timestamp, which is the time when a change is committed. This value is extracted from the change stream data change record._metadata_spanner_server_transaction_id
: A globally unique string that represents the Spanner transaction in which the change was committed. Only use this value in the context of processing change stream records. It isn't correlated with the transaction ID in Spanner's API. This value is extracted from the change stream data change record._metadata_spanner_record_sequence
: The sequence number for the record within the Spanner transaction. Sequence numbers are guaranteed to be unique and monotonically increasing, but not necessarily contiguous, within a transaction. This value is extracted from the change stream data change record._metadata_spanner_is_last_record_in_transaction_in_partition
: Indicates whether the record is the last record for a Spanner transaction in the current partition. This value is extracted from the change stream data change record._metadata_spanner_number_of_records_in_transaction
: The number of data change records that are part of the Spanner transaction across all change stream partitions. This value is extracted from the change stream data change record._metadata_spanner_number_of_partitions_in_transaction
: The number of partitions that return data change records for the Spanner transaction. This value is extracted from the change stream data change record._metadata_big_query_commit_timestamp
: The commit timestamp when the row is inserted into BigQuery. IfuseStorageWriteApi
istrue
, this column is not automatically created in the changelog table by the pipeline. In that case, you must manually add this column in the changelog table if needed.
When using this template, be aware of the following details:
- This template doesn't propagate schema changes from Cloud Spanner to BigQuery. Because performing a schema change in Cloud Spanner is likely going to break the pipeline, you might need to recreate the pipeline after the schema change.
- For
OLD_AND_NEW_VALUES
andNEW_VALUES
value capture types, when the data change record contains an UPDATE change, the template needs to do a stale read to Cloud Spanner at the commit timestamp of the data change record to retrieve the unchanged but watched columns. Make sure to configure your database 'version_retention_period' properly for the stale read. For theNEW_ROW
value capture type, the template is more efficient, because the data change record captures the full new row including columns that are not updated in UPDATE requests, and the template doesn't need to do a stale read. - To minimize network latency and network transport costs, run the Dataflow job from the same region as your Cloud Spanner instance or BigQuery tables. If you use sources, sinks, staging file locations, or temporary file locations that are located outside of your job's region, your data might be sent across regions. For more information, see Dataflow regional endpoints.
- This template supports all valid Cloud Spanner data types. If the BigQuery
type is more precise than the Cloud Spanner type, precision loss might occur during the
transformation. Specifically:
- For Cloud Spanner JSON type, the order of the members of an object is lexicographically ordered, but there is no such guarantee for BigQuery JSON type.
- Cloud Spanner supports nanoseconds TIMESTAMP type, but BigQuery only supports microseconds TIMESTAMP type.
Learn more about change streams, how to build change streams Dataflow pipelines, and best practices.
Pipeline requirements
- The Cloud Spanner instance must exist prior to running the pipeline.
- The Cloud Spanner database must exist prior to running the pipeline.
- The Cloud Spanner metadata instance must exist prior to running the pipeline.
- The Cloud Spanner metadata database must exist prior to running the pipeline.
- The Cloud Spanner change stream must exist prior to running the pipeline.
- The BigQuery dataset must exist prior to running the pipeline.
Template parameters
Parameter | Description |
---|---|
spannerInstanceId |
The Cloud Spanner instance to read change streams from. |
spannerDatabase |
The Cloud Spanner database to read change streams from. |
spannerDatabaseRole |
Optional: The Cloud Spanner database role to use when running the template. This parameter is required only when the IAM principal who is running the template is a fine-grained access control user. The database role must have the SELECT privilege on the change stream and the EXECUTE privilege on the change stream's read function. For more information, see
Fine-grained access control for change streams. |
spannerMetadataInstanceId |
The Cloud Spanner instance to use for the change streams connector metadata table. |
spannerMetadataDatabase |
The Cloud Spanner database to use for the change streams connector metadata table. |
spannerChangeStreamName |
The name of the Cloud Spanner change stream to read from. |
bigQueryDataset |
The BigQuery dataset for change streams output. |
spannerProjectId |
Optional: The project to read change streams from. This value is also the project where the change streams connector metadata table is created. The default value for this parameter is the project where the Dataflow pipeline is running. |
spannerMetadataTableName |
Optional: The Cloud Spanner change streams connector metadata table name to use. If not provided, a Cloud Spanner change streams connector metadata table is automatically created during the pipeline flow. You must provide this parameter when updating an existing pipeline. Otherwise, don't provide this parameter. |
rpcPriority |
Optional: The request priority for Cloud Spanner calls. The value must be one of the following values: HIGH , MEDIUM , or LOW . The default value is HIGH . |
startTimestamp |
Optional: The starting DateTime, inclusive, to use for reading change streams. Ex-2021-10-12T07:20:50.52Z. Defaults to the timestamp when the pipeline starts, that is, the current time. |
endTimestamp |
Optional: The ending DateTime, inclusive, to use for reading change streams. Ex-2021-10-12T07:20:50.52Z. Defaults to an infinite time in the future. |
bigQueryProjectId |
Optional: The BigQuery project. The default value is the project for the Dataflow job. |
bigQueryChangelogTableNameTemplate |
Optional: The template for the name of BigQuery changelog tables. Defaults to _metadata_spanner_table_name_changelog. |
deadLetterQueueDirectory |
Optional: The path to store any unprocessed records that failed to be processed. The default path is a directory under the Dataflow job's temp location. The default value is usually sufficient. |
dlqRetryMinutes |
Optional: The number of minutes between dead-letter queue retries. The default value is 10. |
ignoreFields |
Optional: A comma-separated list of fields (case sensitive) to ignore.
These fields might be fields of watched tables, or metadata fields added by
the pipeline. Ignored fields aren't inserted into BigQuery. When you
ignore the _metadata_spanner_table_name field, the bigQueryChangelogTableNameTemplate
parameter is also ignored. |
useStorageWriteApi |
Optional:
If true , the pipeline uses the
BigQuery Storage Write API. The default value is false . For more information, see
Using the Storage Write API.
|
useStorageWriteApiAtLeastOnce |
Optional:
When using the Storage Write API, specifies the write semantics. To use
at-least-once semantics, set this parameter to true . To use exactly-once semantics,
set the parameter to false . This parameter applies only when
useStorageWriteApi is true . The default value is false .
|
numStorageWriteApiStreams |
Optional:
When using the Storage Write API, specifies the number of write streams. If
useStorageWriteApi is true and useStorageWriteApiAtLeastOnce
is false , then you must set this parameter.
|
storageWriteApiTriggeringFrequencySec |
Optional:
When using the Storage Write API, specifies the triggering frequency, in seconds. If
useStorageWriteApi is true and useStorageWriteApiAtLeastOnce
is false , then you must set this parameter.
|
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
regional endpoint is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Cloud Spanner change streams to BigQuery template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ bigQueryDataset=BIGQUERY_DATASET
Replace the following:
JOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
REGION_NAME
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
SPANNER_INSTANCE_ID
: Cloud Spanner instance IDSPANNER_DATABASE
: Cloud Spanner databaseSPANNER_METADATA_INSTANCE_ID
: Cloud Spanner metadata instance IDSPANNER_METADATA_DATABASE
: Cloud Spanner metadata databaseSPANNER_CHANGE_STREAM
: Cloud Spanner change streamBIGQUERY_DATASET
: The BigQuery dataset for change streams output
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "bigQueryDataset": "BIGQUERY_DATASET" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery", } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
LOCATION
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
SPANNER_INSTANCE_ID
: Cloud Spanner instance IDSPANNER_DATABASE
: Cloud Spanner databaseSPANNER_METADATA_INSTANCE_ID
: Cloud Spanner metadata instance IDSPANNER_METADATA_DATABASE
: Cloud Spanner metadata databaseSPANNER_CHANGE_STREAM
: Cloud Spanner change streamBIGQUERY_DATASET
: The BigQuery dataset for change streams output
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.