Creazione di un sistema di corrispondenza di incorporamenti in tempo reale

Questo articolo fornisce una panoramica della corrispondenza della somiglianza approssimativa, una tecnica che consente di utilizzare il machine learning per trovare elementi simili a un determinato elemento. L'articolo descrive anche una soluzione di esempio end-to-end per la ricerca semantica del testo in tempo reale e spiega i vari aspetti di come eseguire la soluzione di esempio. La soluzione di esempio è in un repository GitHub corrispondente in tempo reale-incorporato-corrispondente.

L'articolo presuppone che tu conosca i concetti di machine learning e abbia familiarità con Google Cloud e strumenti come Apache Beam.

Introduzione

La ricerca di elementi simili a una determinata query è l'aspetto principale dei sistemi di ricerca e recupero, nonché dei motori per suggerimenti. Ad esempio, la corrispondenza di affinità aiuta gli utenti a trovare:

  • Immagini simili a quelle dell'animale domestico.
  • Articoli di notizie pertinenti alla loro query di ricerca.
  • Film o brani simili a quelli che hanno guardato o ascoltato.
  • Suggerimenti su prodotti e servizi.

Per progettare un sistema di corrispondenza di somiglianza, devi prima rappresentare gli elementi come vettori numerici. A loro volta, questi vettori rappresentano le incorporazioni semantiche degli elementi rilevati tramite il machine learning (ML). Per maggiori dettagli, consulta la sezione Panoramica: estrazione e pubblicazione degli incorporamenti delle funzionalità per il machine learning.

In secondo luogo, devi organizzare e archiviare gli incorporamenti per eseguire una ricerca più vicina (in base a una metrica di similitudine) in modo da trovare elementi simili al vettoriali incorporato della query dell'utente. Tuttavia, per cercare, recuperare e pubblicare consigli in tempo reale, la corrispondenza delle corrispondenze deve essere veloce. Pertanto, è più pratico applicare un algoritmo approssimato più vicino per creare un indice degli incorporamenti degli elementi per velocizzare il processo di ricerca di elementi simili.

La soluzione di esempio associata a questo articolo riguarda i seguenti argomenti:

  • Estrazione degli incorporamenti di testo dei titoli di Wikipedia.
  • Tramite il modulo Codificatore universale di frase di tf.Hub.
  • Creazione di un indice di corrispondenza simile con la Libreria anno di Spotify.
  • Pubblicazione dell'indice per la ricerca semantica in tempo reale in un'app web.

Il codice per la soluzione di esempio si trova nel repository GitHub in tempo reale-embeddings-match.

Corrispondenza approssimativa

Per l'identificazione e il recupero, di seguito è riportata una procedura tipica:

  1. Converti gli elementi e la query in vettori in uno spazio delle funzionalità appropriato. Queste funzionalità sono definite incorporamenti.
  2. Definisci una misura di prossimità per una coppia di vettori di incorporamento. Questa misura potrebbe essere la similitudine della coina o la distanza ucraina.
  3. Trova i vicini più vicini utilizzando una ricerca esplicita sull'intero insieme di elementi.

Se hai solo centinaia o alcune migliaia di elementi, la ricerca sull'intero elemento impostato per calcolare la somiglianza tra i vettori di query e i vettori di ogni elemento richiede un periodo di tempo accettabile. Puoi inoltre ottenere prestazioni accettabili se abbini la corrispondenza di somiglianza come lavoro batch e non hai bisogno dei risultati online. Tuttavia, se pubblichi un sistema di ricerca e recupero in tempo reale o un sistema di consigli e hai decine di milioni di elementi, trovare i vicini più vicini deve essere approssimativo. In tal caso, devi ottimizzare il processo per una risposta a bassa latenza.

Una soluzione pratica consiste nell'eseguire la corrispondenza di approssimazione della somiglianza. La corrispondenza della affinità approssimativa comporta l'organizzazione dei vettori degli elementi in un indice, che è una struttura di dati che consente il recupero rapido di elementi simili. Un potenziale problema è che gli elementi recuperati potrebbero non essere gli elementi più simili alla query specificata. Tuttavia, puoi di solito controllare i compromessi tra la precisione dell'indice e la sua latenza (e dimensioni).

Esistono due approcci principali per la corrispondenza della affinità simile: approcci basati sull'albero e approcci basati sull'hashing.

Approcci basati su alberi

L'idea che si cela dietro approcci basati su alberi (o strutture di dati ad albero metrica) è quello di suddividere in modo ricorsivo i dati in una modalità di divisione e conquista, che posiziona i vettori simili l'uno nell'altro nell'albero. Il tempo di query previsto è O(log(n)), dove n è il numero di elementi (vettori) di cui disponi. Gli indici ad albero richiedono grandi quantità di memoria, mentre le prestazioni si riducono con dati più alti. Esempi di approcci basati su alberi, chiamati anche struttura di dati ad albero delle metriche, includono:

Approcci basati su hashing

Un'alternativa all'approccio basato sugli alberi è l'approccio basato su hash. A differenza degli alberi, negli hash non c'è un partizionamento ricorrente. L'idea è quella di imparare un modello che converte un elemento in un codice, in cui elementi simili produrranno lo stesso codice o un codice simile (hashing di collisione). Questo approccio riduce notevolmente la memoria necessaria. Il tempo di query previsto è O(1), ma può essere sublineare in n, dove n è il numero di elementi (vettori) di cui disponi. Ecco alcuni esempi di approcci basati sull'hashing:

Esistono diverse librerie open source che implementano tecniche di corrispondenza della somiglianza approssimative, con diversi compromessi tra precisione, latenza delle query, efficienza della memoria, tempo per creare l'indice, funzionalità e facilità d'uso.

