Echtzeit-Pipeline für Vektoreinbettungen für AlloyDB mit Dataflow erstellen

In diesem Dokument wird beschrieben, wie Sie mit Dataflow eine ETL-Pipeline (Extrahieren, Transformieren, Laden) für AlloyDB for PostgreSQL erstellen. Google Cloud Dataflow ist ein vollständig verwalteter Google Cloud Dienst zum Entwickeln und Ausführen von Datenverarbeitungspipelines.

Sie können die Anleitungen im Dokument verwenden, die auf dem Colab Vector Embedding Ingestion with Apache Beam and AlloyDB basieren. Dort wird Python verwendet, um die basic_ingestion_pipeline.py-Aufnahmepipeline zu erstellen. Einige der Anwendungsfälle, in denen Sie die Informationen in diesem Dokument anwenden können, sind die semantische Suche oder Retrieval Augmented Generation (RAG).

In dieser Anleitung werden die folgenden Dataflow-Pipelinekomponenten beschrieben:

  • AlloyDB- und Dataflow-Verbindung einrichten
  • Einbettungen in AlloyDB for PostgreSQL mit dem Apache Beam-Handler VertexAITextEmbeddings und dem Vertex AI-Texteinbettungsmodell generieren
  • Streaming-Pipeline in Dataflow erstellen

Hinweise

Bevor Sie die Dataflow-Pipeline mit dem Colab erstellen, müssen Sie die folgenden Voraussetzungen erfüllen:

AlloyDB for PostgreSQL-Instanz und Pipelinekomponenten einrichten

Konfigurieren Sie zuerst Ihre Pipeline, um eine Verbindung zu einer AlloyDB for PostgreSQL-Instanz herzustellen. Diese Konfiguration umfasst die Definition der Google Cloud Projekt-ID, des AlloyDB for PostgreSQL-Instanz-URI, des Nutzers und des Passworts für die Verbindung über den AlloyDB-Sprach-Connector. Weitere Informationen zum Einrichten der Verbindung finden Sie unter Datenbank einrichten.

Die RAG-spezifischen Apache Beam-Module bieten Klassen für die folgenden Aufgaben:

  • Daten aus AlloyDB for PostgreSQL aufnehmen
  • Einbettungen generieren
  • Diese Vektoreinbettungen werden in AlloyDB for PostgreSQL zurückgeschrieben.

Importieren Sie die erforderlichen Klassen in Ihren Pipelinecode, bevor Sie die Pipelinelogik erstellen. Weitere Informationen zu Pipelinekomponenten finden Sie unter Pipelinekomponenten importieren.

Beispieldaten erstellen

Das Colab-Notebook für die Aufnahme von Vektoreinbettungen mit Apache Beam und AlloyDB enthält products_data-Beispieldaten zum Ausführen der Pipeline. Die Pipeline verwendet diese Beispieldaten zusammen mit dem Einbettungsmodell als Eingabe, um Einbettungen zu generieren.

Weitere Informationen finden Sie unter Beispieldaten erstellen.

Tabelle zum Speichern von Einbettungen erstellen

Die Pipeline speichert die generierten Einbettungen in der Tabelle default_dataflow_product_embeddings. Weitere Informationen zum Erstellen des Tabellenschemas finden Sie unter Tabelle mit Standardschema erstellen.

Optional: Daten für die Aufnahme von Einbettungen vorbereiten

Anhand Ihres Datasets können Sie Ihre Daten in Metadaten und Text aufteilen, die das Einbettungsmodell in Einbettungen konvertieren muss. Die Klassen MLTransform() und VectorDatabaseWriteTransform() verarbeiten Eingabedaten in einer Größe, die vom Einbettungsmodell unterstützt wird. Fügen Sie die Metadaten ein und formatieren Sie die Eingabedaten gemäß den Spezifikationen des von Ihnen verwendeten Einbettungsmodells.

Weitere Informationen zum Vorbereiten von Daten finden Sie unter Produktinformationen Chunks zuordnen.

