Crea una canalización de embeddings de vectores en tiempo real para AlloyDB con Dataflow

En este documento, se muestra cómo crear una canalización de extracción, transformación y carga (ETL) de AlloyDB para PostgreSQL con Dataflow. Google Cloud Dataflow es un servicio Google Cloud completamente administrado para desarrollar y ejecutar canalizaciones de procesamiento de datos.

Puedes usar las instrucciones del documento, que se basan en el Colab Vector Embedding Ingestion with Apache Beam and AlloyDB, que usa Python para crear la canalización de transferencia de basic_ingestion_pipeline.py. Algunos de los casos de uso en los que puedes aplicar la información de este documento son la búsqueda semántica o la generación mejorada por recuperación (RAG).

En estas instrucciones, se describen los siguientes componentes de la canalización de Dataflow:

  • Cómo configurar una conexión entre AlloyDB y Dataflow
  • Genera embeddings en AlloyDB para PostgreSQL con el controlador VertexAITextEmbeddings de Apache Beam y el modelo de embedding de texto de Vertex AI
  • Crea una canalización de transmisión en Dataflow

Antes de comenzar

Antes de crear la canalización de Dataflow con Colab, completa estos requisitos previos:

Configura tu instancia de AlloyDB para PostgreSQL y los componentes de la canalización

Primero, configura tu canalización para que se conecte a una instancia de AlloyDB para PostgreSQL. Esta configuración incluye la definición del ID del proyecto, el URI de la instancia de AlloyDB para PostgreSQL, el usuario y la contraseña para conectarse con el conector de lenguaje de AlloyDB. Google Cloud Para obtener más información sobre cómo configurar la conexión, consulta Configuración de la base de datos.

Los módulos de Apache Beam específicos de la generación aumentada por recuperación (RAG) proporcionan clases para las siguientes tareas:

  • Ingiere datos de AlloyDB para PostgreSQL
  • Generación de embeddings
  • Cómo escribir estas incorporaciones de vectores en AlloyDB para PostgreSQL

Importa las clases necesarias en el código de tu canalización antes de compilar la lógica de la canalización. Para obtener más información sobre los componentes de canalización, consulta Importación de componentes de canalización.

Crea datos de muestra

El Colab Vector Embedding Ingestion with Apache Beam and AlloyDB proporciona datos de muestra de products_data para ejecutar la canalización. La canalización usa estos datos de muestra como entrada, junto con el modelo de incorporación, para generar incorporaciones.

Para obtener más información, consulta Crea datos de muestra.

Crea una tabla para almacenar embeddings

La canalización almacena los embeddings generados en la tabla default_dataflow_product_embeddings. Para obtener más información sobre cómo crear el esquema de la tabla, consulta Crea una tabla con el esquema predeterminado.

Opcional: Prepara los datos para la transferencia de incorporaciones

Según tu conjunto de datos, puedes dividir tus datos en metadatos y texto que el modelo de embedding debe convertir en embeddings. Las clases MLTransform() y VectorDatabaseWriteTransform() procesan los datos de entrada en un tamaño que admite el modelo de incorporación. Incluye los metadatos y formatea los datos de entrada según las especificaciones del modelo de incorporación que uses.

Para obtener más información sobre la preparación de datos, consulta Cómo asignar datos de productos a fragmentos.

Configura el controlador de embeddings para generar embeddings

La clase VertexAITextEmbeddings() define el modelo de incorporación de texto que crea incorporaciones vectoriales. Este modelo de incorporación convierte los datos segmentados en incorporaciones.

Para obtener más información, consulta Cómo configurar Embedding Handler.

También puedes usar un modelo previamente entrenado que se creó con el framework de SentenceTransformers de Hugging Face para generar embeddings de vectores. Para obtener más información, consulta Genera embeddings con HuggingFace.

Crea una canalización de transferencia de datos

La canalización basic_ingestion_pipeline.py, que se proporciona en el Colab Ingesta de embeddings de vectores con Apache Beam y AlloyDB, incorpora las configuraciones de las secciones anteriores, incluida la configuración de AlloyDB para PostgreSQL, la carga de datos en AlloyDB para PostgreSQL, la fragmentación de datos opcional y la configuración del controlador de embeddings.

La canalización de transferencia hace lo siguiente:

  • Crea tablas de datos de productos
  • Convierte los datos en fragmentos
  • Genera embeddings
  • Escribe los embeddings convertidos en la tabla products_data de AlloyDB para PostgreSQL

Puedes ejecutar esta canalización con un ejecutor local directo o un ejecutor basado en la nube, como Dataflow.

Para obtener más información sobre cómo crear la canalización de transferencia, consulta Cómo guardar nuestra canalización en un archivo de Python.

Ejecuta la canalización de Dataflow

Puedes ejecutar una canalización de Dataflow desde la línea de comandos. Pasa credenciales, como tu ID del proyecto, detalles de conexión de AlloyDB para PostgreSQL, ubicación del bucket de Cloud Storage, detalles del entorno de ejecución, información de la red y el nombre de la canalización de transferencia (basic_ingestion_pipeline.py).

En el Colab Vector Embedding Ingestion with Apache Beam and AlloyDB, la instancia de AlloyDB para PostgreSQL y los trabajos de Dataflow se ejecutan en la misma red de VPC y subred.

Para obtener más información sobre cómo ejecutar una canalización en Dataflow, consulta Ejecuta una canalización en Dataflow.

En la consola de Google Cloud , en el panel de Dataflow, puedes ver los gráficos de ejecución, los registros y las métricas mientras se ejecuta tu canalización.

Opcional: Ejecuta la canalización de transmisión de Dataflow

Para los datos que se espera que cambien con frecuencia, como las búsquedas de similitud o los motores de recomendación, considera crear una canalización de transmisión con Dataflow y Pub/Sub.

En lugar de procesar un lote de datos, esta canalización lee continuamente los mensajes entrantes de un tema de Pub/Sub, los convierte en fragmentos, genera incorporaciones con un modelo especificado (como Hugging Face o Vertex AI) y actualiza la tabla de AlloyDB para PostgreSQL.

Para obtener más información, consulta Cómo transmitir actualizaciones de incorporaciones desde Pub/Sub.

Verifica los embeddings de vectores en AlloyDB para PostgreSQL

Después de que se ejecute la canalización, verifica que haya escrito las incorporaciones en tu base de datos de AlloyDB para PostgreSQL.

Para obtener más información, consulta Cómo verificar las incorporaciones escritas.

¿Qué sigue?