Como criar um sistema de correspondência por similaridade de embeddings em tempo real

Neste artigo, fornecemos uma visão geral da correspondência por similaridade aproximada, uma técnica que usa machine learning para encontrar itens semelhantes a um determinado item. Também descrevemos aqui uma solução de exemplo completa para realizar pesquisa semântica de texto em tempo real e explicamos vários aspectos da execução da solução de exemplo. A solução de exemplo está em um repositório realtime-embeddings-matching do GitHub (em inglês) associado.

O artigo pressupõe que você já conheça o Google Cloud, os conceitos de machine learning e ferramentas como o Apache Beam.

Introdução

Encontrar itens semelhantes a uma determinada consulta é o aspecto principal dos sistemas de pesquisa e recuperação, bem como dos mecanismos de recomendação. Por exemplo, a correspondência por similaridade ajuda os usuários a encontrar os itens a seguir:

  • Imagens semelhantes à imagem do animal de estimação
  • Artigos de notícias relevantes para a consulta de pesquisa
  • Filmes ou músicas semelhantes ao que eles assistiram ou ouviram
  • Recomendações de produtos e serviços

Para projetar um sistema de correspondência por similaridade, primeiro é preciso representar itens como vetores numéricos. Esses vetores, por sua vez, representam embeddings (em inglês) semânticos dos itens descobertos por meio de machine learning (ML). Para mais detalhes, consulte Visão geral: como extrair e exibir embeddings de atributos para machine learning.

Em segundo lugar, é necessário organizar e armazenar esses embeddings para a pesquisa do vizinho mais próximo (em inglês) (com base em uma métrica de similaridade (em inglês)) para encontrar itens semelhantes ao vetor de embedding da consulta do usuário. No entanto, para pesquisar, recuperar e exibir recomendações em tempo real, a correspondência por similaridade precisa ser rápida. Portanto, é mais prático aplicar um algoritmo de vizinho mais próximo aproximado (em inglês) para criar um índice de embeddings do item e acelerar o processo de localização de itens semelhantes.

A solução de exemplo associada a este artigo abrange os itens a seguir:

  • Extração de embeddings de texto de títulos da Wikipédia
  • Uso do módulo do Universal Sentence Encoder de tf.Hub.
  • Criação de um índice de correspondência por similaridade aproximada usando a biblioteca Annoy (em inglês) do Spotify
  • Exibição do índice para pesquisa semântica em tempo real em um app da Web

O código da solução de exemplo está no repositório realtime-embeddings-matching do GitHub (em inglês).

Correspondência por similaridade aproximada

Veja a seguir um procedimento típico para correspondência e recuperação:

  1. Converta os itens e a consulta em vetores em um espaço de atributo apropriado. Esses atributos são chamados de embeddings.
  2. Defina uma medida de proximidade para um par de vetores de embedding. Essa medida poderia ser similaridade por cosseno (em inglês) ou distância euclidiana.
  3. Encontre os vizinhos mais próximos usando uma pesquisa explícita em todo o conjunto de itens.

Se você tiver centenas ou milhares de itens, a pesquisa de todo o conjunto de itens para calcular a similaridade entre o vetor de consulta e o vetor de cada item levará um tempo aceitável. Também será possível ter um desempenho aceitável se você fizer a correspondência por similaridade como um trabalho em lote, em que os resultados on-line não serão necessários. No entanto, se você estiver exibindo um sistema de pesquisa ou recuperação em tempo real ou um sistema de recomendação e tiver dezenas de milhões de itens, a localização dos vizinhos mais próximos precisa ser aproximada. Nesse caso, é preciso otimizar o processo para uma resposta de baixa latência.

Uma solução prática é realizar uma correspondência por similaridade aproximada. Com a correspondência por similaridade aproximada, é possível organizar seus vetores de itens em um índice, que é uma estrutura de dados que permite a recuperação rápida de itens semelhantes. Um possível problema é que os itens recuperados podem não ser os itens mais parecidos com a consulta especificada. No entanto, normalmente é possível controlar a compensação entre a precisão do índice e a latência (e o tamanho) dele.

Há duas abordagens principais para a correspondência por similaridade aproximada: abordagens baseadas em árvore e abordagens baseadas em hash.

Abordagens baseadas em árvore

A ideia por trás das abordagens baseadas em árvore (ou estruturas de dados de árvore métrica) é particionar de modo recursivo os dados no estilo "dividir e conquistar", o que aproxima os vetores semelhantes na árvore. O tempo de consulta esperado é O(log(n)), em que n é o número de itens (vetores) que você tem. Os índices de árvore exigem grandes quantidades de memória, e o desempenho diminui com dados de dimensões superiores. Veja abaixo alguns exemplos de abordagens baseadas em árvore (também denominadas estruturas de dados de árvore métrica):

Abordagens baseadas em hash

Uma alternativa para a abordagem baseada em árvore é a abordagem baseada em hash. Ao contrário das árvores, nos hashes não há particionamento recursivo. A ideia é aprender um modelo que converta um item em um código, em que itens semelhantes produzirão o mesmo código ou um código semelhante (colisão de hash). Essa abordagem reduz significativamente a memória necessária. O tempo de consulta esperado é O(1), mas pode ser sub-linear em n, em que n é o número de itens (vetores) que você tem. Veja abaixo exemplos de abordagens baseadas em hash:

várias bibliotecas de código aberto que implementam técnicas de correspondência por similaridade aproximada, com diferentes equilíbrios entre precisão, latência de consulta, eficiência de memória, tempo para criação do índice, atributos e facilidade de uso.

