Building a real-time embeddings similarity matching system

This article gives an overview of approximate similarity matching, which is a technique for using machine learning to find items similar to a given item. The article also describes an end-to-end example solution for performing real-time text semantic search and explains various aspects of how you can run the example solution. The example solution is in an associated realtime-embeddings-matching GitHub repository.

The article assumes that you're familiar with machine learning concepts and have some familiarity with Google Cloud and tools like Apache Beam.

Introduction

Finding items that are similar to a given query is the core aspect of search and retrieval systems, as well as of recommendation engines. For example, similarity matching helps your users find:

  • Images similar to their pet's image.
  • News articles relevant to their search query.
  • Movies or songs similar to one they've watched or listened to.
  • Product and service recommendations.

To design a similarity matching system, you first need to represent items as numeric vectors. These vectors in turn represent semantic embeddings of the items discovered through machine learning (ML). For more details, see Overview: Extracting and serving feature embeddings for machine learning.

Second, you need to organize and store these embeddings for nearest neighbor search (based on a similarity metric) in order to find items similar to the embedding vector of the user's query. However, in order to search, retrieve, and serve recommendations in real time, similarity matching needs to be fast. Therefore, it's more practical to apply an approximate nearest-neighbor algorithm to build an index of the item embeddings to speed the process of finding similar items.

The example solution associated with this article covers the following:

  • Extracting text embeddings of Wikipedia titles.
  • Using the Universal Sentence Encoder module of tf.Hub.
  • Building an approximate similarity matching index using Spotify's Annoy library.
  • Serving the index for real-time semantic search in a web app.

The code for the example solution is in the realtime-embeddings-matching GitHub repository.

Approximate similarity matching

For matching and retrieval, a typical procedure is as follows:

  1. Convert the items and the query into vectors in an appropriate feature space. These features are referred to as embeddings.
  2. Define a proximity measure for a pair of embedding vectors. This measure could be cosine similarity or Euclidean distance.
  3. Find nearest neighbors using an explicit search over the entire item set.

If you have just hundreds or a few thousand items, searching over the entire item set to compute the similarity between your query vector and each item's vector takes an acceptable amount of time. You can also get acceptable performance if you do the similarity matching as a batch job where you don't need the results online. However, if you're serving a real-time search and retrieval system or a recommendation system, and you have tens of millions of items, finding the nearest neighbors has to be approximate. In that case, you need to optimize the process for a low-latency response.

A practical solution is to perform approximate similarity matching. Approximate similarity matching involves organizing your item vectors into an index, which is a data structure that allows fast similar-items retrieval. A potential issue is that the retrieved items might not be the items most similar to the given query. However, you can usually control the trade-offs between the index's precision and its latency (and size).

There are two main approaches for approximate similarity matching: tree-based approaches and hashing-based approaches.

Tree-based approaches

The idea behind tree-based approaches (or metric tree data structures) is to recursively partition the data in a divide-and-conquer fashion, which puts similar vectors near each other in the tree. The expected query time is O(log(n)), where n is the number of items (vectors) that you have. Tree indexes require large amounts of memory, and the performance degrades with higher-dimensional data. Examples of tree-based approaches—also referred to as metric tree data structures—include:

Hashing-based approaches

An alternative to the tree-based approach is the hash-based approach. Unlike trees, in hashes there's no recursive partitioning. The idea is to learn a model that converts an item into a code, where similar items will produce the same or similar code (hashing collision). This approach significantly reduces the memory that's needed. The expected query time is O(1), but can be sublinear in n, where n is the number of items (vectors) you have. Examples of hashing-based approaches include the following:

There are several open source libraries that implement approximate similarity matching techniques, with different trade-offs between precision, query latency, memory efficiency, time to build the index, features, and ease of use.

The example solution described in this article uses Annoy (Approximate Nearest Neighbors Oh Yeah), a library built by Spotify for music recommendations. Annoy is a C++ library with Python bindings that builds random projection trees. An index is built with a forest of k trees, where k is a tunable parameter that trades off between precision and performance. It also creates large read-only, file-based data structures that are mapped into memory so that many processes can share the data.

Other widely used libraries are NMSLIB (non-metric space library) and Faiss (Facebook AI Similarity Search). The library you use to implement approximate similarity matching shouldn't affect the overall solution architecture or the workflow discussed in this article.

