Compila un sistema de coincidencia de similitud de incorporaciones en tiempo real

En este artículo, se ofrece una descripción general de la coincidencia de similitud aproximada, que es una técnica para usar el aprendizaje automático a fin de encontrar elementos similares a uno determinado. En el artículo también se describe una solución de ejemplo de extremo a extremo para realizar búsquedas semánticas de texto en tiempo real y se explican varios aspectos sobre cómo puedes ejecutar la solución de ejemplo. La solución de ejemplo está en un repositorio de GitHub de coincidencia de incorporaciones en tiempo real asociado.

En el artículo, se supone que estás familiarizado con los conceptos de aprendizaje automático, con Google Cloud y con otras herramientas, como Apache Beam.

Introducción

Encontrar elementos que sean similares a una consulta determinada es el aspecto central de los sistemas de búsqueda y recuperación y, también, de los motores de recomendaciones. Por ejemplo, la coincidencia de similitud ayuda a tus usuarios a encontrar lo siguiente:

  • Imágenes similares a la imagen de su mascota
  • Artículos de noticias relevantes para su búsqueda
  • Películas o canciones similares a las que ya miraron o escucharon
  • Recomendaciones de productos y servicios

Para diseñar un sistema de coincidencia de similitud, primero debes representar los elementos como vectores numéricos. A su vez, estos vectores representan incorporaciones semánticas de los elementos descubiertos a través del aprendizaje automático (AA). Para obtener más detalles, consulta Descripción general: Extrae y entrega las incorporaciones de atributos para el aprendizaje automático.

Luego, debes organizar y almacenar estas incorporaciones para la búsqueda de vecino más cercano (basada en una métrica de similitud) a fin de encontrar elementos similares al vector de incorporación de la consulta del usuario. Sin embargo, para buscar, recuperar y entregar recomendaciones en tiempo real, la coincidencia de similitud debe ser rápida. Por lo tanto, es más práctico aplicar un algoritmo de vecino más cercano aproximado para crear un índice de las incorporaciones de elementos a fin de acelerar el proceso de búsqueda de elementos similares.

En la solución de ejemplo asociada con este artículo, se aborda lo siguiente:

  • Extraer incorporaciones de texto de títulos de Wikipedia
  • Usar el módulo de Codificador de oraciones universal de tf.Hub
  • Crear un índice de coincidencia de similitud aproximada con la biblioteca Annoy de Spotify
  • Entregar el índice para la búsqueda semántica en tiempo real en una app web

El código para la solución de ejemplo se encuentra en el repositorio de GitHub de coincidencias en tiempo real.

Coincidencia de similitud aproximada

Para la coincidencia y la recuperación, un procedimiento típico consta de los siguientes pasos:

  1. Convertir los elementos y la consulta en vectores en un espacio de características apropiado. Estas características se denominan incorporaciones.
  2. Definir una medida de proximidad para un par de vectores de incorporación. Esta medida podría ser de similitud de coseno o de distancia euclidiana.
  3. Encontrar los vecinos más cercanos mediante una búsqueda explícita en todo el conjunto de elementos.

Si solo tienes cientos o miles de elementos, la búsqueda en todo el conjunto de elementos para calcular la similitud entre el vector de consulta y el vector de cada elemento toma una cantidad de tiempo aceptable. También puedes obtener un rendimiento aceptable si realizas la coincidencia de similitud como un trabajo por lotes en el que no se necesitan los resultados en línea. Sin embargo, si entregas un sistema de búsqueda y recuperación en tiempo real o un sistema de recomendación y tienes decenas de millones de elementos, encontrar los vecinos más cercanos debe ser un proceso aproximado. En ese caso, debes optimizar el proceso para obtener una respuesta de baja latencia.

Una solución práctica es realizar coincidencias de similitud aproximadas. La coincidencia de similitud aproximada implica organizar los vectores de tus elementos en un índice, que es una estructura de datos que permite la recuperación rápida de elementos similares. Un problema potencial es que es posible que los elementos recuperados no sean los elementos más similares a la consulta dada. Sin embargo, por lo general, puedes controlar las compensaciones entre la precisión del índice y su latencia (y tamaño).

Hay dos enfoques principales para la coincidencia de similitud aproximada: enfoques basados en árboles y enfoques basados en hash.

Enfoques basados en árboles

La idea detrás de los enfoques basados en árboles (o estructuras de datos de árbol de métricas) es particionar los datos de manera recursiva de acuerdo con el principio de divide y vencerás, que consiste en colocar vectores similares cerca el uno del otro en el árbol. El tiempo de consulta esperado es O(log(n)), en el que n es la cantidad de elementos (vectores) que tienes. Los índices de árbol requieren grandes cantidades de memoria y el rendimiento se degrada con datos de dimensiones superiores. Entre los ejemplos de enfoques basados en árboles, también conocidos como estructuras de datos de árbol de métricas, se incluyen los siguientes:

Enfoques basados en hash

El enfoque basado en hash es una alternativa al enfoque basado en árboles. A diferencia de los árboles, en los hashes no hay particiones recursivas. La idea es enseñarle a un modelo que convierta un elemento en un código, en el que elementos similares producirán el mismo código o uno similar (colisión de hash). Este enfoque reduce de manera significativa la cantidad de memoria necesaria. El tiempo de consulta esperado es O(1), pero puede ser sublineal en n, en el que n es la cantidad de elementos (vectores) que tienes. Entre los ejemplos de enfoques basados en hash, se incluyen los siguientes:

Existen varias bibliotecas de código abierto que implementan técnicas de coincidencia de similitud aproximada, con diferentes compensaciones entre precisión, latencia de consulta, eficiencia de memoria, tiempo para compilar el índice, características y facilidad de uso.