Na solução de exemplo descrita neste artigo, utilizamos a Annoy (Approximate Nearest Neighbours Oh Yeah), uma biblioteca criada pelo Spotify para recomendações musicais. Annoy é uma biblioteca C++ com vinculações Python que cria árvores de projeção aleatória (em inglês). Um índice é criado com uma floresta de árvores k, em que k é um parâmetro ajustável que faz o equilíbrio entre precisão e desempenho. Ela também cria grandes estruturas de dados somente de leitura baseadas em arquivo que são mapeadas na memória para que muitos processos possam compartilhar os dados.

Outras bibliotecas amplamente usadas são NMSLIB (biblioteca espacial não métrica) e FBRs (Facebook AI similarity Search). A biblioteca que você usa para implementar a correspondência por similaridade aproximada não afetará a arquitetura geral da solução ou o fluxo de trabalho discutido neste artigo.

Na solução de exemplo descrita neste artigo, ilustramos uma aplicação da correspondência por similaridade de embeddings na pesquisa semântica de texto. O objetivo da solução é recuperar documentos semanticamente relevantes (por exemplo, artigos de notícias, postagens de blogs ou trabalhos de pesquisa) para uma consulta de pesquisa de entrada e fazer isso em tempo real.

As técnicas de pesquisa baseadas em tokens recuperam documentos com base em alguma métrica (como tempo para retorno ou frequência) da ocorrência das palavras da consulta (individualmente ou combinadas) nos documentos. Por outro lado, na pesquisa semântica, os embeddings da consulta e dos documentos são usados para a correspondência. Por exemplo, conforme mostrado posteriormente na seção Consultar o app da Web de pesquisa, uma consulta pode ser "animais selvagens tropicais" e os resultados podem incluir um título como "na selva africana, é cada leão, gnu e crocodilo por si. BBC vida na selva". Observe que nenhuma das palavras de consulta aparece no resultado, mas o resultado é um artigo que discute animais selvagens tropicais.

O conjunto de dados do BigQuery da Wikipédia

No exemplo, a fonte de dados é o conjunto de dados bigquery-samples:wikipedia_benchmark.Wiki100B no BigQuery, que é um conjunto de dados público que inclui 100 bilhões de entradas com base em títulos da Wikipédia. Por exemplo, os dados são restritos a títulos exclusivos que tenham mais de 2 visualizações, pelo menos 5 palavras e menos de 500 caracteres. Esse filtro resulta em cerca de 10,5 milhões de títulos exclusivos.

Requisitos técnicos do sistema

O exemplo de sistema de pesquisa semântica tem os requisitos técnicos a seguir:

  • Minimizar o esforço de encontrar uma representação vetorial (isto é, os embeddings) que codifica a semântica ou os títulos da Wikipédia. Assim, o exemplo precisa usar um modelo de embedding de texto pré-treinado em vez de treinar um modelo de linguagem do zero.
  • Minimizar a necessidade de uma infraestrutura de computação dedicada que extraia embeddings e crie o índice. Portanto, o exemplo precisa usar serviços de computação sob demanda e totalmente gerenciados que adquirem recursos suficientes (memória e CPU) para o job e os libera quando ele é concluído.
  • Escalonar automaticamente o processo de extração de embedding. Assim, o exemplo precisa usar um serviço paralelo de processamento de dados.
  • Minimizar a latência para encontrar embeddings semelhantes no índice de uma determinada consulta. Assim, o índice precisa estar totalmente carregado na memória.
  • Minimizar a latência para buscar os títulos da Wikipédia pelos vetores de embedding semelhantes em tempo real. Assim, o exemplo precisa armazenar os títulos da Wikipédia em um banco de dados de leitura de baixa latência.
  • Minimizar o esforço de DevOps para implantar o serviço de pesquisa como um app da Web. Assim, o exemplo precisa usar serviços totalmente gerenciados.
  • Lidar com o aumento da carga de trabalho do app da Web, até milhares de consultas por segundo (QPS, na sigla em inglês) com latência média abaixo de um segundo. Assim, o exemplo precisa conseguir implantar vários nós do app da Web de pesquisa e um balanceador de carga.

Arquitetura da solução

Na figura 1, mostramos uma visão geral do sistema de pesquisa semântica de texto em tempo real. O sistema extrai os embeddings dos títulos da Wikipédia, cria um índice de correspondência por similaridade aproximada usando a Annoy e exibe o índice de versão para pesquisa e recuperação semântica em tempo real.

Arquitetura da solução de exemplo

Figura 1. Arquitetura de solução de alto nível do sistema de pesquisa semântica de texto

Principais componentes da arquitetura

Na tabela a seguir, explicamos os principais componentes ilustrados na figura 1.