La soluzione di esempio descritta in questo articolo utilizza Annoy (Approfondimento approssimativo più vicino nei pressi di Oh), una libreria creata da Spotify per consigli musicali. Annoy è una libreria C++ con associazioni Python che crea alberi di proiezione casuale. Un indice viene creato con una foresta di k alberi, dove k è un parametro regolabile che si alterna tra precisione e prestazioni. Inoltre, crea grandi strutture di dati di sola lettura basate su file che sono mappate in memoria, in modo che molti processi possono condividere i dati.

Altre librerie molto utilizzate sono NMSLIB (libreria spaziale non metrica) e Faiss (Facebook AI Similarity Search). La libreria utilizzata per implementare la corrispondenza approssimativa della affinità non dovrebbe influire sull'architettura complessiva della soluzione o sul flusso di lavoro descritto in questo articolo.

La soluzione di esempio descritta in questo articolo illustra un'applicazione di corrispondenze di incorporamenti corrispondenti nella ricerca semantica del testo. L'obiettivo della soluzione è recuperare i documenti semanticamente pertinenti (ad esempio articoli di notizie, post di blog o articoli di ricerca) per una query di ricerca di input e farlo in tempo reale.

Le tecniche di ricerca basate su token recuperano i documenti in base a qualche metrica (come recency o frequenza) dell'occorrenza delle parole della query (individualmente o combinate) nei documenti. La ricerca semantica utilizza invece gli incorporamenti della query e dei documenti per la corrispondenza. Ad esempio, come mostrato più avanti nella sezione Esegui query sull'app web di ricerca, una query potrebbe essere "Animali selvatici tropicali", e i risultati possono includere un titolo simile a "nella giungla africana, "ogni leone, gabbiano e coccodrilli". Nota che nel risultato non viene mostrata nessuna delle parole della query, ma che è comunque un articolo che parla di animali selvatici tropicali.

Il set di dati BigQuery di Wikipedia

Nell'esempio, l'origine dati è il set di dati bigquery-samples:wikipedia_benchmark.Wiki100B in BigQuery, un set di dati pubblico che include 100 miliardi di voci basate su titoli di Wikipedia. Ad esempio, i dati sono limitati ai titoli univoci che hanno più di 2 visualizzazioni, che contengono almeno 5 parole e che contengono meno di 500 caratteri. Questo filtro produce circa 10,5 milioni di titoli unici.

Requisiti tecnici del sistema

Il sistema di ricerca semantico di esempio presenta i seguenti requisiti tecnici:

  • Riduci al minimo l'impegno nel trovare una rappresentazione vettoriale (ovvero gli incorporamenti) che codifica la semantica o i titoli di Wikipedia. Pertanto, l'esempio deve utilizzare un modello di incorporamento di testo preaddestrato anziché addestrare un modello di linguaggio da zero.
  • Riduci al minimo l'esigenza di un'infrastruttura di computing dedicata che estragga le incorporazioni e crei l'indice. Pertanto, l'esempio deve utilizzare servizi di computing on demand completamente gestiti che acquisiscono risorse sufficienti (memoria e CPU) per il job e rilasciali al termine del job.
  • Scala automaticamente il processo di estrazione dell'incorporamento. Pertanto, l'esempio deve utilizzare un servizio di elaborazione dati parallela.
  • Riduci al minimo la latenza per trovare incorporamenti simili nell'indice di una determinata query. Pertanto, l'indice deve essere completamente caricato in memoria.
  • Riduci al minimo la latenza di recupero dei titoli di Wikipedia per i vettori di incorporamento simili in tempo reale. Pertanto, l'esempio deve archiviare i titoli di Wikipedia in un database a lettura a bassa latenza,
  • Riduci al minimo l'impegno DevOps per il deployment del servizio di ricerca come app web. Di conseguenza, l'esempio deve utilizzare servizi completamente gestiti.
  • Gestisci l'aumento del carico di lavoro per l'applicazione web, fino a migliaia di query al secondo (QPS) con latenza media di secondi. Pertanto, l'esempio deve essere in grado di eseguire il deployment di diversi nodi dell'app web di ricerca e di eseguire il deployment di un bilanciatore del carico.

Architettura della soluzione

La Figura 1 mostra una panoramica del sistema di ricerca semantico del testo in tempo reale. Il sistema estrae gli incorporamenti dai titoli di Wikipedia, crea un indice di corrispondenza delle approssimazioni simile a Annoy e pubblica l'indice delle build per la ricerca e il recupero semantico in tempo reale.

Architettura della soluzione di esempio

Figura 1. Architettura della soluzione di alto livello per il sistema di ricerca semantico del testo

Componenti chiave dell'architettura

La tabella seguente illustra i componenti chiave illustrati nella Figura 1.