En la solución de ejemplo descrita en este artículo, se usa Annoy (Approximate Nearest Neighbors Oh Yeah), una biblioteca compilada por Spotify para recomendaciones de música. Annoy es una biblioteca C++ con vinculaciones de Python que compila árboles de proyección aleatoria. Un índice se compila con un conjunto de árboles k, en el que k es un parámetro ajustable que compensa entre la precisión y el rendimiento. También crea estructuras grandes de datos de solo lectura basados en archivos que se asignan en la memoria a fin de que muchos procesos puedan compartir los datos.

Otras bibliotecas muy usadas son NMSLIB (biblioteca de espacio sin métricas) y Faiss (Facebook AI Similarity Search). La biblioteca que uses para implementar la coincidencia de similitud aproximada no debería afectar al flujo de trabajo o a la arquitectura general de la solución que se analizaron en este artículo.

En la solución de ejemplo descrita en este artículo, se ilustra una aplicación de incorporaciones de similitud en la búsqueda semántica de texto. El objetivo de la solución es recuperar documentos relevantes en el aspecto semántico (por ejemplo, artículos de noticias, entradas de blog o trabajos de investigación) para una búsqueda de entrada, y hacerlo en tiempo real.

Las técnicas de búsqueda basadas en tokens recuperan documentos basados en alguna métrica (como las compras recientes o la frecuencia) del caso de las palabras de consulta (de manera individual o combinadas) en los documentos. En cambio, la búsqueda semántica usa las incorporaciones de la consulta y los documentos para la coincidencia. Por ejemplo, como se muestra más adelante en la sección Consulta la app web de búsqueda, una consulta podría ser “animales salvajes tropicales”, y los resultados pueden incluir un título como “en la selva africana, cada león, ñu y cocodrilo se vale por sí mismo. bbc vida salvaje”. Ten en cuenta que ninguna de las palabras de la consulta aparece en el resultado, pero este es un artículo que trata sobre animales salvajes tropicales.

El conjunto de datos de BigQuery de Wikipedia

En el ejemplo, la fuente de datos es el conjunto de datos bigquery-samples:wikipedia_benchmark.Wiki100B en BigQuery, que es un conjunto de datos públicos que incluye 100,000 millones de entradas basadas en títulos de Wikipedia. Para el ejemplo, los datos están restringidos a títulos únicos que tienen más de 2 vistas, que tienen al menos 5 palabras y menos de 500 caracteres. Este filtro da como resultado unos 10.5 millones de títulos únicos.

Requisitos técnicos para el sistema

El sistema de búsqueda semántica de ejemplo tiene los siguientes requisitos técnicos:

  • Minimizar el esfuerzo de encontrar una representación vectorial (es decir, las incorporaciones) que codifica la semántica o los títulos de Wikipedia. Por lo tanto, en el ejemplo se debe usar un modelo de incorporación de texto con entrenamiento previo en lugar de entrenar un modelo de lenguaje desde cero.
  • Minimizar la necesidad de una infraestructura de procesamiento dedicada que extraiga incorporaciones y compile el índice. Por lo tanto, en el ejemplo se deben usar servicios de procesamiento a pedido completamente administrados que adquieran recursos suficientes (memoria y CPU) para el trabajo y los liberen cuando termina el trabajo.
  • Escalar de forma automática el proceso de extracción de incorporaciones. Por lo tanto, en el ejemplo se debe usar un servicio paralelo de procesamiento de datos.
  • Minimizar la latencia para encontrar incorporaciones similares en el índice de una consulta determinada. Por lo tanto, el índice debe estar cargado por completo en la memoria.
  • Minimizar la latencia a fin de recuperar los títulos de Wikipedia para los vectores de incorporación similares en tiempo real. Por lo tanto, se deben almacenar los títulos de Wikipedia en una base de datos de lectura de baja latencia.
  • Minimizar el esfuerzo de DevOps para implementar el servicio de búsqueda como una app web. Por lo tanto, se deben usar servicios completamente administrados.
  • Controlar una carga de trabajo cada vez mayor para la app web: hasta miles de consultas por segundo (QPS) con una latencia promedio de menos de un segundo. Por lo tanto, el sistema de ejemplo debe poder implementar un balanceador de cargas y varios nodos de la aplicación web de búsqueda.

Arquitectura de soluciones

En la Figura 1, se muestra una descripción general del sistema de búsqueda semántica de texto en tiempo real. El sistema extrae las incorporaciones de los títulos de Wikipedia, compila un índice de coincidencia de similitud aproximada mediante Annoy y entrega el índice de compilación para la búsqueda y recuperación semántica en tiempo real.

Arquitectura de la solución de ejemplo

Figura 1. Arquitectura de una solución de alto nivel para el sistema de búsqueda semántica de texto

Componentes clave de la arquitectura

En la siguiente tabla, se explican los componentes clave que se ilustran en la Figura 1.