Componente Descrição
BigQuery BigQuery é o serviço de armazenamento de dados para análise de baixo custo, totalmente gerenciável e com escala em petabyte desenvolvido pelo Google. Na solução de exemplo, os títulos da Wikipédia de origem são armazenados no BigQuery.
Apache Beam O Apache Beam é um framework de programação unificada de código aberto que executa jobs de processamento de dados em lote e por streaming. Na solução de exemplo, usamos o Apache Beam para implementar um pipeline para extrair os embeddings e armazenar um ID para realizar pesquisas de título no Datastore.
Dataflow O Dataflow é um serviço totalmente gerenciado, sem servidor e confiável para executar pipelines do Apache Beam em grande escala no Google Cloud. O Dataflow é usado para escalonar o processamento do texto de entrada e a extração dos embeddings.
tf.Hub O TensorFlow Hub é uma biblioteca de módulos de machine learning reutilizáveis. Na solução de exemplo, usamos o módulo de embedding de texto pré-treinado do Universal Sentence Encoder para converter cada título em um vetor de embedding.
Cloud Storage O Cloud Storage é um serviço de armazenamento altamente disponível e durável para objetos binários grandes. Na solução de exemplo, os embeddings extraídos são armazenados no Cloud Storage como TFRecords. Além disso, após a criação do índice de correspondência por similaridade aproximada, ele é serializado e armazenado no Cloud Storage.
Datastore O Datastore é um banco de dados de documentos NoSQL criado para fornecer escalonamento automático, alto desempenho e facilidade no desenvolvimento de aplicativos. Na solução de exemplo, usamos o Datastore para armazenar os títulos da Wikipédia e seus IDs para que possam ser buscados em tempo real e com baixa latência.
AI Platform O AI Platform é um serviço sem servidor para treinar modelos de ML em escala. Na solução de exemplo, usamos o AI Platform para criar o índice de correspondência por similaridade aproximada usando a biblioteca Annoy, sem a necessidade de uma infraestrutura de computação dedicada.
App Engine O App Engine permite criar e implantar aplicativos escalonáveis e confiáveis em uma plataforma totalmente gerenciada. A solução de exemplo usa o App Engine para exibir um app da Web Flask para pesquisar os títulos da Wikipédia semanticamente relevantes para a consulta de um usuário. O App Engine permite implantar muitas instâncias do app com balanceamento de carga usando apenas uma configuração simples para lidar com QPS crescente.

Fluxo de trabalho global

O fluxo de trabalho do sistema de pesquisa semântica de texto em tempo real ilustrado na figura 1 pode ser dividido nas etapas a seguir:

  1. Extrair embeddings usando o Dataflow

    1. Ler os títulos da Wikipédia no BigQuery.
    2. Extrair as embeddings do título usando o módulo do Universal Sentence Encoder.
    3. Armazenar os embeddings extraídos como TFRecords no Cloud Storage.
    4. Armazenar os títulos e os identificadores deles no Datastore para recuperação em tempo real.
  2. Criar o índice usando o AI Platform

    1. Carregar os embeddings dos arquivos no Cloud Storage no índice da Annoy.
    2. Criar o índice na memória.
    3. Salvar o índice em disco.
    4. Fazer upload do índice salvo no Cloud Storage.
  3. Exibir o app de pesquisa usando o App Engine

    1. Fazer o download do índice da Annoy no Cloud Storage.
    2. Receber a consulta do usuário.
    3. Extrair o embedding de consulta usando o módulo do Universal Sentence Encoder.
    4. Com o uso do índice da Annoy, encontrar os embeddings que são semelhantes ao embedding de consulta.
    5. Conseguir os códigos dos itens de embeddings semelhantes.
    6. Recuperar os títulos da Wikipédia usando os identificadores do Datastore.
    7. retorne os resultados.

Pesquisar sistemas na prática

Na prática, os sistemas de pesquisa e recuperação geralmente combinam técnicas de pesquisa baseadas em semântica com técnicas baseadas em token (índice invertido (em inglês)). Os resultados das duas técnicas são combinados e classificados antes de serem exibidos ao usuário. Talvez você já conheça o Elasticsearch (disponível no Marketplace do Google Cloud) para esta tarefa, que é um framework amplamente utilizado para pesquisa de texto completo com base na biblioteca do Apache Lucene para indexação invertida.

Outra otimização muitas vezes implementada em sistemas reais (que não é abordada nesta solução) é armazenar em cache as consultas e os identificadores de título relevantes usando algo como o Memorystore. Se a consulta foi vista antes, os identificadores de título podem ser recuperados diretamente do Memorystore. Isso ignora as duas operações pesadas de invocação do Universal Sentence Encoder para gerar o embedding de consulta e de pesquisa do índice por correspondência aproximada para itens semelhantes. O armazenamento em cache das consultas pode melhorar a latência média do seu sistema, dependendo do nível de redundância da solicitação de consulta. A Figura 2 mostra o fluxo de trabalho com um cache de consulta.

Arquitetura da solução usando um cache

Figura 2. Arquitetura de solução de alto nível para pesquisa semântica de texto com cache de consulta

A figura 2 ilustra o fluxo a seguir:

  1. Receber a consulta de pesquisa.
  2. Pesquisar a consulta no cache.
  3. Se a consulta não for encontrada:
    1. Extrair o embedding da consulta.
    2. Encontrar itens semelhantes no índice.
    3. Atualizar o cache.
  4. Conseguir acesso por IDs do Datastore.
  5. Retornar resultados.

Como ativar serviços e permissões de acesso

A solução completa descrita na Figura 1 exige que as seguintes APIs de serviço estejam ativadas no Console do Cloud:

Além disso, as permissões a seguir precisam ser concedidas às contas de serviço. As contas de serviço padrão têm permissão de acesso suficiente aos recursos necessários, se pertencerem ao mesmo projeto do Google Cloud. No entanto, se as permissões da conta de serviço tiverem sido alteradas, talvez seja necessário fazer novas alterações. Veja abaixo as permissões necessárias:

  • Dataflow
    • Permissão de leitura no conjunto de dados do BigQuery.
    • Permissão de leitura e gravação no bucket do Cloud Storage em que os TFRecords são armazenados.
    • Permissão de gravação no Datastore
  • AI Platform
    • Permissão de leitura e gravação no bucket do Cloud Storage em que o índice está armazenado.
  • App Engine
    • Permissão de leitura no bucket do Cloud Storage em que o índice está armazenado.
    • Permissão de leitura no Datastore

