Créer un pipeline d'embedding vectoriel en temps réel pour AlloyDB avec Dataflow

Ce document vous explique comment créer un pipeline ETL (Extract, Transform, Load) AlloyDB pour PostgreSQL à l'aide de Dataflow. Google Cloud Dataflow est un service Google Cloud entièrement géré permettant de développer et d'exécuter des pipelines de traitement de données.

Vous pouvez suivre les instructions du document, qui sont basées sur le notebook Colab Ingestion d'embeddings vectoriels avec Apache Beam et AlloyDB, qui utilise Python pour créer le pipeline d'ingestion basic_ingestion_pipeline.py. La recherche sémantique ou la génération augmentée par récupération (RAG) sont des exemples de cas d'utilisation où vous pouvez appliquer les informations de ce document.

Ces instructions décrivent les composants suivants du pipeline Dataflow :

  • Configurer une connexion AlloyDB et Dataflow
  • Générer des embeddings dans AlloyDB pour PostgreSQL à l'aide du gestionnaire VertexAITextEmbeddings Apache Beam et du modèle d'embedding de texte Vertex AI
  • Créer un pipeline de streaming dans Dataflow

Avant de commencer

Avant de créer le pipeline Dataflow à l'aide de Colab, remplissez les conditions préalables suivantes :

Configurer votre instance AlloyDB pour PostgreSQL et les composants du pipeline

Commencez par configurer votre pipeline pour qu'il se connecte à une instance AlloyDB pour PostgreSQL. Cette configuration inclut la définition de l'ID de projet Google Cloud , de l'URI de l'instance AlloyDB pour PostgreSQL, de l'utilisateur et du mot de passe pour se connecter à l'aide du connecteur de langage AlloyDB. Pour en savoir plus sur la configuration de la connexion, consultez Configuration de la base de données.

Les modules Apache Beam spécifiques à la génération augmentée par récupération (RAG) fournissent des classes pour les tâches suivantes :

  • Ingérer des données depuis AlloyDB pour PostgreSQL
  • Génération d'embeddings
  • Réécrire ces embeddings vectoriels dans AlloyDB pour PostgreSQL

Importez les classes requises dans le code de votre pipeline avant de créer la logique du pipeline. Pour en savoir plus sur les composants de pipeline, consultez Importer des composants de pipeline.

Créer des exemples de données

Le Colab Ingestion d'embeddings vectoriels avec Apache Beam et AlloyDB fournit des exemples de données products_data pour exécuter le pipeline. Le pipeline utilise ces exemples de données comme entrée, ainsi que le modèle d'embedding, pour générer des embeddings.

Pour en savoir plus, consultez Créer des exemples de données.

Créer une table pour stocker les embeddings

Le pipeline stocke les embeddings générés dans la table default_dataflow_product_embeddings. Pour en savoir plus sur la création du schéma de table, consultez Créer une table avec un schéma par défaut.

Facultatif : Préparer les données pour l'ingestion d'embeddings

En fonction de votre ensemble de données, vous pouvez diviser vos données en métadonnées et en texte que le modèle d'embedding doit convertir en embeddings. Les classes MLTransform() et VectorDatabaseWriteTransform() traitent les données d'entrée pour les adapter à la taille acceptée par le modèle d'embedding. Incluez les métadonnées et mettez en forme les données d'entrée en fonction des spécifications du modèle d'embedding que vous utilisez.

Pour en savoir plus sur la préparation des données, consultez Mapper les données produit à des blocs.

Configurer le gestionnaire d'embedding pour générer des embeddings

La classe VertexAITextEmbeddings() définit le modèle d'embedding de texte qui crée des embeddings vectoriels. Ce modèle d'embedding convertit les données segmentées en embeddings.

Pour en savoir plus, consultez Configurer le gestionnaire d'intégration.

Vous pouvez également utiliser un modèle pré-entraîné créé avec le framework Huggingface SentenceTransformers pour générer des embeddings vectoriels. Pour en savoir plus, consultez Générer des embeddings avec HuggingFace.

Créer un pipeline d'ingestion

Le pipeline basic_ingestion_pipeline.py, fourni dans le Colab Ingestion d'embeddings vectoriels avec Apache Beam et AlloyDB, intègre les configurations des sections précédentes, y compris la configuration d'AlloyDB pour PostgreSQL, le chargement de données dans AlloyDB pour PostgreSQL, le découpage facultatif des données et la configuration du gestionnaire d'embeddings.

Le pipeline d'ingestion effectue les opérations suivantes :

  • Crée des tables de données produit
  • Convertit les données en blocs
  • Générer des embeddings
  • Écrit les embeddings convertis dans la table products_data d'AlloyDB pour PostgreSQL

Vous pouvez exécuter ce pipeline à l'aide d'un exécuteur local direct ou d'un exécuteur basé sur le cloud, tel que Dataflow.

Pour en savoir plus sur la création du pipeline d'ingestion, consultez Enregistrer notre pipeline dans un fichier Python.

Exécuter le pipeline Dataflow

Vous pouvez exécuter un pipeline Dataflow à partir de la ligne de commande. Transmettez les identifiants, tels que l'ID de votre projet, les informations de connexion AlloyDB pour PostgreSQL, l'emplacement du bucket Cloud Storage, les détails de l'environnement d'exécution, les informations réseau et le nom du pipeline d'ingestion (basic_ingestion_pipeline.py).

Dans le Colab Ingestion d'embeddings vectoriels avec Apache Beam et AlloyDB, l'instance AlloyDB pour PostgreSQL et les jobs Dataflow s'exécutent dans le même réseau VPC et le même sous-réseau.

Pour en savoir plus sur l'exécution d'un pipeline dans Dataflow, consultez Exécuter un pipeline sur Dataflow.

Dans la console Google Cloud , dans le tableau de bord Dataflow, vous pouvez afficher les graphiques d'exécution, les journaux et les métriques pendant l'exécution de votre pipeline.

Facultatif : Exécuter le pipeline Dataflow de streaming

Pour les données qui sont censées changer souvent, comme les moteurs de recherche par similarité ou de recommandation, envisagez de créer un pipeline de streaming à l'aide de Dataflow et Pub/Sub.

Au lieu de traiter un lot de données, ce pipeline lit en continu les messages entrants d'un sujet Pub/Sub, les convertit en blocs, génère des embeddings à l'aide d'un modèle spécifié (comme Hugging Face ou Vertex AI) et met à jour la table AlloyDB pour PostgreSQL.

Pour en savoir plus, consultez Diffuser les mises à jour des embeddings depuis Pub/Sub.

Valider les embeddings vectoriels dans AlloyDB pour PostgreSQL

Une fois le pipeline exécuté, vérifiez qu'il a bien écrit les embeddings dans votre base de données AlloyDB pour PostgreSQL.

Pour en savoir plus, consultez Vérifier les embeddings écrits.

Étapes suivantes