Componente Descripción
BigQuery BigQuery es el almacén de datos de estadísticas de bajo costo, completamente administrado y con escala de petabytes de Google. En la solución de ejemplo, los títulos de Wikipedia de origen se almacenan en BigQuery.
Apache Beam Apache Beam es un marco de trabajo de programación unificado de código abierto que ejecuta trabajos de transmisión y de procesamiento de datos por lotes. En la solución de ejemplo, se usa Apache Beam a fin de implementar una canalización para extraer las incorporaciones y almacenar un ID con el fin de realizar búsquedas de títulos en Datastore.
Dataflow Dataflow es un servicio completamente administrado, sin servidores y confiable para ejecutar canalizaciones de Apache Beam a gran escala en Google Cloud. Dataflow se usa para escalar el procesamiento del texto de entrada y la extracción de las incorporaciones.
tf.Hub TensorFlow Hub es una biblioteca de módulos de aprendizaje automático reutilizables. En la solución de ejemplo, se usa el módulo de incorporación de texto con entrenamiento previo del Codificador de oraciones universal para convertir cada título en un vector de incorporación.
Cloud Storage Cloud Storage es un almacenamiento duradero y con alta disponibilidad para objetos binarios de gran tamaño. En la solución de ejemplo, las incorporaciones extraídas se almacenan en Cloud Storage como TFRecords. Además, después de que se compila el índice de coincidencia de similitud aproximada, se serializa y se almacena en Cloud Storage.
Datastore Datastore es una base de datos de documentos NoSQL diseñada para el ajuste de escala automático, el alto rendimiento y el desarrollo fácil de aplicaciones. En la solución de ejemplo, se usa Datastore para almacenar los títulos de Wikipedia y sus ID a fin de que puedan recuperarse en tiempo real con latencia baja.
AI Platform AI Platform es un servicio sin servidores para entrenar modelos de AA a gran escala. En la solución de ejemplo, se usa AI Platform para crear el índice de coincidencia de similitud aproximada mediante la biblioteca Annoy, sin la necesidad de usar una infraestructura de procesamiento dedicada.
App Engine App Engine te permite implementar y compilar aplicaciones escalables y confiables en una plataforma completamente administrada. En la solución de ejemplo, se usa App Engine a fin de entregar una app web de Flask para buscar los títulos de Wikipedia que sean semánticamente relevantes en una consulta del usuario. App Engine te permite implementar muchas instancias de la app con balanceo de cargas mediante una configuración simple a fin de controlar el aumento de QPS.

Flujo de trabajo general

El flujo de trabajo del sistema de búsqueda semántica de texto en tiempo real ilustrado en la Figura 1 se puede dividir en los siguientes pasos:

  1. Extrae incorporaciones con Dataflow

    1. Lee los títulos de Wikipedia de BigQuery.
    2. Extrae las incorporaciones de títulos con el módulo del Codificador de oraciones universal.
    3. Almacena las incorporaciones extraídas como TFRecords en Cloud Storage.
    4. Almacena los títulos y sus identificadores en Datastore para recuperarlos en tiempo real.
  2. Compila el índice con AI Platform

    1. Carga las incorporaciones de los archivos de Cloud Storage en el índice de Annoy.
    2. Compila el índice en la memoria.
    3. Guarda el índice en el disco.
    4. Sube el índice guardado a Cloud Storage.
  3. Entrega la app de búsqueda mediante App Engine

    1. Descarga el índice de Annoy de Cloud Storage.
    2. Obtén la consulta del usuario.
    3. Extrae la incorporación de consultas con el módulo del Codificador de oraciones universal.
    4. Con el índice de Annoy, encuentra incorporaciones que sean similares a la incorporación de consultas.
    5. Obtén los ID de elementos de las incorporaciones similares.
    6. Recupera los títulos de Wikipedia con los identificadores de Datastore.
    7. Mostrar los resultados

Sistemas de búsqueda en la práctica

En la práctica, los sistemas de búsqueda y recuperación a menudo combinan técnicas de búsqueda basadas en la semántica con técnicas basadas en tokens (índice invertido). Los resultados de ambas técnicas se combinan y clasifican antes de que se entreguen al usuario. Es posible que ya estés familiarizado con Elasticsearch (disponible en Google Cloud Marketplace) para esta tarea, que es un marco de trabajo muy usado en la búsqueda en el texto completo que se basa en la biblioteca Apache Lucene para la indexación invertida.

Otra optimización que se implementa con frecuencia en sistemas reales (lo que no se describe en esta solución) es almacenar las consultas en caché junto con sus identificadores de títulos relevantes mediante servicios como Memorystore. Si la consulta ya se vio antes, los identificadores de título se pueden recuperar directamente desde Memorystore. Esto omite las dos operaciones costosas de la invocación del Codificador de oraciones universal a fin de generar la incorporación de consultas y buscar el índice de coincidencia aproximada para elementos similares. Almacenar las consultas en caché puede mejorar con frecuencia la latencia promedio del sistema, según el nivel de redundancia de la solicitud de consulta. En la Figura 2, se muestra el flujo de trabajo con la caché de una consulta.

Arquitectura de la solución que usa una caché

Figura 2. Arquitectura de una solución de alto nivel para la búsqueda semántica de texto con caché de consultas

En la Figura 2, se ilustra el siguiente flujo:

  1. Recibe la búsqueda.
  2. Busca la consulta en la caché.
  3. Si no se encuentra la consulta, realiza lo siguiente:
    1. Extrae la incorporación de la consulta.
    2. Busca elementos similares en el índice.
    3. Actualiza la caché.
  4. Obtén acceso mediante ID desde Datastore.
  5. Muestra los resultados.

Habilita permisos de acceso y servicios

La solución de extremo a extremo descrita en la Figura 1 requiere que las siguientes API de servicio estén habilitadas en Cloud Console:

Además, se deben otorgar los siguientes permisos a las cuentas de servicio. Las cuentas de servicio predeterminadas tienen suficiente permiso de acceso a los recursos requeridos si pertenecen al mismo proyecto de Google Cloud. Sin embargo, si se modificaron los permisos de la cuenta de servicio, es posible que debas realizar cambios. Los permisos necesarios son los siguientes:

  • Dataflow
    • Permiso de lectura para el conjunto de datos BigQuery
    • Permiso de lectura o escritura para el bucket de Cloud Storage en el que se almacenan los TFRecords
    • Permiso de escritura para Datastore
  • AI Platform
    • Permiso de lectura o escritura en el bucket de Cloud Storage en el que se almacena el índice
  • App Engine
    • Permiso de lectura en el bucket de Cloud Storage en el que se almacena el índice
    • Permiso de lectura para Datastore