Com os snippets de código nas seções a seguir, ilustramos os conceitos discutidos neste artigo. Para informações sobre como executar o exemplo completo, consulte o arquivo README.md no repositório GitHub (em inglês) associado.

Como extrair os embeddings com o Dataflow

O pipeline para extrair o embedding dos títulos da Wikipédia é implementado em pipeline.py usando o Apache Beam. O pipeline geral é mostrado no trecho de código a seguir:

def run(pipeline_options, known_args):

 pipeline = beam.Pipeline(options=pipeline_options)
 gcp_project = pipeline_options.get_all_options()['project']

 with impl.Context(known_args.transform_temp_dir):
   articles = (
       pipeline
       | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(
     project=gcp_project, query=get_source_query(known_args.limit),
     use_standard_sql=True)))

   articles_dataset = (articles, get_metadata())
   embeddings_dataset, _ = (
       articles_dataset
       | 'Extract embeddings' >> impl.AnalyzeAndTransformDataset(
preprocess_fn))

   embeddings, transformed_metadata = embeddings_dataset

   embeddings | 'Write embeddings to TFRecords' >> beam.io.tfrecordio.WriteToTFRecord(
     file_path_prefix='{0}'.format(known_args.output_dir),
     file_name_suffix='.tfrecords',
     coder=tft_coders.example_proto_coder.ExampleProtoCoder(
transformed_metadata.schema))

   (articles
       | "Convert to entity" >> beam.Map(
lambda input_features: create_entity(input_features, known_args.kind))
       | "Write to Datastore" >> WriteToDatastore(project=gcp_project))

...

 job = pipeline.run()

 if pipeline_options.get_all_options()['runner'] == 'DirectRunner':
   job.wait_until_finish()

Ler pelo BigQuery

A primeira etapa no pipeline é ler os títulos do conjunto de dados do BigQuery da Wikipédia usando o método beam.io.Read e um objeto beam.io.BigQuerySource. O método get_source_query em pipeline.py prepara o script SQL usado para recuperar os dados. O número de títulos da Wikipédia recuperados do BigQuery pode ser configurado por meio do parâmetro limit da função get_source_query.

def get_source_query(limit=1000000):
 query = """
   SELECT
     GENERATE_UUID() as id,
     text
   FROM
   (
       SELECT
         DISTINCT LOWER(title) text
       FROM
         `bigquery-samples.wikipedia_benchmark.Wiki100B`
       WHERE
         ARRAY_LENGTH(split(title,' ')) >= 5
       AND
         language = 'en'
       AND
         LENGTH(title) < 500
    )
   LIMIT {0}
 """.format(limit)
 return query

Um identificador é adicionado ao título (aqui, id) usando a função GENERATE_UUID integrada do BigQuery. Esse valor é usado para procurar um título da Wikipédia pelo ID do Datastore e mapear um título da Wikipédia para o embedding dele.

Esta etapa no pipeline do Beam retorna um objeto PCollection, em que cada item da coleção inclui dois elementos: id (string) e title (string).

Extrair embeddings

A segunda etapa no pipeline é usar o módulo Universal Sentence Encoder de tf.Hub para extrair um vetor de incorporação para cada título da Wikipédia que foi lido no BigQuery. Para executar o módulo, o exemplo usa a API TensorFlow Transform (tf.Transform).

O TensorFlow Transform é uma biblioteca de pré-processamento de dados com o Apache Beam. O exemplo usa o método AnalyzeAndTransformDataset de tf.Transform como contexto para chamar o módulo tf.Hub para extrair a incorporação de texto.

O método AnalyzeAndTransformDataset executa a função preprocess_fn, que inclui a lógica de transformação, conforme mostrado no seguinte snippet:

def preprocess_fn(input_features):
 import tensorflow_transform as tft
 embedding = tft.apply_function(embed_text, input_features['text'])
 output_features = {
   'id': input_features['id'],
   'embedding': embedding
 }
 return output_features

def embed_text(text):
 import tensorflow_hub as hub
 global encoder
 if encoder is None:
   encoder = hub.Module(
'https://tfhub.dev/google/universal-sentence-encoder/2')
 embedding = encoder(text)
 return embedding

Essa etapa do pipeline produz outro objeto PCollection, em que cada item da coleção inclui o valor id (uma string) do título da Wikipédia e o valor de embedding (uma matriz numérica) extraído do Universal Sentence Encoder, que tem 512 dimensões.

Gravar embeddings nos TFRecords

Depois de extrair os embeddings dos títulos da Wikipédia, a solução os armazena com os IDs de título, como o TFRecords no Cloud Storage, usando o método beam.io.tfrecordio.WriteToTFRecord.

TFRecord é um formato simples para armazenar uma sequência de registros binários. Cada registro em um arquivo TFRecord é um tf.Example Buffer de protocolo, que constitui um tipo de mensagem flexível que representa um mapeamento de chave-valor. Esse tipo é eficiente para a serialização de dados estruturados.
Para especificar quantos arquivos de incorporação são criados, defina o parâmetro num_shards no método WriteToTFRecord.

Gravar no Datastore

