Modifiche in tempo reale di Bigtable al modello di ricerca vettoriale

Questo modello crea una pipeline di streaming per trasmettere i record delle modifiche dei dati di Bigtable e scriverli in Vertex AI Vector Search utilizzando Dataflow Runner V2.

Requisiti della pipeline

  • L'istanza di origine Bigtable deve esistere.
  • La tabella di origine Bigtable deve esistere e deve avere gli modifiche in tempo reale abilitati.
  • Deve esistere il profilo dell'applicazione Bigtable.
  • Il percorso dell'indice Vector Search deve esistere.

Parametri del modello

Parametri obbligatori

  • embeddingColumn: il nome completo della colonna in cui sono memorizzati gli incorporamenti. Nel formato cf:col.
  • embeddingByteSize: le dimensioni in byte di ogni voce dell'array di incorporamenti. Utilizza 4 per Float e 8 per Double. Il valore predefinito è 4.
  • vectorSearchIndex: l'indice Vector Search in cui verranno trasmessi in streaming le modifiche, nel formato "projects/{projectID}/locations/{region}/indexes/{indexID}" (senza spazi iniziali o finali). Ad esempio, projects/123/locations/us-east1/indexes/456.
  • bigtableChangeStreamAppProfile: l'ID profilo dell'applicazione Bigtable. Il profilo dell'applicazione deve utilizzare il routing a un singolo cluster e consentire le transazioni su riga singola.
  • bigtableReadInstanceId: l'ID dell'istanza Bigtable di origine.
  • bigtableReadTableId: l'ID della tabella Bigtable di origine.