The example solution described in this article illustrates an application of embeddings similarity matching in text semantic search. The goal of the solution is to retrieve semantically relevant documents (for example, news articles, blog posts, or research papers) for an input search query, and to do so in real time.

Token-based search techniques retrieve documents based on some metric (like recency or frequency) of the occurrence of the query words (individually or combined) in the documents. In contrast, semantic search uses the embeddings of the query and documents for matching. For example, as shown later in the Query the search web app section, a query might be "tropical wild animals", and the results can include a title like "in the african jungle, it's every lion, wildebeest and crocodile for itself! bbc wildlife". Notice that none of the query words appear in the result, but that the result is nonetheless an article that discusses tropical wild animals.

The Wikipedia BigQuery dataset

In the example, the data source is the bigquery-samples:wikipedia_benchmark.Wiki100B dataset in BigQuery, which is a public dataset that includes 100 billion entries based on Wikipedia titles. For the example, the data is restricted to unique titles that have more than 2 views, that have at least 5 words, and that have less than 500 characters. This filter results in about 10.5 million unique titles.

Technical requirements for the system

The example semantic search system has the following technical requirements:

  • Minimize the effort of finding a vector representation (that is, the embeddings) that encodes the semantics or the Wikipedia titles. Thus the example must use a pre-trained text-embedding model rather than training a language model from scratch.
  • Minimize the need for a dedicated compute infrastructure that extracts embeddings and builds the index. Thus the example must use fully managed, on-demand compute services that acquire sufficient resources (memory and CPU) for the job, and release them when the job finishes.
  • Automatically scale the embedding extraction process. Thus the example must use a parallel data processing service.
  • Minimize the latency for finding similar embeddings in the index for a given query. Thus the index must be fully loaded in memory.
  • Minimize the latency for fetching the Wikipedia titles for the similar embedding vectors in real time. Thus the example must store the Wikipedia titles in a low-latency read database.
  • Minimize the DevOps effort for deploying the search service as a web app. Thus the example must use fully managed services.
  • Handle increasing workload for the web app—up to thousands of queries per second (QPS) with sub-second average latency. Thus the example must be able to deploy several nodes of the search web app and deploy a load balancer.

Solution architecture

Figure 1 shows an overview of the real-time text semantic search system. The system extracts the embeddings from the Wikipedia titles, builds an approximate similarity matching index using Annoy, and serves the build index for real-time semantic search and retrieval.

Architecture of example solution

Figure 1. High-level solution architecture for the text semantic search system

Key components of the architecture

The following table explains the key components illustrated in Figure 1.

Component Description
BigQuery BigQuery is Google's fully managed, petabyte-scale, low-cost analytics data warehouse. In the example solution, the source Wikipedia titles are stored in BigQuery.
Apache Beam Apache Beam is an open source unified programming framework that runs both streaming and batch data processing jobs. The example solution uses Apache Beam to implement a pipeline to extract the embeddings and store an ID to perform title lookups in Datastore.
Dataflow Dataflow is a fully managed, serverless, reliable service for running Apache Beam pipelines at scale on Google Cloud. Dataflow is used to scale the processing of the input text and extraction of the embeddings.
tf.Hub TensorFlow Hub is a library of reusable machine learning modules. The example solution uses the Universal Sentence Encoder pre-trained text-embedding module to convert each title to an embedding vector.
Cloud Storage Cloud Storage is a highly available and durable storage for binary large objects. In the example solution, the extracted embeddings are stored in Cloud Storage as TFRecords. In addition, after the approximate similarity matching index is built, it's serialized and stored in Cloud Storage.
Datastore Datastore is a NoSQL document database built for automatic scaling, high performance, and ease of application development. The example solution uses Datastore to store the Wikipedia titles and their IDs so that they can be fetched in real time with low latency.
AI Platform AI Platform is a serverless service to train ML models at scale. The example solution uses AI Platform to build the approximate similarity matching index using the Annoy library, without the need for a dedicated compute infrastructure.
App Engine App Engine allows you to build and deploy scalable, reliable applications on a fully managed platform. The example solution uses App Engine to serve a Flask web app for searching the Wikipedia titles that are semantically relevant to a user query. App Engine allows you to deploy many instances of the app with load balancing using only a simple configuration in order to handle increasing QPS.

Overall workflow