En los fragmentos de código de las siguientes secciones, se ilustran los conceptos analizados en este artículo. Para obtener más información sobre cómo ejecutar el ejemplo de extremo a extremo, consulta el archivo README.md en el repositorio de GitHub asociado.

Extrae las incorporaciones con Dataflow

La canalización para extraer la incorporación de los títulos de Wikipedia se implementa en pipeline.py con Apache Beam. La canalización completa se muestra en el siguiente fragmento de código:

def run(pipeline_options, known_args):

 pipeline = beam.Pipeline(options=pipeline_options)
 gcp_project = pipeline_options.get_all_options()['project']

 with impl.Context(known_args.transform_temp_dir):
   articles = (
       pipeline
       | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(
     project=gcp_project, query=get_source_query(known_args.limit),
     use_standard_sql=True)))

   articles_dataset = (articles, get_metadata())
   embeddings_dataset, _ = (
       articles_dataset
       | 'Extract embeddings' >> impl.AnalyzeAndTransformDataset(
preprocess_fn))

   embeddings, transformed_metadata = embeddings_dataset

   embeddings | 'Write embeddings to TFRecords' >> beam.io.tfrecordio.WriteToTFRecord(
     file_path_prefix='{0}'.format(known_args.output_dir),
     file_name_suffix='.tfrecords',
     coder=tft_coders.example_proto_coder.ExampleProtoCoder(
transformed_metadata.schema))

   (articles
       | "Convert to entity" >> beam.Map(
lambda input_features: create_entity(input_features, known_args.kind))
       | "Write to Datastore" >> WriteToDatastore(project=gcp_project))

...

 job = pipeline.run()

 if pipeline_options.get_all_options()['runner'] == 'DirectRunner':
   job.wait_until_finish()

Lee desde BigQuery

El primer paso de la canalización es leer los títulos del conjunto de datos de BigQuery de Wikipedia mediante el método beam.io.Read y un objeto beam.io.BigQuerySource. El método get_source_query en pipeline.py prepara la secuencia de comandos de SQL que se usa para recuperar los datos. La cantidad de títulos de Wikipedia que se recuperarán de BigQuery se puede configurar a través del parámetro limit de la función get_source_query.

def get_source_query(limit=1000000):
 query = """
   SELECT
     GENERATE_UUID() as id,
     text
   FROM
   (
       SELECT
         DISTINCT LOWER(title) text
       FROM
         `bigquery-samples.wikipedia_benchmark.Wiki100B`
       WHERE
         ARRAY_LENGTH(split(title,' ')) >= 5
       AND
         language = 'en'
       AND
         LENGTH(title) < 500
    )
   LIMIT {0}
 """.format(limit)
 return query

Se agrega un identificador al título (aquí, id) con la función GENERATE_UUID incorporada de BigQuery. Este valor se usa a fin de buscar un título de Wikipedia por ID en Datastore y para asignar un título de Wikipedia a su incorporación.

Este paso en la canalización de Beam muestra un objeto PCollection, en el que cada elemento de la colección incluye dos elementos: id (string) y title (string).

Extrae incorporaciones

El segundo paso de la canalización es usar el módulo del Codificador de oraciones universal de tf.Hub a fin de extraer un vector de incorporación para cada título de Wikipedia leído desde BigQuery. Para ejecutar el módulo, en el ejemplo se usa la API de TensorFlow Transform (tf.Transform).

TensorFlow Transform es una biblioteca para el procesamiento previo de datos con Apache Beam. En el ejemplo se usa el método AnalyzeAndTransformDataset de tf.Transform como el contexto para llamar al módulo tf.Hub a fin de extraer la incorporación de texto.

El método AnalyzeAndTransformDataset ejecuta la función preprocess_fn, que incluye la lógica de transformación, como se muestra en el siguiente fragmento:

def preprocess_fn(input_features):
 import tensorflow_transform as tft
 embedding = tft.apply_function(embed_text, input_features['text'])
 output_features = {
   'id': input_features['id'],
   'embedding': embedding
 }
 return output_features

def embed_text(text):
 import tensorflow_hub as hub
 global encoder
 if encoder is None:
   encoder = hub.Module(
'https://tfhub.dev/google/universal-sentence-encoder/2')
 embedding = encoder(text)
 return embedding

En este paso de la canalización, se produce otro objeto PCollection, en el que cada elemento de la colección incluye el valor id (una string) del título de Wikipedia y el valor embedding (un arreglo numérico) extraído del Codificador de oraciones universal, que tiene 512 dimensiones.

Escribe incorporaciones en TFRecords

Después de extraer las incorporaciones de los títulos de Wikipedia, la solución los almacena junto con los ID de título como TFRecords en Cloud Storage, mediante el método beam.io.tfrecordio.WriteToTFRecord.

El formato TFRecord es un formato simple para almacenar una secuencia de registros binarios. Cada registro en un archivo TFRecord es un búfer de protocolos de tf.Example, que constituye un tipo de mensaje flexible que representa una asignación de clave-valor. Este tipo es eficiente para serializar datos estructurados.
Puedes especificar cuántos archivos de incorporación se crean mediante la configuración del parámetro num_shards en el método WriteToTFRecord.

Escribe en Datastore

El siguiente paso es escribir en Datastore. Este paso se ejecuta en paralelo con el paso de extracción de incorporaciones. El objetivo es almacenar los títulos de Wikipedia en Datastore para que se puedan recuperar por sus ID. Los ID de título de Wikipedia también se guardan con la incorporación en los archivos TFRecord, de modo que puedan usarse como identificadores de los elementos (vectores de incorporación) agregados al índice de Annoy.

