Esta plantilla crea una canalización de transmisión para transmitir registros de cambios de datos de Bigtable y escribirlos en la búsqueda de vectores de Vertex AI con Dataflow Runner V2.
Requisitos de la canalización
- La instancia de origen de Bigtable debe existir.
- La tabla de origen de Bigtable debe existir y la tabla debe tener habilitados los flujos de cambios.
- El perfil de la aplicación de Bigtable debe existir.
- La ruta del índice de búsqueda de vectores debe existir.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
embeddingColumn |
El nombre de la columna completamente calificado en el que se almacenan las incorporaciones. En el formato cf:col. |
embeddingByteSize |
El tamaño en bytes de cada entrada en el array de embeddings. Usa 4 para el número de punto flotante y 8 para el doble. La configuración predeterminada es 4 . |
vectorSearchIndex |
El índice de búsqueda de vectores en el que se transmitirán los cambios, en el formato
'projects/{projectID}/locations/{region}/indexes/{indexID}' (sin espacios al principio ni al final). Por ejemplo:
projects/123/locations/us-east1/indexes/456 . |
bigtableChangeStreamAppProfile |
El perfil de la aplicación que se usa para distinguir las cargas de trabajo en Bigtable. |
bigtableReadInstanceId |
El ID de la instancia de Bigtable que contiene la tabla. |
bigtableReadTableId |
La tabla de Bigtable de la que se leerá. |
bigtableMetadataTableTableId |
Opcional: ID de la tabla de metadatos que se crea. Si no se configura, Bigtable generará un ID. |
crowdingTagColumn |
Opcional: El nombre de la columna completamente calificado en la que se almacena la etiqueta de agrupamiento, en el formato cf:col . |
allowRestrictsMappings |
Opcional: Los nombres de columna separados por comas y completamente calificados de las columnas que se usarán como restricciones de allow , más
sus alias. El nombre de cada columna debe tener el formato cf:col->alias . |
denyRestrictsMappings |
Opcional: Los nombres de columna separados por comas y completamente calificados de las columnas que se usarán como restricciones de deny , más
sus alias. El nombre de cada columna debe tener el formato cf:col->alias . |
intNumericRestrictsMappings |
Opcional: Los nombres de columna separados por comas y completamente calificados de las columnas que se usarán como números enteros
numeric_restricts , más
sus alias. El nombre de cada columna debe tener el formato cf:col->alias . |
floatNumericRestrictsMappings |
Opcional: Los nombres de columna separados por comas y completamente calificados de las columnas que se usarán como números de punto flotante (4 bytes)
numeric_restricts , más
sus alias. El nombre de cada columna debe tener el formato cf:col->alias |
doubleNumericRestrictsMappings |
Opcional: Los nombres de columna separados por comas y completamente calificados de las columnas que se usarán como dobles (8 bytes)
numeric_restricts , más
sus alias. El nombre de cada columna debe tener el formato cf:col->alias |
upsertMaxBatchSize |
Opcional: La cantidad máxima de inserciones que se deben almacenar en búfer antes de insertar el lote en el índice de la búsqueda de vectores. Los lotes se envían cuando
hay upsertBatchSize registros listos.
Ejemplo: 10 . |
upsertMaxBufferDuration |
Opcional: El retraso máximo antes de que se envíe un lote de inserciones y actualizaciones a la búsqueda de vectores. Los lotes se envían cuando hay
upsertBatchSize registros listos. Los formatos permitidos son los siguientes:
Ns para los segundos (por ejemplo: 5 s), Nm para los
minutos (por ejemplo: 12 min) y Nh para las horas (por ejemplo:
2 h). Valor predeterminado: 10s . |
deleteMaxBatchSize |
Opcional: La cantidad máxima de eliminaciones que se deben almacenar en búfer antes de
borrar el lote del índice de búsqueda de vectores.
Los lotes se envían cuando hay
deleteBatchSize registros listos.
Por ejemplo: 10 . |
deleteMaxBufferDuration |
Opcional: El retraso máximo antes de enviar un lote de eliminaciones
a la búsqueda de vectores. Los lotes se envían cuando hay
deleteBatchSize registros listos. Los formatos permitidos
son siguientes: Ns para los segundos (por ejemplo: 5 s), Nm para los
minutos (por ejemplo: 12 min) y Nh para las horas (por ejemplo:
2 h). Valor predeterminado: 10s . |
dlqDirectory |
Opcional: La ruta de acceso que almacenará los registros no procesados con el motivo por el que no se pudieron procesar. El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. El valor predeterminado es apropiado para la mayoría de las situaciones. |
bigtableChangeStreamMetadataInstanceId |
Opcional: La instancia de Bigtable que se usará para la tabla de metadatos del conector de flujos de cambios. La configuración predeterminada es vacía. |
bigtableChangeStreamMetadataTableTableId |
Opcional: El ID de la tabla de metadatos del conector de flujos de cambios de Bigtable que se usará. Si no se proporciona, una tabla de metadatos del conector de flujos de cambios de Bigtable se creará de forma automática durante el flujo de la canalización. La configuración predeterminada es vacía. |
bigtableChangeStreamCharset |
Opcional: El nombre del charset de flujos de cambios de Bigtable cuando se leen valores y calificadores de columnas. La configuración predeterminada es UTF-8. |
bigtableChangeStreamStartTimestamp |
Opcional: La fecha y hora de inicio, inclusive, que se usará para leer los flujos de cambios (https://tools.ietf.org/html/rfc3339). Por ejemplo, 2022-05-05T07:59:59Z. El valor predeterminado es la marca de tiempo del inicio de la canalización. |
bigtableChangeStreamIgnoreColumnFamilies |
Opcional: Una lista separada por comas de nombres de familias de columnas cuyos cambios no se capturarán. La configuración predeterminada es vacía. |
bigtableChangeStreamIgnoreColumns |
Opcional: Una lista separada por comas de nombres de columnas cuyos cambios no se capturarán. La configuración predeterminada es vacía. |
bigtableChangeStreamName |
Un nombre único para la canalización del cliente (opcional). Este parámetro te permite reanudar el procesamiento desde el momento en el que se detiene una canalización que se ejecutaba antes. El valor predeterminado es un nombre generado automáticamente. Consulta los registros de trabajos de Dataflow para el valor usado. |
bigtableChangeStreamResume |
Opcional: Cuando se configura como true, una canalización nueva reanuda
el procesamiento desde el momento en que se detiene una canalización que se
ejecutaba antes con el mismo nombre. Si una canalización con ese
nombre nunca se ejecutó en el pasado, la canalización nueva no se inicia.
Usa el parámetro Cuando se configura como false, se inicia una canalización nueva. Si una canalización
con el mismo nombre que La configuración predeterminada es "false". |
bigtableReadProjectId |
Opcional: Proyecto desde el que se leerán los datos de Bigtable. El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow. |
Ejecuta la plantilla
Console
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Bigtable Change Streams to Vector Search template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haz clic en Ejecutar trabajo.
CLI de gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN,\ embeddingByteSize=EMBEDDING_BYTE_SIZE,\ vectorSearchIndex=VECTOR_SEARCH_INDEX,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
REGION_NAME
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
EMBEDDING_COLUMN
: la columna de embeddingEMBEDDING_BYTE_SIZE
: el tamaño en bytes del array de embeddings Puede ser 4 o 8.VECTOR_SEARCH_INDEX
: la ruta del índice de búsqueda de vectoresBIGTABLE_CHANGE_STREAM_APP_PROFILE
: el ID de perfil de la aplicación de BigtableBIGTABLE_READ_INSTANCE_ID
: el ID de la instancia de Bigtable de origenBIGTABLE_READ_TABLE_ID
: el ID de la tabla de origen de Bigtable
API
Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN", "embeddingByteSize": "EMBEDDING_BYTE_SIZE", "vectorSearchIndex": "VECTOR_SEARCH_INDEX", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search", "environment": { "maxWorkers": "10" } } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
LOCATION
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
EMBEDDING_COLUMN
: la columna de embeddingEMBEDDING_BYTE_SIZE
: el tamaño en bytes del array de embeddings Puede ser 4 o 8.VECTOR_SEARCH_INDEX
: la ruta del índice de búsqueda de vectoresBIGTABLE_CHANGE_STREAM_APP_PROFILE
: el ID de perfil de la aplicación de BigtableBIGTABLE_READ_INSTANCE_ID
: el ID de la instancia de Bigtable de origenBIGTABLE_READ_TABLE_ID
: el ID de la tabla de origen de Bigtable