The workflow of the real-time text semantic search system illustrated in Figure 1 can be divided into the following steps:

  1. Extract embeddings using Dataflow

    1. Read the Wikipedia titles from BigQuery.
    2. Extract the title embeddings using the Universal Sentence Encoder module.
    3. Store the extracted embeddings as TFRecords in Cloud Storage.
    4. Store the titles and their identifiers in Datastore for real-time retrieval.
  2. Build the index using AI Platform

    1. Load the embeddings from the files in Cloud Storage into the Annoy index.
    2. Build the index in memory.
    3. Save the index to disk.
    4. Upload the saved index to Cloud Storage.
  3. Serve the search app using App Engine

    1. Download the Annoy index from Cloud Storage.
    2. Get the user query.
    3. Extract the query embedding using the Universal Sentence Encoder module.
    4. Using the Annoy index, find embeddings that are similar to the query embedding.
    5. Get the item IDs of the similar embeddings.
    6. Retrieve the Wikipedia titles using the identifiers from Datastore.
    7. Return the results.

Search systems in practice

In practice, search and retrieval systems often combine semantic-based search techniques with token-based (inverted index) techniques. The results from both techniques are combined and ranked before being served to the user. You might already be familiar with Elasticsearch (available on Google Cloud Marketplace) for this task, which is a widely-used framework for full-text search based on the Apache Lucene library for inverted indexing.

Another optimization that's often implemented in real-world systems (which isn't covered in this solution) is to cache queries and their relevant title identifiers using something like Memorystore. If the query was seen before, the title identifiers can be retrieved directly from Memorystore. This skips the two expensive operations of invoking the Universal Sentence Encoder to generate the query embedding, and searching the approximate matching index for similar items. Caching the queries can frequently improve the average latency of your system, depending on the redundancy level of the query request. Figure 2 shows the workflow with a query cache.

Architecture of solution using a cache

Figure 2. High-level solution architecture for text semantic search with query cache

Figure 2 illustrates the following flow:

  1. Receive the search query.
  2. Look up the query in the cache.
  3. If the query is not found:
    1. Extract embedding from the query.
    2. Find similar items in the index.
    3. Update the cache.
  4. Get access by IDs from Datastore.
  5. Return results.

Enabling services and access permissions

The end-to-end solution described in Figure 1 requires the following service APIs to be enabled in the Cloud Console:

In addition, the following permissions need to be granted to the service accounts. The default service accounts have sufficient access permission to the required resources if they belong to the same Google Cloud project. However, if the service account permissions have been altered, you might need to make changes. The required permissions are:

  • Dataflow
    • Read permission to the BigQuery Dataset
    • Read/write permission to the Cloud Storage bucket where the TFRecords are stored
    • Write permission to Datastore
  • AI Platform
    • Read/write permission to the Cloud Storage bucket where the index is stored
  • App Engine
    • Read permission to the Cloud Storage bucket where the index is stored
    • Read permission to Datastore

The code snippets in the following sections illustrate the concepts discussed in this article. For information about how to run the end-to-end example, see the README.md file in the associated GitHub repository.

Extracting the embeddings with Dataflow

The pipeline for extracting the embedding from the Wikipedia titles is implemented in pipeline.py using Apache Beam. The overall pipeline is shown in the following code snippet:

def run(pipeline_options, known_args):

 pipeline = beam.Pipeline(options=pipeline_options)
 gcp_project = pipeline_options.get_all_options()['project']

 with impl.Context(known_args.transform_temp_dir):
   articles = (
       pipeline
       | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(
     project=gcp_project, query=get_source_query(known_args.limit),
     use_standard_sql=True)))

   articles_dataset = (articles, get_metadata())
   embeddings_dataset, _ = (
       articles_dataset
       | 'Extract embeddings' >> impl.AnalyzeAndTransformDataset(
preprocess_fn))

   embeddings, transformed_metadata = embeddings_dataset

   embeddings | 'Write embeddings to TFRecords' >> beam.io.tfrecordio.WriteToTFRecord(
     file_path_prefix='{0}'.format(known_args.output_dir),
     file_name_suffix='.tfrecords',
     coder=tft_coders.example_proto_coder.ExampleProtoCoder(
transformed_metadata.schema))

   (articles
       | "Convert to entity" >> beam.Map(
lambda input_features: create_entity(input_features, known_args.kind))
       | "Write to Datastore" >> WriteToDatastore(project=gcp_project))