Para almacenar los elementos generados en el paso Lee desde BigQuery en Datastore, la solución primero debe convertir cada elemento en una entidad de Datastore, mediante el código del siguiente fragmento en pipeline.py:

def create_entity(input_features, kind):
 entity = entity_pb2.Entity()
 datastore_helper.add_key_path(
   entity.key, kind, input_features['id'])
 datastore_helper.add_properties(
   entity, {
     "text": unicode(input_features['text'])
   })
 return entity

Después de que se ejecuta este código, el método WriteToDatastore almacena los elementos en Datastore. En la Figura 3, se muestran algunas de las entidades escritas en Datastore después de que la canalización se ejecutó con el parámetro kind de Datastore configurado como wikipedia.

imagen

Figura 3. Entidades de Datastore después de la ejecución de la canalización

Ejecuta la canalización en Dataflow

Puedes ejecutar la canalización de Apache Beam mediante la ejecución de la secuencia de comandos run.py, el paso de los argumentos necesarios y la configuración del argumento --runner como DataflowRunner. Para hacerlo, puedes establecer parámetros de configuración en el archivo de la secuencia de comandos run.sh y ejecutar la secuencia de comandos run.py.

En el siguiente comando, se muestra cómo ejecutar la canalización. La secuencia de comandos incluye una serie de variables (por ejemplo, $OUTPUT_PREFIX) que se configuran cuando ejecutas la secuencia de comandos run.sh.

python run.py \
 --output_dir=$OUTPUT_PREFIX \
 --transform_temp_dir=$TRANSFORM_TEMP_DIR \
 --transform_export_dir=$TRANSFORM_EXPORT_DIR \
 --project=$PROJECT \
 --runner=$RUNNER \
 --region=$REGION \
 --kind=$KIND \
 --limit=$LIMIT \
 --staging_location=$STAGING_LOCATION \
 --temp_location=$TEMP_LOCATION \
 --setup_file=$(pwd)/setup.py \
 --job_name=$JOB_NAME \
 --worker_machine_type=$MACHINE_TYPE \
 --enable_debug \
 --debug_output_prefix=$DEBUG_OUTPUT_PREFIX

Puedes ver el flujo de la canalización de Dataflow en Cloud Console; se parece a lo que se muestra en la Figura 4.

Canalización de Cloud Dataflow como se muestra en Cloud Console

Figura 4. Grafo de ejecución de Dataflow de la canalización como se muestra en Cloud Console

Compila el índice con AI Platform

En la solución de ejemplo, después de extraer los vectores de incorporación de los títulos de Wikipedia, el siguiente paso es compilar un índice de coincidencia de similitud aproximada para estos vectores mediante el uso de la biblioteca Annoy. En la solución de ejemplo, la carpeta index_builder contiene el código que puedes usar para esta tarea.

Primero, implementa una tarea que compile y guarde el índice. Luego, envía la tarea para que se ejecute en AI Platform. El uso de este enfoque te permite crear el índice sin crear una infraestructura de procesamiento exclusiva.

Implementa la tarea del compilador de índices

El archivo task.py es el punto de entrada para el compilador de índices, que realiza los siguientes pasos:

  • Compila el índice de Annoy.
  • Comprime el índice (opcional).
  • Sube los artefactos generados a Cloud Storage.

La lógica para compilar el índice de Annoy se muestra en el siguiente fragmento de código del módulo index.py.

def build_index(embedding_files_pattern, index_filename,
                num_trees=100):

 annoy_index = AnnoyIndex(VECTOR_LENGTH, metric=METRIC)
 mapping = {}

 embed_files = tf.gfile.Glob(embedding_files_pattern)
 logging.info('{} embedding files are found.'.format(len(embed_files)))

 item_counter = 0
 for f, embed_file in enumerate(embed_files):
   logging.info('Loading embeddings in file {} of {}...'.format(f, len(embed_files)))
   record_iterator = tf.python_io.tf_record_iterator(path=embed_file)

   for string_record in record_iterator:
     example = tf.train.Example()
     example.ParseFromString(string_record)
     string_identifier = example.features.feature['id'].bytes_list.value[0]
     mapping[item_counter] = string_identifier
     embedding = np.array(example.features.feature['embedding'].float_list.value)
     annoy_index.add_item(item_counter, embedding)
     item_counter += 1

   logging.info('Loaded {} items to the index'.format(item_counter))

 logging.info('Start building the index with {} trees...'.format(num_trees))
 annoy_index.build(n_trees=num_trees)
 logging.info('Index is successfully built.')
 logging.info('Saving index to disk...')
 annoy_index.save(index_filename)
 logging.info('Index is saved to disk.')
 logging.info('Saving mapping to disk...')
 with open(index_filename + '.mapping', 'wb') as handle:
   pickle.dump(mapping, handle, protocol=pickle.HIGHEST_PROTOCOL)
 logging.info('Mapping is saved to disk.')

Los pasos son los siguientes:

  1. Obtén todos los nombres de los archivos de incorporación que coincidan con un patrón determinado.
  2. Para cada archivo de incorporación, realiza lo siguiente:
    1. Itera a través de las instancias de tf.Example en el archivo TFRecord.
    2. Lee el string_identifier (el ID) y agrégalo al diccionario mapping como un valor, en el que la clave es el valor item_counter actual.
    3. Lee el vector embedding y agrégalo a annoy_index, en el que el valor item_id se configura con el valor item_counter actual.
  3. Invoca el método annoy_index.build con el valor num_trees especificado.
  4. Guarda el índice mediante la invocación del método annoy_index.save.
  5. Serializa el diccionario mapping con el método pickle.dump.

