Plantilla de flujos de cambios de Bigtable a la búsqueda de vectores

Esta plantilla crea una canalización de transmisión para transmitir registros de cambios de datos de Bigtable y escribirlos en la búsqueda de vectores de Vertex AI con Dataflow Runner V2.

Requisitos de la canalización

  • La instancia de origen de Bigtable debe existir.
  • La tabla de origen de Bigtable debe existir y la tabla debe tener habilitados los flujos de cambios.
  • El perfil de la aplicación de Bigtable debe existir.
  • La ruta del índice de búsqueda de vectores debe existir.

Parámetros de la plantilla

Parámetros obligatorios

  • embeddingColumn: El nombre de la columna completamente calificado en el que se almacenan las incorporaciones. En el formato cf:col.
  • embeddingByteSize: Es el tamaño en bytes de cada entrada en el array de embeddings. Usa 4 para Float y 8 para Double. La configuración predeterminada es 4.
  • vectorSearchIndex: Es el índice de Vector Search en el que se transmitirán los cambios, en el formato "projects/{projectID}/locations/{region}/indexes/{indexID}" (sin espacios al principio ni al final). Por ejemplo, projects/123/locations/us-east1/indexes/456.
  • bigtableChangeStreamAppProfile: Es el ID del perfil de la aplicación de Bigtable. El perfil de aplicación debe usar el enrutamiento de un solo clúster y permitir las transacciones de fila única.
  • bigtableReadInstanceId: Es el ID de la instancia de origen de Bigtable.
  • bigtableReadTableId: Es el ID de la tabla de origen de Bigtable.

