Plantilla de flujos de cambios de Bigtable a la búsqueda de vectores

Esta plantilla crea una canalización de transmisión para transmitir registros de cambios de datos de Bigtable y escribirlos en la búsqueda de vectores de Vertex AI con Dataflow Runner V2.

Requisitos de la canalización

  • La instancia de origen de Bigtable debe existir.
  • La tabla de origen de Bigtable debe existir y la tabla debe tener habilitados los flujos de cambios.
  • El perfil de la aplicación de Bigtable debe existir.
  • La ruta del índice de búsqueda de vectores debe existir.

Parámetros de la plantilla

Parámetro Descripción
embeddingColumn El nombre de la columna completamente calificado en el que se almacenan las incorporaciones. En el formato cf:col.
embeddingByteSize El tamaño en bytes de cada entrada en el array de embeddings. Usa 4 para el número de punto flotante y 8 para el doble. La configuración predeterminada es 4.
vectorSearchIndex El índice de búsqueda de vectores en el que se transmitirán los cambios, en el formato 'projects/{projectID}/locations/{region}/indexes/{indexID}' (sin espacios al principio ni al final). Por ejemplo: projects/123/locations/us-east1/indexes/456.
bigtableChangeStreamAppProfile El perfil de la aplicación que se usa para distinguir las cargas de trabajo en Bigtable.
bigtableReadInstanceId El ID de la instancia de Bigtable que contiene la tabla.
bigtableReadTableId La tabla de Bigtable de la que se leerá.
bigtableMetadataTableTableId Opcional: ID de la tabla de metadatos que se crea. Si no se configura, Bigtable generará un ID.
crowdingTagColumn Opcional: El nombre de la columna completamente calificado en la que se almacena la etiqueta de agrupamiento, en el formato cf:col.
allowRestrictsMappings Opcional: Los nombres de columna separados por comas y completamente calificados de las columnas que se usarán como restricciones de allow, más sus alias. El nombre de cada columna debe tener el formato cf:col->alias.
denyRestrictsMappings Opcional: Los nombres de columna separados por comas y completamente calificados de las columnas que se usarán como restricciones de deny, más sus alias. El nombre de cada columna debe tener el formato cf:col->alias.
intNumericRestrictsMappings Opcional: Los nombres de columna separados por comas y completamente calificados de las columnas que se usarán como números enteros numeric_restricts, más sus alias. El nombre de cada columna debe tener el formato cf:col->alias.
floatNumericRestrictsMappings Opcional: Los nombres de columna separados por comas y completamente calificados de las columnas que se usarán como números de punto flotante (4 bytes) numeric_restricts, más sus alias. El nombre de cada columna debe tener el formato cf:col->alias
doubleNumericRestrictsMappings Opcional: Los nombres de columna separados por comas y completamente calificados de las columnas que se usarán como dobles (8 bytes) numeric_restricts, más sus alias. El nombre de cada columna debe tener el formato cf:col->alias
upsertMaxBatchSize Opcional: La cantidad máxima de inserciones que se deben almacenar en búfer antes de insertar el lote en el índice de la búsqueda de vectores. Los lotes se envían cuando hay upsertBatchSize registros listos. Ejemplo: 10.
upsertMaxBufferDuration Opcional: El retraso máximo antes de que se envíe un lote de inserciones y actualizaciones a la búsqueda de vectores. Los lotes se envían cuando hay upsertBatchSize registros listos. Los formatos permitidos son los siguientes: Ns para los segundos (por ejemplo: 5 s), Nm para los minutos (por ejemplo: 12 min) y Nh para las horas (por ejemplo: 2 h). Valor predeterminado: 10s.
deleteMaxBatchSize Opcional: La cantidad máxima de eliminaciones que se deben almacenar en búfer antes de borrar el lote del índice de búsqueda de vectores. Los lotes se envían cuando hay deleteBatchSize registros listos. Por ejemplo: 10.
deleteMaxBufferDuration Opcional: El retraso máximo antes de enviar un lote de eliminaciones a la búsqueda de vectores. Los lotes se envían cuando hay deleteBatchSize registros listos. Los formatos permitidos son siguientes: Ns para los segundos (por ejemplo: 5 s), Nm para los minutos (por ejemplo: 12 min) y Nh para las horas (por ejemplo: 2 h). Valor predeterminado: 10s.
dlqDirectory Opcional: La ruta de acceso que almacenará los registros no procesados con el motivo por el que no se pudieron procesar. El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. El valor predeterminado es apropiado para la mayoría de las situaciones.
bigtableChangeStreamMetadataInstanceId Opcional: La instancia de Bigtable que se usará para la tabla de metadatos del conector de flujos de cambios. La configuración predeterminada es vacía.
bigtableChangeStreamMetadataTableTableId Opcional: El ID de la tabla de metadatos del conector de flujos de cambios de Bigtable que se usará. Si no se proporciona, una tabla de metadatos del conector de flujos de cambios de Bigtable se creará de forma automática durante el flujo de la canalización. La configuración predeterminada es vacía.
bigtableChangeStreamCharset Opcional: El nombre del charset de flujos de cambios de Bigtable cuando se leen valores y calificadores de columnas. La configuración predeterminada es UTF-8.
bigtableChangeStreamStartTimestamp Opcional: La fecha y hora de inicio, inclusive, que se usará para leer los flujos de cambios (https://tools.ietf.org/html/rfc3339). Por ejemplo, 2022-05-05T07:59:59Z. El valor predeterminado es la marca de tiempo del inicio de la canalización.
bigtableChangeStreamIgnoreColumnFamilies Opcional: Una lista separada por comas de nombres de familias de columnas cuyos cambios no se capturarán. La configuración predeterminada es vacía.
bigtableChangeStreamIgnoreColumns Opcional: Una lista separada por comas de nombres de columnas cuyos cambios no se capturarán. La configuración predeterminada es vacía.
bigtableChangeStreamName Un nombre único para la canalización del cliente (opcional). Este parámetro te permite reanudar el procesamiento desde el momento en el que se detiene una canalización que se ejecutaba antes. El valor predeterminado es un nombre generado automáticamente. Consulta los registros de trabajos de Dataflow para el valor usado.
bigtableChangeStreamResume

Opcional: Cuando se configura como true, una canalización nueva reanuda el procesamiento desde el momento en que se detiene una canalización que se ejecutaba antes con el mismo nombre. Si una canalización con ese nombre nunca se ejecutó en el pasado, la canalización nueva no se inicia. Usa el parámetro bigtableChangeStreamName para especificar la línea de canalización.

Cuando se configura como false, se inicia una canalización nueva. Si una canalización con el mismo nombre que bigtableChangeStreamName ya se ejecutaba en el pasado para la fuente dada, la canalización nueva no se inicia.

La configuración predeterminada es "false".

bigtableReadProjectId Opcional: Proyecto desde el que se leerán los datos de Bigtable. El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow.

Ejecuta la plantilla

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Bigtable Change Streams to Vector Search template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haz clic en Ejecutar trabajo.

CLI de gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • EMBEDDING_COLUMN: la columna de embedding
  • EMBEDDING_BYTE_SIZE: el tamaño en bytes del array de embeddings Puede ser 4 o 8.
  • VECTOR_SEARCH_INDEX: la ruta del índice de búsqueda de vectores
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: el ID de perfil de la aplicación de Bigtable
  • BIGTABLE_READ_INSTANCE_ID: el ID de la instancia de Bigtable de origen
  • BIGTABLE_READ_TABLE_ID: el ID de la tabla de origen de Bigtable

API

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • EMBEDDING_COLUMN: la columna de embedding
  • EMBEDDING_BYTE_SIZE: el tamaño en bytes del array de embeddings Puede ser 4 o 8.
  • VECTOR_SEARCH_INDEX: la ruta del índice de búsqueda de vectores
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: el ID de perfil de la aplicación de Bigtable
  • BIGTABLE_READ_INSTANCE_ID: el ID de la instancia de Bigtable de origen
  • BIGTABLE_READ_TABLE_ID: el ID de la tabla de origen de Bigtable