Crea una pipeline di vector embedding in tempo reale per AlloyDB con Dataflow

Questo documento mostra come creare una pipeline di estrazione, trasformazione e caricamento (ETL) di AlloyDB per PostgreSQL utilizzando Dataflow. Google Cloud Dataflow è un servizio Google Cloud completamente gestito per lo sviluppo e l'esecuzione di pipeline di elaborazione dei dati.

Puoi utilizzare le istruzioni nel documento, basate su Vector Embedding Ingestion with Apache Beam and AlloyDB Colab, che utilizza Python per creare la pipeline di importazione basic_ingestion_pipeline.py. Alcuni casi d'uso in cui puoi applicare le informazioni contenute in questo documento sono la ricerca semantica o la generazione RAG (Retrieval-Augmented Generation).

Queste istruzioni descrivono i seguenti componenti della pipeline Dataflow:

  • Configurazione di una connessione AlloyDB e Dataflow
  • Generazione di embedding in AlloyDB per PostgreSQL utilizzando il gestore Apache Beam VertexAITextEmbeddings e il modello di text embedding Vertex AI
  • Creazione di una pipeline di streaming in Dataflow

Prima di iniziare

Prima di creare la pipeline Dataflow utilizzando Colab, completa i seguenti prerequisiti:

Configura i componenti dell'istanza e della pipeline di AlloyDB per PostgreSQL

Innanzitutto, configura la pipeline per connettersi a un'istanza AlloyDB per PostgreSQL. Questa configurazione include la definizione dell'ID progetto, dell'URI dell'istanza AlloyDB per PostgreSQL, dell'utente e della password per la connessione tramite il connettore del linguaggio AlloyDB. Google Cloud Per ulteriori informazioni sulla configurazione della connessione, vedi Configurazione del database.

I moduli Apache Beam specifici per la Retrieval-Augmented Generation (RAG) forniscono classi per le seguenti attività:

  • Importazione di dati da AlloyDB per PostgreSQL
  • Generazione degli incorporamenti
  • Scrivere questi incorporamenti vettoriali in AlloyDB per PostgreSQL

Importa le classi richieste nel codice della pipeline prima di creare la logica della pipeline. Per saperne di più sui componenti della pipeline, consulta Importazione di componenti della pipeline.

Crea dati di esempio

Il notebook Colab Vector Embedding Ingestion with Apache Beam and AlloyDB fornisce dati products_data di esempio per l'esecuzione della pipeline. La pipeline utilizza questi dati di esempio come input, insieme al modello di incorporamento, per generare incorporamenti.

Per saperne di più, vedi Creare dati di esempio.

Crea una tabella per archiviare gli incorporamenti

La pipeline archivia gli incorporamenti generati nella tabella default_dataflow_product_embeddings. Per saperne di più sulla creazione dello schema della tabella, vedi Creare una tabella con lo schema predefinito.

(Facoltativo) Prepara i dati per l'importazione degli incorporamenti

In base al set di dati, puoi dividere i dati in metadati e testo che il modello di embedding deve convertire in embedding. Le classi MLTransform() e VectorDatabaseWriteTransform() elaborano i dati di input in una dimensione supportata dal modello di incorporamento. Includi i metadati e formatta i dati di input in base alle specifiche del modello di incorporamento che stai utilizzando.

Per saperne di più sulla preparazione dei dati, consulta Mappare i dati dei prodotti in blocchi.

Configura il gestore degli incorporamenti per generare incorporamenti

La classe VertexAITextEmbeddings() definisce il modello di embedding di testo che crea gli embedding vettoriali. Questo modello di embedding converte i dati suddivisi in blocchi in embedding.

Per saperne di più, consulta Configurare l'handler di incorporamento.

Puoi anche utilizzare un modello preaddestrato creato con il framework Huggingface SentenceTransformers per generare incorporamenti vettoriali. Per saperne di più, consulta Generare incorporamenti con HuggingFace.

Crea una pipeline di importazione

La pipeline basic_ingestion_pipeline.py, fornita nel Colab Vector Embedding Ingestion with Apache Beam and AlloyDB, incorpora le configurazioni delle sezioni precedenti, tra cui la configurazione di AlloyDB per PostgreSQL, il caricamento dei dati in AlloyDB per PostgreSQL, la suddivisione facoltativa dei dati in blocchi e la configurazione del gestore di incorporamenti.

La pipeline di importazione esegue le seguenti operazioni:

  • Crea tabelle di dati di prodotto
  • Converte i dati in blocchi
  • Genera incorporamenti
  • Scrive gli incorporamenti convertiti nella tabella products_data in AlloyDB per PostgreSQL

Puoi eseguire questa pipeline utilizzando un runner locale diretto o un runner basato su cloud come Dataflow.

Per ulteriori informazioni sulla creazione della pipeline di importazione, consulta Salvare la pipeline in un file Python.

Esegui la pipeline Dataflow

Puoi eseguire una pipeline Dataflow dalla riga di comando. Trasmetti le credenziali, come l'ID progetto, i dettagli di connessione di AlloyDB per PostgreSQL, la posizione del bucket Cloud Storage, i dettagli dell'ambiente di esecuzione, le informazioni di rete e il nome della pipeline di importazione (basic_ingestion_pipeline.py).

Nel blocco note Vector Embedding Ingestion with Apache Beam and AlloyDB, l'istanza AlloyDB per PostgreSQL e i job Dataflow vengono eseguiti nella stessa rete VPC e subnet.

Per saperne di più sull'esecuzione di una pipeline in Dataflow, consulta Esegui pipeline su Dataflow.

Nella console Google Cloud , nella dashboard Dataflow, puoi visualizzare grafici di esecuzione, log e metriche durante l'esecuzione della pipeline.

(Facoltativo) Esegui la pipeline Dataflow in modalità flusso

Per i dati che dovrebbero cambiare spesso, come le ricerche di similarità o i motori di suggerimenti, valuta la possibilità di creare una pipeline di streaming utilizzando Dataflow e Pub/Sub.

Anziché elaborare un batch di dati, questa pipeline legge continuamente i messaggi in entrata da un argomento Pub/Sub, li converte in blocchi, genera incorporamenti utilizzando un modello specificato (come Hugging Face o Vertex AI) e aggiorna la tabella AlloyDB per PostgreSQL.

Per ulteriori informazioni, consulta Aggiornamenti degli incorporamenti in streaming da Pub/Sub.

Verifica degli incorporamenti vettoriali in AlloyDB per PostgreSQL

Dopo l'esecuzione della pipeline, verifica che abbia scritto gli incorporamenti nel database AlloyDB per PostgreSQL.

Per ulteriori informazioni, consulta Verificare gli incorporamenti scritti.

Passaggi successivi