リアルタイムで埋め込みを行う類似度マッチング システムの構築

この記事では、機械学習を使用して特定のアイテムに類似したアイテムを見つける手法である類似度マッチングの概要について説明します。また、テキスト セマンティック検索をリアルタイムで行うサンプル ソリューションとその実行方法についても説明します。このサンプル ソリューションは、realtime-embeddings-matching GitHub リポジトリにあります。

この記事では、機械学習のコンセプトを理解し、Google Cloud や Apache Beam などのツールに精通していることを前提としています。

概要

検索システムやレコメンデーション エンジンでは、特定のクエリに似たアイテムを見つけることが中核的な機能となります。たとえば、類似度マッチングは、次のようなものを検索する際に有効な手段となります。

  • ペットの画像に類似した画像
  • 検索クエリに関連するニュース記事
  • 過去に見た映画や聞いた音楽に似ている映画や音楽
  • おすすめの商品やサービス

類似度マッチングのシステムを設計する場合、まず、アイテムを数値ベクトルで表す必要があります。このベクトルは、機械学習(ML)で見つかったアイテムの意味的な埋め込みを表します。詳しくは、概要: 機械学習のための特徴埋め込みの抽出と提供をご覧ください。

次に、ユーザーのクエリの埋め込みベクトルに類似するアイテムを検索できるように、これらの埋め込みを最近傍探索用に(類似度指標に基づいて)整理し、格納する必要があります。ただし、おすすめをリアルタイムで検索、取得、提供するには類似度マッチングが必要で、近似最近傍アルゴリズムを使用してアイテムの埋め込みにインデックスを作成し、類似アイテムの探索プロセスを高速化するのが実用的です。

この記事で説明するサンプルは、次のようなソリューションで使用されます。

  • ウィキペディアのタイトルのテキスト埋め込みを抽出する
  • tf.Hub のユニバーサル センテンス エンコーダの使用
  • Spotify の Annoy ライブラリを使用して近似類似度マッチングのインデックスを作成する
  • ウェブアプリでリアルタイム セマンティック検索にインデックスを提供する

サンプル ソリューションのコードは、realtime-embeddings-matching GitHub リポジトリにあります。

近似類似度マッチング

マッチング検索の場合、一般的な手順は次のようになります。

  1. アイテムとクエリを適切な特徴空間のベクトルに変換します。これらの特徴を埋め込みといいます。
  2. 埋め込みベクトルのペアに近接度を定義します。これは、コサイン類似度またはユークリッド距離で定義します。
  3. アイテムセット全体に対して明示的な検索を使用して、最近傍を探索します。

アイテム数が数百または数千の場合、アイテムセット全体を探索してクエリベクトルとアイテム ベクトルの類似度を計算しても、許容できる時間内に処理が完了します。また、オンラインで結果を必要としないバッチジョブで類似度マッチングを行う場合にも、許容できるパフォーマンスを得ることができます。しかし、リアルタイムの検索システムや推薦システムを提供し、アイテム数が数千万件に及ぶ場合、近似的な最近傍探索が必要になります。この場合、レスポンスのレイテンシを短くするため、プロセスの最適化を行う必要があります。

実用的な解決策は、近似類似度マッチングを使用することです。近似類似度マッチングでは、類似アイテムをすばやく取得できるようにアイテムのベクトルをインデックスに整理します。指定したクエリに最も類似したアイテムが取得されない可能性もありますが、通常、インデックスの精度とレイテンシ(とサイズ)の間で調整を行うことができます。

近似類似度マッチングには、ツリーベースとハッシング ベースの 2 つの方法があります。

ツリーベースの方法

ツリーベース(または指標ツリーデータ構造)では、相互に類似したベクトルをツリー内に配置し、分割統治の方法でデータを再帰的に分割します。予想されるクエリの処理時間は O(log(n)) で、n は対象のアイテム数(ベクトル数)を表します。ツリー インデックスは大量のメモリを必要とするため、データの次元数が多くなると、パフォーマンスが低下します。ツリーベース(指標ツリーデータ構造)の使用例としては、次のようなものがあります。

ハッシング ベースの方法

ツリーベースではなく、ハッシュを使用する方法もあります。ツリーとは異なり、ハッシュでは再帰的なパーティショニングは行われません。ここでは、アイテムをコードに変換するモデルを学習します。この場合、類似アイテムが同じまたは類似のコードが生成されます(ハッシュの競合)。この方法では、必要なメモリが大幅に少なくなります。予想されるクエリの処理時間は O(1) ですが、n が劣線形になることもあります。ここで、n は対象のアイテム数(ベクトル数)です。ハッシング ベースの使用例としては、次のようなものがあります。

近似類似度マッチングを実装するオープンソース ライブラリがいくつかあります。精度、クエリのレイテンシ、メモリ効率、インデックスの作成時間、特徴、使いやすさを検討して、最適なライブラリを選択してください。

この記事で説明するサンプル ソリューションでは、Annoy(Approximate Nearest Neighbors Oh Yeah)を使用しています。これは、曲の推奨を行うために Spotify が構築したライブラリです。Annoy は、ランダム プロジェクション ツリーを構築する Python バインディングを含む C ++ ライブラリです。インデックスは k ツリーのフォレストで構築します。k は精度とパフォーマンスのトレードオフを調整するパラメータです。また、多くのプロセスでデータを共有できるように、メモリにマッピングされる読み取り専用の大規模なファイルベースのデータ構造を作成します。

