The Bigtable change streams to BigQuery template is a streaming pipeline that streams Bigtable data change records and writes them into BigQuery tables by using Dataflow.
A Bigtable change stream lets you subscribe to data mutations on a per-table basis. When you subscribe to table change streams, the following constraints apply:
- Only modified cells and descriptors of delete operations are returned.
- Only the new value of a modified cell is returned.
When data change records are written to BigQuery, rows might be inserted out of order compared to the original Bigtable commit timestamp ordering.
Changelog table rows that can't be written to BigQuery due to persistent error are permanently placed into a dead-letter queue (unprocessed messages queue) directory in Cloud Storage for human review or further processing by the user.
If the necessary BigQuery table doesn't exist, the pipeline creates it. Otherwise, an existing BigQuery table is used. The schema of existing BigQuery tables must contain the columns in the following table.
Each new BigQuery row includes one data change record returned by the change stream from its corresponding row in your Bigtable table.
BigQuery output table schema
Column name | Type | Nullable | Description |
---|---|---|---|
row_key |
STRING or BYTES |
No | The row key of the changed row. When the writeRowkeyAsBytes pipeline option is set to true , the type of the column must be BYTES . Otherwise, use the STRING type. |
mod_type |
STRING |
No | The type of the row mutation. Use one of the following values: SET_CELL , DELETE_CELLS , or DELETE_FAMILY . |
column_family |
STRING |
No | The column family affected by the row mutation. |
column |
STRING |
Yes | The column qualifier affected by the row mutation. For the DELETE_FAMILY mutation type, set to NULL . |
commit_timestamp |
TIMESTAMP |
No | The time when Bigtable applies the mutation. |
big_query_commit_timestamp |
TIMESTAMP |
Yes | Optional: Specifies the time when BigQuery writes the row to an output table. The field isn't populated if the column name is present in the bigQueryChangelogTableFieldsToIgnore pipeline option value. |
timestamp |
TIMESTAMP or INT64 |
Yes | The timestamp value of the cell affected by the mutation. When the writeNumericTimestamps pipeline option is set to true , the type of the column must be INT64 . Otherwise use the TIMESTAMP type.
For DELETE_CELLS and DELETE_FAMILY mutation types, set to NULL . |
timestamp_from |
TIMESTAMP or INT64 |
Yes | Describes an inclusive start of the timestamp interval for all cells deleted by the DELETE_CELLS mutation. For other mutation types, set to NULL . |
timestamp_to |
TIMESTAMP or INT64 |
Yes | Describes an exclusive end of the timestamp interval for all cells deleted by the DELETE_CELLS mutation. For other mutation types, set to NULL . |
is_gc |
BOOL |
No | Optional: When the mutation is triggered by a garbage collection policy, set to true .
In all other cases, set to false . The field isn't populated when the column name is present in the bigQueryChangelogTableFieldsToIgnore pipeline option value. |
source_instance |
STRING |
No | Optional: Describes the name of the Bigtable instance that the mutation comes from. The field isn't populated when the column name is present in the bigQueryChangelogTableFieldsToIgnore pipeline option value. |
source_cluster |
STRING |
No | Optional: Describes the name of the Bigtable cluster that the mutation comes from. The field isn't populated when the column name is present in the bigQueryChangelogTableFieldsToIgnore pipeline option value. |
source_table |
STRING |
No | Optional: Describes the name of the Bigtable table that the mutation applies to. The value in this column might be useful if multiple Bigtable tables stream changes to the same BigQuery table. The field isn't populated when the column name is present in the bigQueryChangelogTableFieldsToIgnore pipeline option value. |
tiebreaker |
INT64 |
No | Optional: When two mutations are registered at the same time by different Bigtable clusters, the mutation with the highest tiebreaker value is applied to the source table. Mutations with lower tiebreaker values are discarded. The field isn't populated when the column name is present in the bigQueryChangelogTableFieldsToIgnore pipeline option value. |
value |
STRING or BYTES |
Yes | The new value set by the mutation. When the writeValuesAsBytes pipeline option is set to true , the type of the column must be BYTES . Otherwise, use the STRING type. The value is set for SET_CELL mutations. For other mutation types, the value is set to NULL . |
Pipeline requirements
- The specified Bigtable source instance.
- The specified Bigtable source table. The table must have change streams enabled.
- The specified Bigtable application profile.
- The specified BigQuery destination dataset.
Template parameters
Required parameters
- bigQueryDataset: The dataset name of the destination BigQuery table.
- bigtableChangeStreamAppProfile: The Bigtable application profile ID. The application profile must use single-cluster routing and allow single-row transactions.
- bigtableReadInstanceId: The source Bigtable instance ID.
- bigtableReadTableId: The source Bigtable table ID.
Optional parameters
- writeRowkeyAsBytes: Whether to write rowkeys as BigQuery
BYTES
. When set totrue
, row keys are written to theBYTES
column. Otherwise, rowkeys are written to theSTRING
column. Defaults tofalse
. - writeValuesAsBytes: When set to
true
, values are written to a column of type BYTES, otherwise to a column of type STRING . Defaults to:false
. - writeNumericTimestamps: Whether to write the Bigtable timestamp as BigQuery INT64. When set to
true
, values are written to the INT64 column. Otherwise, values are written to theTIMESTAMP
column. Columns affected:timestamp
,timestamp_from
, andtimestamp_to
. Defaults tofalse
. When set totrue
, the time is measured in microseconds since the Unix epoch (January 1, 1970 at UTC). - bigQueryProjectId: The BigQuery dataset project ID. The default is the project for the Dataflow job.
- bigQueryChangelogTableName: Destination BigQuery table name. If not specified, the value
bigtableReadTableId + "_changelog"
is used. Defaults to empty. - bigQueryChangelogTablePartitionGranularity: Specifies a granularity for partitioning the changelog table. When set, the table is partitioned. Use one of the following supported values:
HOUR
,DAY
,MONTH
, orYEAR
. By default, the table isn't partitioned. - bigQueryChangelogTablePartitionExpirationMs: Sets the changelog table partition expiration time, in milliseconds. When set to
true
, partitions older than the specified number of milliseconds are deleted. By default, no expiration is set. - bigQueryChangelogTableFieldsToIgnore: A comma-separated list of the changelog columns that, when specified, aren't created and populated. Use one of the following supported values:
is_gc
,source_instance
,source_cluster
,source_table
,tiebreaker
, orbig_query_commit_timestamp
. By default, all columns are populated. - dlqDirectory: The directory to use for the dead-letter queue. Records that fail to be processed are stored in this directory. The default is a directory under the Dataflow job's temp location. In most cases, you can use the default path.
- bigtableChangeStreamMetadataInstanceId: The Bigtable change streams metadata instance ID. Defaults to empty.
- bigtableChangeStreamMetadataTableTableId: The ID of the Bigtable change streams connector metadata table. If not provided, a Bigtable change streams connector metadata table is automatically created during pipeline execution. Defaults to empty.
- bigtableChangeStreamCharset: The Bigtable change streams charset name. Defaults to: UTF-8.
- bigtableChangeStreamStartTimestamp: The starting timestamp (https://tools.ietf.org/html/rfc3339), inclusive, to use for reading change streams. For example,
2022-05-05T07:59:59Z
. Defaults to the timestamp of the pipeline start time. - bigtableChangeStreamIgnoreColumnFamilies: A comma-separated list of column family name changes to ignore. Defaults to empty.
- bigtableChangeStreamIgnoreColumns: A comma-separated list of column name changes to ignore. Defaults to empty.
- bigtableChangeStreamName: A unique name for the client pipeline. Lets you resume processing from the point at which a previously running pipeline stopped. Defaults to an automatically generated name. See the Dataflow job logs for the value used.
- bigtableChangeStreamResume: When set to
true
, a new pipeline resumes processing from the point at which a previously running pipeline with the samebigtableChangeStreamName
value stopped. If the pipeline with the givenbigtableChangeStreamName
value has never run, a new pipeline doesn't start. When set tofalse
, a new pipeline starts. If a pipeline with the samebigtableChangeStreamName
value has already run for the given source, a new pipeline doesn't start. Defaults tofalse
. - bigtableReadProjectId: The Bigtable project ID. The default is the project for the Dataflow job.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Bigtable change streams to BigQuery template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ bigQueryDataset=BIGQUERY_DESTINATION_DATASET
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
REGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
BIGTABLE_INSTANCE_ID
: your Bigtable instance id.BIGTABLE_TABLE_ID
: your Bigtable table id.BIGTABLE_APPLICATION_PROFILE_ID
: your Bigtable application profile id.BIGQUERY_DESTINATION_DATASET
: the BigQuery destination dataset name
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET" } } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
LOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
BIGTABLE_INSTANCE_ID
: your Bigtable instance id.BIGTABLE_TABLE_ID
: your Bigtable table id.BIGTABLE_APPLICATION_PROFILE_ID
: your Bigtable application profile id.BIGQUERY_DESTINATION_DATASET
: the BigQuery destination dataset name
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.