Esta plantilla crea un flujo de procesamiento en streaming para transmitir registros de cambios de datos de Bigtable y escribirlos en Vertex AI Vector Search mediante Dataflow Runner V2.
Requisitos del flujo de procesamiento
- La instancia de origen de Bigtable debe existir.
- La tabla de origen de Bigtable debe existir y tener habilitados los flujos de cambios.
- El perfil de aplicación de Bigtable debe existir.
- La ruta del índice de búsqueda de vectores debe existir.
Parámetros de plantilla
Parámetros obligatorios
- embeddingColumn nombre de columna completo en el que se almacenan las inserciones. En el formato cf:col.
- embeddingByteSize el tamaño en bytes de cada entrada de la matriz de incrustaciones. Usa 4 para Float y 8 para Double. El valor predeterminado es 4.
- vectorSearchIndex el índice de búsqueda vectorial en el que se transmitirán los cambios, con el formato "projects/{projectID}/locations/{region}/indexes/{indexID}" (sin espacios al principio ni al final). Por ejemplo,
projects/123/locations/us-east1/indexes/456
. - bigtableChangeStreamAppProfile el ID del perfil de aplicación de Bigtable. El perfil de la aplicación debe usar el enrutamiento de un solo clúster y permitir transacciones de una sola fila.
- bigtableReadInstanceId el ID de la instancia de Bigtable de origen.
- bigtableReadTableId el ID de la tabla de Bigtable de origen.
Parámetros opcionales
- bigtableMetadataTableTableId ID de la tabla que se usa para crear la tabla de metadatos.
- crowdingTagColumn nombre de columna completo en el que se almacena la etiqueta de aglomeración. En el formato cf:col.
- allowRestrictsMappings los nombres de columna completos separados por comas de las columnas que se deben usar como
allow
restricciones, con su alias. En el formato cf:col->alias. - denyRestrictsMappings los nombres de columna completos separados por comas de las columnas que se deben usar como restricciones de
deny
, con su alias. En el formato cf:col->alias. - intNumericRestrictsMappings los nombres de columna completos separados por comas de las columnas que se deben usar como
numeric_restricts
entero, con su alias. En el formato cf:col->alias. - floatNumericRestrictsMappings los nombres de columna completos separados por comas de las columnas que se deben usar como float (4 bytes)
numeric_restricts
, con su alias. En el formato cf:col->alias. - doubleNumericRestrictsMappings los nombres de columna completos separados por comas que se deben usar como
numeric_restricts
de doble precisión (8 bytes), con su alias. En el formato cf:col->alias. - upsertMaxBatchSize número máximo de inserciones para almacenar en el búfer antes de insertar el lote en el índice de búsqueda vectorial. Los lotes se enviarán cuando haya registros upsertBatchSize listos o cuando haya transcurrido el tiempo upsertBatchDelay desde que se creó un registro. Por ejemplo,
10
. El valor predeterminado es 10. - upsertMaxBufferDuration el retraso máximo antes de que se envíe un lote de inserciones y actualizaciones a la búsqueda vectorial.Los lotes se enviarán cuando haya registros upsertBatchSize listos o cuando haya transcurrido el tiempo upsertBatchDelay desde que se creó un registro. Los formatos permitidos son: Ns (para segundos, por ejemplo, 5s), Nm (para minutos, por ejemplo, 12m) y Nh (para horas, por ejemplo, 2h). Por ejemplo,
10s
. El valor predeterminado es 10 s. - deleteMaxBatchSize: número máximo de eliminaciones que se almacenarán en el búfer antes de eliminar el lote del índice de búsqueda vectorial. Los lotes se enviarán cuando haya registros deleteBatchSize listos o cuando haya transcurrido el tiempo deleteBatchDelay desde que se añadió un registro. Por ejemplo,
10
. El valor predeterminado es 10. - deleteMaxBufferDuration el tiempo máximo que debe transcurrir antes de que se envíe un lote de eliminaciones a la búsqueda vectorial.Los lotes se enviarán cuando haya deleteBatchSize registros listos o cuando haya transcurrido el tiempo deleteBatchDelay desde que se añadió un registro. Los formatos permitidos son: Ns (para segundos, por ejemplo, 5s), Nm (para minutos, por ejemplo, 12m) y Nh (para horas, por ejemplo, 2h). Por ejemplo,
10s
. El valor predeterminado es 10 s. - dlqDirectory la ruta para almacenar los registros sin procesar con el motivo por el que no se han podido procesar. El valor predeterminado es un directorio de la ubicación temporal de la tarea de Dataflow. El valor predeterminado es suficiente en la mayoría de las condiciones.
- bigtableChangeStreamMetadataInstanceId el ID de instancia de metadatos de los flujos de cambios de Bigtable. El valor predeterminado es una cadena vacía.
- bigtableChangeStreamMetadataTableTableId el ID de la tabla de metadatos del conector de flujos de cambios de Bigtable. Si no se proporciona, se crea automáticamente una tabla de metadatos del conector de flujos de cambios de Bigtable durante la ejecución de la canalización. El valor predeterminado es una cadena vacía.
- bigtableChangeStreamCharset nombre del conjunto de caracteres de los flujos de cambios de Bigtable. El valor predeterminado es UTF-8.
- bigtableChangeStreamStartTimestamp marca de tiempo inicial (https://tools.ietf.org/html/rfc3339) (inclusive) que se usará para leer los flujos de cambios. Por ejemplo,
2022-05-05T07:59:59Z
. El valor predeterminado es la marca de tiempo de la hora de inicio de la canalización. - bigtableChangeStreamIgnoreColumnFamilies lista de cambios en los nombres de las familias de columnas separados por comas que se deben ignorar. El valor predeterminado es una cadena vacía.
- bigtableChangeStreamIgnoreColumns lista de cambios de nombres de columnas separados por comas que se deben ignorar. Ejemplo: "cf1:col1,cf2:col2". El valor predeterminado es una cadena vacía.
- bigtableChangeStreamName nombre único de la canalización del cliente. Te permite reanudar el procesamiento desde el punto en el que se detuvo una canalización que se estaba ejecutando. El valor predeterminado es un nombre generado automáticamente. Consulte los registros de trabajos de Dataflow para ver el valor utilizado.
- bigtableChangeStreamResume si se define como
true
, una nueva canalización reanuda el procesamiento desde el punto en el que se detuvo una canalización que se estaba ejecutando anteriormente con el mismo valor debigtableChangeStreamName
. Si la canalización con el valorbigtableChangeStreamName
proporcionado no se ha ejecutado nunca, no se iniciará ninguna canalización. Si se define comofalse
, se inicia una nueva canalización. Si ya se ha ejecutado una canalización con el mismo valor debigtableChangeStreamName
para la fuente en cuestión, no se iniciará una nueva canalización. El valor predeterminado esfalse
. - bigtableReadChangeStreamTimeoutMs tiempo de espera de las solicitudes ReadChangeStream de Bigtable en milisegundos.
- bigtableReadProjectId el ID del proyecto de Bigtable. El valor predeterminado es el proyecto de la tarea de Dataflow.
Ejecutar la plantilla
Consola
- Ve a la página Crear tarea a partir de plantilla de Dataflow. Ir a Crear tarea a partir de plantilla
- En el campo Nombre de la tarea, introduce un nombre único.
- Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es
us-central1
.Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de flujo de datos, seleccione the Bigtable Change Streams to Vector Search template.
- En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
- Haz clic en Ejecutar trabajo.
CLI de gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN,\ embeddingByteSize=EMBEDDING_BYTE_SIZE,\ vectorSearchIndex=VECTOR_SEARCH_INDEX,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\
Haz los cambios siguientes:
JOB_NAME
: un nombre de trabajo único que elijasVERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
EMBEDDING_COLUMN
: la columna EmbeddingEMBEDDING_BYTE_SIZE
: tamaño en bytes de la matriz de inserciones. Puede ser 4 u 8.VECTOR_SEARCH_INDEX
: la ruta del índice de búsqueda vectorialBIGTABLE_CHANGE_STREAM_APP_PROFILE
: el ID del perfil de aplicación de BigtableBIGTABLE_READ_INSTANCE_ID
: el ID de instancia de Bigtable de origenBIGTABLE_READ_TABLE_ID
: el ID de la tabla de Bigtable de origen
API
Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN", "embeddingByteSize": "EMBEDDING_BYTE_SIZE", "vectorSearchIndex": "VECTOR_SEARCH_INDEX", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search", "environment": { "maxWorkers": "10" } } }
Haz los cambios siguientes:
PROJECT_ID
: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de DataflowJOB_NAME
: un nombre de trabajo único que elijasVERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
EMBEDDING_COLUMN
: la columna EmbeddingEMBEDDING_BYTE_SIZE
: tamaño en bytes de la matriz de inserciones. Puede ser 4 u 8.VECTOR_SEARCH_INDEX
: la ruta del índice de búsqueda vectorialBIGTABLE_CHANGE_STREAM_APP_PROFILE
: el ID del perfil de aplicación de BigtableBIGTABLE_READ_INSTANCE_ID
: el ID de instancia de Bigtable de origenBIGTABLE_READ_TABLE_ID
: el ID de la tabla de Bigtable de origen