この他にも NMSLIB(非距離空間ライブラリ)や Faiss(Facebook AI の類似度検索)などがよく利用されています。近似類似度マッチングの実装に使用するライブラリは、ソリューション アーキテクチャ全体やこの記事で説明するワークフローに影響を及ぼしません。

この記事で説明するサンプル ソリューションでは、テキスト セマンティック検索で埋め込みの類似度マッチングを使用しています。このソリューションは、入力検索クエリに対して意味的に関連のあるドキュメント(ニュース記事、ブログ投稿、研究論文など)をリアルタイムで取得することを目的としています。

トークンベースの検索技術では、ドキュメント内での検索語句(個々または組み合わせ)のオカレンス(時期、頻度など)に基づいてドキュメントを取得します。これに対して、セマンティック検索ではマッチングにクエリとドキュメントの埋め込みを使用します。たとえば、後述の検索ウェブアプリの照会で説明するように、クエリで「tropical wild animals」を検索すると、検索結果には「in the african jungle, it's every lion, wildebeest and crocodile for itself! bbc wildlife」というタイトルが含まれます。クエリで使用した単語が結果に含まれていませんが、熱帯の野生動物を説明する記事のタイトルが返されています。

Wikipedia BigQuery データセット

この例では、データソースに BigQuery の bigquery-samples:wikipedia_benchmark.Wiki100B データセットを使用しています。この公開データセットには、Wikipedia のタイトルに基づいて 1,000 億個のエントリが格納されています。この例のデータは、閲覧回数が 3 回以上、単語数が 5 単語以上、文字数が 500 文字未満の一意のタイトルに限定されています。このフィルタにより、タイトルが約 1,050 万個に絞り込まれています。

システムの技術的要件

サンプルのセマンティック検索システムには、次の技術的要件があります。

  • セマンティクスまたは Wikipedia のタイトルをエンコードするベクトル表現(つまり埋め込み)を探す労力を最小限にする。このため、言語モデルをゼロからトレーニングするのではなく、トレーニング済みのテキスト埋め込みモデルを使用する必要があります。
  • 埋め込みを抽出してインデックスを構築するために使用する専用のコンピューティング インフラストラクチャを最小限にする。このため、ジョブの十分なリソース(メモリと CPU)を取得し、ジョブの終了時にリソースを解放するフルマネージドのオンデマンド コンピューティング サービスを使用します。
  • 埋め込み抽出プロセスを自動的にスケーリングする。この例では、並列データ処理サービスを使用する必要があります。
  • クエリのインデックスで類似の埋め込みを検索する際のレイテンシを最小限に抑える。このため、インデックスをメモリに完全に読み込む必要があります。
  • 埋め込みベクトルに類似した Wikipedia のタイトルをリアルタイムで取得する際のレイテンシを最小限に抑える。このため、低レイテンシの読み取りデータベースに Wikipedia のタイトルを格納する必要があります。
  • ウェブアプリとして検索サービスをデプロイする際の DevOps の作業を最小限に抑える。このため、フルマネージドのサービスを使用する必要があります。
  • ウェブアプリのワークロードの増加に対応する(レイテンシの平均時間を 1 秒未満にし、1 秒あたりに数千件のクエリを処理できるようにする)。このため、検索ウェブアプリの複数のノードをデプロイし、ロードバランサをデプロイする必要があります。

ソリューション アーキテクチャ

図 1 は、リアルタイムでのテキスト セマンティック検索システムの概要を示しています。このシステムでは、Wikipedia のタイトルから埋め込みを抽出して、Annoy を使用して近似類似度マッチングのインデックスを作成し、このインデックスを使用してセマンティック検索をリアルタイムで行います。

サンプル ソリューションのアーキテクチャ

図 1. テキスト セマンティック検索システムのソリューション アーキテクチャの概要

アーキテクチャの主要コンポーネント

次の表に、図 1 の主要コンポーネントを示します。

コンポーネント 説明
BigQuery BigQuery は、Google が提供するペタバイト規模の低料金フルマネージド アナリティクス データ ウェアハウスです。このサンプル ソリューションでソースとして使用する Wikipedia のタイトルは BigQuery に格納されています。
Apache Beam Apache Beam は、ストリーミングとバッチの両方のデータ処理を行うオープンソースの統合プログラミング フレームワークです。サンプル ソリューションでは、Apache Beam を使用して埋め込み抽出と ID 格納のパイプラインを実装し、Datastore でタイトルの検索を行います。
Dataflow Dataflow は、Google Cloud 上で Apache Beam パイプラインを大規模に実行するための、信頼性の高いフルマネージド サーバーレス サービスです。Dataflow は、入力テキストの処理と埋め込み抽出のスケーリングに使用されます。
tf.Hub TensorFlow Hub は、再利用可能な機械学習モジュールのライブラリです。このソリューションでは、ユニバーサル センテンス エンコーダによって事前トレーニングされたテキスト埋め込みモジュールを使用して、各タイトルを埋め込みベクトルに変換します。
Cloud Storage Cloud Storage は、バイナリラージ オブジェクトを格納する可用性と耐久性に優れたストレージです。このサンプル ソリューションでは、抽出された埋め込みを TFRecords として Cloud Storage に格納します。また、近似類似度マッチングのインデックスが作成された後、インデックスがシリアル化され、Cloud Storage に格納されます。
Datastore Datastore は、自動スケーリングと高パフォーマンスを実現し、アプリケーション開発を簡素化するように構築された NoSQL ドキュメント データベースです。Datastore を使用して Wikipedia のタイトルと ID を格納することで、レイテンシを抑えながら、これらの情報をリアルタイムで取得できます。
AI Platform AI Platform は、ML モデルのトレーニングを大規模に行うためのサーバーレス サービスです。このサンプル ソリューションでは、AI Platform を使用することで、専用のコンピューティング インフラストラクチャを用意することなく、Annoy ライブラリを使用して近似類似度マッチングのインデックスを作成します。
App Engine App Engine を使用すると、フルマネージドのプラットフォームを使用して、スケーラブルで信頼性の高いアプリケーションを構築し、デプロイできます。このサンプル ソリューションでは、App Engine を使用して Flask ウェブアプリを提供し、ユーザークエリに意味的に関連する Wikipedia タイトルを検索します。App Engine では、増加する QPS に対応するため、単純な構成のみを使用して負荷分散を行い、アプリの複数のインスタンスをデプロイできます。