A próxima etapa é gravar no Datastore. Essa etapa é realizada em paralelo com a etapa de extração do embedding. O objetivo é armazenar os títulos da Wikipédia no Datastore para que possam ser recuperados pelos IDs correspondentes. Os IDs dos títulos da Wikipédia também são salvos com o embedding nos arquivos TFRecord, para que possam ser usados como identificadores dos itens (vetores incorporados) adicionados ao índice da Annoy.

Para armazenar os itens produzidos pela etapa de leitura do BigQuery para o Datastore, primeiro a solução precisa converter cada item em uma entidade do Datastore, usando o código no snippet a seguir em pipeline.py:

def create_entity(input_features, kind):
 entity = entity_pb2.Entity()
 datastore_helper.add_key_path(
   entity.key, kind, input_features['id'])
 datastore_helper.add_properties(
   entity, {
     "text": unicode(input_features['text'])
   })
 return entity

Após a execução desse código, o método WriteToDatastore armazena os itens no Datastore. A figura 3 mostra algumas das entidades gravadas no Datastore após a execução do pipeline com o parâmetro kind do Datastore definido como wikipedia.

imagem

Figura 3. Entidades do Datastore após a execução do pipeline

Executar o pipeline no Dataflow

Para executar o pipeline do Apache Beam, execute o script run.py, transmita os argumentos necessários e defina o argumento --runner como DataflowRunner. Para fazer isso, defina parâmetros de configuração no arquivo de script run.sh e execute o script run.py.

O comando a seguir mostra como executar o pipeline. O script inclui diversas variáveis (por exemplo, $OUTPUT_PREFIX) que são definidas quando você executa o script run.sh.

python run.py \
 --output_dir=$OUTPUT_PREFIX \
 --transform_temp_dir=$TRANSFORM_TEMP_DIR \
 --transform_export_dir=$TRANSFORM_EXPORT_DIR \
 --project=$PROJECT \
 --runner=$RUNNER \
 --region=$REGION \
 --kind=$KIND \
 --limit=$LIMIT \
 --staging_location=$STAGING_LOCATION \
 --temp_location=$TEMP_LOCATION \
 --setup_file=$(pwd)/setup.py \
 --job_name=$JOB_NAME \
 --worker_machine_type=$MACHINE_TYPE \
 --enable_debug \
 --debug_output_prefix=$DEBUG_OUTPUT_PREFIX

Veja o fluxo do pipeline do Dataflow no Console do Cloud. Ele se parece com o que você vê na Figura 4.

Pipeline do Cloud Dataflow conforme exibido no Console do Cloud

Figura 4. Gráfico de execução do Dataflow do pipeline, conforme exibido no Console do Cloud

Como criar o índice com o AI Platform

Na solução de exemplo, depois que os vetores de embedding são extraídos dos títulos da Wikipédia, o próximo passo é criar um índice de correspondência por similaridade aproximada para esses vetores usando a biblioteca Annoy. Na solução de exemplo, a pasta index_builder contém o código que pode ser usado para essa tarefa.

Primeiro, implemente uma tarefa que cria e salva o índice. Em segundo lugar, envie a tarefa para que seja executada no AI Platform. Usar essa abordagem permite criar o índice sem criar uma infraestrutura de computador dedicada.

Como implementar a tarefa do criador de índice

O arquivo task.py é o ponto de entrada do builder do índice, que executa as seguintes etapas:

  • Criar o índice da Annoy.
  • (Opcional) Compactar o índice.
  • Fazer upload dos artefatos produzidos no Cloud Storage.

A lógica para criar o índice Annoy é mostrada no snippet de código a seguir do módulo index.py.

def build_index(embedding_files_pattern, index_filename,
                num_trees=100):

 annoy_index = AnnoyIndex(VECTOR_LENGTH, metric=METRIC)
 mapping = {}

 embed_files = tf.gfile.Glob(embedding_files_pattern)
 logging.info('{} embedding files are found.'.format(len(embed_files)))

 item_counter = 0
 for f, embed_file in enumerate(embed_files):
   logging.info('Loading embeddings in file {} of {}...'.format(f, len(embed_files)))
   record_iterator = tf.python_io.tf_record_iterator(path=embed_file)

   for string_record in record_iterator:
     example = tf.train.Example()
     example.ParseFromString(string_record)
     string_identifier = example.features.feature['id'].bytes_list.value[0]
     mapping[item_counter] = string_identifier
     embedding = np.array(example.features.feature['embedding'].float_list.value)
     annoy_index.add_item(item_counter, embedding)
     item_counter += 1

   logging.info('Loaded {} items to the index'.format(item_counter))

 logging.info('Start building the index with {} trees...'.format(num_trees))
 annoy_index.build(n_trees=num_trees)
 logging.info('Index is successfully built.')
 logging.info('Saving index to disk...')
 annoy_index.save(index_filename)
 logging.info('Index is saved to disk.')
 logging.info('Saving mapping to disk...')
 with open(index_filename + '.mapping', 'wb') as handle:
   pickle.dump(mapping, handle, protocol=pickle.HIGHEST_PROTOCOL)
 logging.info('Mapping is saved to disk.')

Estas são as etapas:

  1. Conseguir todos os nomes de arquivos de embedding que correspondem a um determinado padrão.
  2. Para cada arquivo de embedding:
    1. Faça a iteração nas instâncias tf.Example no arquivo TFRecord.
    2. Leia o string_identifier (o ID) e adicione-o ao dicionário de mapping como um valor, em que a chave é o valor item_counter atual.
    3. Leia o vetor embedding e adicione-o ao annoy_index, em que o valor item_id é definido como o valor item_counter atual.
  3. Invoque o método annoy_index.build com o valor num_trees especificado.
  4. Salve o índice invocando o método annoy_index.save.
  5. Serialize o dicionário mapping usando o método pickle.dump.

