The Bigtable change streams to Pub/Sub template is a streaming pipeline that streams Bigtable data change records and publishes them to a Pub/Sub topic by using Dataflow.
A Bigtable change stream lets you subscribe to data mutations on a per-table basis. When you subscribe to table change streams, the following constraints apply:
- Only modified cells and descriptors of delete operations are returned.
- Only the new value of a modified cell is returned.
When data change records are published to a Pub/Sub topic, messages might be inserted out of order compared to the original Bigtable commit timestamp ordering.
Bigtable data change records that can't be published to Pub/Sub topics are temporarily placed in a dead-letter queue (unprocessed messages queue) directory in Cloud Storage. After the maximum number of unsuccessful retries, these records are indefinitely placed in the same dead-letter queue directory for human review or further processing by the user.
The pipeline requires that the destination Pub/Sub topic exists. The destination topic might be configured to validate messages using a schema. When a Pub/Sub topic specifies a schema, the pipeline only starts if the schema is valid. Depending on the schema type, use one of the following schema definitions for the destination topic:
Protocol buffers
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangeLogEntryProto"; message ChangelogEntryProto{ required bytes rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional bytes column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional bytes value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Avro
{ "name" : "ChangelogEntryMessage", "type" : "record", "namespace" : "com.google.cloud.teleport.bigtable", "fields" : [ { "name" : "rowKey", "type" : "bytes"}, { "name" : "modType", "type" : { "name": "ModType", "type": "enum", "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]} }, { "name": "isGC", "type": "boolean" }, { "name": "tieBreaker", "type": "int"}, { "name": "columnFamily", "type": "string"}, { "name": "commitTimestamp", "type" : "long"}, { "name" : "sourceInstance", "type" : "string"}, { "name" : "sourceCluster", "type" : "string"}, { "name" : "sourceTable", "type" : "string"}, { "name": "column", "type" : ["null", "bytes"]}, { "name": "timestamp", "type" : ["null", "long"]}, { "name": "timestampFrom", "type" : ["null", "long"]}, { "name": "timestampTo", "type" : ["null", "long"]}, { "name" : "value", "type" : ["null", "bytes"]} ] }
JSON
Use the following Protobuf schema with JSON
message encoding:
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangelogEntryMessageText"; message ChangelogEntryText{ required string rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional string column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional string value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Each new Pub/Sub message includes one entry from a data change record returned by the change stream from its corresponding row in your Bigtable table. The Pub/Sub template flattens the entries in each data change record into individual cell-level changes.
Pub/Sub output message description
Field name | Description |
---|---|
rowKey |
The row key of the changed row. Arrives in a form of a byte array. When JSON message encoding is configured, row keys are returned as strings. When useBase64Rowkeys is specified, row keys are Base64-encoded. Otherwise, a charset specified by bigtableChangeStreamCharset is used to decode row key bytes into a string. |
modType |
The type of the row mutation. Use one of the following values: SET_CELL , DELETE_CELLS , or DELETE_FAMILY . |
columnFamily |
The column family affected by the row mutation. |
column |
The column qualifier affected by the row mutation. For the DELETE_FAMILY mutation type, the column field isn't set. Arrives in a form of a byte array. When JSON message encoding is configured, columns are returned as strings. When useBase64ColumnQualifier is specified, the column field is Base64-encoded. Otherwise, a charset specified by bigtableChangeStreamCharset is used to decode row key bytes into a string. |
commitTimestamp |
The time when Bigtable applies the mutation. The time is measured in microseconds since the Unix epoch (January 1, 1970 at UTC). |
timestamp |
The timestamp value of the cell affected by the mutation. For DELETE_CELLS and DELETE_FAMILY mutation types, timestamp is not set. The time is measured in microseconds since the Unix epoch (January 1, 1970 at UTC). |
timestampFrom |
Describes an inclusive start of the timestamp interval for all cells deleted by the DELETE_CELLS mutation. For other mutation types, timestampFrom isn't set. The time is measured in microseconds since the Unix epoch (January 1, 1970 at UTC). |
timestampTo |
Describes an exclusive end of the timestamp interval for all cells deleted by the DELETE_CELLS mutation. For other mutation types, timestampTo isn't set. |
isGC |
A boolean value that indicates whether the mutation is generated by a Bigtable garbage collection mechanism. |
tieBreaker |
When two mutations are registered at the same time by different Bigtable clusters, the mutation with the highest tiebreaker value is applied to the source table. Mutations with lower tiebreaker values are discarded. |
value |
The new value set by the mutation. Unless the stripValues pipeline option is set, the value is set for SET_CELL mutations. For other mutation types, the value isn't set. Arrives in a form of a byte array. When JSON message encoding is configured, values are returned as strings.
When useBase64Values is specified, the value is Base64-encoded. Otherwise, a charset specified by bigtableChangeStreamCharset is used to decode value bytes into a string. |
sourceInstance |
The name of the Bigtable instance that registered the mutation. Might be when multiple pipelines stream changes from different instances to the same Pub/Sub topic. |
sourceCluster |
The name of the Bigtable cluster that registered the mutation. Might be used when multiple pipelines stream changes from different instances to the same Pub/Sub topic. |
sourceTable |
The name of the Bigtable table that received the mutation. Might be used in case when a multiple pipelines stream changes from a different tables to the same Pub/Sub topic. |
Pipeline requirements
- The specified Bigtable source instance.
- The specified Bigtable source table. The table must have change streams enabled.
- The specified Bigtable application profile.
- The specified Pub/Sub topic must exist.
Template parameters
Required parameters
- pubSubTopic : The name of the destination Pub/Sub topic.
- bigtableChangeStreamAppProfile : The Bigtable application profile ID. The application profile must use single-cluster routing and allow single-row transactions.
- bigtableReadInstanceId : The source Bigtable instance ID.
- bigtableReadTableId : The source Bigtable table ID.
Optional parameters
- messageEncoding : The encoding of the messages to be published to the Pub/Sub topic. When the schema of the destination topic is configured, the message encoding is determined by the topic settings. The following values are supported:
BINARY
andJSON
. Defaults toJSON
. - messageFormat : The encoding of the messages to publish to the Pub/Sub topic. When the schema of the destination topic is configured, the message encoding is determined by the topic settings. The following values are supported:
AVRO
,PROTOCOL_BUFFERS
, andJSON
. The default value isJSON
. When theJSON
format is used, the rowKey, column, and value fields of the message are strings, the contents of which are determined by the pipeline optionsuseBase64Rowkeys
,useBase64ColumnQualifiers
,useBase64Values
, andbigtableChangeStreamCharset
. - stripValues : When set to true, the SET_CELL mutations are returned without new values set. Defaults to false. This parameter is useful when you don't need a new value to be present, also known as cache invalidation, or when values are extremely large and exceed Pub/Sub message size limits.
- dlqDirectory : The directory for the dead-letter queue. Records that fail to be processed are stored in this directory. Defaults to a directory under the Dataflow job temp location. In most cases, you can use the default path.
- dlqRetryMinutes : The number of minutes between dead-letter queue retries. Defaults to
10
. - dlqMaxRetries : The dead letter maximum retries. Defaults to
5
. - useBase64Rowkeys : Used with JSON message encoding. When set to
true
, therowKey
field is a Base64-encoded string. Otherwise, therowKey
is produced by usingbigtableChangeStreamCharset
to decode bytes into a string. Defaults tofalse
. - pubSubProjectId : The Bigtable project ID. The default is the project of the Dataflow job.
- useBase64ColumnQualifiers : Used with JSON message encoding. When set to
true
, thecolumn
field is a Base64-encoded string. Otherwise, the column is produced by usingbigtableChangeStreamCharset
to decode bytes into a string. Defaults tofalse
. - useBase64Values : Used with JSON message encoding. When set to
true
, the value field is a Base64-encoded string. Otherwise, the value isproduced by usingbigtableChangeStreamCharset
to decode bytes into a string. Defaults tofalse
. - disableDlqRetries : Whether or not to disable retries for the DLQ. Defaults to: false.
- bigtableChangeStreamMetadataInstanceId : The Bigtable change streams metadata instance ID. Defaults to empty.
- bigtableChangeStreamMetadataTableTableId : The ID of the Bigtable change streams connector metadata table. If not provided, a Bigtable change streams connector metadata table is automatically created during pipeline execution. Defaults to empty.
- bigtableChangeStreamCharset : The Bigtable change streams charset name. Defaults to: UTF-8.
- bigtableChangeStreamStartTimestamp : The starting timestamp (https://tools.ietf.org/html/rfc3339), inclusive, to use for reading change streams. For example,
2022-05-05T07:59:59Z
. Defaults to the timestamp of the pipeline start time. - bigtableChangeStreamIgnoreColumnFamilies : A comma-separated list of column family name changes to ignore. Defaults to empty.
- bigtableChangeStreamIgnoreColumns : A comma-separated list of column name changes to ignore. Defaults to empty.
- bigtableChangeStreamName : A unique name for the client pipeline. Lets you resume processing from the point at which a previously running pipeline stopped. Defaults to an automatically generated name. See the Dataflow job logs for the value used.
- bigtableChangeStreamResume : When set to
true
, a new pipeline resumes processing from the point at which a previously running pipeline with the samebigtableChangeStreamName
value stopped. If the pipeline with the givenbigtableChangeStreamName
value has never run, a new pipeline doesn't start. When set tofalse
, a new pipeline starts. If a pipeline with the samebigtableChangeStreamName
value has already run for the given source, a new pipeline doesn't start. Defaults tofalse
. - bigtableReadProjectId : The Bigtable project ID. The default is the project for the Dataflow job.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Bigtable change streams to Pub/Sub template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ pubSubTopic=PUBSUB_TOPIC
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
REGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
BIGTABLE_INSTANCE_ID
: your Bigtable instance id.BIGTABLE_TABLE_ID
: your Bigtable table id.BIGTABLE_APPLICATION_PROFILE_ID
: your Bigtable application profile id.PUBSUB_TOPIC
: the Pub/Sub destination topic name
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "pubSubTopic": "PUBSUB_TOPIC" } } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
LOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
BIGTABLE_INSTANCE_ID
: your Bigtable instance id.BIGTABLE_TABLE_ID
: your Bigtable table id.BIGTABLE_APPLICATION_PROFILE_ID
: your Bigtable application profile id.PUBSUB_TOPIC
: the Pub/Sub destination topic name
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.