The Cloud Bigtable change streams to Pub/Sub template is a streaming pipeline that streams Bigtable data change records and publishes them to a Pub/Sub topic by using Dataflow.
A Cloud Bigtable change stream lets you subscribe to data mutations on a per-table basis. When you subscribe to table change streams, the following constraints apply:
- Only modified cells and descriptors of delete operations are returned.
- Only the new value of a modified cell is returned.
When data change records are published to a Pub/Sub topic, messages might be inserted out of order compared to the original Cloud Bigtable commit timestamp ordering.
Cloud Bigtable data change records that can't be published to Pub/Sub topics are temporarily placed in a dead-letter queue (unprocessed messages queue) directory in Cloud Storage. After the maximum number of unsuccessful retries, these records are indefinitely placed in the same dead-letter queue directory for human review or further processing by the user.
The pipeline requires that the destination Pub/Sub topic exists. The destination topic might be configured to validate messages using a schema. When a Pub/Sub topic specifies a schema, the pipeline only starts if the schema is valid. Depending on the schema type, use one of the following schema definitions for the destination topic:
Protocol buffers
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangeLogEntryProto"; message ChangelogEntryProto{ required bytes rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional bytes column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional bytes value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Avro
{ "name" : "ChangelogEntryMessage", "type" : "record", "namespace" : "com.google.cloud.teleport.bigtable", "fields" : [ { "name" : "rowKey", "type" : "bytes"}, { "name" : "modType", "type" : { "name": "ModType", "type": "enum", "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]} }, { "name": "isGC", "type": "boolean" }, { "name": "tieBreaker", "type": "int"}, { "name": "columnFamily", "type": "string"}, { "name": "commitTimestamp", "type" : "long"}, { "name" : "sourceInstance", "type" : "string"}, { "name" : "sourceCluster", "type" : "string"}, { "name" : "sourceTable", "type" : "string"}, { "name": "column", "type" : ["null", "bytes"]}, { "name": "timestamp", "type" : ["null", "long"]}, { "name": "timestampFrom", "type" : ["null", "long"]}, { "name": "timestampTo", "type" : ["null", "long"]}, { "name" : "value", "type" : ["null", "bytes"]} ] }
JSON
Use the following Protobuf schema with JSON
message encoding:
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangelogEntryMessageText"; message ChangelogEntryText{ required string rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional string column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional string value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Each new Pub/Sub message includes one data change record returned by the change stream from its corresponding row in your Cloud Bigtable table.
Pub/Sub output message description
Field name | Description |
---|---|
rowKey |
The row key of the changed row. Arrives in a form of a byte array. When JSON message encoding is configured, row keys are returned as strings. When useBase64Rowkeys is specified, row keys are Base64-encoded. Otherwise, a charset specified by bigtableChangeStreamCharset is used to decode row key bytes into a string. |
modType |
The type of the row mutation. Use one of the following values: SET_CELL , DELETE_CELLS , or DELETE_FAMILY . |
columnFamily |
The column family affected by the row mutation. |
column |
The column qualifier affected by the row mutation. For the DELETE_FAMILY mutation type, the column field isn't set. Arrives in a form of a byte array. When JSON message encoding is configured, columns are returned as strings. When useBase64ColumnQualifier is specified, the column field is Base64-encoded. Otherwise, a charset specified by bigtableChangeStreamCharset is used to decode row key bytes into a string. |
commitTimestamp |
The time when Cloud Bigtable applies the mutation. The time is measured in microseconds since the Unix epoch (January 1, 1970 at UTC). |
timestamp |
The timestamp value of the cell affected by the mutation. For DELETE_CELLS and DELETE_FAMILY mutation types, timestamp is not set. The time is measured in microseconds since the Unix epoch (January 1, 1970 at UTC). |
timestampFrom |
Describes an inclusive start of the timestamp interval for all cells deleted by the DELETE_CELLS mutation. For other mutation types, timestampFrom isn't set. The time is measured in microseconds since the Unix epoch (January 1, 1970 at UTC). |
timestampTo |
Describes an exclusive end of the timestamp interval for all cells deleted by the DELETE_CELLS mutation. For other mutation types, timestampTo isn't set. |
isGC |
A boolean value that indicates whether the mutation is generated by a Cloud Bigtable garbage collection mechanism. |
tieBreaker |
When two mutations are registered at the same time by different Cloud Bigtable clusters, the mutation with the highest tiebreaker value is applied to the source table. Mutations with lower tiebreaker values are discarded. |
value |
The new value set by the mutation. Unless the stripValues pipeline option is set, the value is set for SET_CELL mutations. For other mutation types, the value isn't set. Arrives in a form of a byte array. When JSON message encoding is configured, values are returned as strings.
When useBase64Values is specified, the value is Base64-encoded. Otherwise, a charset specified by bigtableChangeStreamCharset is used to decode value bytes into a string. |
sourceInstance |
The name of the Bigtable instance that registered the mutation. Might be when multiple pipelines stream changes from different instances to the same Pub/Sub topic. |
sourceCluster |
The name of the Bigtable cluster that registered the mutation. Might be used when multiple pipelines stream changes from different instances to the same Pub/Sub topic. |
sourceTable |
The name of the Bigtable table that received the mutation. Might be used in case when a multiple pipelines stream changes from a different tables to the same Pub/Sub topic. |
Pipeline requirements
- The specified Cloud Bigtable source instance.
- The specified Cloud Bigtable source table. The table must have change streams enabled.
- The specified Cloud Bigtable application profile.
- The specified Pub/Sub topic must exist.
Template parameters
Parameter | Description |
---|---|
bigtableReadInstanceId |
The source Cloud Bigtable instance ID. |
bigtableReadTableId |
The source Cloud Bigtable table ID. |
bigtableChangeStreamAppProfile |
The Cloud Bigtable application profile ID. The application profile must use single-cluster routing and allow single-row transactions. |
pubSubTopic |
The name of the destination Pub/Sub topic. |
messageFormat |
Optional: When destination topic has schema configured, the message format is determined by the configured schema and encoding. The format of the messages to be published to the Pub/Sub topic. Supported values: AVRO , PROTOCOL_BUFFERS and JSON . Defaults to JSON . When JSON format is used, rowKey, column and value fields of the message are strings, the contents of which is determined by useBase64Rowkeys , useBase64ColumnQualifiers , useBase64Values and bigtableChangeStreamCharset pipeline options. |
messageEncoding |
Optional: When destination topic has schema configured, the message encoding is determined by the topic settings. The encoding of the messages to be published to the Pub/Sub topic. Supported values: BINARY , JSON and JSON . Defaults to JSON . |
stripValues
| Optional: When set to true , the SET_CELL mutations are returned without new values set. Defaults to false.
This parameter is useful when the you don't need a new value to be present, also known as cache invalidation, or when values are extremely large and exceed Pub/Sub message size limits. |
bigtableReadProjectId |
Optional: The Cloud Bigtable project ID. The default is the project of the Dataflow job. |
pubSubProjectId |
Optional: The Cloud Bigtable project ID. The default is the project of the Dataflow job. |
bigtableChangeStreamMetadataInstanceId |
Optional: The Cloud Bigtable change streams metadata instance ID. |
bigtableChangeStreamMetadataTableTableId |
Optional: The Cloud Bigtable change streams metadata table ID. |
bigtableChangeStreamCharset |
Optional: The Cloud Bigtable change streams charset name when reading rowkeys, values, and column qualifiers. This option is used when message encoding is JSON. |
bigtableChangeStreamStartTimestamp |
Optional: The starting timestamp, inclusive, to use for reading change streams. For example, 2022-05-05T07:59:59Z . Defaults to the timestamp of the pipeline start time. |
bigtableChangeStreamIgnoreColumnFamilies |
Optional: A comma-separated list of column family name changes to ignore. |
bigtableChangeStreamIgnoreColumns |
Optional: A comma-separated list of column name changes to ignore. |
bigtableChangeStreamName |
Optional: A unique name for the client pipeline. Lets you resume processing from the point at which a previously running pipeline stopped. Defaults to the automatically generated name. To find the value, see the Dataflow job logs. |
bigtableChangeStreamResume |
Optional: When set to true , a new pipeline resumes processing from the point when a previously running pipeline with the same bigtableChangeStreamName value stopped.
If the pipeline with the given bigtableChangeStreamName value has never run, a new pipeline doesn't start.
When set to false , a new pipeline starts. If a pipeline with the same bigtableChangeStreamName value
has already run for the given source, a new pipeline doesn't start. Defaults to false . |
useBase64Rowkeys
| Optional: Used with JSON message encoding. When set to true , the rowKey field is a Base64-encoded string.
Otherwise, the rowKey is produced by using bigtableChangeStreamCharset to decode bytes into a string. Defaults to false . |
useBase64ColumnQualifiers
| Optional: Used with JSON message encoding. When set to true , the column field is a Base64-encoded string.
Otherwise, the column is produced by using bigtableChangeStreamCharset to decode bytes into a string. Defaults to false . |
useBase64Values
| Optional: Used with JSON message encoding. When set to true , the value field is a Base64-encoded string.
Otherwise, the value is produced by using bigtableChangeStreamCharset to decode bytes into a string. Defaults to false . |
dlqMaxRetries |
Optional: The dead letter maximum retries. Defaults to 5 . |
dlqRetryMinutes |
Optional: The number of minutes between dead-letter queue retries. Defaults to 10 . |
dlqDirectory |
Optional: The directory for the dead-letter queue. Records that fail to be processed are stored in this directory. The default is a directory under the temp location of the Dataflow job. In most cases, you can use the default path. |
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
regional endpoint is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Bigtable change streams to Pub/Sub template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ pubSubTopic=PUBSUB_TOPIC
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
REGION_NAME
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
BIGTABLE_INSTANCE_ID
: your Bigtable instance id.BIGTABLE_TABLE_ID
: your Bigtable table id.BIGTABLE_APPLICATION_PROFILE_ID
: your Bigtable application profile id.PUBSUB_TOPIC
: the Pub/Sub destination topic name
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "pubSubTopic": "PUBSUB_TOPIC" } } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
LOCATION
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
BIGTABLE_INSTANCE_ID
: your Bigtable instance id.BIGTABLE_TABLE_ID
: your Bigtable table id.BIGTABLE_APPLICATION_PROFILE_ID
: your Bigtable application profile id.PUBSUB_TOPIC
: the Pub/Sub destination topic name
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.