MLTransform を使用してエンベディングを生成する

このページでは、機械学習(ML)モデルのトレーニング用データを準備するために MLTransform 機能を使用する理由と方法について説明します。具体的には、このページでは、MLTransform を使用してエンベディングを生成してデータを処理する方法について説明します。

MLTransform は、複数のデータ処理変換を 1 つのクラスに統合することで、Apache Beam ML データ処理オペレーションをワークフローに適用するプロセスを簡素化しています。

データ処理のステップがハイライト表示された Dataflow ML ワークフローの図。

図 1. 完全な Dataflow ML ワークフロー。ワークフローの前処理ステップで MLTransform を使用します。

エンベディングの概要

エンベディングは、最新のセマンティック検索アプリケーションと検索拡張生成(RAG)アプリケーションに不可欠です。エンベディングにより、システムはより深くより概念的なレベルで情報を理解し、操作できます。セマンティック検索では、エンベディングによってクエリとドキュメントがベクトル表現に変換されます。これらの表現は、基盤となる意味と関係性を捉えています。そのため、キーワードが完全に一致しない場合でも、関連性の高い検索結果を見つけることが可能です。これは、標準的なキーワード ベースの検索を大きく上回るものです。エンベディングは、商品のおすすめにも使用できます。これには、画像とテキストを使用するマルチモーダル検索、ログ分析、重複除去などのタスクが含まれます。

RAG では、エンベディングは、ナレッジベースから関連するコンテキストを取得して、大規模言語モデル(LLM)のレスポンスをグラウンディングするうえで重要な役割を果たします。RAG システムは、ユーザーのクエリとナレッジベース内の情報のチャンクの両方をエンベディングすることで、最も意味的に類似した情報を効率的に特定して取得できます。このセマンティック マッチングにより、LLM は正確で有益な回答を生成するために必要な情報にアクセスできます。

エンベディング用のデータを取り込んで処理する

チャンク化とエンベディング生成のデータ処理ステップがハイライト表示されたエンベディング ナレッジの取り込みの図。

図 2.ナレッジの取り込み図。これは、入力マルチモーダル ドキュメント データと、チャンク化とエンベディング生成の 2 つの処理ステップを示しています。チャンク化は、エンベディング生成の前に複雑なデータに使用される前処理ステップです。データが処理されると、エンベディングがベクトル データベースに保存されます。

コア エンベディングのユースケースでは、知識の取り込みと処理の方法が重要な考慮事項となります。この取り込みは、バッチ方式またはストリーミング方式で行えます。この知識のソースは多岐にわたります。たとえば、この情報は Cloud Storage に保存されているファイルから取得することも、Pub/Sub や Google Cloud Managed Service for Apache Kafka などのストリーミング ソースから取得することもできます。

ストリーミング ソースの場合、データ自体は未加工のコンテンツ(例: 書式なしテキスト)またはドキュメントを指す URI である可能性があります。通常、最初のステージでは、ソースに関係なく、情報の前処理が行われます。生テキストの場合、基本的なデータ クレンジングなど、最小限の処理になることがあります。ただし、大きなドキュメントや複雑なコンテンツの場合は、重要なステップはチャンク化です。チャンク化には、ソースデータを管理しやすい小さな単位に分割することが含まれます。最適なチャンク化戦略は標準化されておらず、特定のデータとアプリケーションによって異なります。Dataflow などのプラットフォームには、さまざまなチャンク化のニーズに対応する機能が組み込まれており、この重要な前処理ステージを簡素化できます。

利点

MLTransform クラスには次のようなメリットがあります。

  • データをベクトル データベースに push したり、推論を実行するために使用できるエンベディングを生成します。
  • 複雑なコードの記述や基盤となるライブラリの管理を行わずに、データを変換できます。
  • 複数のタイプの処理オペレーションを 1 つのインターフェースで効率的に連結できます。

サポートと制限事項