A lógica por trás do dicionário mapping é que o identificador dos títulos da Wikipédia armazenados no Datastore é uma string. Ele é gerado usando o método GENERATE_UUID quando os dados são lidos do BigQuery. No entanto, o identificador de um item (o vetor de embedding) no índice da Annoy só pode ser um número inteiro. Portanto, o código cria um dicionário para mapear entre um índice inteiro alternativo e o identificador de string do item da Wikipédia.

O valor METRIC passado no construtor AnnoyIndex é angular, que é uma variação da semelhança de cossenos. O valor VECTOR_LENGTH é definido como 512, que é o tamanho do embedding de texto produzido pelo módulo do Universal Sentence Encoder.

O tamanho do índice salvo pode ser de vários gigabytes, dependendo do número de vetores de incorporação e do valor do parâmetro num_trees. Portanto, para fazer upload do índice para o Cloud Storage, a solução precisa usar APIs compatíveis com agrupamento. A solução de exemplo usa o método googleapiclient.http.MediaFileUpload em vez de google.cloud.storage, conforme mostrado no snippet de código a seguir em task.py:

media = MediaFileUpload(
 local_file_name, mimetype='application/octet-stream', chunksize=CHUNKSIZE,
 resumable=True)
request = gcs_services.objects().insert(
 bucket=bucket_name, name=gcs_location, media_body=media)
response = None
while response is None:
 progress, response = request.next_chunk()

Enviar a tarefa do criador de índice para o AI Platform

Na solução de exemplo, a execução da tarefa do criador de índice como um job do AI Platform envolve os arquivos a seguir:

  • submit.sh. Este arquivo precisa ser atualizado para configurar variáveis do projeto, do nome do bucket e da região para a saída do índice.
  • config.yaml. Esse arquivo usa o parâmetro scale_tier para especificar o tamanho da máquina usado para executar o job.
  • setup.py. Este arquivo especifica os pacotes necessários para o job. A solução de exemplo precisa de Annoy e google-api-python-client.

Após a atualização desses arquivos, é possível enviar uma tarefa do criador como um job do AI Platform por meio da execução do script submit.sh. O script inclui o comando a seguir:

gcloud ml-engine jobs submit training ${JOB_NAME} \
    --job-dir=${JOB_DIR} \
    --runtime-version=1.12 \
    --region=${REGION} \
    --scale-tier=${TIER} \
    --module-name=builder.task \
    --package-path=${PACKAGE_PATH}  \
    --config=config.yaml \
    -- \
    --embedding-files=${EMBED_FILES} \
    --index-file=${INDEX_FILE} \
    --num-trees=${NUM_TREES}

Dependendo do tamanho do índice, o job pode levar várias horas. O tempo depende do número de vetores e da dimensionalidade deles, além do número de árvores usadas para criar o índice.

Após a conclusão do job do AI Platform para a criação do índice, os artefatos a seguir estarão disponíveis no bucket especificado do Cloud Storage:

  • gs://your_bucket/wikipedia/index/embeds.index
  • gs://your_bucket/wikipedia/index/embeds.index.mapping

Como implementar o serviço de pesquisa semântica

Nesta seção, descrevemos a implementação dos utilitários do serviço de pesquisa semântica que usam o índice da Annoy criado anteriormente para recuperar títulos relevantes da Wikipédia do Datastore. O serviço de pesquisa semântica usa os utilitários a seguir:

  • Utilitário de embedding de consulta
  • Utilitário de correspondência de embedding
  • Utilitário de pesquisa do Datastore
  • Wrapper de serviço de pesquisa

Utilitário de embedding de consulta

Quando o usuário insere uma consulta de pesquisa, a solução precisa extrair o embedding da consulta para associá-lo a outros semelhantes no índice. O snippet de código a seguir em embedding.py executa a tarefa:

class EmbedUtil:

 def __init__(self):
   logging.info("Initialising embedding utility...")
   embed_module = hub.Module(
"https://tfhub.dev/google/universal-sentence-encoder/2")
   placeholder = tf.placeholder(dtype=tf.string)
   embed = embed_module(placeholder)
   session = tf.Session()
   session.run([tf.global_variables_initializer(), tf.tables_initializer()])
   logging.info('tf.Hub module is loaded.')

   def _embeddings_fn(sentences):
     computed_embeddings = session.run(
       embed, feed_dict={placeholder: sentences})
     return computed_embeddings

   self.embedding_fn = _embeddings_fn
   logging.info("Embedding utility initialised.")

 def extract_embeddings(self, query):
   return self.embedding_fn([query])[0]

O código realiza as ações a seguir:

  1. Carrega o Universal Sentence Encoder pelo tf.Hub.
  2. Fornece o método extract_embeddings, que aceita o texto da consulta do usuário.
  3. Retorna a codificação da frase (embeddings) para a consulta.

O código garante que o método EmbedUtil carregue o módulo tf.Hub apenas uma vez no construtor da classe, e não sempre que o método extract_embeddings for invocado. Isso ocorre porque o carregamento do módulo do Universal Sentence Encoder pode levar vários segundos.

Utilitário de correspondência de embedding

A classe MatchingUtil, implementada em matching.py, é responsável por carregar o índice da Annoy do arquivo de disco local, bem como por carregar o dicionário de mapeamento. O snippet de código a seguir mostra a implementação da classe MatchingUtil.