Componente Descrizione
BigQuery BigQuery è il data warehouse di analisi di Google completamente gestito, a basso costo e con capacità di petabyte. Nella soluzione di esempio, i titoli di origine di Wikipedia vengono archiviati in BigQuery.
Fascio di Apache Apache Beam è un framework di programmazione unificato open source che esegue job di flusso e di elaborazione dati in batch. La soluzione di esempio utilizza Apache Beam per implementare una pipeline per estrarre gli incorporamenti e archiviare un ID per eseguire ricerche nei titoli in Datastore.
Dataflow Dataflow è un servizio completamente gestito, serverless e affidabile per l'esecuzione di pipeline Apache Beam su larga scala su Google Cloud. Dataflow viene utilizzato per scalare l'elaborazione del testo di input e l'estrazione degli incorporamenti.
tf.Hub TensorFlow Hub è una libreria di moduli di machine learning riutilizzabili. La soluzione di esempio utilizza il Universal Sentence Encoder modulo di incorporamento di testo preaddestrato per convertire ogni titolo in un vettore di incorporamento.
Cloud Storage Cloud Storage è uno spazio di archiviazione a disponibilità elevata e durevole per oggetti binari di grandi dimensioni. Nella soluzione di esempio, gli incorporamenti estratti vengono archiviati in Cloud Storage come TFRecord. Inoltre, dopo aver creato l'indice di corrispondenza della similità, viene serializzato e archiviato in Cloud Storage.
Datastore Datastore è un database di documenti NoSQL pensato per la scalabilità automatica, le prestazioni elevate e la facilità di sviluppo delle applicazioni. La soluzione di esempio utilizza Datastore per archiviare i titoli di Wikipedia e i relativi ID in modo che possano essere recuperati in tempo reale con bassa latenza.
AI Platform AI Platform è un servizio serverless per l'addestramento di modelli ML su larga scala. La soluzione di esempio utilizza AI Platform per creare l'indice di corrispondenza della similitudine approssimativo utilizzando la libreria Annoy, senza la necessità di un'infrastruttura di calcolo dedicata.
App Engine App Engine consente di creare ed eseguire il deployment di applicazioni scalabili e affidabili su una piattaforma completamente gestita. La soluzione di esempio utilizza App Engine per pubblicare un'applicazione web Flask per la ricerca nei titoli di Wikipedia semanticamente pertinenti a una query dell'utente. App Engine consente di eseguire il deployment di molte istanze dell'app con bilanciamento del carico utilizzando solo una semplice configurazione per gestire l'aumento delle QPS.

Flusso di lavoro complessivo

Il flusso di lavoro del sistema di ricerca semantico del testo in tempo reale illustrato nella Figura 1 può essere suddiviso nei seguenti passaggi:

  1. Estrarre incorporamenti utilizzando Dataflow

    1. Leggi i titoli di Wikipedia da BigQuery.
    2. Estrai i incorporamenti dei titoli utilizzando il modulo Encoder Universal Sentence.
    3. Archivia gli incorporamenti estratti come TFRecord in Cloud Storage.
    4. Archivia i titoli e i relativi identificatori in Datastore per il recupero in tempo reale.
  2. Crea l'indice con AI Platform

    1. Carica gli incorporamenti dai file in Cloud Storage nell'indice di Annoy.
    2. Crea l'indice in memoria.
    3. Salva l'indice su disco.
    4. Carica l'indice salvato in Cloud Storage.
  3. Pubblicare l'app di ricerca con App Engine

    1. Scarica l'indice Annoy da Cloud Storage.
    2. Ottieni la query dell'utente.
    3. Estrai l'incorporamento della query utilizzando il modulo Universal Sentence Encoder.
    4. Utilizzando l'indice di Annoy, trova incorporamenti simili all'incorporamento della query.
    5. Recupera gli ID elemento degli incorporamenti simili.
    6. Recupera i titoli di Wikipedia utilizzando gli identificatori dal datastore.
    7. Restituisce i risultati.

Pratica sistemi di ricerca

In pratica, i sistemi di ricerca e recupero spesso combinano tecniche di ricerca semantiche con tecniche basate su token (indice invertito). I risultati di entrambe le tecniche vengono combinati e classificati prima di essere pubblicati per l'utente. Forse hai già dimestichezza con Elasticsearch (disponibile su Google Cloud Marketplace) per questa attività, che è un framework ampiamente utilizzato per la ricerca a testo intero basato sulla libreria Apache Lucene per l'indicizzazione invertita.

Un'altra ottimizzazione spesso implementata in sistemi reali (che non è coperta da questa soluzione) consiste nel memorizzare le query nella cache e i relativi identificatori dei titoli pertinenti mediante un sistema di memoria. Se la query è stata già eseguita in precedenza, gli identificatori del titolo possono essere recuperati direttamente da Memorystore. In questo modo vengono ignorate le due costose operazioni di chiamata al codificatore universale di frasi per generare l'incorporamento della query e la ricerca di elementi simili nell'indice di corrispondenza approssimativo. Memorizzare nella cache le query può migliorare spesso la latenza media del sistema, a seconda del livello di ridondanza della richiesta di query. La Figura 2 mostra il flusso di lavoro con una cache di query.

Architettura della soluzione utilizzando una cache

Figura 2. Architettura della soluzione di alto livello per la ricerca semantica del testo con cache delle query

La Figura 2 illustra il seguente flusso:

  1. Ricevi la query di ricerca.
  2. Cerca la query nella cache.
  3. Se la query non viene trovata:
    1. Estrai l'incorporamento dalla query.
    2. Trova elementi simili nell'indice.
    3. Aggiorna la cache.
  4. Ottieni l'accesso tramite ID da Datastore.
  5. Restituisci risultati.

Attivazione di servizi e autorizzazioni di accesso

La soluzione end-to-end descritta nella figura 1 richiede l'abilitazione delle seguenti API del servizio nella console:

Inoltre, è necessario concedere le seguenti autorizzazioni agli account di servizio. Gli account di servizio predefiniti hanno un'autorizzazione di accesso sufficiente alle risorse richieste se appartengono allo stesso progetto Google Cloud. Tuttavia, se le autorizzazioni dell'account di servizio sono state alterate, potresti dover apportare modifiche. Le autorizzazioni richieste sono:

  • Dataflow
    • Autorizzazione di lettura per il set di dati BigQuery
    • Autorizzazione di lettura/scrittura per il bucket Cloud Storage in cui sono archiviati i TFRecord
    • Autorizzazione di scrittura a Datastore
  • AI Platform
    • Autorizzazione di lettura/scrittura nel bucket Cloud Storage in cui è archiviato l'indice
  • Ambiente standard
    • Autorizzazione di lettura nel bucket Cloud Storage in cui è archiviato l'indice
    • Autorizzazione di lettura a Datastore