全体的なワークフロー

図 1 に示したリアルタイムのテキスト セマンティック検索システムのワークフローは、以下のステップに分けることができます。

  1. Dataflow を使用して埋め込みを抽出する

    1. BigQuery から Wikipedia のタイトルを読み込みます。
    2. ユニバーサル センテンス エンコーダ モジュールを使用して、タイトルの埋め込みを抽出します。
    3. 抽出した埋め込みを TFRecord として Cloud Storage に格納します。
    4. リアルタイムで取得できるように、タイトルとその ID を Datastore に格納します。
  2. AI Platform を使用してインデックスを作成する

    1. Cloud Storage に格納された埋め込みを Annoy インデックスに読み込みます。
    2. メモリ内でインデックスを作成します。
    3. インデックスをディスクに保存します。
    4. 保存したモデルを Cloud Storage にアップロードします。
  3. App Engine を使用して検索アプリを提供する

    1. Cloud Storage から Annoy インデックスをダウンロードします。
    2. ユーザークエリを取得します。
    3. ユニバーサル センテンス エンコーダ モジュールを使用して、クエリの埋め込みを抽出します。
    4. Annoy インデックスを使用して、クエリの埋め込みに類似した埋め込みを見つけます。
    5. 類似した埋め込みのアイテム ID を取得します。
    6. Datastore に格納した ID を使用して、Wikipedia のタイトルを取得します。
    7. 結果を返します。

実際の検索システム

実際の検索システムでは、セマンティック ベースの検索技術とトークンベースの技術(転置インデックス)を組み合わせることがよくあります。これらの結果は、ユーザーに提供される前に組み合わされてランク付けされます。このタスクには ElasticsearchGoogle Cloud Marketplace で入手可能)を使用できます。これは、Apache Lucene ライブラリをベースにした全文検索の転置インデックスでよく利用されているフレームワークです。

このソリューションでは使用しませんが、最適化の手段として Memorystore などでクエリと関連タイトルの ID をキャッシュに格納する方法も実際のシステムではよく利用されています。同じクエリがすでに実行されている場合、タイトル ID を Memorystore から直接取得できるので、ユニバーサル センテンス エンコーダを呼び出してクエリの埋め込みを生成する必要がなくなります。また、類似アイテムの近似一致インデックスを検索する必要もありません。これにより、負荷の高い 2 つのオペレーションをスキップできます。クエリ リクエストの冗長性レベルによっては、クエリをキャッシュに保存することでシステムの平均レイテンシを改善できます。図 2 は、クエリ キャッシュを使用した場合のワークフローです。

キャッシュを使用したソリューションのアーキテクチャ

図 2. クエリ キャッシュを使用したテキスト セマンティック検索のソリューション アーキテクチャの概要

図 2 は次のフローを表しています。

  1. 検索クエリを受け取ります。
  2. キャッシュ内のクエリを探します。
  3. クエリが見つからない場合:
    1. クエリから埋め込みを抽出します。
    2. インデックスで類似アイテムを検索します。
    3. キャッシュを更新します。
  4. Datastore に格納された ID でアクセス権を取得します。
  5. 結果を返します。

サービスとアクセス権を有効にする

図 1 のエンドツーエンド ソリューションでは、Cloud Console で次のサービス API を有効にする必要があります。

さらに、次の権限をサービス アカウントに付与する必要があります。同じ Google Cloud プロジェクトに属している場合、デフォルトのサービス アカウントには必要なリソースにアクセスできる十分なアクセス権が付与されています。ただし、サービス アカウントの権限が変更されている場合は、変更が必要になることがあります。必要な権限は次のとおりです。

  • Dataflow
    • BigQuery データセットに対する読み取り権限
    • TFRecord が格納されている Cloud Storage バケットに対する読み取り / 書き込み権限
    • Datastore への書き込み権限
  • AI Platform
    • インデックスが格納されている Cloud Storage バケットに対する読み取り / 書き込み権限
  • App Engine
    • インデックスが格納されている Cloud Storage バケットに対する読み取り権限
    • Datastore に対する読み取り権限

次のセクションのコード スニペットは、この記事で説明したコンセプトを表しています。エンドツーエンドのサンプルの実行方法については、関連する GitHub リポジトリREADME.md ファイルをご覧ください。

Dataflow で埋め込みを抽出する

Wikipedia のタイトルから埋め込みを抽出するパイプラインは、Apache Beam を使用して pipeline.py に実装されています。次のコード スニペットに全体的なパイプラインを示します。

