Dataflow を使用して AlloyDB のリアルタイム ベクトル エンベディング パイプラインを構築する

このドキュメントでは、Dataflow を使用して AlloyDB for PostgreSQL の抽出、変換、読み込み(ETL)パイプラインを作成する方法について説明します。 Google Cloud Dataflow は、データ処理パイプラインの開発と実行のためのフルマネージド Google Cloud サービスです。

使用可能なこのドキュメントの手順は、Apache Beam と AlloyDB を使用したベクトル エンベディングの取り込み Colab に基づいています。この Colab では、Python を使用して basic_ingestion_pipeline.py 取り込みパイプラインを作成しています。このドキュメントの情報が適用されるユースケースとしては、セマンティック検索や検索拡張生成(RAG)などがあります。

この手順では、次の Dataflow パイプライン コンポーネントについて説明します。

  • AlloyDB と Dataflow の接続を設定する
  • Apache Beam VertexAITextEmbeddings ハンドラと Vertex AI テキスト エンベディング モデルを使用して AlloyDB for PostgreSQL でエンベディングを生成する
  • Dataflow でのストリーミング パイプラインの作成

始める前に

Colab を使用して Dataflow パイプラインを作成する前に、次の前提条件を満たしてください。

AlloyDB for PostgreSQL インスタンスとパイプライン コンポーネントを設定する

まず、AlloyDB for PostgreSQL インスタンスに接続するようにパイプラインを構成します。この構成には、AlloyDB 言語コネクタを使用して接続するための Google Cloud プロジェクト ID、AlloyDB for PostgreSQL インスタンス URI、ユーザー、パスワードの定義が含まれます。接続の設定について詳しくは、データベースの設定をご覧ください。

検索拡張生成(RAG)固有の Apache Beam モジュールは、次のタスクのクラスを提供します。

  • AlloyDB for PostgreSQL からのデータの取り込み
  • エンベディングの生成
  • これらのベクトル エンベディングを AlloyDB for PostgreSQL に書き戻す

パイプライン ロジックを構築する前に、必要なクラスをパイプライン コードにインポートします。パイプライン コンポーネントの詳細については、パイプライン コンポーネントのインポートをご覧ください。

サンプルデータを作成する

Apache Beam と AlloyDB を使用したベクトル エンベディングの取り込みの Colab には、パイプラインの実行に使用するためのサンプル products_data データが用意されています。パイプラインは、このサンプルデータをエンベディング モデルとともに入力として使用して、エンベディングを生成します。

詳細については、サンプルデータを作成するをご覧ください。

エンベディングを格納するテーブルを作成する

パイプラインは、生成されたエンベディングを default_dataflow_product_embeddings テーブルに保存します。テーブル スキーマの作成の詳細については、デフォルトのスキーマでテーブルを作成するをご覧ください。

省略可: エンベディングの取り込み用にデータを準備する

データセットに基づいて、エンベディング モデルがエンベディングに変換する必要があるメタデータとテキストにデータを分割できます。MLTransform() クラスと VectorDatabaseWriteTransform() クラスは、入力データをエンベディング モデルがサポートするサイズに処理します。メタデータを配置し、使用しているエンベディング モデルの仕様に従って入力データをフォーマットします。

データの準備の詳細については、プロダクト データをチャンクにマッピングするをご覧ください。

エンベディングを生成するようにエンベディング ハンドラを構成する

VertexAITextEmbeddings() クラスは、ベクトル エンベディングを作成するテキスト エンベディング モデルを定義します。このエンベディング モデルは、チャンク化されたデータをエンベディングに変換します。

詳細については、エンベディング ハンドラを構成するをご覧ください。

Huggingface SentenceTransformers フレームワークを使用して作成された事前トレーニング済みモデルを使用して、ベクトル エンベディングを生成することもできます。詳細については、HuggingFace を使用してエンベディングを生成するをご覧ください。

取り込みパイプラインを作成する

Apache Beam と AlloyDB を使用したベクトル エンベディングの取り込み Colab に用意されている basic_ingestion_pipeline.py パイプラインには、AlloyDB for PostgreSQL の設定、AlloyDB for PostgreSQL へのデータの読み込み、オプションのデータチャンク、エンベディング ハンドラの構成など、先行するセクションの構成が組み込まれています。

取り込みパイプラインは次の処理を行います。

  • 商品データテーブルを作成する
  • データをチャンクに変換する
  • エンベディングを生成する
  • 変換されたエンベディングを AlloyDB for PostgreSQL の products_data テーブルに書き込む

このパイプラインは、直接ローカル ランナーまたはクラウドベースのランナー(Dataflow など)を使用して実行できます。

取り込みパイプラインの作成の詳細については、パイプラインを Python ファイルに保存するをご覧ください。

Dataflow パイプラインを実行する

コマンドラインから Dataflow パイプラインを実行できます。プロジェクト ID、AlloyDB for PostgreSQL 接続の詳細、Cloud Storage バケットのロケーション、実行環境の詳細、ネットワーク情報、取り込みパイプラインの名前(basic_ingestion_pipeline.py)などの認証情報を渡します。

Apache Beam と AlloyDB を使用したベクトル エンベディングの取り込みの Colab では、AlloyDB for PostgreSQL インスタンスと Dataflow ジョブは同じ VPC ネットワークとサブネットワークで実行されます。

Dataflow でパイプラインを実行する際の詳細については、Dataflow でパイプラインを実行するをご覧ください。

Google Cloud コンソールの Dataflow ダッシュボードで、パイプラインの実行中に実行グラフ、ログ、指標を表示できます。

省略可: ストリーミング Dataflow パイプラインを実行する

類似性検索やレコメンデーション エンジンなど、頻繁に変更されることが予想されるデータについては、Dataflow と Pub/Sub を使用してストリーミング パイプラインを作成することを検討してください。

このパイプラインは、データのバッチを処理するのではなく、Pub/Sub トピックから受信メッセージを継続的に読み取り、メッセージをチャンクに変換し、指定されたモデル(Hugging Face や Vertex AI など)を使用してエンベディングを生成し、AlloyDB for PostgreSQL テーブルを更新します。

詳細については、Pub/Sub からのエンベディングの更新のストリーミングをご覧ください。

AlloyDB for PostgreSQL でベクトル エンベディングを確認する

パイプラインの実行後、パイプラインがエンベディングを AlloyDB for PostgreSQL データベースに書き込んだことを確認します。

詳細については、書き込まれたエンベディングを確認するをご覧ください。

次のステップ