MLTransform クラスには次の制限があります。

  • Apache Beam Python SDK バージョン 2.53.0 以降を使用するパイプラインで使用できます。
  • パイプラインでは、デフォルト ウィンドウを使用する必要があります。

テキスト エンベディング変換:

ユースケース

サンプル ノートブックは、特定のユースケースで MLTransform を使用する方法を示しています。

Vertex AI を使用して LLM のテキスト エンベディングを生成したい
Apache Beam MLTransform クラスと Vertex AI text-embeddings API を使用して、テキスト エンベディングを生成します。テキスト エンベディングは、テキストを数値ベクトルとして表現する方法であり、多くの自然言語処理(NLP)タスクで必要になります。
Hugging Face を使用して LLM のテキスト エンベディングを生成したい
Apache Beam MLTransform クラスと Hugging Face Hub モデルを使用して、テキスト エンベディングを生成します。Hugging Face SentenceTransformers フレームワークは、Python を使用して文、テキスト、画像のエンベディングを生成します。
テキスト エンベディングを生成して AlloyDB for PostgreSQL に取り込みたい
Apache Beam(特に MLTransform クラスと Hugging Face Hub モデル)を使用して、テキスト エンベディングを生成します。次に、VectorDatabaseWriteTransform を使用して、これらのエンベディングと関連するメタデータを AlloyDB for PostgreSQL に読み込みます。このノートブックでは、AlloyDB for PostgreSQL ベクトル データベースにデータを入力するためのスケーラブルなバッチとストリーミング Beam データ パイプラインの構築方法を示します。これには、Pub/Sub や既存のデータベース テーブルなどのさまざまなソースからのデータの処理、カスタム スキーマの作成、データの更新が含まれます。
テキスト エンベディングを生成して BigQuery に取り込みたい
Apache Beam MLTransform クラスと Hugging Face Hub モデルを使用して、商品カタログなどのアプリケーション データからテキスト エンベディングを生成します。これには Apache Beam HuggingfaceTextEmbeddings 変換が使用されます。この変換では、文とテキストのエンベディングを生成するモデルを提供する Hugging Face SentenceTransformers フレームワークを使用します。生成されたエンベディングとそのメタデータは、Apache Beam VectorDatabaseWriteTransform を使用して BigQuery に取り込まれます。このノートブックでは、拡充変換を使用して BigQuery でベクトル類似度検索を行う方法も示します。

使用可能な変換の完全なリストについては、Apache Beam ドキュメントの変換をご覧ください。

エンベディング生成に MLTransform を使用する

MLTransform クラスを使用して情報をチャンク化し、エンベディングを生成するには、次のコードをパイプラインに含めます。

  
  def create_chunk(product: Dict[str, Any]) -> Chunk:
    return Chunk(
        content=Content(
            text=f"{product['name']}: {product['description']}"
        ),
        id=product['id'],  # Use product ID as chunk ID
        metadata=product,  # Store all product info in metadata
    )

  [...]
  with beam.Pipeline() as p:
    _ = (
            p
            | 'Create Products' >> beam.Create(products)
            | 'Convert to Chunks' >> beam.Map(create_chunk)
            | 'Generate Embeddings' >> MLTransform(
  write_artifact_location=tempfile.mkdtemp())
  .with_transform(huggingface_embedder)
            | 'Write to AlloyDB' >> VectorDatabaseWriteTransform(alloydb_config)
        )

上記の例では、要素ごとに 1 つのチャンクが作成されますが、代わりに LangChain を使用してチャンクを作成することもできます。

splitter = CharacterTextSplitter(chunk_size=100, chunk_overlap=20)
provider = beam.ml.rag.chunking.langchain.LangChainChunker(
document_field='content', metadata_fields=[], text_splitter=splitter)

with beam.Pipeline() as p:
_ = (
p
| 'Create Products' >> beam.io.textio.ReadFromText(products)
| 'Convert to Chunks' >> provider.get_ptransform_for_processing()

次のステップ