def run(pipeline_options, known_args):

 pipeline = beam.Pipeline(options=pipeline_options)
 gcp_project = pipeline_options.get_all_options()['project']

 with impl.Context(known_args.transform_temp_dir):
   articles = (
       pipeline
       | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(
     project=gcp_project, query=get_source_query(known_args.limit),
     use_standard_sql=True)))

   articles_dataset = (articles, get_metadata())
   embeddings_dataset, _ = (
       articles_dataset
       | 'Extract embeddings' >> impl.AnalyzeAndTransformDataset(
preprocess_fn))

   embeddings, transformed_metadata = embeddings_dataset

   embeddings | 'Write embeddings to TFRecords' >> beam.io.tfrecordio.WriteToTFRecord(
     file_path_prefix='{0}'.format(known_args.output_dir),
     file_name_suffix='.tfrecords',
     coder=tft_coders.example_proto_coder.ExampleProtoCoder(
transformed_metadata.schema))

   (articles
       | "Convert to entity" >> beam.Map(
lambda input_features: create_entity(input_features, known_args.kind))
       | "Write to Datastore" >> WriteToDatastore(project=gcp_project))

...

 job = pipeline.run()

 if pipeline_options.get_all_options()['runner'] == 'DirectRunner':
   job.wait_until_finish()

BigQuery から読み取る

パイプラインの最初のステップは、Wikipedia BigQuery データセットからのタイトルの読み取りです。この処理には、beam.io.Read メソッドと beam.io.BigQuerySource オブジェクトを使用します。pipeline.pyget_source_query メソッドは、データの取得に使用する SQL スクリプトを準備します。BigQuery から取得する Wikipedia のタイトル数は、get_source_query 関数の limit パラメータで構成できます。

def get_source_query(limit=1000000):
 query = """
   SELECT
     GENERATE_UUID() as id,
     text
   FROM
   (
       SELECT
         DISTINCT LOWER(title) text
       FROM
         `bigquery-samples.wikipedia_benchmark.Wiki100B`
       WHERE
         ARRAY_LENGTH(split(title,' ')) >= 5
       AND
         language = 'en'
       AND
         LENGTH(title) < 500
    )
   LIMIT {0}
 """.format(limit)
 return query

組み込みの BigQuery GENERATE_UUID 関数を使用して、ID がタイトル(ここでは id)に追加されます。この値は、Datastore に格納された ID で Wikipedia のタイトルを検索し、Wikipedia のタイトルをその埋め込みに関連付けるために使用されます。

Beam パイプラインのこのステップは PCollection オブジェクトを返します。コレクション内の各アイテムには、id(文字列)と title(文字列)の 2 つの要素が含まれます。

埋め込みを抽出する

パイプラインの 2 番目のステップは、tf.Hub のユニバーサル センテンス エンコーダ モジュールの使用です。このモジュールを使用して、BigQuery から読み込まれた Wikipedia のタイトルの埋め込みベクトルを抽出します。このモジュールを実行するため、この例では、TensorFlow Transformtf.Transform)API を使用しています。

TensorFlow Transform は、Apache Beam でデータを前処理するためのライブラリです。この例では、tf.Hub モジュールの呼び出しコンテキストとして tf.TransformAnalyzeAndTransformDataset メソッドを使用して、テキスト埋め込みを抽出します。

次のスニペットに示すように、AnalyzeAndTransformDataset メソッドは変換ロジックを含む preprocess_fn 関数を実行します。

def preprocess_fn(input_features):
 import tensorflow_transform as tft
 embedding = tft.apply_function(embed_text, input_features['text'])
 output_features = {
   'id': input_features['id'],
   'embedding': embedding
 }
 return output_features

def embed_text(text):
 import tensorflow_hub as hub
 global encoder
 if encoder is None:
   encoder = hub.Module(
'https://tfhub.dev/google/universal-sentence-encoder/2')
 embedding = encoder(text)
 return embedding

パイプラインのこのステップでは、他の PCollection オブジェクトを生成します。コレクション内の各アイテムには、Wikipedia のタイトルの id 値(文字列)とユニバーサル センテンス エンコーダから抽出した embedding 値(数値配列)が含まれます。この配列の次元数は 512 です。

TFRecords に埋め込みを書き込む

Wikipedia のタイトルの埋め込みを抽出した後、beam.io.tfrecordio.WriteToTFRecord メソッドを使用して、これらのタイトルと ID を TFRecord として Cloud Storage に格納します。

TFRecord は、一連のバイナリ レコードを格納するシンプルな形式です。TFRecord ファイルの各レコードは tf.Example プロトコル バッファで、Key-Value のマッピングを表す柔軟なメッセージ タイプから構成されます。このタイプは、構造化データのシリアル化に役立ちます。
作成する埋め込みファイルの数を指定するには、WriteToTFRecord メソッドの num_shards パラメータを設定します。

Datastore への書き込み

次のステップは、Datastore への書き込みです。このステップは、埋め込みの抽出ステップと並行して実行されます。これは、Wikipedia のタイトルを Datastore に保存し、ID でタイトルを取得できるようにすることを目的にしています。Wikipedia のタイトル ID は、埋め込みと一緒に TFRecord ファイルに保存されるため、Annoy インデックスに追加されるアイテム(埋め込みベクトル)の ID としても使用できます。

BigQuery からの読み取りで生成されたアイテムを Datastore に格納するには、pipeline.py のスニペットのコードを使用して、各アイテムを Datastore エンティティに変換する必要があります。