Gli snippet di codice nelle seguenti sezioni illustrano i concetti illustrati in questo articolo. Per informazioni su come eseguire l'esempio end-to-end, consulta il file README.md nel repository GitHub associato.

Estrazione degli incorporamenti con Dataflow

La pipeline per l'estrazione dell'incorporamento dai titoli di Wikipedia viene implementata in pipeline.py utilizzando Apache Beam. La pipeline complessiva è mostrata nel seguente snippet di codice:

def run(pipeline_options, known_args):

 pipeline = beam.Pipeline(options=pipeline_options)
 gcp_project = pipeline_options.get_all_options()['project']

 with impl.Context(known_args.transform_temp_dir):
   articles = (
       pipeline
       | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(
     project=gcp_project, query=get_source_query(known_args.limit),
     use_standard_sql=True)))

   articles_dataset = (articles, get_metadata())
   embeddings_dataset, _ = (
       articles_dataset
       | 'Extract embeddings' >> impl.AnalyzeAndTransformDataset(
preprocess_fn))

   embeddings, transformed_metadata = embeddings_dataset

   embeddings | 'Write embeddings to TFRecords' >> beam.io.tfrecordio.WriteToTFRecord(
     file_path_prefix='{0}'.format(known_args.output_dir),
     file_name_suffix='.tfrecords',
     coder=tft_coders.example_proto_coder.ExampleProtoCoder(
transformed_metadata.schema))

   (articles
       | "Convert to entity" >> beam.Map(
lambda input_features: create_entity(input_features, known_args.kind))
       | "Write to Datastore" >> WriteToDatastore(project=gcp_project))

...

 job = pipeline.run()

 if pipeline_options.get_all_options()['runner'] == 'DirectRunner':
   job.wait_until_finish()

Lettura da BigQuery

Il primo passaggio della pipeline è leggere i titoli dal set di dati BigQuery di Wikipedia utilizzando il metodo beam.io.Read e un oggetto beam.io.BigQuerySource. Il metodo get_source_query in pipeline.py prepara lo script SQL utilizzato per recuperare i dati. Il numero di titoli di Wikipedia da recuperare da BigQuery è configurabile tramite il parametro limit della funzione get_source_query.

def get_source_query(limit=1000000):
 query = """
   SELECT
     GENERATE_UUID() as id,
     text
   FROM
   (
       SELECT
         DISTINCT LOWER(title) text
       FROM
         `bigquery-samples.wikipedia_benchmark.Wiki100B`
       WHERE
         ARRAY_LENGTH(split(title,' ')) >= 5
       AND
         language = 'en'
       AND
         LENGTH(title) < 500
    )
   LIMIT {0}
 """.format(limit)
 return query

Al titolo viene aggiunto un identificatore (qui, id) utilizzando la funzione BigQuery GENERATE_UUID integrata. Questo valore viene utilizzato per cercare un titolo di Wikipedia in base all'ID in Datastore e per mappare un titolo di Wikipedia al rispettivo incorporamento.

Questo passaggio della pipeline Beam restituisce un oggetto PCollection, in cui ogni elemento della raccolta include due elementi: id (stringa) e title (stringa).

Estrai incorporamenti

Il secondo passaggio della pipeline consiste nell'utilizzare il modulo Universal Sentence Encoder di tf.Hub per estrarre un vettore di incorporamento per ogni titolo di Wikipedia letto da BigQuery. Per eseguire il modulo, l'esempio utilizza l'API TensorFlow Transform (tf.Transform).

TensorFlow Transform è una libreria per la pre-elaborazione dei dati con Apache Beam. L'esempio utilizza il metodo AnalyzeAndTransformDataset di tf.Transform come contesto per chiamare il modulo tf.Hub per estrarre l'incorporamento del testo.

Il metodo AnalyzeAndTransformDataset esegue la funzione preprocess_fn, che include la logica di trasformazione, come mostrato nel seguente snippet:

def preprocess_fn(input_features):
 import tensorflow_transform as tft
 embedding = tft.apply_function(embed_text, input_features['text'])
 output_features = {
   'id': input_features['id'],
   'embedding': embedding
 }
 return output_features

def embed_text(text):
 import tensorflow_hub as hub
 global encoder
 if encoder is None:
   encoder = hub.Module(
'https://tfhub.dev/google/universal-sentence-encoder/2')
 embedding = encoder(text)
 return embedding

Questo passaggio della pipeline produce un altro oggetto PCollection, in cui ogni elemento nella raccolta include il valore id (una stringa) del titolo di Wikipedia e il valore embedding (un array numerico) estratti dallo strumento di codifica universale della frase, 512.

Scrivere incorporamenti in TFRecord

Dopo aver estratto gli incorporamenti dei titoli di Wikipedia, la soluzione li archivia insieme agli ID titolo come TFRecord in Cloud Storage, utilizzando il metodo beam.io.tfrecordio.WriteToTFRecord.

Il formato TFRecord è un formato semplice per l'archiviazione di una sequenza di record binari. Ogni record in un file TFRecord è un tf.Example buffer di protocollo, che costituisce un tipo di messaggio flessibile che rappresenta una mappatura della coppia chiave-valore. Questo tipo è efficace per la serializzazione dei dati strutturati.
Puoi specificare quanti file di incorporamento vengono creati impostando il parametro num_shards nel metodo WriteToTFRecord.

Scrittura in Datastore

Il passaggio successivo è scrivere su Datastore. Questo passaggio viene eseguito in parallelo al passaggio per l'estrazione dell'incorporamento. Lo scopo è archiviare i titoli di Wikipedia in Datastore in modo che possano essere recuperati dai loro ID. Gli ID titolo di Wikipedia vengono salvati anche con l'incorporamento nei file TFRecord, in modo che possano essere utilizzati come identificatori per gli elementi (incorporando vettori) aggiunti all'indice Annoy.