...

 job = pipeline.run()

 if pipeline_options.get_all_options()['runner'] == 'DirectRunner':
   job.wait_until_finish()

Read from BigQuery

The first step in the pipeline is to read the titles from the Wikipedia BigQuery dataset using the beam.io.Read method and a beam.io.BigQuerySource object. The get_source_query method in pipeline.py prepares the SQL script that's used to retrieve the data. The number of Wikipedia titles to retrieve from BigQuery is configurable through the limit parameter of the get_source_query function.

def get_source_query(limit=1000000):
 query = """
   SELECT
     GENERATE_UUID() as id,
     text
   FROM
   (
       SELECT
         DISTINCT LOWER(title) text
       FROM
         `bigquery-samples.wikipedia_benchmark.Wiki100B`
       WHERE
         ARRAY_LENGTH(split(title,' ')) >= 5
       AND
         language = 'en'
       AND
         LENGTH(title) < 500
    )
   LIMIT {0}
 """.format(limit)
 return query

An identifier is added to the title (here, id) using the built-in BigQuery GENERATE_UUID function. This value is used to look up a Wikipedia title by ID from Datastore, and to map a Wikipedia title to its embedding.

This step in the Beam pipeline returns a PCollection object, where each item in the collection includes two elements: id (string) and title (string).

Extract embeddings

The second step in the pipeline is to use the Universal Sentence Encoder module of tf.Hub to extract an embedding vector for each Wikipedia title that's been read from BigQuery. To execute the module, the example uses the TensorFlow Transform (tf.Transform) API.

TensorFlow Transform is a library for preprocessing data with Apache Beam. The example uses the AnalyzeAndTransformDataset method of tf.Transform as the context for calling the tf.Hub module to extract text embedding.

The AnalyzeAndTransformDataset method executes the preprocess_fn function, which includes the transformation logic, as shown in the following snippet:

def preprocess_fn(input_features):
 import tensorflow_transform as tft
 embedding = tft.apply_function(embed_text, input_features['text'])
 output_features = {
   'id': input_features['id'],
   'embedding': embedding
 }
 return output_features

def embed_text(text):
 import tensorflow_hub as hub
 global encoder
 if encoder is None:
   encoder = hub.Module(
'https://tfhub.dev/google/universal-sentence-encoder/2')
 embedding = encoder(text)
 return embedding

This step of the pipeline produces another PCollection object, where each item in the collection includes the id value (a string) of the Wikipedia title and the embedding value (a numeric array) extracted from the Universal Sentence Encoder, which is 512 dimensions.

Write embeddings to TFRecords

After extracting the embeddings for the Wikipedia titles, the solution stores them along with title IDs as TFRecords in Cloud Storage, using the beam.io.tfrecordio.WriteToTFRecord method.

The TFRecord format is a simple format for storing a sequence of binary records. Each record in a TFRecord file is a tf.Example Protocol buffer, which constitutes a flexible message type that represents a key-value mapping. This type is efficient for serializing structured data.
You can specify how many embedding files are created by setting the num_shards parameter in the WriteToTFRecord method.

Write to Datastore

The next step is to write to Datastore. This step is executed in parallel with the embedding extraction step. The purpose is to store the Wikipedia titles in Datastore so that they can be retrieved by their IDs. The Wikipedia title IDs are also saved with the embedding in the TFRecord files, so that they can be used as identifiers to the items (embedding vectors) added to the Annoy index.

To store the items produced by the read from BigQuery step to Datastore, the solution first needs to convert each item to a Datastore entity, using the code in the following snippet in pipeline.py:

def create_entity(input_features, kind):
 entity = entity_pb2.Entity()
 datastore_helper.add_key_path(
   entity.key, kind, input_features['id'])
 datastore_helper.add_properties(
   entity, {
     "text": unicode(input_features['text'])
   })
 return entity

After this code runs, the WriteToDatastore method stores the items to Datastore. Figure 3 shows some of the entities written to Datastore after the pipeline has been executed with the Datastore kind parameter set to wikipedia.

image

Figure 3. Datastore entities after the pipeline executes

Execute the pipeline on Dataflow

You can execute the Apache Beam pipeline by running the run.py script, passing the required arguments, and setting the --runner argument to DataflowRunner. To do this, you can set configuration parameters in the run.sh script file and then run the run.py script.