class MatchingUtil:

 def __init__(self, index_file):
   logging.info("Initialising matching utility...")
   self.index = AnnoyIndex(VECTOR_LENGTH)
   self.index.load(index_file, prefault=True)
   logging.info("Annoy index {} is loaded".format(index_file))
   with open(index_file + '.mapping', 'rb') as handle:
     self.mapping = pickle.load(handle)
   logging.info("Mapping file {} is loaded".format(index_file + '.mapping'))
   logging.info("Matching utility initialised.")

 def find_similar_items(self, vector, num_matches):
   item_ids = self.index.get_nns_by_vector(
     vector, num_matches, search_k=-1, include_distances=False)
   identifiers = [self.mapping[item_id]
                 for item_id in item_ids]
   return identifiers

O índice é carregado no construtor da classe. O código define o parâmetro prefault no método index.load como True para que todo o arquivo de índice seja carregado na memória.

A classe também expõe o método find_similar_items, que faz o seguinte:

  1. Recebe um vetor (o vetor de embedding de uma consulta do usuário).
  2. Encontra os item_ids (IDs de números inteiros) do embedding mais semelhante no Annoy index para o vetor especificado.
  3. Consegue o identifiers (IDs de string GUID) do dicionário de mapping.
  4. Retorna o objeto identifiers a ser usado para buscar os títulos da Wikipédia no Datastore.

Utilitário de pesquisa do Datastore

O snippet a seguir mostra a classe DatastoreUtil em lookup.py, que implementa a lógica de busca dos títulos da Wikipédia no Datastore. O construtor usa um valor de kind do Datastore que descreve a quais entidades os títulos pertencem.

class DatastoreUtil:

 def __init__(self, kind):
   logging.info("Initialising datastore lookup utility...")
   self.kind = kind
   self.client = datastore.Client()
   logging.info("Datastore lookup utility initialised.")

 def get_items(self, keys):
   keys = [self.client.key(self.kind, key)
           for key in keys]
   items = self.client.get_multi(keys)
   return items

O método get_items aceita um parâmetro keys, que é uma lista de identificadores, e retorna o objeto items do Datastore associado a essas chaves.

Wrapper de serviço de pesquisa

O snippet a seguir mostra a classe SearchUtil em search.py, que atua como um wrapper para os módulos utilitários descritos anteriormente.

class SearchUtil:

 def __init__(self):
   logging.info("Initialising search utility...")
   dir_path = os.path.dirname(os.path.realpath(__file__))
   service_account_json = os.path.join(dir_path, SERVICE_ACCOUNT_JSON)
   index_file = os.path.join(dir_path, INDEX_FILE)
   download_artifacts(index_file, GCS_BUCKET, GCS_INDEX_LOCATION)
   self.match_util = matching.MatchingUtil(index_file)
   self.embed_util = embedding.EmbedUtil()
   self.datastore_util = lookup.DatastoreUtil(KIND, service_account_json)
   logging.info("Search utility is up and running.")

 def search(self, query, num_matches=10):
   query_embedding = self.embed_util.extract_embeddings(query)
   item_ids = self.match_util.find_similar_items(query_embedding, num_matches)
   items = self.datastore_util.get_items(item_ids)
   return items

No construtor de SearchUtil, o arquivo de índice Annoy e o dicionário de mapeamento serializado são salvos do Cloud Storage para o disco local usando o método download_artifacts. Em seguida, os objetos match_util, embed_util e datastore_util são inicializados.

O método search aceita um parâmetro query de pesquisa do usuário e o parâmetro num_matches, que especifica o número de correspondências a serem recuperadas. O método search chama os métodos a seguir:

  • O método embed_util.extract_embeddings recebe o vetor de embedding da consulta usando o módulo do Universal Sentence Encoder.
  • O método match_util.find_similar_items encontra os IDs de item de correspondências semelhantes à incorporação de consulta no índice Annoy.
  • O método datastore_util.get_items busca os itens do Datastore fornecidos como item_ids, que incluem os títulos da Wikipédia.

Uma etapa de pós-recuperação comum é classificar os itens produzidos pelo índice em relação à medida de similaridade antes de retorná-los.

Como exibir a pesquisa com o App Engine

Nesta seção, descrevemos como exibir o serviço de pesquisa semântica como um app da Web e implantá-lo no App Engine.

Implementar o app da Web Flask

O snippet de código a seguir em main.py implementa um app da Web Flask para exibir a pesquisa semântica dos títulos da Wikipédia.

...
search_util = utils.search.SearchUtil()
app = Flask(__name__)

@app.route('/search', methods=['GET'])
def search():
   try:
       query = request.args.get('query')
       show = request.args.get('show')
       is_valid, error = validate_request(query, show)

       if is_valid:
           results = search_util.search(query, show)
       else:
           results = error

   except Exception as error:
       results = "Unexpected error: {}".format(error)

   response = jsonify(results)
   return response

if __name__ == '__main__':
 app.run(host='127.0.0.1', port=8080)

O objeto search_util é inicializado apenas uma vez no nível do módulo. O endpoint RESTful /search redireciona a solicitação HTTP GET para o método search. O método recebe a query de pesquisa do usuário (string) e quantos resultados show (inteiro), chama o método search_util.search e retorna as correspondências recuperadas.

Implantar o app da Web no App Engine

