Bigtable change streams to BigQuery template

The Bigtable change streams to BigQuery template is a streaming pipeline that streams Bigtable data change records and writes them into BigQuery tables by using Dataflow.

A Bigtable change stream lets you subscribe to data mutations on a per-table basis. When you subscribe to table change streams, the following constraints apply:

  • Only modified cells and descriptors of delete operations are returned.
  • Only the new value of a modified cell is returned.

When data change records are written to BigQuery, rows might be inserted out of order compared to the original Bigtable commit timestamp ordering.

Changelog table rows that can't be written to BigQuery due to persistent error are permanently placed into a dead-letter queue (unprocessed messages queue) directory in Cloud Storage for human review or further processing by the user.

If the necessary BigQuery table doesn't exist, the pipeline creates it. Otherwise, an existing BigQuery table is used. The schema of existing BigQuery tables must contain the columns in the following table.

Each new BigQuery row includes one data change record returned by the change stream from its corresponding row in your Bigtable table.

BigQuery output table schema

Column name Type Nullable Description
row_key STRING or BYTES No The row key of the changed row. When the writeRowkeyAsBytes pipeline option is set to true, the type of the column must be BYTES. Otherwise, use the STRING type.
mod_type STRING No The type of the row mutation. Use one of the following values: SET_CELL, DELETE_CELLS, or DELETE_FAMILY.
column_family STRING No The column family affected by the row mutation.
column STRING Yes The column qualifier affected by the row mutation. For the DELETE_FAMILY mutation type, set to NULL.
commit_timestamp TIMESTAMP No The time when Bigtable applies the mutation.
big_query_commit_timestamp TIMESTAMP Yes Optional: Specifies the time when BigQuery writes the row to an output table. The field isn't populated if the column name is present in the bigQueryChangelogTableFieldsToIgnore pipeline option value.
timestamp TIMESTAMP or INT64 Yes The timestamp value of the cell affected by the mutation. When the writeNumericTimestamps pipeline option is set to true, the type of the column must be INT64. Otherwise use the TIMESTAMP type. For DELETE_CELLS and DELETE_FAMILY mutation types, set to NULL.
timestamp_from TIMESTAMP or INT64 Yes Describes an inclusive start of the timestamp interval for all cells deleted by the DELETE_CELLS mutation. For other mutation types, set to NULL.
timestamp_to TIMESTAMP or INT64 Yes Describes an exclusive end of the timestamp interval for all cells deleted by the DELETE_CELLS mutation. For other mutation types, set to NULL.
is_gc BOOL No Optional: When the mutation is triggered by a garbage collection policy, set to true. In all other cases, set to false. The field isn't populated when the column name is present in the bigQueryChangelogTableFieldsToIgnore pipeline option value.
source_instance STRING No Optional: Describes the name of the Bigtable instance that the mutation comes from. The field isn't populated when the column name is present in the bigQueryChangelogTableFieldsToIgnore pipeline option value.
source_cluster STRING No Optional: Describes the name of the Bigtable cluster that the mutation comes from. The field isn't populated when the column name is present in the bigQueryChangelogTableFieldsToIgnore pipeline option value.
source_table STRING No Optional: Describes the name of the Bigtable table that the mutation applies to. The value in this column might be useful if multiple Bigtable tables stream changes to the same BigQuery table. The field isn't populated when the column name is present in the bigQueryChangelogTableFieldsToIgnore pipeline option value.
tiebreaker INT64 No Optional: When two mutations are registered at the same time by different Bigtable clusters, the mutation with the highest tiebreaker value is applied to the source table. Mutations with lower tiebreaker values are discarded. The field isn't populated when the column name is present in the bigQueryChangelogTableFieldsToIgnore pipeline option value.
value STRING or BYTES Yes The new value set by the mutation. When the writeValuesAsBytes pipeline option is set to true, the type of the column must be BYTES. Otherwise, use the STRING type. The value is set for SET_CELL mutations. For other mutation types, the value is set to NULL.

Pipeline requirements

  • The specified Bigtable source instance.
  • The specified Bigtable source table. The table must have change streams enabled.
  • The specified Bigtable application profile.
  • The specified BigQuery destination dataset.

Template parameters