def create_entity(input_features, kind):
 entity = entity_pb2.Entity()
 datastore_helper.add_key_path(
   entity.key, kind, input_features['id'])
 datastore_helper.add_properties(
   entity, {
     "text": unicode(input_features['text'])
   })
 return entity

このコードの実行後、WriteToDatastore メソッドが Datastore にアイテムを格納します。図 3 に、Datastore kind パラメータに wikipedia を設定してパイプラインを実行した後に Datastore に書き込まれたエンティティの一部を示します。

画像

図 3. パイプライン実行後の Datastore エンティティ

Dataflow でパイプラインを実行する

Apache Beam パイプラインを実行するには、必要な引数を渡して --runner 引数を DataflowRunner に設定し、run.py スクリプトを実行します。この操作を行うには、run.sh スクリプト ファイルで構成パラメータを設定し、run.py スクリプトを実行します。

次のコマンドは、パイプラインの実行方法を示しています。このスクリプトには、run.sh スクリプトの実行時に設定される変数(たとえば、$OUTPUT_PREFIX)が含まれています。

python run.py \
 --output_dir=$OUTPUT_PREFIX \
 --transform_temp_dir=$TRANSFORM_TEMP_DIR \
 --transform_export_dir=$TRANSFORM_EXPORT_DIR \
 --project=$PROJECT \
 --runner=$RUNNER \
 --region=$REGION \
 --kind=$KIND \
 --limit=$LIMIT \
 --staging_location=$STAGING_LOCATION \
 --temp_location=$TEMP_LOCATION \
 --setup_file=$(pwd)/setup.py \
 --job_name=$JOB_NAME \
 --worker_machine_type=$MACHINE_TYPE \
 --enable_debug \
 --debug_output_prefix=$DEBUG_OUTPUT_PREFIX

Cloud Console で、図 4 のような Dataflow パイプラインの流れを確認できます。

Cloud Console に表示される Cloud Dataflow パイプライン

図 4. Cloud Console に表示されるパイプラインを表す Dataflow 実行グラフ

AI Platform でインデックスを作成する

このサンプル ソリューションでは、Wikipedia のタイトルから埋め込みベクトルを抽出した後、Annoy ライブラリを使用して、これらのベクトルに近似類似度マッピングのインデックスを作成します。このソリューションの index_builder フォルダに、このタスクで使用できるコードが含まれています。

まず、インデックスの構築と保存を行うタスクを実装します。次に、AI Platform で実行するタスクを送信します。これにより、専用のインフラストラクチャを用意しなくても、インデックスを作成できます。

インデックス ビルダータスクを実装する

task.py ファイルは、次の手順を行うインデックス ビルダーのエントリ ポイントとなります。

  • Annoy インデックスを作成します。
  • (オプション)インデックスを圧縮します。
  • 作成したアーティファクトを Cloud Storage にアップロードします。

index.py モジュールに含まれる次のコード スニペットは、Annoy インデックスを構築するロジックの一例です。

def build_index(embedding_files_pattern, index_filename,
                num_trees=100):

 annoy_index = AnnoyIndex(VECTOR_LENGTH, metric=METRIC)
 mapping = {}

 embed_files = tf.gfile.Glob(embedding_files_pattern)
 logging.info('{} embedding files are found.'.format(len(embed_files)))

 item_counter = 0
 for f, embed_file in enumerate(embed_files):
   logging.info('Loading embeddings in file {} of {}...'.format(f, len(embed_files)))
   record_iterator = tf.python_io.tf_record_iterator(path=embed_file)

   for string_record in record_iterator:
     example = tf.train.Example()
     example.ParseFromString(string_record)
     string_identifier = example.features.feature['id'].bytes_list.value[0]
     mapping[item_counter] = string_identifier
     embedding = np.array(example.features.feature['embedding'].float_list.value)
     annoy_index.add_item(item_counter, embedding)
     item_counter += 1

   logging.info('Loaded {} items to the index'.format(item_counter))

 logging.info('Start building the index with {} trees...'.format(num_trees))
 annoy_index.build(n_trees=num_trees)
 logging.info('Index is successfully built.')
 logging.info('Saving index to disk...')
 annoy_index.save(index_filename)
 logging.info('Index is saved to disk.')
 logging.info('Saving mapping to disk...')
 with open(index_filename + '.mapping', 'wb') as handle:
   pickle.dump(mapping, handle, protocol=pickle.HIGHEST_PROTOCOL)
 logging.info('Mapping is saved to disk.')

手順は次のとおりです。

  1. 指定したパターンに一致するすべての埋め込みファイルの名前を取得します。
  2. 埋め込みファイルごとに、次の操作を行います。
    1. TFRecord ファイルで tf.Example インスタンスを繰り返します。
    2. string_identifier(ID)を読み取り、値として mapping 辞書に追加します。ここで、キーは現在の item_counter 値になります。
    3. embedding ベクトルを読み取り、annoy_index に追加します。ここで、item_id 値は現在の item_counter 値に設定されます。
  3. num_trees 値を指定して、annoy_index.build メソッドを呼び出します。
  4. annoy_index.save メソッドを呼び出して、インデックスを格納します。
  5. pickle.dump メソッドを使用して、mapping 辞書をシリアル化します。

mapping 辞書を使用するのは、Datastore に保存されている Wikipedia のタイトルの ID が文字列のためです。この ID は、データが BigQuery から読み取られるときに GENERATE_UUID メソッドで生成されます。Annoy インデックス内のアイテム(埋め込みベクトル)の ID は整数のため、辞書を作成して、サロゲート整数インデックスと Wikipedia のアイテムの文字列 ID のマッピングを行います。