Parámetros opcionales

  • bigtableMetadataTableTableId: Es el ID de la tabla que se usa para crear la tabla de metadatos.
  • crowdingTagColumn: El nombre de la columna completamente calificado en el que se almacena la etiqueta de agrupamiento. En el formato cf:col.
  • allowRestrictsMappings: Son los nombres de columna completamente calificados y separados por comas de las columnas que se deben usar como restricciones de allow, con su alias. En el formato cf:col->alias.
  • denyRestrictsMappings: Nombres de columna completamente calificados y separados por comas de las columnas que se deben usar como restricciones de deny, con su alias. En el formato cf:col->alias.
  • intNumericRestrictsMappings: Los nombres de columna completamente calificados y separados por comas de las columnas que se deben usar como numeric_restricts de número entero, con su alias. En el formato cf:col->alias.
  • floatNumericRestrictsMappings: Los nombres de columna completamente calificados y separados por comas de las columnas que se deben usar como numeric_restricts de número de punto flotante (4 bytes), con su alias. En el formato cf:col->alias.
  • doubleNumericRestrictsMappings: Los nombres de columna completamente calificados y separados por comas de las columnas que se deben usar como numeric_restricts dobles (8 bytes), con su alias. En el formato cf:col->alias.
  • upsertMaxBatchSize: Es la cantidad máxima de inserciones que se deben almacenar en búfer antes de insertar el lote en el índice de la búsqueda de vectores. Los lotes se enviarán cuando haya registros upsertBatchSize listos o cuando haya transcurrido el tiempo de espera upsertBatchDelay de cualquier registro. Por ejemplo, 10 La configuración predeterminada es 10.
  • upsertMaxBufferDuration: Es el retraso máximo antes de que se envíe un lote de inserciones y actualizaciones a la búsqueda de vectores.Los lotes se enviarán cuando haya registros upsertBatchSize listos o cuando haya transcurrido el tiempo upsertBatchDelay de espera de cualquier registro. Los formatos permitidos son: Ns (para los segundos, por ejemplo, 5 s), Nm (para los minutos, por ejemplo, 12 m) y Nh (para las horas, por ejemplo, 2 h). Por ejemplo, 10s El valor predeterminado es 10 s.
  • deleteMaxBatchSize: Es la cantidad máxima de eliminaciones que se deben almacenar en búfer antes de borrar el lote del índice de la Búsqueda de Vectores. Los lotes se enviarán cuando haya registros deleteBatchSize listos o cuando haya transcurrido el tiempo deleteBatchDelay de espera de cualquier registro. Por ejemplo, 10 La configuración predeterminada es 10.
  • deleteMaxBufferDuration: Es el retraso máximo antes de que se envíe un lote de eliminaciones a la búsqueda de vectores.Los lotes se enviarán cuando haya registros deleteBatchSize listos o cuando haya transcurrido el tiempo deleteBatchDelay de espera de cualquier registro. Los formatos permitidos son: Ns (para los segundos, por ejemplo, 5 s), Nm (para los minutos, por ejemplo, 12 m) y Nh (para las horas, por ejemplo, 2 h). Por ejemplo, 10s El valor predeterminado es 10 s.
  • dlqDirectory: Es la ruta de acceso que almacenará los registros no procesados con el motivo por el que no se pudieron procesar. El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. El valor predeterminado es suficiente en la mayoría de las condiciones.
  • bigtableChangeStreamMetadataInstanceId: Es el ID de la instancia de metadatos de flujos de cambios de Bigtable. La configuración predeterminada es vacía.
  • bigtableChangeStreamMetadataTableTableId: Es el ID de la tabla de metadatos del conector de flujos de cambios de Bigtable. Si no se proporciona, una tabla de metadatos de conectores de transmisión de cambios de Bigtable se crea de forma automática durante la ejecución de la canalización. La configuración predeterminada es vacía.
  • bigtableChangeStreamCharset: El nombre del conjunto de caracteres de los flujos de cambios de Bigtable. La configuración predeterminada es UTF-8.
  • bigtableChangeStreamStartTimestamp: Es la marca de tiempo de inicio (https://tools.ietf.org/html/rfc3339), inclusiva, que se usará para leer los flujos de cambios. Por ejemplo, 2022-05-05T07:59:59Z. El valor predeterminado es la marca de tiempo de la hora de inicio de la canalización.
  • bigtableChangeStreamIgnoreColumnFamilies: Una lista separada por comas de los cambios en el nombre de la familia de columnas que se deben ignorar (opcional). La configuración predeterminada es vacía.
  • bigtableChangeStreamIgnoreColumns: Una lista separada por comas de los cambios en el nombre de la columna que se deben ignorar. Ejemplo: "cf1:col1,cf2:col2". La configuración predeterminada es vacía.
  • bigtableChangeStreamName: Un nombre único para la canalización del cliente (opcional). Te permite reanudar el procesamiento desde el momento en que se detuvo una canalización que estaba en ejecución. El valor predeterminado es un nombre generado automáticamente. Consulta los registros de trabajos de Dataflow para el valor usado.
  • bigtableChangeStreamResume: Cuando se establece en true, una canalización nueva reanuda el procesamiento desde el momento en que se detuvo una canalización que se ejecutaba antes con el mismo valor bigtableChangeStreamName. Si nunca se ejecuta la canalización con el valor bigtableChangeStreamName determinado, no se inicia una canalización nueva. Cuando se establece en false, se inicia una canalización nueva. Si ya se ejecutó una canalización con el mismo valor bigtableChangeStreamName para una fuente determinada, no se inicia una canalización nueva. La configuración predeterminada es false.
  • bigtableReadChangeStreamTimeoutMs: Es el tiempo de espera para las solicitudes de ReadChangeStream de Bigtable en milisegundos.
  • bigtableReadProjectId: Es el ID del proyecto de Bigtable. El valor predeterminado es el proyecto para el trabajo de Dataflow.

Ejecuta la plantilla

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Bigtable Change Streams to Vector Search template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haz clic en Ejecutar trabajo.

gcloud CLI

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • EMBEDDING_COLUMN: la columna de embedding
  • EMBEDDING_BYTE_SIZE: el tamaño en bytes del array de embeddings Puede ser 4 o 8.
  • VECTOR_SEARCH_INDEX: la ruta del índice de búsqueda de vectores
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: el ID de perfil de la aplicación de Bigtable
  • BIGTABLE_READ_INSTANCE_ID: el ID de la instancia de Bigtable de origen
  • BIGTABLE_READ_TABLE_ID: el ID de la tabla de origen de Bigtable

API

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto Google Cloud en el que deseas ejecutar el trabajo de Dataflow
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • EMBEDDING_COLUMN: la columna de embedding
  • EMBEDDING_BYTE_SIZE: el tamaño en bytes del array de embeddings Puede ser 4 o 8.
  • VECTOR_SEARCH_INDEX: la ruta del índice de búsqueda de vectores
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: el ID de perfil de la aplicación de Bigtable
  • BIGTABLE_READ_INSTANCE_ID: el ID de la instancia de Bigtable de origen
  • BIGTABLE_READ_TABLE_ID: el ID de la tabla de origen de Bigtable