The following command shows how to run the pipeline. The script includes a number of variables (for example, $OUTPUT_PREFIX) that are set when you run the run.sh script.

python run.py \
 --output_dir=$OUTPUT_PREFIX \
 --transform_temp_dir=$TRANSFORM_TEMP_DIR \
 --transform_export_dir=$TRANSFORM_EXPORT_DIR \
 --project=$PROJECT \
 --runner=$RUNNER \
 --region=$REGION \
 --kind=$KIND \
 --limit=$LIMIT \
 --staging_location=$STAGING_LOCATION \
 --temp_location=$TEMP_LOCATION \
 --setup_file=$(pwd)/setup.py \
 --job_name=$JOB_NAME \
 --worker_machine_type=$MACHINE_TYPE \
 --enable_debug \
 --debug_output_prefix=$DEBUG_OUTPUT_PREFIX

You can see the flow of the Dataflow pipeline in the Cloud Console; it looks like what you see in Figure 4.

Cloud Dataflow pipeline as displayed in the Cloud Console

Figure 4. Dataflow execution graph of the pipeline as displayed in the Cloud Console

Building the index with AI Platform

In the example solution, after the embedding vectors are extracted from the Wikipedia titles, the next step is to build an approximate similarity matching index for these vectors by using the Annoy library. In the example solution, the index_builder folder contains the code that you can use for this task.

First, you implement a task that builds and saves the index. Second, you submit the task to run on AI Platform. Using this approach lets you create the index without creating a dedicated computer infrastructure.

Implementing the index builder task

The task.py file is the entry point for the index builder, which performs the following steps:

  • Build the Annoy index.
  • (Optional) Compress the index.
  • Upload the produced artifacts to Cloud Storage.

The logic for building the Annoy index is shown in the following code snippet from the index.py module.

def build_index(embedding_files_pattern, index_filename,
                num_trees=100):

 annoy_index = AnnoyIndex(VECTOR_LENGTH, metric=METRIC)
 mapping = {}

 embed_files = tf.gfile.Glob(embedding_files_pattern)
 logging.info('{} embedding files are found.'.format(len(embed_files)))

 item_counter = 0
 for f, embed_file in enumerate(embed_files):
   logging.info('Loading embeddings in file {} of {}...'.format(f, len(embed_files)))
   record_iterator = tf.python_io.tf_record_iterator(path=embed_file)

   for string_record in record_iterator:
     example = tf.train.Example()
     example.ParseFromString(string_record)
     string_identifier = example.features.feature['id'].bytes_list.value[0]
     mapping[item_counter] = string_identifier
     embedding = np.array(example.features.feature['embedding'].float_list.value)
     annoy_index.add_item(item_counter, embedding)
     item_counter += 1

   logging.info('Loaded {} items to the index'.format(item_counter))

 logging.info('Start building the index with {} trees...'.format(num_trees))
 annoy_index.build(n_trees=num_trees)
 logging.info('Index is successfully built.')
 logging.info('Saving index to disk...')
 annoy_index.save(index_filename)
 logging.info('Index is saved to disk.')
 logging.info('Saving mapping to disk...')
 with open(index_filename + '.mapping', 'wb') as handle:
   pickle.dump(mapping, handle, protocol=pickle.HIGHEST_PROTOCOL)
 logging.info('Mapping is saved to disk.')

The steps are as follows:

  1. Get all the embedding filenames that match a given pattern.
  2. For each embedding file:
    1. Iterate through the tf.Example instances in the TFRecord file.
    2. Read the string_identifier (the ID) and add it to the mapping dictionary as a value, where the key is the current item_countervalue.
    3. Read the embedding vector and add it to the annoy_index, where the item_id value is set to the current item_counter value.
  3. Invoke the annoy_index.build method with the num_trees value specified.
  4. Save the index by invoking the annoy_index.save method.
  5. Serialize the mappingdictionary using the pickle.dump method.

