This template creates a streaming pipeline to stream Bigtable data change records and write them to Vertex AI Vector Search using Dataflow Runner V2.
Pipeline requirements
- The Bigtable source instance must exist.
- The Bigtable source table must exist, and the table must have change streams enabled.
- The Bigtable application profile must exist.
- The Vector Search index path must exist.
Template parameters
Parameter | Description |
---|---|
embeddingColumn |
The fully qualified column name where the embeddings are stored. In the format cf:col. |
embeddingByteSize |
The byte size of each entry in the embeddings array. Use 4 for float and 8 for double. Defaults to 4 . |
vectorSearchIndex |
The Vector Search index where changes will be streamed, in the format
'projects/{projectID}/locations/{region}/indexes/{indexID}' (no leading or trailing spaces). For example:
projects/123/locations/us-east1/indexes/456 . |
bigtableChangeStreamAppProfile |
The application profile that is used to distinguish workloads in Bigtable. |
bigtableReadInstanceId |
The ID of the Bigtable instance that contains the table. |
bigtableReadTableId |
The Bigtable table to read from. |
bigtableMetadataTableTableId |
Optional: ID for the metadata table that is created. If unset, Bigtable generates an ID. |
crowdingTagColumn |
Optional: The fully qualified column name where the crowding tag is stored, in the format cf:col . |
allowRestrictsMappings |
Optional: The comma-separated, fully qualified column names of the columns to use as allow restricts, plus
their aliases. Each column name must be in the format cf:col->alias . |
denyRestrictsMappings |
Optional: The comma-separated, fully qualified column names of the columns to use as deny restricts, plus
their aliases. Each column name must be in the format cf:col->alias . |
intNumericRestrictsMappings |
Optional: The comma-separated, fully qualified column names of the columns to use as integer
numeric_restricts , plus
their aliases. Each column name must be in the format cf:col->alias . |
floatNumericRestrictsMappings |
Optional: The comma-separated, fully qualified column names of the columns to use as float (4 bytes)
numeric_restricts , plus
their aliases. Each column name must be in the format cf:col->alias |
doubleNumericRestrictsMappings |
Optional: The comma-separated, fully qualified column names of the columns to use as double (8 bytes)
numeric_restricts , plus
their aliases. Each column name must be in the format cf:col->alias |
upsertMaxBatchSize |
Optional: The maximum number of upserts to buffer before upserting the batch to the Vector Search index. Batches are sent when
there are either upsertBatchSize records ready.
Example: 10 . |
upsertMaxBufferDuration |
Optional: The maximum delay before a batch of upserts is sent to Vector Search. Batches are sent when there are either
upsertBatchSize records ready. Allowed formats are:
Ns for seconds (example: 5s), Nm for
minutes (example: 12m), and Nh for hours (example:
2h). Default: 10s . |
deleteMaxBatchSize |
Optional: The maximum number of deletes to buffer before
deleting the batch from the Vector Search index.
Batches are sent when there are either
deleteBatchSize records ready.
For example: 10 . |
deleteMaxBufferDuration |
Optional: The maximum delay before a batch of deletes is sent
to Vector Search. Batches are sent when there are
either deleteBatchSize records ready. Allowed formats
are: Ns for seconds (example: 5s), Nm for
minutes (example: 12m), and Nh for hours (example:
2h). Default: 10s . |
dlqDirectory |
Optional: The path to store any unprocessed records with the reason they failed to be processed. Default is a directory under the Dataflow job's temporary location. The default value is appropriate for most scenarios. |
bigtableChangeStreamMetadataInstanceId |
Optional: The Bigtable instance to use for the change streams connector metadata table. Defaults to empty. |
bigtableChangeStreamMetadataTableTableId |
Optional: The Bigtable change streams connector metadata table ID to use. If not provided, a Bigtable change streams connector metadata table is automatically created during the pipeline flow. Defaults to empty. |
bigtableChangeStreamCharset |
Optional: Bigtable change streams charset name when reading values and column qualifiers. Default is UTF-8. |
bigtableChangeStreamStartTimestamp |
Optional: The starting DateTime, inclusive, to use for reading change streams (https://tools.ietf.org/html/rfc3339). For example, 2022-05-05T07:59:59Z. Defaults to the timestamp when the pipeline starts. |
bigtableChangeStreamIgnoreColumnFamilies |
Optional: A comma-separated list of column family names changes to which won't be captured. Defaults to empty. |
bigtableChangeStreamIgnoreColumns |
Optional: A comma-separated list of column names changes to which won't be captured. Defaults to empty. |
bigtableChangeStreamName |
Optional: A unique name for the client pipeline. This parameter leys you resume processing from the point at which a previously running pipeline stopped. Defaults to an automatically generated name. See the Dataflow job logs for the value used. |
bigtableChangeStreamResume |
Optional: When set to true, a new pipeline resumes
processing from the point at which a previously running
pipeline with the same name stopped. If a pipeline with that
name never ran in the past, the new pipeline fails to start.
Use the When set to false, a new pipeline is started. If a pipeline
with the same name as Defaults to false. |
bigtableReadProjectId |
Optional: Project to read Bigtable data from. The default for this parameter is the project where the Dataflow pipeline is running. |
Run the template
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Bigtable Change Streams to Vector Search template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
In your shell or terminal, run the template:
gcloud dataflow flex-template runJOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME /VERSION /flex/Bigtable_Change_Streams_to_Vector_Search \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN ,\ embeddingByteSize=EMBEDDING_BYTE_SIZE ,\ vectorSearchIndex=VECTOR_SEARCH_INDEX ,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE ,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID ,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID ,\
Replace the following:
JOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
REGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
EMBEDDING_COLUMN
: the Embedding columnEMBEDDING_BYTE_SIZE
: the The byte size of the embeddings array. Can be 4 or 8.VECTOR_SEARCH_INDEX
: the Vector Search index PathBIGTABLE_CHANGE_STREAM_APP_PROFILE
: the Bigtable application profile IDBIGTABLE_READ_INSTANCE_ID
: the source Bigtable Instance IDBIGTABLE_READ_TABLE_ID
: the source Bigtable table ID
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID /locations/LOCATION /flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME ", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN ", "embeddingByteSize": "EMBEDDING_BYTE_SIZE ", "vectorSearchIndex": "VECTOR_SEARCH_INDEX ", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE ", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID ", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID ", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION /VERSION /flex/Bigtable_Change_Streams_to_Vector_Search", "environment": { "maxWorkers": "10" } } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
LOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
EMBEDDING_COLUMN
: the Embedding columnEMBEDDING_BYTE_SIZE
: the The byte size of the embeddings array. Can be 4 or 8.VECTOR_SEARCH_INDEX
: the Vector Search index PathBIGTABLE_CHANGE_STREAM_APP_PROFILE
: the Bigtable application profile IDBIGTABLE_READ_INSTANCE_ID
: the source Bigtable Instance IDBIGTABLE_READ_TABLE_ID
: the source Bigtable table ID