Per archiviare gli elementi prodotti dal passaggio di lettura da BigQuery a Datastore, la soluzione deve prima convertire ogni elemento in un'entità di Datastore, utilizzando il codice nel seguente snippet in pipeline.py:

def create_entity(input_features, kind):
 entity = entity_pb2.Entity()
 datastore_helper.add_key_path(
   entity.key, kind, input_features['id'])
 datastore_helper.add_properties(
   entity, {
     "text": unicode(input_features['text'])
   })
 return entity

Una volta eseguito questo codice, il metodo WriteToDatastore archivia gli elementi in Datastore. La Figura 3 mostra alcune delle entità scritte in Datastore dopo l'esecuzione della pipeline con il parametro kindDatastore impostato su wikipedia.

immagine

Figura 3. Entità Datastore dopo l'esecuzione della pipeline

Esegui la pipeline su Dataflow

Puoi eseguire la pipeline di Apache Beam eseguendo lo script run.py, passando gli argomenti richiesti e impostando l'argomento --runner su DataflowRunner. Per farlo, puoi impostare i parametri di configurazione nel file dello script run.sh ed eseguire lo script run.py.

Il comando seguente mostra come eseguire la pipeline. Lo script include un numero di variabili (ad esempio, $OUTPUT_PREFIX) che vengono impostate quando esegui lo script run.sh.

python run.py \
 --output_dir=$OUTPUT_PREFIX \
 --transform_temp_dir=$TRANSFORM_TEMP_DIR \
 --transform_export_dir=$TRANSFORM_EXPORT_DIR \
 --project=$PROJECT \
 --runner=$RUNNER \
 --region=$REGION \
 --kind=$KIND \
 --limit=$LIMIT \
 --staging_location=$STAGING_LOCATION \
 --temp_location=$TEMP_LOCATION \
 --setup_file=$(pwd)/setup.py \
 --job_name=$JOB_NAME \
 --worker_machine_type=$MACHINE_TYPE \
 --enable_debug \
 --debug_output_prefix=$DEBUG_OUTPUT_PREFIX

Puoi vedere il flusso della pipeline Dataflow nella console; ha un aspetto simile a quello nella Figura 4.

Pipeline Cloud Dataflow visualizzata nella console

Figura 4. Grafico di esecuzione Dataflow della pipeline, come visualizzato nella console

Creazione dell'indice con AI Platform

Nella soluzione di esempio, dopo che i vettori di incorporamento vengono estratti dai titoli di Wikipedia, il passaggio successivo è quello di creare un indice di corrispondenza della somiglianza approssimativo per questi vettori utilizzando la libreria Annoy. Nella soluzione di esempio, la cartella index_builder contiene il codice che puoi utilizzare per questa attività.

Per prima cosa, implementi un'attività che crea e salva l'indice. In secondo luogo, devi inviare l'attività da eseguire su AI Platform. Questo approccio ti consente di creare l'indice senza creare un'infrastruttura informatica dedicata.

Implementare l'attività di creazione dell'indice

Il file task.py è il punto di contatto del generatore dell'indice, che esegue i seguenti passaggi:

  • Creare l'indice di Annoy.
  • (Facoltativo) Comprimi l'indice.
  • Carica gli artefatti prodotti in Cloud Storage.

La logica per la creazione dell'indice Annoy è mostrata nel seguente snippet di codice del modulo index.py.

def build_index(embedding_files_pattern, index_filename,
                num_trees=100):

 annoy_index = AnnoyIndex(VECTOR_LENGTH, metric=METRIC)
 mapping = {}

 embed_files = tf.gfile.Glob(embedding_files_pattern)
 logging.info('{} embedding files are found.'.format(len(embed_files)))

 item_counter = 0
 for f, embed_file in enumerate(embed_files):
   logging.info('Loading embeddings in file {} of {}...'.format(f, len(embed_files)))
   record_iterator = tf.python_io.tf_record_iterator(path=embed_file)

   for string_record in record_iterator:
     example = tf.train.Example()
     example.ParseFromString(string_record)
     string_identifier = example.features.feature['id'].bytes_list.value[0]
     mapping[item_counter] = string_identifier
     embedding = np.array(example.features.feature['embedding'].float_list.value)
     annoy_index.add_item(item_counter, embedding)
     item_counter += 1

   logging.info('Loaded {} items to the index'.format(item_counter))

 logging.info('Start building the index with {} trees...'.format(num_trees))
 annoy_index.build(n_trees=num_trees)
 logging.info('Index is successfully built.')
 logging.info('Saving index to disk...')
 annoy_index.save(index_filename)
 logging.info('Index is saved to disk.')
 logging.info('Saving mapping to disk...')
 with open(index_filename + '.mapping', 'wb') as handle:
   pickle.dump(mapping, handle, protocol=pickle.HIGHEST_PROTOCOL)
 logging.info('Mapping is saved to disk.')

I passaggi sono i seguenti:

  1. Ottieni tutti i nomi file di incorporamento che corrispondono a un determinato pattern.
  2. Per ogni file di incorporamento:
    1. Esegui l'iterazione attraverso le istanze tf.Example del file TFRecord.
    2. Leggi string_identifier (ID) e aggiungilo al dizionario mapping come valore, dove la chiave è il valore item_counter attuale.
    3. Leggi il vettore embedding e aggiungilo a annoy_index, dove il valore item_id è impostato sul valore corrente di item_counter.
  3. Richiama il metodo annoy_index.build con il valore num_trees specificato.
  4. Salva l'indice richiamando il metodo annoy_index.save.
  5. Serializzazione del mappingdizionario utilizzando il metodo pickle.dump.

