このドキュメントでは、Dataflow を使用して AlloyDB for PostgreSQL の抽出、変換、読み込み(ETL)パイプラインを作成する方法について説明します。 Google Cloud Dataflow は、データ処理パイプラインの開発と実行のためのフルマネージド Google Cloud サービスです。
使用可能なこのドキュメントの手順は、Apache Beam と AlloyDB を使用したベクトル エンベディングの取り込み Colab に基づいています。この Colab では、Python を使用して basic_ingestion_pipeline.py
取り込みパイプラインを作成しています。このドキュメントの情報が適用されるユースケースとしては、セマンティック検索や検索拡張生成(RAG)などがあります。
この手順では、次の Dataflow パイプライン コンポーネントについて説明します。
- AlloyDB と Dataflow の接続を設定する
- Apache Beam
VertexAITextEmbeddings
ハンドラと Vertex AI テキスト エンベディング モデルを使用して AlloyDB for PostgreSQL でエンベディングを生成する - Dataflow でのストリーミング パイプラインの作成
始める前に
Colab を使用して Dataflow パイプラインを作成する前に、次の前提条件を満たしてください。
- 環境を構成して、Dataflow パイプラインを作成します。
AlloyDB for PostgreSQL とその他の必要な API を有効にします。
gcloud services enable alloydb.googleapis.com cloudresourcemanager.googleapis.com \ servicenetworking.googleapis.com
Dataflow ユーザー アカウントに AlloyDB 管理者(roles/alloydb.admin)ロールを付与します。
AlloyDB for PostgreSQL インスタンスとパイプライン コンポーネントを設定する
まず、AlloyDB for PostgreSQL インスタンスに接続するようにパイプラインを構成します。この構成には、AlloyDB 言語コネクタを使用して接続するための Google Cloud プロジェクト ID、AlloyDB for PostgreSQL インスタンス URI、ユーザー、パスワードの定義が含まれます。接続の設定について詳しくは、データベースの設定をご覧ください。
検索拡張生成(RAG)固有の Apache Beam モジュールは、次のタスクのクラスを提供します。
- AlloyDB for PostgreSQL からのデータの取り込み
- エンベディングの生成
- これらのベクトル エンベディングを AlloyDB for PostgreSQL に書き戻す
パイプライン ロジックを構築する前に、必要なクラスをパイプライン コードにインポートします。パイプライン コンポーネントの詳細については、パイプライン コンポーネントのインポートをご覧ください。
サンプルデータを作成する
Apache Beam と AlloyDB を使用したベクトル エンベディングの取り込みの Colab には、パイプラインの実行に使用するためのサンプル products_data
データが用意されています。パイプラインは、このサンプルデータをエンベディング モデルとともに入力として使用して、エンベディングを生成します。
詳細については、サンプルデータを作成するをご覧ください。
エンベディングを格納するテーブルを作成する
パイプラインは、生成されたエンベディングを default_dataflow_product_embeddings
テーブルに保存します。テーブル スキーマの作成の詳細については、デフォルトのスキーマでテーブルを作成するをご覧ください。
省略可: エンベディングの取り込み用にデータを準備する
データセットに基づいて、エンベディング モデルがエンベディングに変換する必要があるメタデータとテキストにデータを分割できます。MLTransform()
クラスと VectorDatabaseWriteTransform()
クラスは、入力データをエンベディング モデルがサポートするサイズに処理します。メタデータを配置し、使用しているエンベディング モデルの仕様に従って入力データをフォーマットします。
データの準備の詳細については、プロダクト データをチャンクにマッピングするをご覧ください。
エンベディングを生成するようにエンベディング ハンドラを構成する
VertexAITextEmbeddings()
クラスは、ベクトル エンベディングを作成するテキスト エンベディング モデルを定義します。このエンベディング モデルは、チャンク化されたデータをエンベディングに変換します。
詳細については、エンベディング ハンドラを構成するをご覧ください。
Huggingface SentenceTransformers フレームワークを使用して作成された事前トレーニング済みモデルを使用して、ベクトル エンベディングを生成することもできます。詳細については、HuggingFace を使用してエンベディングを生成するをご覧ください。
取り込みパイプラインを作成する
Apache Beam と AlloyDB を使用したベクトル エンベディングの取り込み Colab に用意されている basic_ingestion_pipeline.py
パイプラインには、AlloyDB for PostgreSQL の設定、AlloyDB for PostgreSQL へのデータの読み込み、オプションのデータチャンク、エンベディング ハンドラの構成など、先行するセクションの構成が組み込まれています。
取り込みパイプラインは次の処理を行います。
- 商品データテーブルを作成する
- データをチャンクに変換する
- エンベディングを生成する
- 変換されたエンベディングを AlloyDB for PostgreSQL の
products_data
テーブルに書き込む
このパイプラインは、直接ローカル ランナーまたはクラウドベースのランナー(Dataflow など)を使用して実行できます。
取り込みパイプラインの作成の詳細については、パイプラインを Python ファイルに保存するをご覧ください。
Dataflow パイプラインを実行する
コマンドラインから Dataflow パイプラインを実行できます。プロジェクト ID、AlloyDB for PostgreSQL 接続の詳細、Cloud Storage バケットのロケーション、実行環境の詳細、ネットワーク情報、取り込みパイプラインの名前(basic_ingestion_pipeline.py
)などの認証情報を渡します。
Apache Beam と AlloyDB を使用したベクトル エンベディングの取り込みの Colab では、AlloyDB for PostgreSQL インスタンスと Dataflow ジョブは同じ VPC ネットワークとサブネットワークで実行されます。
Dataflow でパイプラインを実行する際の詳細については、Dataflow でパイプラインを実行するをご覧ください。
Google Cloud コンソールの Dataflow ダッシュボードで、パイプラインの実行中に実行グラフ、ログ、指標を表示できます。
省略可: ストリーミング Dataflow パイプラインを実行する
類似性検索やレコメンデーション エンジンなど、頻繁に変更されることが予想されるデータについては、Dataflow と Pub/Sub を使用してストリーミング パイプラインを作成することを検討してください。
このパイプラインは、データのバッチを処理するのではなく、Pub/Sub トピックから受信メッセージを継続的に読み取り、メッセージをチャンクに変換し、指定されたモデル(Hugging Face や Vertex AI など)を使用してエンベディングを生成し、AlloyDB for PostgreSQL テーブルを更新します。
詳細については、Pub/Sub からのエンベディングの更新のストリーミングをご覧ください。
AlloyDB for PostgreSQL でベクトル エンベディングを確認する
パイプラインの実行後、パイプラインがエンベディングを AlloyDB for PostgreSQL データベースに書き込んだことを確認します。
詳細については、書き込まれたエンベディングを確認するをご覧ください。