Parameter Description
bigtableReadInstanceId The source Bigtable instance ID.
bigtableReadTableId The source Bigtable table ID.
bigtableChangeStreamAppProfile The Bigtable application profile ID. The application profile must use single-cluster routing and allow single-row transactions.
bigQueryDataset The dataset name of the destination BigQuery table.
writeNumericTimestamps Optional: Write the Bigtable timestamp as BigQuery INT64. When set to true, values are written to the INT64 column. Otherwise, values are written to the TIMESTAMP column. Columns affected: timestamp, timestamp_from, and timestamp_to. Defaults to false. When set to true, the time is measured in microseconds since the Unix epoch (January 1, 1970 at UTC).
writeRowkeyAsBytes Optional: Write rowkeys as BigQuery BYTES. When set to true, row keys are written to the BYTES column. Otherwise, rowkeys are written to the STRING column. Defaults to false.
writeValuesAsBytes Optional: Write values as BigQuery BYTES. When set to true, values are written to the BYTES column. Otherwise, values are written to the STRING column. Defaults to false.
bigQueryChangelogTableName Optional: Destination BigQuery table name. If not specified, the value bigtableReadTableId + "_changelog" is used
bigQueryProjectId Optional: The BigQuery dataset project ID. The default is the project for the Dataflow job.
bigtableReadProjectId Optional: The Bigtable project ID. The default is the project for the Dataflow job.
bigtableChangeStreamMetadataInstanceId Optional: The Bigtable change streams metadata instance ID.
bigtableChangeStreamMetadataTableTableId Optional: The Bigtable change streams metadata table ID.
bigtableChangeStreamCharset Optional: The Bigtable change streams charset name when reading values and column qualifiers.
bigtableChangeStreamStartTimestamp Optional: The starting timestamp, inclusive, to use for reading change streams. For example, 2022-05-05T07:59:59Z. Defaults to the timestamp of the pipeline start time.
bigtableChangeStreamIgnoreColumnFamilies Optional: A comma-separated list of column family name changes to ignore.
bigtableChangeStreamIgnoreColumns Optional: A comma-separated list of column name changes to ignore.
bigtableChangeStreamName Optional: A unique name for the client pipeline. Lets you resume processing from the point at which a previously running pipeline stopped. Defaults to auto-generated name. See the Dataflow job logs for the value used.
bigtableChangeStreamResume Optional: When set to true, a new pipeline resumes processing from the point when a previously running pipeline with the same bigtableChangeStreamName value stopped. If the pipeline with the given bigtableChangeStreamName value has never run, a new pipeline doesn't start. When set to false, a new pipeline starts. If a pipeline with the same bigtableChangeStreamName value has already run for the given source, a new pipeline doesn't start. Defaults to false.
bigQueryChangelogTableFieldsToIgnore Optional: A comma-separated list of the changelog columns that, when specified, aren't created and populated. Use one of the following supported values: is_gc, source_instance, source_cluster, source_table, tiebreaker, or big_query_commit_timestamp. By default, all columns are populated.
bigQueryChangelogTablePartitionExpirationMs Optional: Sets the changelog table partition expiration time, in milliseconds. When set to true, partitions older than the specified number of milliseconds are deleted. By default, no expiration is set.
bigQueryChangelogTablePartitionGranularity Optional: Specifies a granularity for partitioning the changelog table. When set, the table is partitioned. Use one of the following supported values: HOUR, DAY, MONTH, or YEAR. By default, the table isn't partitioned.
dlqDirectory Optional: The directory for the dead-letter queue. Records that fail to be processed are stored in this directory. The default is a directory under the Dataflow job's temp location. In most cases, you can use the default path.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Bigtable change streams to BigQuery template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • BIGTABLE_INSTANCE_ID: your Bigtable instance id.
  • BIGTABLE_TABLE_ID: your Bigtable table id.
  • BIGTABLE_APPLICATION_PROFILE_ID: your Bigtable application profile id.
  • BIGQUERY_DESTINATION_DATASET: the BigQuery destination dataset name

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • BIGTABLE_INSTANCE_ID: your Bigtable instance id.
  • BIGTABLE_TABLE_ID: your Bigtable table id.
  • BIGTABLE_APPLICATION_PROFILE_ID: your Bigtable application profile id.
  • BIGQUERY_DESTINATION_DATASET: the BigQuery destination dataset name

What's next