El motivo por el que se debe tener el diccionario mapping es que el identificador de los títulos de Wikipedia almacenados en Datastore es una string (se genera con el método GENERATE_UUID cuando se leen datos de BigQuery). Sin embargo, el identificador de un elemento (el vector de incorporación) en el índice Annoy solo puede ser un número entero. Por lo tanto, el código crea un diccionario a fin de asignar un índice de número entero subrogado al identificador de string para el elemento de Wikipedia.

El valor METRIC que se pasa en el constructor AnnoyIndex es angular, lo que es una variación de la similitud de coseno. El valor VECTOR_LENGTH se configura como 512, que es la longitud de la incorporación de texto producida por el módulo del Codificador de oraciones universal.

El tamaño del índice guardado puede ser de varios gigabytes, según la cantidad de vectores de incorporación y el valor del parámetro num_trees. Por lo tanto, para subir el índice a Cloud Storage, en la solución se necesita usar API que admitan la fragmentación. En la solución de ejemplo, se usa el método googleapiclient.http.MediaFileUpload en lugar de google.cloud.storage, como se muestra en el siguiente fragmento de código en task.py:

media = MediaFileUpload(
 local_file_name, mimetype='application/octet-stream', chunksize=CHUNKSIZE,
 resumable=True)
request = gcs_services.objects().insert(
 bucket=bucket_name, name=gcs_location, media_body=media)
response = None
while response is None:
 progress, response = request.next_chunk()

Envía la tarea del compilador de índices a AI Platform

En la solución de ejemplo, la ejecución de la tarea del compilador de índices como un trabajo de AI Platform implica los siguientes archivos:

  • submit.sh. Este archivo se debe actualizar a fin de configurar variables en el proyecto, el nombre del depósito y la región para el resultado del índice.
  • config.yaml. Este archivo usa el parámetro scale_tier a fin de especificar el tamaño de la máquina que se usa para ejecutar el trabajo.
  • setup.py. Este archivo especifica los paquetes que se requieren para el trabajo. En la solución de ejemplo, se necesita Annoy y google-api-python-client.

Después de actualizar estos archivos, se puede enviar una tarea del compilador como un trabajo de AI Platform mediante la ejecución de la secuencia de comandos submit.sh. La secuencia de comandos incluye el siguiente comando:

gcloud ml-engine jobs submit training ${JOB_NAME} \
    --job-dir=${JOB_DIR} \
    --runtime-version=1.12 \
    --region=${REGION} \
    --scale-tier=${TIER} \
    --module-name=builder.task \
    --package-path=${PACKAGE_PATH}  \
    --config=config.yaml \
    -- \
    --embedding-files=${EMBED_FILES} \
    --index-file=${INDEX_FILE} \
    --num-trees=${NUM_TREES}

Según el tamaño del índice, el trabajo puede demorar varias horas. El tiempo depende de la cantidad de vectores y su dimensionalidad, y de la cantidad de árboles que se usen para crear el índice.

Una vez que finaliza el trabajo de AI Platform para compilar el índice, los siguientes artefactos están disponibles en el bucket de Cloud Storage especificado:

  • gs://your_bucket/wikipedia/index/embeds.index
  • gs://your_bucket/wikipedia/index/embeds.index.mapping

Implementa el servicio de búsqueda semántica

En esta sección, se describe la implementación de las utilidades del servicio de búsqueda semántica que usan el índice de Annoy que se creó antes para recuperar títulos relevantes de Wikipedia de Datastore. El servicio de búsqueda semántica usa las siguientes utilidades:

  • Utilidad de incorporación de consultas
  • Utilidad de coincidencia de incorporaciones
  • Utilidad de búsqueda de Datastore
  • Wrapper de servicio de búsqueda

Utilidad de incorporación de consultas

Cuando el usuario ingresa una búsqueda, la solución necesita extraer la incorporación de la consulta para que coincida con otras similares en el índice. El siguiente fragmento de código en embedding.py realiza la tarea:

class EmbedUtil:

 def __init__(self):
   logging.info("Initialising embedding utility...")
   embed_module = hub.Module(
"https://tfhub.dev/google/universal-sentence-encoder/2")
   placeholder = tf.placeholder(dtype=tf.string)
   embed = embed_module(placeholder)
   session = tf.Session()
   session.run([tf.global_variables_initializer(), tf.tables_initializer()])
   logging.info('tf.Hub module is loaded.')

   def _embeddings_fn(sentences):
     computed_embeddings = session.run(
       embed, feed_dict={placeholder: sentences})
     return computed_embeddings

   self.embedding_fn = _embeddings_fn
   logging.info("Embedding utility initialised.")

 def extract_embeddings(self, query):
   return self.embedding_fn([query])[0]

El código hace lo siguiente:

  1. Carga el Codificador de oraciones universal de tf.Hub.
  2. Proporciona el método extract_embeddings, que acepta el texto de la consulta del usuario.
  3. Muestra la codificación de la oración (incorporaciones) para la consulta.

El código garantiza que el método EmbedUtil cargue el módulo tf.Hub solo una vez en el constructor de la clase, no cada vez que se invoque al método extract_embeddings. Esto se debe a que la carga del módulo del Codificador de oraciones universal puede demorar varios segundos.

Utilidad de coincidencia de incorporaciones

La clase MatchingUtil, implementada en matching.py, es responsable de cargar el índice de Annoy del archivo del disco local y, también, de cargar el diccionario de asignación. En el siguiente fragmento de código, se muestra la implementación de la clase MatchingUtil.