The rationale behind having the mapping dictionary is that the identifier for the Wikipedia titles stored in Datastore is a string (it's generated using the GENERATE_UUID method when data is read from BigQuery). However, the identifier for an item (the embedding vector) in the Annoy index can only be integer. Therefore, the code creates a dictionary to map between a surrogate integer index to the string identifier for the Wikipedia item.

The METRIC value passed in the AnnoyIndex constructor is angular, which is a variation of cosine similarity. The VECTOR_LENGTH value is set to 512, which is the length of the text embedding produced by the Universal Sentence Encoder module.

The size of the saved index could be several gigabytes, depending on the number of embedding vectors and on the value of the num_trees parameter. Therefore, in order to upload the index to Cloud Storage, the solution needs to use APIs that support chunking. The example solution uses the googleapiclient.http.MediaFileUpload method instead of google.cloud.storage, as shown in the following code snippet in task.py:

media = MediaFileUpload(
 local_file_name, mimetype='application/octet-stream', chunksize=CHUNKSIZE,
 resumable=True)
request = gcs_services.objects().insert(
 bucket=bucket_name, name=gcs_location, media_body=media)
response = None
while response is None:
 progress, response = request.next_chunk()

Submit the index builder task to AI Platform

In the example solution, running the index builder task as an AI Platform job involves the following files:

  • submit.sh. This file must be updated to set variables for the project, bucket name, and region for the index output.
  • config.yaml. This file uses the scale_tier parameter to specify the machine size that's used to run the job.
  • setup.py. This file specifies the packages that are required for the job. The example solution needs Annoy and google-api-python-client.

After these files are updated, a builder task can be submitted as an AI Platform job by running the submit.shscript. The script includes the following command:

gcloud ml-engine jobs submit training ${JOB_NAME} \
    --job-dir=${JOB_DIR} \
    --runtime-version=1.12 \
    --region=${REGION} \
    --scale-tier=${TIER} \
    --module-name=builder.task \
    --package-path=${PACKAGE_PATH}  \
    --config=config.yaml \
    -- \
    --embedding-files=${EMBED_FILES} \
    --index-file=${INDEX_FILE} \
    --num-trees=${NUM_TREES}

Depending on the size of the index, the job might take several hours. The time depends on the number of vectors and their dimensionality, and on the number of trees that are used for building the index.

After the AI Platform job for building the index finishes, the following artifacts are available in the specified Cloud Storage bucket:

  • gs://your_bucket/wikipedia/index/embeds.index
  • gs://your_bucket/wikipedia/index/embeds.index.mapping

Implementing the semantic search service

This section describes the implementation of the semantic search service utilities that use the Annoy index built earlier to retrieve relevant Wikipedia titles from Datastore. The semantic search service uses the following utilities:

  • Query embedding utility
  • Embedding matching utility
  • Datastore lookup utility
  • Search service wrapper

Query embedding utility

When the user enters a search query, the solution needs to extract the embedding of the query to match it with similar ones in the index. The following code snippet in embedding.py performs the task:

class EmbedUtil:

 def __init__(self):
   logging.info("Initialising embedding utility...")
   embed_module = hub.Module(
"https://tfhub.dev/google/universal-sentence-encoder/2")
   placeholder = tf.placeholder(dtype=tf.string)
   embed = embed_module(placeholder)
   session = tf.Session()
   session.run([tf.global_variables_initializer(), tf.tables_initializer()])
   logging.info('tf.Hub module is loaded.')

   def _embeddings_fn(sentences):
     computed_embeddings = session.run(
       embed, feed_dict={placeholder: sentences})
     return computed_embeddings

   self.embedding_fn = _embeddings_fn
   logging.info("Embedding utility initialised.")

 def extract_embeddings(self, query):
   return self.embedding_fn([query])[0]

The code does the following:

  1. Loads the Universal Sentence Encoder from tf.Hub.
  2. Provides the extract_embeddings method, which accepts the text of the user query.
  3. Returns the sentence encoding (embeddings) for the query.

The code makes sure that the EmbedUtil method loads the tf.Hub module only once in the constructor of the class, not each time the extract_embeddings method is invoked. This is because loading the Universal Sentence Encoder module can take several seconds.

Embedding matching utility

The MatchingUtil class, implemented in matching.py, is responsible for loading the Annoy index from local disk file, as well as for loading the mapping dictionary. The following code snippet shows the implementation of the MatchingUtil class.

class MatchingUtil:

 def __init__(self, index_file):
   logging.info("Initialising matching utility...")
   self.index = AnnoyIndex(VECTOR_LENGTH)
   self.index.load(index_file, prefault=True)
   logging.info("Annoy index {} is loaded".format(index_file))
   with open(index_file + '.mapping', 'rb') as handle:
     self.mapping = pickle.load(handle)
   logging.info("Mapping file {} is loaded".format(index_file + '.mapping'))
   logging.info("Matching utility initialised.")

 def find_similar_items(self, vector, num_matches):
   item_ids = self.index.get_nns_by_vector(
     vector, num_matches, search_k=-1, include_distances=False)
   identifiers = [self.mapping[item_id]
                 for item_id in item_ids]
   return identifiers

The index is loaded in the class constructor. The code sets the prefault parameter in the index.load method to True so that the entire index file is loaded into memory.

The class also exposes the find_similar_items method, which does the following:

  1. Receives a vector (the embedding vector of a user query).
  2. Finds the item_ids (integer IDs) of the most similar embedding in the Annoy index for the given vector.
  3. Gets the identifiers(GUID string IDs) from the mapping dictionary.
  4. Returns the identifiers object to be used to fetch the Wikipedia titles from Datastore.

Datastore lookup utility

The following snippet shows the DatastoreUtil class in lookup.py, which implements the logic for fetching the Wikipedia titles from Datastore. The constructor takes a Datastore kind value that describes which entities the titles belong to.

class DatastoreUtil:

 def __init__(self, kind):
   logging.info("Initialising datastore lookup utility...")
   self.kind = kind
   self.client = datastore.Client()
   logging.info("Datastore lookup utility initialised.")

 def get_items(self, keys):
   keys = [self.client.key(self.kind, key)
           for key in keys]
   items = self.client.get_multi(keys)
   return items

The get_items method accepts a keys parameter, which is a list of identifiers, and returns the Datastore items object associated with these keys.

Search service wrapper

The following snippet shows the SearchUtil class in search.py, which acts as a wrapper to the utility modules described earlier.

class SearchUtil:

 def __init__(self):
   logging.info("Initialising search utility...")
   dir_path = os.path.dirname(os.path.realpath(__file__))
   service_account_json = os.path.join(dir_path, SERVICE_ACCOUNT_JSON)
   index_file = os.path.join(dir_path, INDEX_FILE)
   download_artifacts(index_file, GCS_BUCKET, GCS_INDEX_LOCATION)
   self.match_util = matching.MatchingUtil(index_file)
   self.embed_util = embedding.EmbedUtil()
   self.datastore_util = lookup.DatastoreUtil(KIND, service_account_json)
   logging.info("Search utility is up and running.")

 def search(self, query, num_matches=10):
   query_embedding = self.embed_util.extract_embeddings(query)
   item_ids = self.match_util.find_similar_items(query_embedding, num_matches)
   items = self.datastore_util.get_items(item_ids)
   return items

In the constructor of SearchUtil, the Annoy index file and the serialized mapping dictionary are downloaded from Cloud Storage to local disk using the download_artifacts method. Then the match_util, embed_util, and datastore_util objects are initialized.

The search method accepts a user search query parameter and the num_matches parameter, which specifies the number of matches to retrieve. The search method calls the following methods:

  • The embed_util.extract_embeddings method gets the embedding vector of the query using the Universal Sentence Encoder module.
  • The match_util.find_similar_items method finds the item IDs of matches similar to the query embedding in the Annoy index.
  • The datastore_util.get_items method fetches the items from the Datastore given item_ids, which include the Wikipedia titles.

A typical post-retrieval step is to rank the items produced by the index with respect to the similarity measure before returning the items.

Serving the search with App Engine

This section describes how to serve the semantic search service as a web app and deploy it to App Engine.

Implement the Flask web app

The following code snippet in main.py implements a Flask web app to serve the semantic search for the Wikipedia titles.

...
search_util = utils.search.SearchUtil()
app = Flask(__name__)

@app.route('/search', methods=['GET'])
def search():
   try:
       query = request.args.get('query')
       show = request.args.get('show')
       is_valid, error = validate_request(query, show)

       if is_valid:
           results = search_util.search(query, show)
       else:
           results = error

   except Exception as error:
       results = "Unexpected error: {}".format(error)

   response = jsonify(results)
   return response

if __name__ == '__main__':
 app.run(host='127.0.0.1', port=8080)

The search_util object is initialized only once at the module level. The RESTful /search endpoint redirects the HTTP GET request to the search method. The method gets the user search query (string) and how many results to show (integer), calls the search_util.search method, and returns the retrieved matches.

Deploy the web app to App Engine

The Flask web app is deployed to App Engine flexible environment, with gunicorn as an HTTP Web Server Gateway Interface (WSGI). Deploying to App Engine requires configuration settings in the following files:

  • app.yaml. This file defines configuration settings for the Python runtime, as well as general app, network, and resource settings. In this file, you need to make the following changes:

    • Set app_start_timeout_sec in the readiness_check section to allow enough time to download the index and load the utility objects.
    • Set memory in the resources section to a value larger than your index size, so that the index can be loaded completely to the memory.
    • Set gunicorn --timeout to allow enough time to download and load the index and load the utility objects.
    • Set gunicorn --threading to two to four times the number of CPU cores that are requested in the resources section of the app.yaml file in order to increase concurrency.
  • requirement.txt. The runtime looks for a requirements.txt file in the app's source directory and uses pip to install any dependencies before starting the app.

You can run the deploy.sh script to deploy the app to App Engine, which includes the following command:

gcloud --verbosity=info -q app deploy app.yaml --project=${PROJECT}

Query the search web app

After the web app is deployed to App Engine, a search can be invoked by calling the following URL:

https://service_name-dot-project_name.appspot.com/search?query=query

The service_name value is the same name that's provided in the app.yaml file. If the query passed in query contains spaces, they must be converted to %20. Adding show=num_results to the query string specifies how many matches to retrieve. The default is 10.

The following examples show search query examples and the matched Wikipedia titles based on the sample dataset.

Query Sample results
Tropical wild animals "in the african jungle, it's every lion, wildebeest and crocodile for itself! bbc wildlife"
Global technology concerns "worldwide risk of artificial intelligence"
Fresh summer drinks "great ideas for non-alcoholic mojito"
Winter sports "cross-country skiing at the fis nordic world ski championships 2007"

Volumetrics and load testing

After the example solution was created, a sample run was performed to get performance information. The following tables show the settings that were used to run the end-to-end example using the bigquery-samples.wikipedia_benchmark.Wiki100B dataset.

Extracting embeddings

The following table shows the configurations of the Dataflow job used to extract the embeddings and the resulting execution time.

Configuration
  • Record limit: 5 million
  • Embedding vector size: 512
  • vCPUs: 64 (32 worker)
  • Worker machine type: n1-highmem-2
Results
  • Job time : 32 minutes

Building the index

The following table shows configuration and results information for the task of building the index using an AI Platform job.

Configuration
  • Annoy index number of trees: 100
  • AI Platform scale tier: large_model (n1-highmem-8)
Results
  • Job time: 2 hours, 56 minutes
  • Index file size: 19.28 GB
  • Mapping file size: 263.21 MB

Serving the search app

The following table shows configuration and results information for serving the search app using App Engine. The load test was performed using the ab - Apache HTTP server benchmarking tool for 180 seconds.

Configuration
  • vCPUs: 6
  • Memory: 24 GB
  • Disk: 50 GB
  • Scaling: manual 10 instances
Results
  • Deployment time (up and running): ~19 minutes
  • Building and uploading the container image: ~6 minutes
  • Deploying the app: ~13 minutes
  • Concurrency level: 1500
  • Requests per seconds: ~2500
  • Latency (95th percentile) : ~903 milliseconds
  • Latency (50th percentile): ~514 milliseconds

Further improvements

The following improvements can be made to the current system:

  • Using GPUs for serving. The Universal Sentence Encoder mode can benefit from running on an accelerator. The Annoy library doesn't support GPUs, but a library like Faiss that does support GPUs can improve search time for the approximate similarity matching index. However, App Engine doesn't support using GPUs, so to use GPUs, you have to use Compute Engine or Google Kubernetes Engine (GKE) instead of App Engine.

  • Reading the index from disk. As a cost-optimization technique, instead of serving the index using a large-memory node (in the example, 26 GB RAM), you can use smaller memory nodes (for example, 4 GB RAM) and read the index from disk. If you read the index from disk, you must specify an SSD, or performance probably won't be adequate. Keeping the index on disk allows you to increase the number of serving nodes, which in turn increases system throughput. It also reduces the cost of the system. However, if you want to keep the index on disk, you must use Compute Engine or GKE, because App Engine doesn't support SSD for persistent disks.

  • Updating the index in the live system. As new data is received (in the example, new Wikipedia articles), the index needs to be updated. This is usually performed as a batch process that runs daily or weekly. After the update, the search app has to be updated to use the new index, with no downtime.

What's next