AnnoyIndex コンストラクタに渡された METRIC 値は角度で、これはコサイン類似度のバリエーションです。VECTOR_LENGTH 値は 512 に設定されています。これは、ユニバーサル センテンス エンコーダ モジュールによって生成されたテキスト埋め込みの長さです。

埋め込みベクトルの数と num_trees パラメータの値によっては、保存されるインデックスのサイズは数 GB になります。したがって、インデックスを Cloud Storage にアップロードするには、ソリューションでチャンクをサポートする API を使用する必要があります。task.py の次のコード スニペットのように、サンプル ソリューションは google.cloud.storage ではなく googleapiclient.http.MediaFileUpload メソッドを使用します。

media = MediaFileUpload(
 local_file_name, mimetype='application/octet-stream', chunksize=CHUNKSIZE,
 resumable=True)
request = gcs_services.objects().insert(
 bucket=bucket_name, name=gcs_location, media_body=media)
response = None
while response is None:
 progress, response = request.next_chunk()

インデックス ビルダータスクを AI Platform に送信する

このサンプル ソリューションでは、AI Platform ジョブとしてインデックス ビルダータスクを実行するときに、次のファイルを使用します。

  • submit.sh。このファイルを更新して、インデックス出力のプロジェクト、バケット名、リージョンの変数を設定します。
  • config.yaml。このファイルの scale_tier パラメータに、ジョブの実行に使用するマシンのサイズを指定します。
  • setup.py。このファイルには、ジョブに必要なパッケージを指定します。サンプル ソリューションでは、Annoygoogle-api-python-client が必要です。

これらのファイルを更新した後、submit.sh スクリプトを実行してビルダータスクを AI Platform ジョブとして送信します。スクリプトには次のコマンドが含まれています。

gcloud ml-engine jobs submit training ${JOB_NAME} \
    --job-dir=${JOB_DIR} \
    --runtime-version=1.12 \
    --region=${REGION} \
    --scale-tier=${TIER} \
    --module-name=builder.task \
    --package-path=${PACKAGE_PATH}  \
    --config=config.yaml \
    -- \
    --embedding-files=${EMBED_FILES} \
    --index-file=${INDEX_FILE} \
    --num-trees=${NUM_TREES}

インデックスのサイズによっては、ジョブに数時間かかることがあります。この時間は、ベクトルの数とその次元数、インデックスの作成に使用されるツリーの数によって異なります。

インデックスを作成する AI Platform ジョブが完了すると、指定した Cloud Storage バケット内で次のアーティファクトが使用可能になります。

  • gs://your_bucket/wikipedia/index/embeds.index
  • gs://your_bucket/wikipedia/index/embeds.index.mapping

セマンティック検索サービスを実装する

このセクションでは、Datastore から関連の Wikipedia のタイトルを取得するため、以前に作成された Annoy のインデックスを使用するセマンティック検索サービス ユーティリティを実装します。セマンティック検索サービスは以下のユーティリティを使用します。

  • クエリ埋め込みユーティリティ
  • 埋め込みマッチング ユーティリティ
  • Datastore 検索ユーティリティ
  • 検索サービス ラッパー

クエリ埋め込みユーティリティ

ユーザーが検索クエリを入力すると、クエリの埋め込みを抽出し、インデックス内で類似する埋め込みと照合します。この処理は、embedding.py の次のコード スニペットで実行します。

class EmbedUtil:

 def __init__(self):
   logging.info("Initialising embedding utility...")
   embed_module = hub.Module(
"https://tfhub.dev/google/universal-sentence-encoder/2")
   placeholder = tf.placeholder(dtype=tf.string)
   embed = embed_module(placeholder)
   session = tf.Session()
   session.run([tf.global_variables_initializer(), tf.tables_initializer()])
   logging.info('tf.Hub module is loaded.')

   def _embeddings_fn(sentences):
     computed_embeddings = session.run(
       embed, feed_dict={placeholder: sentences})
     return computed_embeddings

   self.embedding_fn = _embeddings_fn
   logging.info("Embedding utility initialised.")

 def extract_embeddings(self, query):
   return self.embedding_fn([query])[0]

コードは次の処理を行います。

  1. tf.Hub からユニバーサル センテンス エンコーダを読み込みます。
  2. ユーザークエリのテキストを受け取る extract_embeddings メソッドを提供します。
  3. クエリのセンテンス エンコード(埋め込み)を返します。

このコードにより、EmbedUtil メソッドはクラスのコンストラクタに tf.Hub モジュールを 1 度だけ読み込みます。extract_embeddings メソッドが呼び出されるたびに読み込まれることはありません。この処理を行うのは、ユニバーサル センテンス エンコーダ モジュールの読み込みに数秒かかることがあるためです。

埋め込みマッチング ユーティリティ

matching.py で実装した MatchingUtil クラスは、マッピング辞書の読み込みだけでなく、ローカル ディスク ファイルから Annoy インデックスを読み込みます。MatchingUtil クラスの実装は、次のコード スニペットで行います。

class MatchingUtil:

 def __init__(self, index_file):
   logging.info("Initialising matching utility...")
   self.index = AnnoyIndex(VECTOR_LENGTH)
   self.index.load(index_file, prefault=True)
   logging.info("Annoy index {} is loaded".format(index_file))
   with open(index_file + '.mapping', 'rb') as handle:
     self.mapping = pickle.load(handle)
   logging.info("Mapping file {} is loaded".format(index_file + '.mapping'))
   logging.info("Matching utility initialised.")

 def find_similar_items(self, vector, num_matches):
   item_ids = self.index.get_nns_by_vector(
     vector, num_matches, search_k=-1, include_distances=False)
   identifiers = [self.mapping[item_id]
                 for item_id in item_ids]
   return identifiers