Parametri facoltativi

  • bigtableMetadataTableTableId: ID tabella utilizzato per creare la tabella dei metadati.
  • crowdingTagColumn: il nome completo della colonna in cui è memorizzato il tag di affollamento. Nel formato cf:col.
  • allowRestrictsMappings: i nomi di colonna completi separati da virgole delle colonne da utilizzare come restrizioni allow, con il relativo alias. Nel formato cf:col->alias.
  • denyRestrictsMappings: i nomi di colonna completi separati da virgole delle colonne che devono essere utilizzate come restrizioni deny, con il relativo alias. Nel formato cf:col->alias.
  • intNumericRestrictsMappings: i nomi di colonna completi separati da virgole delle colonne da utilizzare come numeric_restricts intero, con il relativo alias. Nel formato cf:col->alias.
  • floatNumericRestrictsMappings: i nomi di colonna completi separati da virgole delle colonne da utilizzare come float (4 byte) numeric_restricts, con il relativo alias. Nel formato cf:col->alias.
  • doubleNumericRestrictsMappings: i nomi di colonna completi separati da virgole delle colonne da utilizzare come numeric_restricts double (8 byte), con il relativo alias. Nel formato cf:col->alias.
  • upsertMaxBatchSize: il numero massimo di upsert da memorizzare nel buffer prima di eseguire l'upsert del batch nell'indice Vector Search. I batch verranno inviati quando sono pronti upsertBatchSize record o quando è trascorso il tempo di attesa upsertBatchDelay per qualsiasi record. Ad esempio, 10. Il valore predefinito è 10.
  • upsertMaxBufferDuration: il ritardo massimo prima che un batch di upsert venga inviato a Vector Search.I batch vengono inviati quando sono pronti upsertBatchSize record o quando è trascorso il tempo di attesa upsertBatchDelay per qualsiasi record. I formati consentiti sono: Ns (per i secondi, ad esempio 5s), Nm (per i minuti, ad esempio 12m), Nh (per le ore, ad esempio 2h). Ad esempio, 10s. Il valore predefinito è 10 secondi.
  • deleteMaxBatchSize: il numero massimo di eliminazioni da memorizzare nel buffer prima di eliminare il batch dall'indice Vector Search. I batch vengono inviati quando sono pronti deleteBatchSize record o quando è trascorso il tempo di attesa deleteBatchDelay per qualsiasi record. Ad esempio, 10. Il valore predefinito è 10.
  • deleteMaxBufferDuration: il ritardo massimo prima che un batch di eliminazioni venga inviato a Vector Search.I batch vengono inviati quando sono pronti deleteBatchSize record o quando è trascorso il tempo di attesa deleteBatchDelay per un record. I formati consentiti sono: Ns (per i secondi, ad esempio 5s), Nm (per i minuti, ad esempio 12m), Nh (per le ore, ad esempio 2h). Ad esempio, 10s. Il valore predefinito è 10 secondi.
  • dlqDirectory: il percorso in cui archiviare i record non elaborati con il motivo per cui non sono stati elaborati. Il valore predefinito è una directory nella posizione temporanea del job Dataflow. Il valore predefinito è sufficiente nella maggior parte delle condizioni.
  • bigtableChangeStreamMetadataInstanceId: l'ID istanza dei metadati delle modifiche in tempo reale di Bigtable. Il valore predefinito è vuoto.
  • bigtableChangeStreamMetadataTableTableId: l'ID della tabella dei metadati del connettore Bigtable modifiche in tempo reale. Se non specificata, durante l'esecuzione della pipeline viene creata automaticamente una tabella di metadati del connettore di modifiche in tempo reale Bigtable. Il valore predefinito è vuoto.
  • bigtableChangeStreamCharset: il nome del set di caratteri dei modifiche in tempo reale Bigtable. Il valore predefinito è UTF-8.
  • bigtableChangeStreamStartTimestamp: il timestamp iniziale (https://tools.ietf.org/html/rfc3339), inclusivo, da utilizzare per la lettura dei modifiche in tempo reale. Ad esempio: 2022-05-05T07:59:59Z. Il valore predefinito è il timestamp dell'ora di inizio della pipeline.
  • bigtableChangeStreamIgnoreColumnFamilies: un elenco separato da virgole di modifiche ai nomi famiglia di colonne da ignorare. Il valore predefinito è vuoto.
  • bigtableChangeStreamIgnoreColumns: un elenco separato da virgole di modifiche ai nomi delle colonne da ignorare. Esempio: "cf1:col1,cf2:col2". Il valore predefinito è vuoto.
  • bigtableChangeStreamName: un nome univoco per la pipeline client. Consente di riprendere l'elaborazione dal punto in cui si è interrotta una pipeline in esecuzione in precedenza. Il valore predefinito è un nome generato automaticamente. Consulta i log dei job Dataflow per il valore utilizzato.
  • bigtableChangeStreamResume: se impostato su true, una nuova pipeline riprende l'elaborazione dal punto in cui si è interrotta una pipeline in esecuzione in precedenza con lo stesso valore bigtableChangeStreamName. Se la pipeline con il valore bigtableChangeStreamName specificato non è mai stata eseguita, non viene avviata una nuova pipeline. Se impostato su false, viene avviata una nuova pipeline. Se una pipeline con lo stesso valore di bigtableChangeStreamName è già stata eseguita per l'origine specificata, non viene avviata una nuova pipeline. Il valore predefinito è false.
  • bigtableReadChangeStreamTimeoutMs: il timeout per le richieste Bigtable ReadChangeStream in millisecondi.
  • bigtableReadProjectId: l'ID progetto Bigtable. Il valore predefinito è il progetto per il job Dataflow.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Bigtable Change Streams to Vector Search template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Interfaccia a riga di comando gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

Sostituisci quanto segue:

  • JOB_NAME: un nome univoco del job a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • EMBEDDING_COLUMN: la colonna Embedding
  • EMBEDDING_BYTE_SIZE: la dimensione in byte dell'array di incorporamenti. Può essere 4 o 8.
  • VECTOR_SEARCH_INDEX: il percorso dell'indice Vector Search
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: l'ID profilo di applicazione Bigtable
  • BIGTABLE_READ_INSTANCE_ID: l'ID dell'istanza Bigtable di origine
  • BIGTABLE_READ_TABLE_ID: l'ID della tabella Bigtable di origine

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome univoco del job a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • EMBEDDING_COLUMN: la colonna Embedding
  • EMBEDDING_BYTE_SIZE: la dimensione in byte dell'array di incorporamenti. Può essere 4 o 8.
  • VECTOR_SEARCH_INDEX: il percorso dell'indice Vector Search
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: l'ID profilo di applicazione Bigtable
  • BIGTABLE_READ_INSTANCE_ID: l'ID dell'istanza Bigtable di origine
  • BIGTABLE_READ_TABLE_ID: l'ID della tabella Bigtable di origine