This template creates a streaming pipeline to stream Bigtable data change records and write them to Vertex AI Vector Search using Dataflow Runner V2.
Pipeline requirements
- The Bigtable source instance must exist.
- The Bigtable source table must exist, and the table must have change streams enabled.
- The Bigtable application profile must exist.
- The Vector Search index path must exist.
Template parameters
Parameter | Description |
---|---|
embeddingColumn |
The fully qualified column name where the embeddings are stored. In the format cf:col. |
embeddingByteSize |
The byte size of each entry in the embeddings array. Use 4 for float and 8 for double. Defaults to 4 . |
vectorSearchIndex |
The Vector Search index where changes will be streamed, in the format
'projects/{projectID}/locations/{region}/indexes/{indexID}' (no leading or trailing spaces). For example:
projects/123/locations/us-east1/indexes/456 . |
bigtableChangeStreamAppProfile |
The application profile that is used to distinguish workloads in Bigtable. |
bigtableReadInstanceId |
The ID of the Bigtable instance that contains the table. |
bigtableReadTableId |
The Bigtable table to read from. |
bigtableMetadataTableTableId |
Optional: ID for the metadata table that is created. If unset, Bigtable generates an ID. |
crowdingTagColumn |
Optional: The fully qualified column name where the crowding tag is stored, in the format cf:col . |
allowRestrictsMappings |
Optional: The comma-separated, fully qualified column names of the columns to use as allow restricts, plus
their aliases. Each column name must be in the format cf:col->alias . |
denyRestrictsMappings |
Optional: The comma-separated, fully qualified column names of the columns to use as deny restricts, plus
their aliases. Each column name must be in the format cf:col->alias . |
intNumericRestrictsMappings |
Optional: The comma-separated, fully qualified column names of the columns to use as integer
numeric_restricts , plus
their aliases. Each column name must be in the format cf:col->alias . |
floatNumericRestrictsMappings |
Optional: The comma-separated, fully qualified column names of the columns to use as float (4 bytes)
numeric_restricts , plus
their aliases. Each column name must be in the format cf:col->alias |
doubleNumericRestrictsMappings |
Optional: The comma-separated, fully qualified column names of the columns to use as double (8 bytes)
numeric_restricts , plus
their aliases. Each column name must be in the format cf:col->alias |
upsertMaxBatchSize |
Optional: The maximum number of upserts to buffer before upserting the batch to the Vector Search index. Batches are sent when
there are either upsertBatchSize records ready.
Example: 10 . |
upsertMaxBufferDuration |
Optional: The maximum delay before a batch of upserts is sent to Vector Search. Batches are sent when there are either
upsertBatchSize records ready. Allowed formats are:
Ns for seconds (example: 5s), Nm for
minutes (example: 12m), and Nh for hours (example:
2h). Default: 10s . |
deleteMaxBatchSize |
Optional: The maximum number of deletes to buffer before
deleting the batch from the Vector Search index.
Batches are sent when there are either
deleteBatchSize records ready.
For example: 10 . |
deleteMaxBufferDuration |
Optional: The maximum delay before a batch of deletes is sent
to Vector Search. Batches are sent when there are
either deleteBatchSize records ready. Allowed formats
are: Ns for seconds (example: 5s), Nm for
minutes (example: 12m), and Nh for hours (example:
2h). Default: 10s . |
dlqDirectory |
Optional: The path to store any unprocessed records with the reason they failed to be processed. Default is a directory under the Dataflow job's temporary location. The default value is appropriate for most scenarios. |
bigtableChangeStreamMetadataInstanceId |
Optional: The Bigtable instance to use for the change streams connector metadata table. Defaults to empty. |
bigtableChangeStreamMetadataTableTableId |
Optional: The Bigtable change streams connector metadata table ID to use. If not provided, a Bigtable change streams connector metadata table is automatically created during the pipeline flow. Defaults to empty. |
bigtableChangeStreamCharset |
Optional: Bigtable change streams charset name when reading values and column qualifiers. Default is UTF-8. |
bigtableChangeStreamStartTimestamp |
Optional: The starting DateTime, inclusive, to use for reading change streams (https://tools.ietf.org/html/rfc3339). For example, 2022-05-05T07:59:59Z. Defaults to the timestamp when the pipeline starts. |
bigtableChangeStreamIgnoreColumnFamilies |
Optional: A comma-separated list of column family names changes to which won't be captured. Defaults to empty. |
bigtableChangeStreamIgnoreColumns |
Optional: A comma-separated list of column names changes to which won't be captured. Defaults to empty. |
bigtableChangeStreamName |
Optional: A unique name for the client pipeline. This parameter leys you resume processing from the point at which a previously running pipeline stopped. Defaults to an automatically generated name. See the Dataflow job logs for the value used. |
bigtableChangeStreamResume |
Optional: When set to true, a new pipeline resumes
processing from the point at which a previously running
pipeline with the same name stopped. If a pipeline with that
name never ran in the past, the new pipeline fails to start.
Use the When set to false, a new pipeline is started. If a pipeline
with the same name as Defaults to false. |
bigtableReadProjectId |
Optional: Project to read Bigtable data from. The default for this parameter is the project where the Dataflow pipeline is running. |
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Bigtable Change Streams to Vector Search template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud CLI
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN,\ embeddingByteSize=EMBEDDING_BYTE_SIZE,\ vectorSearchIndex=VECTOR_SEARCH_INDEX,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\
Replace the following:
JOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
REGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
EMBEDDING_COLUMN
: the Embedding columnEMBEDDING_BYTE_SIZE
: the The byte size of the embeddings array. Can be 4 or 8.VECTOR_SEARCH_INDEX
: the Vector Search index PathBIGTABLE_CHANGE_STREAM_APP_PROFILE
: the Bigtable application profile IDBIGTABLE_READ_INSTANCE_ID
: the source Bigtable Instance IDBIGTABLE_READ_TABLE_ID
: the source Bigtable table ID
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN", "embeddingByteSize": "EMBEDDING_BYTE_SIZE", "vectorSearchIndex": "VECTOR_SEARCH_INDEX", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search", "environment": { "maxWorkers": "10" } } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
LOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
EMBEDDING_COLUMN
: the Embedding columnEMBEDDING_BYTE_SIZE
: the The byte size of the embeddings array. Can be 4 or 8.VECTOR_SEARCH_INDEX
: the Vector Search index PathBIGTABLE_CHANGE_STREAM_APP_PROFILE
: the Bigtable application profile IDBIGTABLE_READ_INSTANCE_ID
: the source Bigtable Instance IDBIGTABLE_READ_TABLE_ID
: the source Bigtable table ID