インデックスはクラスのコンストラクタに読み込まれます。このコードでは、index.load メソッドの prefault パラメータが True に設定されているため、インデックス ファイル全体がメモリに読み込まれます。

このクラスは、次の処理を行う find_similar_items メソッドも公開しています。

  1. ベクトル(ユーザークエリの埋め込みベクトル)を受け取ります。
  2. 特定のベクトルに最も類似した埋め込みの item_ids(整数 ID)を Annoy index で検索します。
  3. mapping 辞書から identifiers(GUID 文字列 ID)を取得します。
  4. identifiers オブジェクトを返します。このオブジェクトは、Datastore から Wikipedia のタイトルを取得するために使用されます。

Datastore 検索ユーティリティ

次のスニペットは lookup.pyDatastoreUtil クラスを示しています。これは、Datastore から Wikipedia のタイトルを取得するためのロジックを実装しています。コンストラクタは Datastore の kind 値を使用します。この値は、タイトルが属するエンティティを表しています。

class DatastoreUtil:

 def __init__(self, kind):
   logging.info("Initialising datastore lookup utility...")
   self.kind = kind
   self.client = datastore.Client()
   logging.info("Datastore lookup utility initialised.")

 def get_items(self, keys):
   keys = [self.client.key(self.kind, key)
           for key in keys]
   items = self.client.get_multi(keys)
   return items

get_items メソッドは、ID のリストである keys パラメータを受け取り、これらのキーに関連付けられている Datastore items オブジェクトを返します。

検索サービス ラッパー

次のスニペットは search.pySearchUtil クラスを示しています。これは、前述のユーティリティ モジュールのラッパーとして機能します。

class SearchUtil:

 def __init__(self):
   logging.info("Initialising search utility...")
   dir_path = os.path.dirname(os.path.realpath(__file__))
   service_account_json = os.path.join(dir_path, SERVICE_ACCOUNT_JSON)
   index_file = os.path.join(dir_path, INDEX_FILE)
   download_artifacts(index_file, GCS_BUCKET, GCS_INDEX_LOCATION)
   self.match_util = matching.MatchingUtil(index_file)
   self.embed_util = embedding.EmbedUtil()
   self.datastore_util = lookup.DatastoreUtil(KIND, service_account_json)
   logging.info("Search utility is up and running.")

 def search(self, query, num_matches=10):
   query_embedding = self.embed_util.extract_embeddings(query)
   item_ids = self.match_util.find_similar_items(query_embedding, num_matches)
   items = self.datastore_util.get_items(item_ids)
   return items

SearchUtil のコンストラクタでは、download_artifacts メソッドを使用して、Annoy インデックス ファイルとシリアル化されたマッピング辞書が Cloud Storage からローカル ディスクにダウンロードされます。その後、match_utilembed_utildatastore_util オブジェクトが初期化されます。

search メソッドは、ユーザー検索の query パラメータと、取得する一致数を指定する num_matches パラメータを使用します。search メソッドは、次のメソッドを呼び出します。

  • embed_util.extract_embeddings メソッドは、ユニバーサル センテンス エンコーダ モジュールを使用して、クエリの埋め込みベクトルを取得します。
  • match_util.find_similar_items メソッドは、Annoy インデックスでクエリ埋め込みに類似するアイテムの ID を検索します。
  • datastore_util.get_items メソッドは、item_ids のアイテムを Datastore から取得します。ここに、Wikipedia のタイトルが含まれています。

検索後は、アイテムを返す前に、インデックスによって生成されたアイテムを類似度でランク付けします。

App Engine で検索サービスを提供する

このセクションでは、セマンティック検索サービスをウェブアプリとして提供し、App Engine にデプロイする方法について説明します。

Flask ウェブアプリを実装する

main.py の次のコード スニペットでは、Wikipedia のタイトルにセマンティック検索を行う Flask ウェブアプリを実装します。

...
search_util = utils.search.SearchUtil()
app = Flask(__name__)

@app.route('/search', methods=['GET'])
def search():
   try:
       query = request.args.get('query')
       show = request.args.get('show')
       is_valid, error = validate_request(query, show)

       if is_valid:
           results = search_util.search(query, show)
       else:
           results = error

   except Exception as error:
       results = "Unexpected error: {}".format(error)

   response = jsonify(results)
   return response

if __name__ == '__main__':
 app.run(host='127.0.0.1', port=8080)

search_util オブジェクトは、モジュール レベルで一度だけ初期化されます。RESTful /search エンドポイントにより、HTTP GET リクエストが search メソッドにリダイレクトされます。このメソッドは、ユーザーの検索 query(文字列)と結果数 show(整数)を取得し、search_util.search メソッドを呼び出して、取得したマッチングを返します。

App Engine にウェブアプリをデプロイする