La motivazione alla base del dizionario mapping è che l'identificatore per i titoli di Wikipedia archiviati in Datastore è una stringa (viene generata utilizzando il metodo GENERATE_UUID quando i dati vengono letti da BigQuery). Tuttavia, l'identificatore di un elemento (il vettore da incorporare) nell'indice Annoy può essere solo un numero intero. Pertanto, il codice crea un dizionario per mappare un indice intero surrogato all'identificatore della stringa per l'elemento Wikipedia.

Il valore METRIC trasmesso nel costruttore AnnoyIndex è angolare, che è una variante della somiglianza cosena. Il valore VECTOR_LENGTH è impostato su 512, che è la lunghezza del testo incorporato dall'interno del modulo Universal Encoder Encoder.

Le dimensioni dell'indice salvato potrebbero corrispondere a diversi gigabyte, a seconda del numero di vettori di incorporamento e del valore del parametro num_trees. Di conseguenza, al fine di caricare l'indice in Cloud Storage, la soluzione deve utilizzare API che supportano il blocco. La soluzione di esempio utilizza il metodo googleapiclient.http.MediaFileUpload anziché google.cloud.storage, come mostrato nel seguente snippet di codice in task.py:

media = MediaFileUpload(
 local_file_name, mimetype='application/octet-stream', chunksize=CHUNKSIZE,
 resumable=True)
request = gcs_services.objects().insert(
 bucket=bucket_name, name=gcs_location, media_body=media)
response = None
while response is None:
 progress, response = request.next_chunk()

Inviare l'attività di creazione dell'indice ad AI Platform

Nella soluzione di esempio, l'esecuzione dell'attività di creazione dell'indice come job AI Platform prevede i seguenti file:

  • submit.sh. Questo file deve essere aggiornato per impostare le variabili per il progetto, il nome del bucket e l'area geografica per l'output dell'indice.
  • config.yaml. Questo file utilizza il parametro scale_tier per specificare la dimensione della macchina utilizzata per eseguire il job.
  • setup.py. Questo file specifica i pacchetti necessari per il job. La soluzione di esempio richiede Annoy e google-api-python-client.

Una volta aggiornati questi file, un'attività del builder può essere inviata come job AI Platform eseguendo lo script submit.sh. Lo script include il seguente comando:

gcloud ml-engine jobs submit training ${JOB_NAME} \
    --job-dir=${JOB_DIR} \
    --runtime-version=1.12 \
    --region=${REGION} \
    --scale-tier=${TIER} \
    --module-name=builder.task \
    --package-path=${PACKAGE_PATH}  \
    --config=config.yaml \
    -- \
    --embedding-files=${EMBED_FILES} \
    --index-file=${INDEX_FILE} \
    --num-trees=${NUM_TREES}

A seconda delle dimensioni dell'indice, il job potrebbe richiedere diverse ore. Il tempo dipende dal numero di vettori e dalla loro dimensionalità e dal numero di alberi utilizzati per creare l'indice.

Al termine del job di AI Platform per la creazione dell'indice, nel bucket Cloud Storage specificato sono disponibili i seguenti artefatti:

  • gs://your_bucket/wikipedia/index/embeds.index
  • gs://your_bucket/wikipedia/index/embeds.index.mapping

Implementare il servizio di ricerca semantico

Questa sezione descrive l'implementazione delle utilità del servizio di ricerca semantico che utilizzano l'indice di Annoy creato in precedenza per recuperare i titoli pertinenti di Wikipedia da Datastore. Il servizio di ricerca semantico utilizza le seguenti utilità:

  • Utilità di incorporamento della query
  • Utilità di incorporamento incorporata
  • Utilità di ricerca del datastore
  • Wrapper servizio di ricerca

Utilità di incorporamento della query

Quando l'utente inserisce una query di ricerca, la soluzione deve estrarre l'incorporamento della query per farla corrispondere a quelle simili nell'indice. L'attività viene eseguita nel seguente snippet di codice: embedding.py

class EmbedUtil:

 def __init__(self):
   logging.info("Initialising embedding utility...")
   embed_module = hub.Module(
"https://tfhub.dev/google/universal-sentence-encoder/2")
   placeholder = tf.placeholder(dtype=tf.string)
   embed = embed_module(placeholder)
   session = tf.Session()
   session.run([tf.global_variables_initializer(), tf.tables_initializer()])
   logging.info('tf.Hub module is loaded.')

   def _embeddings_fn(sentences):
     computed_embeddings = session.run(
       embed, feed_dict={placeholder: sentences})
     return computed_embeddings

   self.embedding_fn = _embeddings_fn
   logging.info("Embedding utility initialised.")

 def extract_embeddings(self, query):
   return self.embedding_fn([query])[0]

Il codice svolge le seguenti operazioni:

  1. Carica il codificatore di frase universale da tf.Hub.
  2. Fornisce il metodo extract_embeddings, che accetta il testo della query dell'utente.
  3. Restituisce la codifica della frase (incorporazioni) per la query.

Il codice assicura che il metodo EmbedUtil carichi il modulo tf.Hub solo una volta nel costruttore della classe, non ogni volta che il metodo extract_embeddings viene richiamato. Ciò avviene perché il caricamento del modulo Codificatore universale di frase può richiedere diversi secondi.

Utilità di incorporamento incorporata

La classe MatchingUtil, implementata in matching.py, è responsabile del caricamento dell'indice Annoy dal file del disco locale, nonché del caricamento del dizionario di mappatura. Il seguente snippet di codice mostra l'implementazione della classe MatchingUtil.