O app da Web Flask é implantado no ambiente flexível do App Engine, com o gunicorn como uma interface de gateway do servidor da Web (WSGI, na sigla em inglês) HTTP. A implantação no App Engine exige configurações nos arquivos a seguir:

  • app.yaml. Este arquivo estabelece as definições de configuração para o ambiente de execução do Python, bem como configurações gerais de apps, redes e recursos. Neste arquivo, é preciso fazer as alterações a seguir:

    • Configure app_start_timeout_sec na seção readiness_check para permitir tempo suficiente para fazer o download do índice e carregar os objetos de utilitário.
    • Defina memory na seção resources como um valor maior que o tamanho do índice para que o índice possa ser carregado completamente na memória.
    • Configure gunicorn --timeout para permitir tempo suficiente para fazer o download e carregar o índice e os objetos de utilitário.
    • Defina gunicorn --threading como duas a quatro vezes o número de núcleos de CPU solicitados na seção resources do arquivo app.yaml para aumentar a simultaneidade.
  • requirement.txt. O ambiente de execução procura um arquivo requirements.txt no diretório de origem do app e usa o pip para instalar as dependências antes de iniciar o aplicativo.

Execute o script deploy.sh para implantar o app no App Engine, que inclui o comando a seguir:

gcloud --verbosity=info -q app deploy app.yaml --project=${PROJECT}

Consultar o app da Web de pesquisa

Depois que o app da Web é implantado no App Engine, uma pesquisa pode ser invocada pelo URL a seguir:

https://service_name-dot-project_name.appspot.com/search?query=query

O valor service_name é o mesmo nome fornecido no arquivo app.yaml. Se a consulta passada em query contiver espaços, eles precisarão ser convertidos em %20. Adicionar show=num_results à string de consulta especifica quantas correspondências recuperar. O padrão é 10.

A seguir, veja exemplos de consulta de pesquisa e os títulos da Wikipédia correspondentes com base no conjunto de dados de amostra.

Consulta Resultados da amostra
Animais selvagens tropicais "na selva africana, é cada leão, gnu e crocodilo por si. BBC vida selvagem"
Preocupações com tecnologia global "o risco mundial da inteligência artificial"
Bebidas geladas de verão "ótimas ideias para um mojito sem álcool"
Esportes de inverno "esqui cross-country no fis nordic world ski championships 2007"

Testes de carga e volumétricos

Após a criação da solução de exemplo, uma amostra de execução foi realizada para conseguir informações de desempenho. As tabelas a seguir mostram as configurações usadas para executar o exemplo completo usando o conjunto de dados bigquery-samples.wikipedia_benchmark.Wiki100B.

Como extrair embeddings

A tabela a seguir mostra as configurações do job do Dataflow usadas para extrair os embeddings e o tempo de execução resultante.

Configuração
  • Limite de registro: 5 milhões
  • Tamanho do vetor de embedding: 512
  • vCPUs: 64 (32 workers)
  • Tipo de máquina do worker: n1-highmem-2
Resultados
  • Tempo de trabalho: 32 minutos

Como criar o índice

A tabela a seguir mostra informações de configuração e resultados para a tarefa de criar o índice usando um job do AI Platform.

Configuração
Resultados
  • Tempo do job: 2 horas e 56 minutos
  • Tamanho do arquivo de índice: 19,28 GB
  • Tamanho do arquivo de mapeamento: 263,21 MB

Como exibir o app de pesquisa

A tabela a seguir mostra informações de configuração e resultados para exibir o app de pesquisa usando o App Engine. O teste de carga foi realizado usando a ferramenta ab - comparação de servidores HTTP Apache (em inglês) por 180 segundos.

Configuração
  • vCPUs: 6
  • Memória: 24 GB
  • Disco: 50 GB
  • Escalonamento: 10 instâncias manuais
Resultados
  • Tempo de implantação (em funcionamento): ~19 minutos
  • Criação e upload da imagem do container: ~6 minutos
  • Implantação do aplicativo: ~13 minutos
  • Nível de concorrência: 1500
  • Solicitações por segundo: ~2500
  • Latência (95º percentil): ~903 milissegundos
  • Latência (50º percentil): ~514 milissegundos

Outras melhorias

As melhorias a seguir podem ser feitas no sistema atual:

  • Uso de GPUs para exibição. O modo Universal Sentence Encoder pode aproveitar a execução em um acelerador. A biblioteca Annoy não é compatível com GPUs, mas uma biblioteca como a Faiss, que aceita GPUs, pode melhorar o tempo de pesquisa do índice de correspondência por similaridade aproximada. No entanto, o App Engine não aceita o uso de GPUs. Por isso, para usar GPUs, é preciso usar o Compute Engine ou o Google Kubernetes Engine (GKE) em vez do App Engine.

  • Leitura do índice pelo disco. Como técnica de otimização de custos, em vez de exibir o índice usando um nó de memória grande (no exemplo, 26 GB de RAM), é possível usar nós de memória menores (por exemplo, 4 GB de RAM) e ler o índice pelo disco. Se você ler o índice pelo disco, precisará especificar um SSD ou o desempenho provavelmente não será adequado. Manter o índice no disco permite aumentar o número de nós de exibição, o que, por sua vez, aumenta a capacidade do sistema. Isso também reduz o custo do sistema. No entanto, se você quiser manter o índice em disco, use o Compute Engine ou o GKE, porque o App Engine não é compatível com SSD para discos permanentes.

  • Atualização do índice no sistema ativo. À medida que novos dados são recebidos (no exemplo, novos artigos da Wikipédia), o índice precisa ser atualizado. Isso geralmente é executado como um processo em lote que é executado de maneira diária ou semanal. Após a atualização, o app de pesquisa precisa ser atualizado para usar o índice novo, sem inatividade.

A seguir