Modifiche in tempo reale di Bigtable per il modello Vector Search

Questo modello crea una pipeline in modalità flusso per trasmettere in flusso i record delle modifiche dei dati di Bigtable e scriverli in Vertex AI Vector Search utilizzando Dataflow Runner V2.

Requisiti della pipeline

  • Deve esistere l'istanza di origine Bigtable.
  • La tabella di origine Bigtable deve esistere e la tabella deve avere le modifiche in tempo reale abilitate.
  • Il profilo dell'applicazione Bigtable deve esistere.
  • Il percorso dell'indice di Vector Search deve esistere.

Parametri del modello

Parametro Descrizione
embeddingColumn Il nome completo della colonna in cui sono archiviati gli incorporamenti. Nel formato cf:col.
embeddingByteSize La dimensione in byte di ogni voce nell'array degli incorporamenti. Usa 4 per float e 8 per il doppio. Il valore predefinito è 4.
vectorSearchIndex L'indice di Vector Search in cui verranno trasmesse le modifiche, nel formato "projects/{projectID}/locations/{region}/indexes/{indexID}" (senza spazi iniziali o finali). Ad esempio: projects/123/locations/us-east1/indexes/456.
bigtableChangeStreamAppProfile Il profilo dell'applicazione utilizzato per distinguere i carichi di lavoro in Bigtable.
bigtableReadInstanceId L'ID dell'istanza Bigtable che contiene la tabella.
bigtableReadTableId La tabella Bigtable da cui leggere.
bigtableMetadataTableTableId Facoltativo: ID per la tabella di metadati creata. Se il criterio non viene configurato, Bigtable genera un ID.
crowdingTagColumn (Facoltativo) Il nome completo della colonna in cui è archiviato il tag di crowding, nel formato cf:col.
allowRestrictsMappings Facoltativo: i nomi delle colonne completi e separati da virgole delle colonne da utilizzare come limitazioni di allow, oltre ai relativi alias. Ogni nome colonna deve essere nel formato cf:col->alias.
denyRestrictsMappings Facoltativo: i nomi delle colonne completi e separati da virgole delle colonne da utilizzare come limitazioni di deny, oltre ai relativi alias. Ogni nome colonna deve essere nel formato cf:col->alias.
intNumericRestrictsMappings Facoltativo: i nomi delle colonne completi e separati da virgole delle colonne da utilizzare come numero intero numeric_restricts, più i relativi alias. Ogni nome colonna deve essere nel formato cf:col->alias.
floatNumericRestrictsMappings Facoltativo: i nomi delle colonne completi e separati da virgole delle colonne da utilizzare come numeri in virgola mobile (4 byte) numeric_restricts, più i relativi alias. Ogni nome colonna deve essere nel formato cf:col->alias
doubleNumericRestrictsMappings Facoltativo: i nomi delle colonne completi e separati da virgole delle colonne da utilizzare come doppie (8 byte) numeric_restricts, più i relativi alias. Ogni nome colonna deve essere nel formato cf:col->alias
upsertMaxBatchSize (Facoltativo) Il numero massimo di upsert di cui eseguire il buffer prima di eseguire l'upsert del batch all'indice di Vector Search. I batch vengono inviati quando sono pronti upsertBatchSize record. Esempio: 10.
upsertMaxBufferDuration (Facoltativo) Il ritardo massimo prima che un batch di upsert venga inviato a Vector Search. I batch vengono inviati quando sono pronti upsertBatchSize record. I formati consentiti sono: Ns per secondi (ad esempio: 5 s), Nm per minuti (ad esempio: 12 min) e Nh per le ore (esempio: 2 h). Valore predefinito: 10s.
deleteMaxBatchSize (Facoltativo) Il numero massimo di eliminazioni nel buffer prima di eliminare il batch dall'indice di Vector Search. I batch vengono inviati quando sono pronti deleteBatchSize record. Ad esempio: 10.
deleteMaxBufferDuration (Facoltativo) Il ritardo massimo prima che un batch di eliminazioni venga inviato a Vector Search. I batch vengono inviati quando sono pronti deleteBatchSize record. I formati consentiti sono: Ns per i secondi (ad esempio: 5 sec), Nm per i minuti (ad esempio: 12 m) e Nh per le ore (ad esempio: 2 h). Valore predefinito: 10s.
dlqDirectory (Facoltativo) Il percorso in cui archiviare tutti i record non elaborati con il motivo della mancata elaborazione. Il valore predefinito è una directory all'interno della posizione temporanea del job Dataflow. Il valore predefinito è appropriato per la maggior parte degli scenari.
bigtableChangeStreamMetadataInstanceId (Facoltativo) L'istanza Bigtable da utilizzare per la tabella dei metadati del connettore delle modifiche in tempo reale. Il campo predefinito è vuoto.
bigtableChangeStreamMetadataTableTableId (Facoltativo) L'ID tabella dei metadati del connettore delle modifiche in tempo reale di Bigtable da utilizzare. Se non viene fornita, viene creata automaticamente una tabella di metadati del connettore delle modifiche in tempo reale di Bigtable durante il flusso della pipeline. Il campo predefinito è vuoto.
bigtableChangeStreamCharset (Facoltativo) Bigtable modifica il nome del set di caratteri durante la lettura dei valori e dei qualificatori di colonna. Il valore predefinito è UTF-8.
bigtableChangeStreamStartTimestamp Facoltativo: il valore DateTime di inizio, incluso, da utilizzare per la lettura delle modifiche in tempo reale (https://tools.ietf.org/html/rfc3339). Ad esempio, 2022-05-05T07:59:59Z. Il valore predefinito è il timestamp all'avvio della pipeline.
bigtableChangeStreamIgnoreColumnFamilies (Facoltativo) Un elenco separato da virgole di nomi di famiglia di colonne che non verranno acquisiti. Il campo predefinito è vuoto.
bigtableChangeStreamIgnoreColumns (Facoltativo) Un elenco separato da virgole di nomi di colonne che non verranno acquisiti. Il campo predefinito è vuoto.
bigtableChangeStreamName (Facoltativo) Un nome univoco per la pipeline client. Questo parametro consente di riprendere l'elaborazione dal punto in cui è stata arrestata una pipeline in esecuzione in precedenza. Il nome predefinito è generato automaticamente. Controlla i log del job Dataflow per il valore utilizzato.
bigtableChangeStreamResume

(Facoltativo) Se viene impostato su true, l'elaborazione di una nuova pipeline riprende dal punto in cui è stata arrestata una pipeline precedentemente in esecuzione con lo stesso nome. Se una pipeline con questo nome non è mai stata eseguita in passato, non viene avviata la nuova pipeline. Utilizza il parametro bigtableChangeStreamName per specificare la linea della pipeline.

Se il criterio viene impostato su false, viene avviata una nuova pipeline. Se una pipeline con lo stesso nome di bigtableChangeStreamName è già stata eseguita in passato per l'origine specificata, la nuova pipeline non viene avviata.

Il valore predefinito è false.

bigtableReadProjectId (Facoltativo) Progetto da cui leggere i dati Bigtable. Il valore predefinito per questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Bigtable Change Streams to Vector Search template.
  6. Inserisci i valori parametro negli appositi campi.
  7. Fai clic su Esegui job.

Interfaccia a riga di comando gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

Sostituisci quanto segue:

  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • EMBEDDING_COLUMN: la colonna Incorporamento
  • EMBEDDING_BYTE_SIZE: la dimensione in byte dell'array degli incorporamenti. Può essere 4 o 8.
  • VECTOR_SEARCH_INDEX: il percorso dell'indice di Vector Search
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: ID profilo applicazione Bigtable
  • BIGTABLE_READ_INSTANCE_ID: l'ID dell'istanza Bigtable di origine
  • BIGTABLE_READ_TABLE_ID: l'ID tabella Bigtable di origine

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • EMBEDDING_COLUMN: la colonna Incorporamento
  • EMBEDDING_BYTE_SIZE: la dimensione in byte dell'array degli incorporamenti. Può essere 4 o 8.
  • VECTOR_SEARCH_INDEX: il percorso dell'indice di Vector Search
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: ID profilo applicazione Bigtable
  • BIGTABLE_READ_INSTANCE_ID: l'ID dell'istanza Bigtable di origine
  • BIGTABLE_READ_TABLE_ID: l'ID tabella Bigtable di origine