class MatchingUtil:

 def __init__(self, index_file):
   logging.info("Initialising matching utility...")
   self.index = AnnoyIndex(VECTOR_LENGTH)
   self.index.load(index_file, prefault=True)
   logging.info("Annoy index {} is loaded".format(index_file))
   with open(index_file + '.mapping', 'rb') as handle:
     self.mapping = pickle.load(handle)
   logging.info("Mapping file {} is loaded".format(index_file + '.mapping'))
   logging.info("Matching utility initialised.")

 def find_similar_items(self, vector, num_matches):
   item_ids = self.index.get_nns_by_vector(
     vector, num_matches, search_k=-1, include_distances=False)
   identifiers = [self.mapping[item_id]
                 for item_id in item_ids]
   return identifiers

L'indice viene caricato nel costruttore della classe. Il codice imposta il parametro prefault nel metodo index.load su True, in modo che venga caricato l'intero file indice.

La classe espone inoltre il metodo find_similar_items, che esegue le seguenti operazioni:

  1. Riceve un vettore (il vettore di incorporamento di una query dell'utente).
  2. Trova l'item_ids (ID intero) dell'incorporamento più simile nell'elemento Annoy index per il Dato specifico.
  3. Visualizza identifiers(ID stringa GUID) dal dizionario mapping.
  4. Restituisce l'oggetto identifiers da utilizzare per recuperare i titoli di Wikipedia da Datastore.

Utilità di ricerca del datastore

Il seguente snippet mostra la classe DatastoreUtil in lookup.py, che implementa la logica per il recupero dei titoli di Wikipedia dal datastore. Il costruttore prende un valore Datastorekind che descrive a quali entità appartengono i titoli.

class DatastoreUtil:

 def __init__(self, kind):
   logging.info("Initialising datastore lookup utility...")
   self.kind = kind
   self.client = datastore.Client()
   logging.info("Datastore lookup utility initialised.")

 def get_items(self, keys):
   keys = [self.client.key(self.kind, key)
           for key in keys]
   items = self.client.get_multi(keys)
   return items

Il metodo get_items accetta un parametro keys, che è un elenco di identificatori, e restituisce l'oggetto Datastore items associato a queste chiavi.

Wrapper servizio di ricerca

Il seguente snippet mostra la classe SearchUtil in search.py, che funge da wrapper per i moduli di utilità descritti in precedenza.

class SearchUtil:

 def __init__(self):
   logging.info("Initialising search utility...")
   dir_path = os.path.dirname(os.path.realpath(__file__))
   service_account_json = os.path.join(dir_path, SERVICE_ACCOUNT_JSON)
   index_file = os.path.join(dir_path, INDEX_FILE)
   download_artifacts(index_file, GCS_BUCKET, GCS_INDEX_LOCATION)
   self.match_util = matching.MatchingUtil(index_file)
   self.embed_util = embedding.EmbedUtil()
   self.datastore_util = lookup.DatastoreUtil(KIND, service_account_json)
   logging.info("Search utility is up and running.")

 def search(self, query, num_matches=10):
   query_embedding = self.embed_util.extract_embeddings(query)
   item_ids = self.match_util.find_similar_items(query_embedding, num_matches)
   items = self.datastore_util.get_items(item_ids)
   return items

Nel costruttore SearchUtil, il file Indice Annoy e il dizionario di mappatura serializzato vengono scaricati da Cloud Storage sul disco locale utilizzando il metodo download_artifacts. Quindi gli oggetti match_util, embed_util e datastore_util vengono inizializzati.

Il metodo search accetta un parametro query per la ricerca utente e il parametro num_matches, che specifica il numero di corrispondenze da recuperare. Il metodo search chiama i seguenti metodi:

  • Il metodo embed_util.extract_embeddings riceve il vettore di incorporamento della query utilizzando il modulo Encoder di Universal Sentence.
  • Il metodo match_util.find_similar_items trova gli ID elemento delle corrispondenze, simili a quelli della query di incorporamento nell'indice di Annoy.
  • Il metodo datastore_util.get_items recupera gli elementi dal datastore item_ids, che includono i titoli di Wikipedia.

In genere, un passaggio successivo al recupero consiste nel classificare gli elementi prodotti dall'indice in relazione alla misura di somiglianza prima di restituire gli elementi.

Gestione della ricerca con App Engine

Questa sezione descrive come pubblicare il servizio di ricerca semantico come app web e eseguirne il deployment in App Engine.

Implementare l'app web Flask

Il seguente snippet di codice in main.py implementa un'app web Flask per pubblicare la ricerca semantica dei titoli di Wikipedia.

...
search_util = utils.search.SearchUtil()
app = Flask(__name__)

@app.route('/search', methods=['GET'])
def search():
   try:
       query = request.args.get('query')
       show = request.args.get('show')
       is_valid, error = validate_request(query, show)

       if is_valid:
           results = search_util.search(query, show)
       else:
           results = error

   except Exception as error:
       results = "Unexpected error: {}".format(error)

   response = jsonify(results)
   return response

if __name__ == '__main__':
 app.run(host='127.0.0.1', port=8080)

L'oggetto search_util viene inizializzato solo una volta a livello di modulo. L'endpoint RESTful /search reindirizza la richiesta HTTP GET al metodo search. Il metodo recupera la ricerca dell'utente query (stringa) e il numero di risultati a show (numero intero), chiama il metodo search_util.search e restituisce le corrispondenze recuperate.

Eseguire il deployment dell'applicazione web in App Engine

Viene eseguito il deployment dell'app web Flask in un ambiente flessibile App Engine, con gunicorn come WSGI (HTTP Web Server Gateway Interface). Il deployment in App Engine richiede impostazioni di configurazione nei seguenti file:

  • app.yaml. Questo file definisce le impostazioni di configurazione per il runtime Python, nonché le impostazioni generali di app, rete e risorse. In questo file, devi apportare le seguenti modifiche:

    • Imposta app_start_timeout_sec nella sezione readiness_check per concedere tempo sufficiente per il download dell'indice e il caricamento degli oggetti di utilità.
    • Imposta memory nella sezione resources su un valore maggiore di quello della dimensione dell'indice, in modo che l'indice possa essere caricato completamente nella memoria.
    • Imposta gunicorn --timeout per concedere tempo sufficiente per scaricare e caricare l'indice e caricare gli oggetti di utilità.
    • Per aumentare la contemporaneità, imposta gunicorn --threading da due a quattro volte il numero di core CPU richiesti nella sezione resources del file app.yaml.
  • requirement.txt. Il runtime cerca un file requirements.txt nella directory di origine dell'app e utilizza pip per installare le dipendenze prima di avviare l'app.

Puoi eseguire lo script deploy.sh per eseguire il deployment dell'app in App Engine, che include il seguente comando:

gcloud --verbosity=info -q app deploy app.yaml --project=${PROJECT}

Eseguire query sull'app web di ricerca

Dopo aver eseguito il deployment dell'applicazione web in App Engine, puoi richiamare una ricerca chiamando il seguente URL:

https://service_name-dot-project_name.appspot.com/search?query=query

Il valore service_name è lo stesso nome indicato nel file app.yaml. Se la query trasmessa in query contiene spazi, deve essere convertita in %20. L'aggiunta di show=num_results alla stringa di query specifica il numero di corrispondenze da recuperare. Il valore predefinito è 10.

Gli esempi seguenti mostrano esempi di query di ricerca e titoli di Wikipedia corrispondenti basati sul set di dati di esempio.

Query Risultati di esempio
Animali selvatici tropicali "Nella giungla africana, per ogni leone, gnu e coccodrilli è tutto per sé! bbc fauna selvatica;
Problemi globali relativi alla tecnologia "rischio a livello mondiale di intelligenza artificiale"
Bevande estive fresche "un'ottima idea di mojito analcolici".
Sport invernali "sci di fondo ai campionati di sci nordico di sci di fondo 2007"

Volumetria e test del carico

Dopo aver creato la soluzione di esempio, è stata eseguita un'esecuzione di esempio per ottenere informazioni sulle prestazioni. Le seguenti tabelle mostrano le impostazioni utilizzate per eseguire l'esempio end-to-end utilizzando il set di dati bigquery-samples.wikipedia_benchmark.Wiki100B.

Estrazione degli incorporamenti

La tabella seguente mostra le configurazioni del job Dataflow utilizzate per estrarre gli incorporamenti e il tempo di esecuzione risultante.

Configurazione
  • Limite record: 5 milioni
  • Dimensione di incorporamento: 512
  • vCPU: 64 (32 worker)
  • Tipo di macchina worker: n1-highmem-2
Risultati
  • Orario del job : 32 minuti

Creare l'indice

La tabella seguente mostra le informazioni sulla configurazione e sui risultati relativi all'attività di creazione dell'indice utilizzando un job AI Platform.

Configurazione
Risultati
  • Orario di prestazione: 2 ore e 56 minuti
  • Dimensioni file indice: 19,28 GB
  • Dimensioni file di mappatura: 263,21 MB

Pubblicazione dell'app di ricerca

La tabella seguente mostra le informazioni sulla configurazione e sui risultati per la pubblicazione dell'app di ricerca utilizzando App Engine. Il test di carico è stato eseguito utilizzando lo strumento di benchmarking di ab - Apache HTTP per 180 secondi.

Configurazione
  • vCPU: 6
  • Memoria: 24 GB
  • Disco: 50 GB
  • Scalabilità: 10 istanze manuali
Risultati
  • Durata del deployment (in esecuzione): ~19 minuti
  • Creazione e caricamento dell'immagine container: ~6 minuti
  • Deployment dell'app: ~13 minuti
  • Livello di contemporaneità: 1500
  • Richieste al secondo: ~2500
  • Latenza (95° percentile): ~903 millisecondi
  • Latenza (50° percentile): ~514 millisecondi

Ulteriori miglioramenti

Al sistema corrente possono essere apportati i seguenti miglioramenti:

  • Utilizzare GPU per la pubblicazione. La modalità del codificatore Universal Sentence può essere utile per l'esecuzione su un acceleratore. La libreria Annoy non supporta le GPU, ma una libreria come Faiss che supporta le GPU può migliorare il tempo di ricerca per l'indice di corrispondenza simile. Tuttavia, App Engine non supporta l'uso delle GPU, pertanto per utilizzare le GPU è necessario utilizzare Compute Engine o Google Kubernetes Engine (GKE) anziché App Engine.

  • Lettura dell'indice dal disco. Come tecnica di ottimizzazione dei costi, anziché utilizzare l'indice con un nodo a memoria elevata (nell'esempio, 26 GB di RAM), puoi utilizzare nodi di memoria più piccoli (ad esempio 4 GB di RAM) e leggere l'indice dal disco. Se leggi l'indice dal disco, devi specificare un SSD; in caso contrario il rendimento potrebbe non essere adeguato. Mantenere l'indice sul disco consente di aumentare il numero di nodi di pubblicazione, il che a sua volta aumenta la velocità effettiva del sistema. In questo modo, si riduce anche il costo del sistema. Tuttavia, se vuoi mantenere l'indice su disco, devi utilizzare Compute Engine o GKE, perché App Engine non supporta l'SSD per i dischi permanenti.

  • Aggiornamento dell'indice nel sistema pubblicato. Man mano che vengono ricevuti nuovi dati (nell'esempio, nuovi articoli di Wikipedia), l'indice deve essere aggiornato. Generalmente, questa operazione viene eseguita come processo batch in esecuzione ogni giorno o ogni settimana. Al termine dell'aggiornamento, l'app di ricerca deve essere aggiornata per utilizzare il nuovo indice, senza tempi di inattività.

Passaggi successivi