class MatchingUtil:

 def __init__(self, index_file):
   logging.info("Initialising matching utility...")
   self.index = AnnoyIndex(VECTOR_LENGTH)
   self.index.load(index_file, prefault=True)
   logging.info("Annoy index {} is loaded".format(index_file))
   with open(index_file + '.mapping', 'rb') as handle:
     self.mapping = pickle.load(handle)
   logging.info("Mapping file {} is loaded".format(index_file + '.mapping'))
   logging.info("Matching utility initialised.")

 def find_similar_items(self, vector, num_matches):
   item_ids = self.index.get_nns_by_vector(
     vector, num_matches, search_k=-1, include_distances=False)
   identifiers = [self.mapping[item_id]
                 for item_id in item_ids]
   return identifiers

El índice se carga en el constructor de clase. El código configura el parámetro prefault en el método index.load como True para que todo el archivo de índice se cargue en la memoria.

La clase también expone el método find_similar_items, que hace lo siguiente:

  1. Recibe un vector (el vector de incorporación de una consulta del usuario).
  2. Busca los item_ids (ID de número entero) de la incorporación más similar en el index de Annoy para el vector dado.
  3. Obtiene los identifiers(ID de la string de GUID) del diccionario de mapping.
  4. Muestra el objeto identifiers que se usará para recuperar los títulos de Wikipedia de Datastore.

Utilidad de búsqueda de Datastore

En el siguiente fragmento, se muestra la clase DatastoreUtil en lookup.py, que implementa la lógica para recuperar los títulos de Wikipedia de Datastore. El constructor toma un valor kind de Datastore que describe a qué entidades pertenecen los títulos.

class DatastoreUtil:

 def __init__(self, kind):
   logging.info("Initialising datastore lookup utility...")
   self.kind = kind
   self.client = datastore.Client()
   logging.info("Datastore lookup utility initialised.")

 def get_items(self, keys):
   keys = [self.client.key(self.kind, key)
           for key in keys]
   items = self.client.get_multi(keys)
   return items

El método get_items acepta un parámetro keys, que es una lista de identificadores, y muestra el objeto items de Datastore asociado con estas claves.

Wrapper de servicio de búsqueda

En el siguiente fragmento, se muestra la clase SearchUtil en search.py, que actúa como wrapper de los módulos de utilidad que se describieron antes.

class SearchUtil:

 def __init__(self):
   logging.info("Initialising search utility...")
   dir_path = os.path.dirname(os.path.realpath(__file__))
   service_account_json = os.path.join(dir_path, SERVICE_ACCOUNT_JSON)
   index_file = os.path.join(dir_path, INDEX_FILE)
   download_artifacts(index_file, GCS_BUCKET, GCS_INDEX_LOCATION)
   self.match_util = matching.MatchingUtil(index_file)
   self.embed_util = embedding.EmbedUtil()
   self.datastore_util = lookup.DatastoreUtil(KIND, service_account_json)
   logging.info("Search utility is up and running.")

 def search(self, query, num_matches=10):
   query_embedding = self.embed_util.extract_embeddings(query)
   item_ids = self.match_util.find_similar_items(query_embedding, num_matches)
   items = self.datastore_util.get_items(item_ids)
   return items

En el constructor de SearchUtil, el archivo del índice de Annoy y el diccionario de asignación serializado se descargan de Cloud Storage al disco local con el método download_artifacts. Luego, se inicializan los objetos match_util, embed_util y datastore_util.

El método search acepta un parámetro de búsqueda de usuario query y el parámetro num_matches, que especifica la cantidad de coincidencias que se recuperarán. El método search invoca los siguientes métodos:

  • El método embed_util.extract_embeddings obtiene el vector de incorporación de la consulta mediante el módulo del Codificador de oraciones universal.
  • El método match_util.find_similar_items busca los ID de elemento de coincidencias similares a la incorporación de consultas en el índice de Annoy.
  • El método datastore_util.get_items recupera los elementos de Datastore que tienen item_ids, incluidos los títulos de Wikipedia.

Un paso típico posterior a la recuperación es clasificar los elementos generados por el índice con respecto a la medida de similitud antes de mostrar los elementos.

Entrega la búsqueda con App Engine

En esta sección, se describe cómo entregar el servicio de búsqueda semántica como una app web y cómo implementarlo en App Engine.

Implementa la app web de Flask

Con el siguiente fragmento de código en main.py, se implementa una app web de Flask para entregar la búsqueda semántica de los títulos de Wikipedia.

...
search_util = utils.search.SearchUtil()
app = Flask(__name__)

@app.route('/search', methods=['GET'])
def search():
   try:
       query = request.args.get('query')
       show = request.args.get('show')
       is_valid, error = validate_request(query, show)

       if is_valid:
           results = search_util.search(query, show)
       else:
           results = error

   except Exception as error:
       results = "Unexpected error: {}".format(error)

   response = jsonify(results)
   return response

if __name__ == '__main__':
 app.run(host='127.0.0.1', port=8080)

El objeto search_util solo se inicializa solo una vez a nivel del módulo. El extremo /search de RESTful redirecciona la solicitud HTTP GET al método search. El método obtiene la query (string) de búsqueda de usuario y la cantidad de resultados para show (número entero), llama al método search_util.search y muestra las coincidencias recuperadas.

Implementa la app web en App Engine

