Bigtable change streams to Vector Search template

This template creates a streaming pipeline to stream Bigtable data change records and write them to Vertex AI Vector Search using Dataflow Runner V2.

Pipeline requirements

  • The Bigtable source instance must exist.
  • The Bigtable source table must exist, and the table must have change streams enabled.
  • The Bigtable application profile must exist.
  • The Vector Search index path must exist.

Template parameters

Parameter Description
embeddingColumn The fully qualified column name where the embeddings are stored. In the format cf:col.
embeddingByteSize The byte size of each entry in the embeddings array. Use 4 for float and 8 for double. Defaults to 4.
vectorSearchIndex The Vector Search index where changes will be streamed, in the format 'projects/{projectID}/locations/{region}/indexes/{indexID}' (no leading or trailing spaces). For example: projects/123/locations/us-east1/indexes/456.
bigtableChangeStreamAppProfile The application profile that is used to distinguish workloads in Bigtable.
bigtableReadInstanceId The ID of the Bigtable instance that contains the table.
bigtableReadTableId The Bigtable table to read from.
bigtableMetadataTableTableId Optional: ID for the metadata table that is created. If unset, Bigtable generates an ID.
crowdingTagColumn Optional: The fully qualified column name where the crowding tag is stored, in the format cf:col.
allowRestrictsMappings Optional: The comma-separated, fully qualified column names of the columns to use as allow restricts, plus their aliases. Each column name must be in the format cf:col->alias.
denyRestrictsMappings Optional: The comma-separated, fully qualified column names of the columns to use as deny restricts, plus their aliases. Each column name must be in the format cf:col->alias.
intNumericRestrictsMappings Optional: The comma-separated, fully qualified column names of the columns to use as integer numeric_restricts, plus their aliases. Each column name must be in the format cf:col->alias.
floatNumericRestrictsMappings Optional: The comma-separated, fully qualified column names of the columns to use as float (4 bytes) numeric_restricts, plus their aliases. Each column name must be in the format cf:col->alias
doubleNumericRestrictsMappings Optional: The comma-separated, fully qualified column names of the columns to use as double (8 bytes) numeric_restricts, plus their aliases. Each column name must be in the format cf:col->alias
upsertMaxBatchSize Optional: The maximum number of upserts to buffer before upserting the batch to the Vector Search index. Batches are sent when there are either upsertBatchSize records ready. Example: 10.
upsertMaxBufferDuration Optional: The maximum delay before a batch of upserts is sent to Vector Search. Batches are sent when there are either upsertBatchSize records ready. Allowed formats are: Ns for seconds (example: 5s), Nm for minutes (example: 12m), and Nh for hours (example: 2h). Default: 10s.
deleteMaxBatchSize Optional: The maximum number of deletes to buffer before deleting the batch from the Vector Search index. Batches are sent when there are either deleteBatchSize records ready. For example: 10.
deleteMaxBufferDuration Optional: The maximum delay before a batch of deletes is sent to Vector Search. Batches are sent when there are either deleteBatchSize records ready. Allowed formats are: Ns for seconds (example: 5s), Nm for minutes (example: 12m), and Nh for hours (example: 2h). Default: 10s.
dlqDirectory Optional: The path to store any unprocessed records with the reason they failed to be processed. Default is a directory under the Dataflow job's temporary location. The default value is appropriate for most scenarios.
bigtableChangeStreamMetadataInstanceId Optional: The Bigtable instance to use for the change streams connector metadata table. Defaults to empty.
bigtableChangeStreamMetadataTableTableId Optional: The Bigtable change streams connector metadata table ID to use. If not provided, a Bigtable change streams connector metadata table is automatically created during the pipeline flow. Defaults to empty.
bigtableChangeStreamCharset Optional: Bigtable change streams charset name when reading values and column qualifiers. Default is UTF-8.
bigtableChangeStreamStartTimestamp Optional: The starting DateTime, inclusive, to use for reading change streams (https://tools.ietf.org/html/rfc3339). For example, 2022-05-05T07:59:59Z. Defaults to the timestamp when the pipeline starts.
bigtableChangeStreamIgnoreColumnFamilies Optional: A comma-separated list of column family names changes to which won't be captured. Defaults to empty.
bigtableChangeStreamIgnoreColumns Optional: A comma-separated list of column names changes to which won't be captured. Defaults to empty.
bigtableChangeStreamName Optional: A unique name for the client pipeline. This parameter leys you resume processing from the point at which a previously running pipeline stopped. Defaults to an automatically generated name. See the Dataflow job logs for the value used.
bigtableChangeStreamResume

Optional: When set to true, a new pipeline resumes processing from the point at which a previously running pipeline with the same name stopped. If a pipeline with that name never ran in the past, the new pipeline fails to start. Use the bigtableChangeStreamName parameter to specify the pipeline line.

When set to false, a new pipeline is started. If a pipeline with the same name as bigtableChangeStreamName already ran in the past for the given source, the new pipeline fails to start.

Defaults to false.

bigtableReadProjectId Optional: Project to read Bigtable data from. The default for this parameter is the project where the Dataflow pipeline is running.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Bigtable Change Streams to Vector Search template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud CLI

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • EMBEDDING_COLUMN: the Embedding column
  • EMBEDDING_BYTE_SIZE: the The byte size of the embeddings array. Can be 4 or 8.
  • VECTOR_SEARCH_INDEX: the Vector Search index Path
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: the Bigtable application profile ID
  • BIGTABLE_READ_INSTANCE_ID: the source Bigtable Instance ID
  • BIGTABLE_READ_TABLE_ID: the source Bigtable table ID

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • EMBEDDING_COLUMN: the Embedding column
  • EMBEDDING_BYTE_SIZE: the The byte size of the embeddings array. Can be 4 or 8.
  • VECTOR_SEARCH_INDEX: the Vector Search index Path
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: the Bigtable application profile ID
  • BIGTABLE_READ_INSTANCE_ID: the source Bigtable Instance ID
  • BIGTABLE_READ_TABLE_ID: the source Bigtable table ID