Einbettungs-Handler zum Generieren von Einbettungen konfigurieren

Die Klasse VertexAITextEmbeddings() definiert das Texteinbettungsmodell, mit dem Vektoreinbettungen erstellt werden. Mit diesem Einbettungsmodell werden die in Chunks aufgeteilten Daten in Einbettungen konvertiert.

Weitere Informationen finden Sie unter Embedding-Handler konfigurieren.

Sie können auch ein vortrainiertes Modell verwenden, das mit dem Huggingface SentenceTransformers-Framework erstellt wurde, um Vektoreinbettungen zu generieren. Weitere Informationen finden Sie unter Einbettungen mit HuggingFace generieren.

Erstellen einer Ingestion-Pipeline

Die basic_ingestion_pipeline.py-Pipeline, die im Vector Embedding Ingestion with Apache Beam and AlloyDB-Colab enthalten ist, enthält die Konfigurationen aus den vorherigen Abschnitten, einschließlich der Einrichtung von AlloyDB for PostgreSQL, dem Laden von Daten in AlloyDB for PostgreSQL, dem optionalen Aufteilen von Daten in Chunks und der Konfiguration des Einbettungshandlers.

Die Ingest-Pipeline führt folgende Schritte aus:

  • Produktdatentabellen erstellen
  • Daten in Chunks umwandeln
  • Generiert Einbettungen
  • Schreibt die konvertierten Einbettungen in die Tabelle products_data in AlloyDB for PostgreSQL

Sie können diese Pipeline mit einem direkten lokalen Runner oder einem cloudbasierten Runner wie Dataflow ausführen.

Weitere Informationen zum Erstellen der Ingestionspipeline finden Sie unter Pipeline in einer Python-Datei speichern.

Dataflow-Pipeline ausführen

Sie können eine Dataflow-Pipeline über die Befehlszeile ausführen. Übergeben Sie Anmeldedaten wie Ihre Projekt-ID, AlloyDB for PostgreSQL-Verbindungsdetails, den Speicherort des Cloud Storage-Bucket, Details zur Ausführungsumgebung, Netzwerkinformationen und den Namen der Ingestion-Pipeline (basic_ingestion_pipeline.py).

Im Colab Vector Embedding Ingestion with Apache Beam and AlloyDB (Aufnahme von Vektoreinbettungen mit Apache Beam und AlloyDB) werden die AlloyDB for PostgreSQL-Instanz und die Dataflow-Jobs im selben VPC-Netzwerk und ‑Subnetzwerk ausgeführt.

Weitere Informationen zum Ausführen einer Pipeline in Dataflow finden Sie unter Pipeline in Dataflow ausführen.

In der Google Cloud Console können Sie im Dataflow-Dashboard Ausführungsgrafiken, Logs und Messwerte ansehen, während Ihre Pipeline ausgeführt wird.

Optional: Dataflow-Streamingpipeline ausführen

Für Daten, die sich voraussichtlich häufig ändern, z. B. Ähnlichkeitssuchen oder Empfehlungs-Engines, sollten Sie eine Streamingpipeline mit Dataflow und Pub/Sub erstellen.

Anstatt einen Batch von Daten zu verarbeiten, liest diese Pipeline kontinuierlich eingehende Nachrichten aus einem Pub/Sub-Thema, wandelt die Nachrichten in Chunks um, generiert Einbettungen mit einem angegebenen Modell (z. B. Hugging Face oder Vertex AI) und aktualisiert die AlloyDB for PostgreSQL-Tabelle.

Weitere Informationen finden Sie unter Streaming von Einbettungsaktualisierungen aus Pub/Sub.

Vektoreinbettungen in AlloyDB for PostgreSQL überprüfen

Prüfen Sie nach der Ausführung der Pipeline, ob die Einbettungen in Ihre AlloyDB for PostgreSQL-Datenbank geschrieben wurden.

Weitere Informationen finden Sie unter Geschriebene Einbettungen überprüfen.

Nächste Schritte