Esta plantilla crea una canalización de transmisión para transmitir registros de cambios de datos de Bigtable y escribirlos en la búsqueda de vectores de Vertex AI con Dataflow Runner V2.
Requisitos de la canalización
- La instancia de origen de Bigtable debe existir.
- La tabla de origen de Bigtable debe existir y la tabla debe tener habilitados los flujos de cambios.
- El perfil de la aplicación de Bigtable debe existir.
- La ruta del índice de búsqueda de vectores debe existir.
Parámetros de la plantilla
Parámetros obligatorios
- embeddingColumn: El nombre de la columna completamente calificado en el que se almacenan las incorporaciones. En el formato cf:col.
- embeddingByteSize: Es el tamaño en bytes de cada entrada en el array de embeddings. Usa 4 para Float y 8 para Double. La configuración predeterminada es 4.
- vectorSearchIndex: Es el índice de Vector Search en el que se transmitirán los cambios, en el formato "projects/{projectID}/locations/{region}/indexes/{indexID}" (sin espacios al principio ni al final). Por ejemplo,
projects/123/locations/us-east1/indexes/456
. - bigtableChangeStreamAppProfile: Es el ID del perfil de la aplicación de Bigtable. El perfil de aplicación debe usar el enrutamiento de un solo clúster y permitir las transacciones de fila única.
- bigtableReadInstanceId: Es el ID de la instancia de origen de Bigtable.
- bigtableReadTableId: Es el ID de la tabla de origen de Bigtable.
Parámetros opcionales
- bigtableMetadataTableTableId: Es el ID de la tabla que se usa para crear la tabla de metadatos.
- crowdingTagColumn: El nombre de la columna completamente calificado en el que se almacena la etiqueta de agrupamiento. En el formato cf:col.
- allowRestrictsMappings: Son los nombres de columna completamente calificados y separados por comas de las columnas que se deben usar como restricciones de
allow
, con su alias. En el formato cf:col->alias. - denyRestrictsMappings: Nombres de columna completamente calificados y separados por comas de las columnas que se deben usar como restricciones de
deny
, con su alias. En el formato cf:col->alias. - intNumericRestrictsMappings: Los nombres de columna completamente calificados y separados por comas de las columnas que se deben usar como
numeric_restricts
de número entero, con su alias. En el formato cf:col->alias. - floatNumericRestrictsMappings: Los nombres de columna completamente calificados y separados por comas de las columnas que se deben usar como
numeric_restricts
de número de punto flotante (4 bytes), con su alias. En el formato cf:col->alias. - doubleNumericRestrictsMappings: Los nombres de columna completamente calificados y separados por comas de las columnas que se deben usar como
numeric_restricts
dobles (8 bytes), con su alias. En el formato cf:col->alias. - upsertMaxBatchSize: Es la cantidad máxima de inserciones que se deben almacenar en búfer antes de insertar el lote en el índice de la búsqueda de vectores. Los lotes se enviarán cuando haya registros upsertBatchSize listos o cuando haya transcurrido el tiempo de espera upsertBatchDelay de cualquier registro. Por ejemplo,
10
La configuración predeterminada es 10. - upsertMaxBufferDuration: Es el retraso máximo antes de que se envíe un lote de inserciones y actualizaciones a la búsqueda de vectores.Los lotes se enviarán cuando haya registros upsertBatchSize listos o cuando haya transcurrido el tiempo upsertBatchDelay de espera de cualquier registro. Los formatos permitidos son: Ns (para los segundos, por ejemplo, 5 s), Nm (para los minutos, por ejemplo, 12 m) y Nh (para las horas, por ejemplo, 2 h). Por ejemplo,
10s
El valor predeterminado es 10 s. - deleteMaxBatchSize: Es la cantidad máxima de eliminaciones que se deben almacenar en búfer antes de borrar el lote del índice de la Búsqueda de Vectores. Los lotes se enviarán cuando haya registros deleteBatchSize listos o cuando haya transcurrido el tiempo deleteBatchDelay de espera de cualquier registro. Por ejemplo,
10
La configuración predeterminada es 10. - deleteMaxBufferDuration: Es el retraso máximo antes de que se envíe un lote de eliminaciones a la búsqueda de vectores.Los lotes se enviarán cuando haya registros deleteBatchSize listos o cuando haya transcurrido el tiempo deleteBatchDelay de espera de cualquier registro. Los formatos permitidos son: Ns (para los segundos, por ejemplo, 5 s), Nm (para los minutos, por ejemplo, 12 m) y Nh (para las horas, por ejemplo, 2 h). Por ejemplo,
10s
El valor predeterminado es 10 s. - dlqDirectory: Es la ruta de acceso que almacenará los registros no procesados con el motivo por el que no se pudieron procesar. El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. El valor predeterminado es suficiente en la mayoría de las condiciones.
- bigtableChangeStreamMetadataInstanceId: Es el ID de la instancia de metadatos de flujos de cambios de Bigtable. La configuración predeterminada es vacía.
- bigtableChangeStreamMetadataTableTableId: Es el ID de la tabla de metadatos del conector de flujos de cambios de Bigtable. Si no se proporciona, una tabla de metadatos de conectores de transmisión de cambios de Bigtable se crea de forma automática durante la ejecución de la canalización. La configuración predeterminada es vacía.
- bigtableChangeStreamCharset: El nombre del conjunto de caracteres de los flujos de cambios de Bigtable. La configuración predeterminada es UTF-8.
- bigtableChangeStreamStartTimestamp: Es la marca de tiempo de inicio (https://tools.ietf.org/html/rfc3339), inclusiva, que se usará para leer los flujos de cambios. Por ejemplo,
2022-05-05T07:59:59Z
. El valor predeterminado es la marca de tiempo de la hora de inicio de la canalización. - bigtableChangeStreamIgnoreColumnFamilies: Una lista separada por comas de los cambios en el nombre de la familia de columnas que se deben ignorar (opcional). La configuración predeterminada es vacía.
- bigtableChangeStreamIgnoreColumns: Una lista separada por comas de los cambios en el nombre de la columna que se deben ignorar. Ejemplo: "cf1:col1,cf2:col2". La configuración predeterminada es vacía.
- bigtableChangeStreamName: Un nombre único para la canalización del cliente (opcional). Te permite reanudar el procesamiento desde el momento en que se detuvo una canalización que estaba en ejecución. El valor predeterminado es un nombre generado automáticamente. Consulta los registros de trabajos de Dataflow para el valor usado.
- bigtableChangeStreamResume: Cuando se establece en
true
, una canalización nueva reanuda el procesamiento desde el momento en que se detuvo una canalización que se ejecutaba antes con el mismo valorbigtableChangeStreamName
. Si nunca se ejecuta la canalización con el valorbigtableChangeStreamName
determinado, no se inicia una canalización nueva. Cuando se establece enfalse
, se inicia una canalización nueva. Si ya se ejecutó una canalización con el mismo valorbigtableChangeStreamName
para una fuente determinada, no se inicia una canalización nueva. La configuración predeterminada esfalse
. - bigtableReadChangeStreamTimeoutMs: Es el tiempo de espera para las solicitudes de ReadChangeStream de Bigtable en milisegundos.
- bigtableReadProjectId: Es el ID del proyecto de Bigtable. El valor predeterminado es el proyecto para el trabajo de Dataflow.
Ejecuta la plantilla
Console
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Bigtable Change Streams to Vector Search template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haz clic en Ejecutar trabajo.
gcloud CLI
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN,\ embeddingByteSize=EMBEDDING_BYTE_SIZE,\ vectorSearchIndex=VECTOR_SEARCH_INDEX,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
REGION_NAME
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
EMBEDDING_COLUMN
: la columna de embeddingEMBEDDING_BYTE_SIZE
: el tamaño en bytes del array de embeddings Puede ser 4 o 8.VECTOR_SEARCH_INDEX
: la ruta del índice de búsqueda de vectoresBIGTABLE_CHANGE_STREAM_APP_PROFILE
: el ID de perfil de la aplicación de BigtableBIGTABLE_READ_INSTANCE_ID
: el ID de la instancia de Bigtable de origenBIGTABLE_READ_TABLE_ID
: el ID de la tabla de origen de Bigtable
API
Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN", "embeddingByteSize": "EMBEDDING_BYTE_SIZE", "vectorSearchIndex": "VECTOR_SEARCH_INDEX", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search", "environment": { "maxWorkers": "10" } } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto Google Cloud en el que deseas ejecutar el trabajo de DataflowJOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
LOCATION
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
EMBEDDING_COLUMN
: la columna de embeddingEMBEDDING_BYTE_SIZE
: el tamaño en bytes del array de embeddings Puede ser 4 o 8.VECTOR_SEARCH_INDEX
: la ruta del índice de búsqueda de vectoresBIGTABLE_CHANGE_STREAM_APP_PROFILE
: el ID de perfil de la aplicación de BigtableBIGTABLE_READ_INSTANCE_ID
: el ID de la instancia de Bigtable de origenBIGTABLE_READ_TABLE_ID
: el ID de la tabla de origen de Bigtable