Bigtable change streams to Pub/Sub template

The Bigtable change streams to Pub/Sub template is a streaming pipeline that streams Bigtable data change records and publishes them to a Pub/Sub topic by using Dataflow.

A Bigtable change stream lets you subscribe to data mutations on a per-table basis. When you subscribe to table change streams, the following constraints apply:

  • Only modified cells and descriptors of delete operations are returned.
  • Only the new value of a modified cell is returned.

When data change records are published to a Pub/Sub topic, messages might be inserted out of order compared to the original Bigtable commit timestamp ordering.

Bigtable data change records that can't be published to Pub/Sub topics are temporarily placed in a dead-letter queue (unprocessed messages queue) directory in Cloud Storage. After the maximum number of unsuccessful retries, these records are indefinitely placed in the same dead-letter queue directory for human review or further processing by the user.

The pipeline requires that the destination Pub/Sub topic exists. The destination topic might be configured to validate messages using a schema. When a Pub/Sub topic specifies a schema, the pipeline only starts if the schema is valid. Depending on the schema type, use one of the following schema definitions for the destination topic:

Protocol buffers

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  

Avro

{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

JSON

Use the following Protobuf schema with JSON message encoding:

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

Each new Pub/Sub message includes one data change record returned by the change stream from its corresponding row in your Bigtable table.

Pub/Sub output message description

Field name Description
rowKey The row key of the changed row. Arrives in a form of a byte array. When JSON message encoding is configured, row keys are returned as strings. When useBase64Rowkeys is specified, row keys are Base64-encoded. Otherwise, a charset specified by bigtableChangeStreamCharset is used to decode row key bytes into a string.
modType The type of the row mutation. Use one of the following values: SET_CELL, DELETE_CELLS, or DELETE_FAMILY.
columnFamily The column family affected by the row mutation.
column The column qualifier affected by the row mutation. For the DELETE_FAMILY mutation type, the column field isn't set. Arrives in a form of a byte array. When JSON message encoding is configured, columns are returned as strings. When useBase64ColumnQualifier is specified, the column field is Base64-encoded. Otherwise, a charset specified by bigtableChangeStreamCharset is used to decode row key bytes into a string.
commitTimestamp The time when Bigtable applies the mutation. The time is measured in microseconds since the Unix epoch (January 1, 1970 at UTC).
timestamp The timestamp value of the cell affected by the mutation. For DELETE_CELLS and DELETE_FAMILY mutation types, timestamp is not set. The time is measured in microseconds since the Unix epoch (January 1, 1970 at UTC).
timestampFrom Describes an inclusive start of the timestamp interval for all cells deleted by the DELETE_CELLS mutation. For other mutation types, timestampFrom isn't set. The time is measured in microseconds since the Unix epoch (January 1, 1970 at UTC).
timestampTo Describes an exclusive end of the timestamp interval for all cells deleted by the DELETE_CELLS mutation. For other mutation types, timestampTo isn't set.
isGC A boolean value that indicates whether the mutation is generated by a Bigtable garbage collection mechanism.
tieBreaker When two mutations are registered at the same time by different Bigtable clusters, the mutation with the highest tiebreaker value is applied to the source table. Mutations with lower tiebreaker values are discarded.
value The new value set by the mutation. Unless the stripValues pipeline option is set, the value is set for SET_CELL mutations. For other mutation types, the value isn't set. Arrives in a form of a byte array. When JSON message encoding is configured, values are returned as strings. When useBase64Values is specified, the value is Base64-encoded. Otherwise, a charset specified by bigtableChangeStreamCharset is used to decode value bytes into a string.
sourceInstance The name of the Bigtable instance that registered the mutation. Might be when multiple pipelines stream changes from different instances to the same Pub/Sub topic.
sourceCluster The name of the Bigtable cluster that registered the mutation. Might be used when multiple pipelines stream changes from different instances to the same Pub/Sub topic.
sourceTable The name of the Bigtable table that received the mutation. Might be used in case when a multiple pipelines stream changes from a different tables to the same Pub/Sub topic.

Pipeline requirements

  • The specified Bigtable source instance.
  • The specified Bigtable source table. The table must have change streams enabled.
  • The specified Bigtable application profile.
  • The specified Pub/Sub topic must exist.

Template parameters

Parameter Description
bigtableReadInstanceId The source Bigtable instance ID.
bigtableReadTableId The source Bigtable table ID.
bigtableChangeStreamAppProfile The Bigtable application profile ID. The application profile must use single-cluster routing and allow single-row transactions.
pubSubTopic The name of the destination Pub/Sub topic.
messageFormat Optional: When destination topic has schema configured, the message format is determined by the configured schema and encoding. The format of the messages to be published to the Pub/Sub topic. Supported values: AVRO, PROTOCOL_BUFFERS and JSON. Defaults to JSON. When JSON format is used, rowKey, column and value fields of the message are strings, the contents of which is determined by useBase64Rowkeys, useBase64ColumnQualifiers, useBase64Values and bigtableChangeStreamCharset pipeline options.
messageEncoding Optional: When destination topic has schema configured, the message encoding is determined by the topic settings. The encoding of the messages to be published to the Pub/Sub topic. Supported values: BINARY, JSON and JSON. Defaults to JSON.
stripValues Optional: When set to true, the SET_CELL mutations are returned without new values set. Defaults to false. This parameter is useful when the you don't need a new value to be present, also known as cache invalidation, or when values are extremely large and exceed Pub/Sub message size limits.
bigtableReadProjectId Optional: The Bigtable project ID. The default is the project of the Dataflow job.
pubSubProjectId Optional: The Bigtable project ID. The default is the project of the Dataflow job.
bigtableChangeStreamMetadataInstanceId Optional: The Bigtable change streams metadata instance ID.
bigtableChangeStreamMetadataTableTableId Optional: The Bigtable change streams metadata table ID.
bigtableChangeStreamCharset Optional: The Bigtable change streams charset name when reading rowkeys, values, and column qualifiers. This option is used when message encoding is JSON.
bigtableChangeStreamStartTimestamp Optional: The starting timestamp, inclusive, to use for reading change streams. For example, 2022-05-05T07:59:59Z. Defaults to the timestamp of the pipeline start time.
bigtableChangeStreamIgnoreColumnFamilies Optional: A comma-separated list of column family name changes to ignore.
bigtableChangeStreamIgnoreColumns Optional: A comma-separated list of column name changes to ignore.
bigtableChangeStreamName Optional: A unique name for the client pipeline. Lets you resume processing from the point at which a previously running pipeline stopped. Defaults to the automatically generated name. To find the value, see the Dataflow job logs.
bigtableChangeStreamResume Optional: When set to true, a new pipeline resumes processing from the point when a previously running pipeline with the same bigtableChangeStreamName value stopped. If the pipeline with the given bigtableChangeStreamName value has never run, a new pipeline doesn't start. When set to false, a new pipeline starts. If a pipeline with the same bigtableChangeStreamName value has already run for the given source, a new pipeline doesn't start. Defaults to false.
useBase64Rowkeys Optional: Used with JSON message encoding. When set to true, the rowKey field is a Base64-encoded string. Otherwise, the rowKey is produced by using bigtableChangeStreamCharset to decode bytes into a string. Defaults to false.
useBase64ColumnQualifiers Optional: Used with JSON message encoding. When set to true, the column field is a Base64-encoded string. Otherwise, the column is produced by using bigtableChangeStreamCharset to decode bytes into a string. Defaults to false.
useBase64Values Optional: Used with JSON message encoding. When set to true, the value field is a Base64-encoded string. Otherwise, the value is produced by using bigtableChangeStreamCharset to decode bytes into a string. Defaults to false.
dlqMaxRetries Optional: The dead letter maximum retries. Defaults to 5.
dlqRetryMinutes Optional: The number of minutes between dead-letter queue retries. Defaults to 10.
dlqDirectory Optional: The directory for the dead-letter queue. Records that fail to be processed are stored in this directory. The default is a directory under the temp location of the Dataflow job. In most cases, you can use the default path.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Bigtable change streams to Pub/Sub template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • BIGTABLE_INSTANCE_ID: your Bigtable instance id.
  • BIGTABLE_TABLE_ID: your Bigtable table id.
  • BIGTABLE_APPLICATION_PROFILE_ID: your Bigtable application profile id.
  • PUBSUB_TOPIC: the Pub/Sub destination topic name

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • BIGTABLE_INSTANCE_ID: your Bigtable instance id.
  • BIGTABLE_TABLE_ID: your Bigtable table id.
  • BIGTABLE_APPLICATION_PROFILE_ID: your Bigtable application profile id.
  • PUBSUB_TOPIC: the Pub/Sub destination topic name

What's next