Flask ウェブアプリは、gunicorn を使用して HTTP ウェブサーバー ゲートウェイ インターフェース(WSGI)として App Engine フレキシブル環境にデプロイされます。App Engine にデプロイするには、次のファイルの構成が必要です。

  • app.yaml。このファイルは、Python ランタイムの構成、一般的なアプリ、ネットワーク、リソースの設定を定義します。このファイルに次の変更を加えます。

    • readiness_check セクションの app_start_timeout_sec を設定します。インデックスのダウンロードとユーティリティ オブジェクトの読み込みが完了するように、十分な時間を指定します。
    • インデックスがメモリに完全に読み込まれるように、resources セクションの memory にインデックス サイズよりも大きい値を設定します。
    • gunicorn --timeout を設定します。インデックスのダウンロードとユーティリティ オブジェクトの読み込みが完了するように、十分な時間を指定します。
    • app.yaml ファイルの resources セクションでリクエストされた CPU コア数の 2 倍から 4 倍に gunicorn --threading を設定して、同時実行性を改善します。
  • requirement.txt。ランタイムは、アプリのソース ディレクトリの requirements.txt ファイルを検索し、アプリを起動する前に pip を使用して依存関係をインストールします。

次のコマンドを含む deploy.sh スクリプトを実行して、アプリを App Engine にデプロイします。

gcloud --verbosity=info -q app deploy app.yaml --project=${PROJECT}

検索ウェブアプリにクエリを実行する

ウェブアプリを App Engine にデプロイしたら、次の URL を呼び出して検索を呼び出すことができます。

https://service_name-dot-project_name.appspot.com/search?query=query

service_name 値は、app.yaml ファイルに指定されたファイル名と同じです。query に渡されたクエリにスペースが含まれる場合は、これらを %20 に変換する必要があります。クエリ文字列に show=num_results を追加することで、検索するマッチングの数を決定できます。デフォルトは 10 です。

次の例に、検索クエリの例と、サンプル データセットに基づいて一致した Wikipedia のタイトルを示しています。

クエリ サンプル結果
Tropical wild animals 「in the african jungle, it's every lion, wildebeest and crocodile for itself! bbc wildlife
Global technology concerns 「worldwide risk of artificial intelligence
Fresh summer drinks 「great ideas for non-alcoholic mojito
Winter sports 「cross-country skiing at the fis nordic world ski championships 2007

容積測定と負荷テスト

サンプル ソリューションの作成後、パフォーマンス情報を取得するためにサンプルを実行しました。以下の表に、bigquery-samples.wikipedia_benchmark.Wiki100B データセットを使用してエンドツーエンドのサンプルを実行するために使用された設定を示します。

埋め込みの抽出

次の表は、埋め込みの抽出に使用された Dataflow ジョブの構成とその結果の実行時間を示しています。

構成
  • レコード制限: 500 万
  • 埋め込みベクトルサイズ: 512
  • vCPU: 64(32 ワーカー)
  • ワーカーのマシンタイプ: n1-highmem-2
結果
  • ジョブの実行時間: 32 分

インデックスの作成

次の表は、AI Platform ジョブを使用してインデックスを構築するタスクの構成と結果を示しています。

構成
  • Annoy インデックスのツリー数: 100
  • AI Platform スケール階層: large_model(n1-highmem-8
結果
  • ジョブの実行時間: 2 時間 56 分
  • インデックス ファイルのサイズ: 19.28 GB
  • マッピング ファイルのサイズ: 263.21 MB

検索アプリの提供

次の表は、App Engine を使用して検索アプリを提供するための構成と結果を示しています。負荷テストは、ab - Apache HTTP サーバー ベンチマーク ツールを使用して 180 秒間実行しました。

構成
  • vCPU: 6
  • メモリ: 24 GB
  • ディスク: 50 GB
  • スケーリング: 手動で 10 インスタンス
結果
  • デプロイ時間(起動と稼働): ~19 分
  • コンテナ イメージの構築とアップロード: 〜6 分
  • アプリのデプロイ: 〜13 分
  • 同時実行レベル: 1500
  • 1 秒あたりのリクエスト数: ~2500
  • レイテンシ(95 パーセンタイル): ~903 ミリ秒
  • レイテンシ(50 パーセンタイル): 〜514 ミリ秒

さらに改善する方法

現在のシステムで以下の点を改善できます。

  • サービスの提供に GPU を使用する。ユニバーサル センテンス エンコーダ モードは、アクセラレータ上で実行するとメリットがあります。Annoy ライブラリは GPU をサポートしませんが、GPU をサポートする Faiss のようなライブラリを使用すると、近似類似度マッチングのインデックスの検索時間を改善できます。ただし、App Engine は GPU の使用をサポートしていないため、GPU を使用するには、App Engine の代わりに Compute Engine または Google Kubernetes Engine(GKE)を使用する必要があります。

  • ディスクからインデックスを読み取る。コストを最適化する方法として、大容量メモリノード(例: 26 GB RAM)の代わりに、より小さいメモリノード(例: 4 GB RAM)を使用してインデックスをディスクから読み取ることができます。ディスクからインデックスを読み取る場合は、SSD を指定する必要があります。この指定を行わないと、十分なパフォーマンスが得られません。インデックスをディスク上に保持すると、サービスを提供するノードの数を増やし、システムのスループットを向上させることができます。また、システムのコストも削減できます。ただし、ディスクにインデックスを保持するには、Compute Engine または GKE を使用する必要があります。App Engine は、永続ディスクの SSD をサポートしていません。

  • ライブシステムでインデックスを更新する。新しいデータ(たとえば、新しい Wikipedia 記事)を受信すると、インデックスの更新が必要になります。これは通常、毎日または毎週実行されるバッチプロセスとして実行されます。更新後、新しいインデックスを使用するように検索アプリを更新することで、ダウンタイムの発生を防ぐことができます。

次のステップ