La app web de Flask se implementa en el entorno flexible de App Engine, con gunicorn como una interfaz de puerta de enlace del servidor web (WSGI) HTTP. Para la implementación en App Engine, se requiere que cambies la configuración en los siguientes archivos:

  • app.yaml. Este archivo define los parámetros de configuración del entorno de ejecución de Python, además de la configuración general de apps, redes y recursos. En este archivo, debes realizar los siguientes cambios:

    • Configura app_start_timeout_sec en la sección readiness_check a fin de permitir suficiente tiempo para descargar el índice y cargar los objetos de utilidad.
    • Configura memory en la sección resources como un valor mayor que tu tamaño de índice, de modo que el índice se pueda cargar por completo en la memoria.
    • Configura gunicorn --timeout a fin de permitir el tiempo suficiente para descargar y cargar el índice, y cargar los objetos de utilidad.
    • Configura gunicorn --threading como el doble o el cuádruple de la cantidad de núcleos de CPU que se solicitan en la sección resources del archivo app.yaml para aumentar la simultaneidad.
  • requirement.txt. El entorno de ejecución busca un archivo requirements.txt en el directorio de origen de la app y usa pip para instalar las dependencias antes de iniciar la app.

Puedes ejecutar la secuencia de comandos deploy.sh para implementar la app en App Engine, que incluye el siguiente comando:

gcloud --verbosity=info -q app deploy app.yaml --project=${PROJECT}

Consulta la app web de búsqueda

Después de implementar la app web en App Engine, se puede invocar una búsqueda mediante una llamada a la siguiente URL:

https://service_name-dot-project_name.appspot.com/search?query=query

El valor service_name es el mismo nombre que se proporciona en el archivo app.yaml. Si la consulta que se pasó en query contiene espacios, deben convertirse en %20. Agregar show=num_results a la string de consulta especifica cuántas coincidencias se deben recuperar. El valor predeterminado es 10.

A continuación, se muestran ejemplos de búsquedas y los títulos de Wikipedia coincidentes en función del conjunto de datos de muestra.

Consulta Resultados de muestra
Animales salvajes tropicales “en la jungla africana, cada león, ñu y cocodrilo se vale por sí mismo”
Problemas globales de tecnología “riesgo mundial de inteligencia artificial”
Bebidas frescas de verano “ideas estupendas para un mojito sin alcohol”
Deportes de invierno “esquí de fondo en el campeonato mundial de esquí nórdico de la fis de 2007”

Métricas de volumen y pruebas de carga

Después de crear la solución de ejemplo, se realizó una ejecución de muestra para obtener información sobre el rendimiento. En las siguientes tablas, se muestra la configuración que se usó para ejecutar el ejemplo de extremo a extremo con el conjunto de datos bigquery-samples.wikipedia_benchmark.Wiki100B.

Extrae incorporaciones

En la siguiente tabla, se muestra la configuración del trabajo de Dataflow que se usa para extraer las incorporaciones y el tiempo de ejecución resultante.

Configuración
  • Límite de registros: 5 millones
  • Tamaño del vector de incorporación: 512
  • CPU virtuales: 64 (32 trabajadores)
  • Tipo de máquina de trabajador: n1-highmem-2
Resultados
  • Duración del trabajo: 32 minutos

Compila el índice

En la siguiente tabla, se muestra información de configuración y resultados para la tarea de compilar el índice mediante un trabajo de AI Platform.

Configuración
  • Cantidad de árboles del índice de Annoy: 100
  • Nivel de escala de AI Platform: large_model (n1-highmem-8)
Resultados
  • Duración del trabajo: 2 horas, 56 minutos
  • Tamaño del archivo de índice: 19.28 GB
  • Tamaño del archivo de asignación: 263.21 MB

Entrega la app de búsqueda

En la siguiente tabla, se muestra información de configuración y resultados para entregar la app de búsqueda con App Engine. La prueba de carga se realizó mediante ab, la herramienta de Apache para realizar evaluaciones comparativas en el servidor HTTP, durante 180 segundos.

Configuración
  • CPU virtuales: 6
  • Memoria: 24 GB
  • Disco: 50 GB
  • Escalamiento: 10 instancias manuales
Resultados
  • Duración de la implementación (en funcionamiento): ~19 minutos
  • Compilación y carga de la imagen de contenedor: ~6 minutos
  • Implementación de la app: ~13 minutos
  • Nivel de simultaneidad: 1,500
  • Solicitudes por segundos: ~2,500
  • Latencia (percentil 95): ~903 milisegundos
  • Latencia (percentil 50): ~514 milisegundos

Mejoras adicionales

Se pueden realizar las siguientes mejoras en el sistema actual:

  • Usa GPU para la entrega. Ejecutarse en un acelerador puede ser conveniente para el modo del Codificador de oraciones universal. La biblioteca de Annoy no admite GPU, pero una biblioteca como Faiss, que sí admite GPU, puede mejorar el tiempo de búsqueda para el índice de coincidencia de similitud aproximada. Sin embargo, App Engine no admite el uso de GPU, así que para usarlas debes usar Compute Engine o Google Kubernetes Engine (GKE) en lugar de App Engine.

  • Lee el índice desde el disco. Como técnica de optimización de costos, en lugar de entregar el índice con un nodo de memoria grande (en el ejemplo, de 26 GB de RAM), puedes usar nodos de memoria más pequeños (por ejemplo, de 4 GB de RAM) y leer el índice del disco. Si lees el índice desde el disco, debes especificar un SSD o es probable que el rendimiento no sea adecuado. Mantener el índice en el disco te permite aumentar la cantidad de nodos de entrega y esto, a su vez, aumenta la capacidad de procesamiento del sistema. También reduce el costo del sistema. Sin embargo, si deseas mantener el índice en el disco, debes usar Compute Engine o GKE, porque App Engine no admite SSD para discos persistentes.

  • Actualiza el índice en el sistema activo. A medida que se reciben datos nuevos (en el ejemplo, artículos de Wikipedia nuevos), el índice debe actualizarse. Por lo general, esto se realiza como un proceso por lotes que se ejecuta a diario o cada semana. Después de la actualización, la app de búsqueda debe actualizarse para usar el índice nuevo, sin tiempo de inactividad.

¿Qué sigue?