Modifiche in tempo reale di Bigtable al modello di ricerca vettoriale

Questo modello crea una pipeline di streaming per trasmettere i record delle modifiche dei dati di Bigtable e scriverli in Vertex AI Vector Search utilizzando Dataflow Runner 2.

Requisiti della pipeline

  • L'istanza di origine Bigtable deve esistere.
  • La tabella di origine Bigtable deve esistere e deve avere gli stream di variazioni abilitati.
  • Il profilo dell'applicazione Bigtable deve esistere.
  • Il percorso dell'indice Vector Search deve esistere.

Parametri del modello

Parametro Descrizione
embeddingColumn Il nome completo della colonna in cui sono archiviati gli embedding. Nel formato cf:col.
embeddingByteSize La dimensione in byte di ogni voce nell'array di incorporamenti. Utilizza 4 per float e 8 per double. Il valore predefinito è 4.
vectorSearchIndex L'indice di ricerca vettoriale in cui verranno trasmesse le modifiche, nel formato 'projects/{projectID}/locations/{region}/indexes/{indexID}' (senza spazi iniziali o finali). Ad esempio: projects/123/locations/us-east1/indexes/456.
bigtableChangeStreamAppProfile Il profilo dell'applicazione utilizzato per distinguere i carichi di lavoro in Bigtable.
bigtableReadInstanceId L'ID dell'istanza Bigtable che contiene la tabella.
bigtableReadTableId La tabella Bigtable da cui leggere.
bigtableMetadataTableTableId (Facoltativo) ID della tabella dei metadati creata. Se non è impostato, Bigtable genera un ID.
crowdingTagColumn (Facoltativo) Il nome completo della colonna in cui è memorizzato il tag di affollamento, nel formato cf:col.
allowRestrictsMappings (Facoltativo) I nomi completi delle colonne separati da virgole da utilizzare come limitazioni allow, oltre ai relativi alias. Ogni nome di colonna deve essere nel formato cf:col->alias.
denyRestrictsMappings (Facoltativo) I nomi completi delle colonne separati da virgole da utilizzare come limitazioni deny, oltre ai relativi alias. Ogni nome di colonna deve essere nel formato cf:col->alias.
intNumericRestrictsMappings (Facoltativo) I nomi completi delle colonne separati da virgole da utilizzare come interi numeric_restricts, oltre ai relativi alias. Ogni nome di colonna deve essere nel formato cf:col->alias.
floatNumericRestrictsMappings (Facoltativo) I nomi completi delle colonne separati da virgole da utilizzare come numeri in virgola mobile (4 byte) numeric_restricts, oltre ai relativi alias. Ogni nome di colonna deve essere nel formato cf:col->alias
doubleNumericRestrictsMappings (Facoltativo) I nomi delle colonne completi separati da virgole da utilizzare come doppi (8 byte) numeric_restricts, oltre ai relativi alias. Ogni nome di colonna deve essere nel formato cf:col->alias
upsertMaxBatchSize (Facoltativo) Il numero massimo di upsert da mettere in buffer prima di eseguire l'upsert del batch nell'indice di Vector Search. I batch vengono inviati quando sono disponibili upsertBatchSize record. Esempio: 10.
upsertMaxBufferDuration (Facoltativo) Il ritardo massimo prima che un batch di upsert venga inviato a Vector Search. I batch vengono inviati quando sono disponibili upsertBatchSize record. I formati consentiti sono: Ns per i secondi (ad es. 5s), Nm per i minuti (ad es. 12m) e Nh per le ore (ad es. 2h). Valore predefinito: 10s.
deleteMaxBatchSize (Facoltativo) Il numero massimo di eliminazioni da mettere in coda prima di eliminare il batch dall'indice Vector Search. I batch vengono inviati quando sono disponibili deleteBatchSize record. Ad esempio: 10.
deleteMaxBufferDuration (Facoltativo) Il ritardo massimo prima che un batch di eliminazioni venga inviato a Vector Search. I batch vengono inviati quando sono disponibili almeno deleteBatchSize record. I formati consentiti sono: Ns per i secondi (ad es. 5s), Nm per i minuti (ad es. 12m) e Nh per le ore (ad es. 2h). Valore predefinito: 10s.
dlqDirectory (Facoltativo) Il percorso per archiviare i record non elaborati con il motivo dell'errore di elaborazione. Il valore predefinito è una directory nella posizione temporanea del job Dataflow. Il valore predefinito è appropriato per la maggior parte degli scenari.
bigtableChangeStreamMetadataInstanceId (Facoltativo) L'istanza Bigtable da utilizzare per la tabella dei metadati del connettore delle modifiche in tempo reale. Il valore predefinito è vuoto.
bigtableChangeStreamMetadataTableTableId (Facoltativo) L'ID della tabella dei metadati del connettore delle modifiche in tempo reale di Bigtable da utilizzare. Se non viene fornita, una tabella dei metadati del connettore degli stream di modifiche Bigtable viene creata automaticamente durante il flusso della pipeline. Il valore predefinito è vuoto.
bigtableChangeStreamCharset (Facoltativo) Il nome del set di caratteri dei flussi di modifiche Bigtable quando si leggono i valori e i qualificatori di colonna. Il valore predefinito è UTF-8.
bigtableChangeStreamStartTimestamp (Facoltativo) La data e l'ora di inizio, incluse, da utilizzare per la lettura dei flussi di modifiche (https://tools.ietf.org/html/rfc3339). Ad esempio, 2022-05-05T07:59:59Z. Il valore predefinito è il timestamp dell'avvio della pipeline.
bigtableChangeStreamIgnoreColumnFamilies (Facoltativo) Un elenco di nomi di famiglie di colonne separati da virgole di cui non verranno acquisite le modifiche. Il valore predefinito è vuoto.
bigtableChangeStreamIgnoreColumns (Facoltativo) Un elenco separato da virgole di nomi di colonne di cui non verranno acquisite le modifiche. Il valore predefinito è vuoto.
bigtableChangeStreamName (Facoltativo) Un nome univoco per la pipeline del client. Questo parametro consente di riprendere l'elaborazione dal punto in cui si è interrotta una pipeline in esecuzione in precedenza. Il valore predefinito è un nome generato automaticamente. Controlla i log dei job Dataflow per il valore utilizzato.
bigtableChangeStreamResume

(Facoltativo) Se impostato su true, una nuova pipeline riprende l'elaborazione dal punto in cui è stata interrotta una pipeline in esecuzione precedente con lo stesso nome. Se una pipeline con questo nome non è mai stata eseguita in passato, la nuova pipeline non riesce a essere avviata. Utilizza il parametro bigtableChangeStreamName per specificare la riga della pipeline.

Se impostato su false, viene avviata una nuova pipeline. Se una pipeline con lo stesso nome di bigtableChangeStreamName è già stata eseguita in passato per l'origine specificata, la nuova pipeline non riesce a avviarsi.

Il valore predefinito è false.

bigtableReadProjectId (Facoltativo) Progetto da cui leggere i dati di Bigtable. Il valore predefinito per questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Bigtable Change Streams to Vector Search template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

Sostituisci quanto segue:

  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • EMBEDDING_COLUMN: la colonna Embedding
  • EMBEDDING_BYTE_SIZE: la dimensione in byte dell'array di embedding. Può essere 4 o 8.
  • VECTOR_SEARCH_INDEX: il percorso dell'indice Vector Search
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: l'ID profilo dell'applicazione Bigtable
  • BIGTABLE_READ_INSTANCE_ID: l'ID istanza Bigtable di origine
  • BIGTABLE_READ_TABLE_ID: l'ID tabella Bigtable di origine

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • EMBEDDING_COLUMN: la colonna Embedding
  • EMBEDDING_BYTE_SIZE: la dimensione in byte dell'array di embedding. Può essere 4 o 8.
  • VECTOR_SEARCH_INDEX: il percorso dell'indice Vector Search
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: l'ID profilo dell'applicazione Bigtable
  • BIGTABLE_READ_INSTANCE_ID: l'ID istanza Bigtable di origine
  • BIGTABLE_READ_TABLE_ID: l'ID tabella Bigtable di origine
Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstovectorsearch;

import com.google.cloud.Timestamp;
import com.google.cloud.aiplatform.v1.IndexDatapoint;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions.ReadChangeStreamOptions;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions.ReadOptions;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.options.BigtableChangeStreamsToVectorSearchOptions;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Template(
    name = "Bigtable_Change_Streams_to_Vector_Search",
    category = TemplateCategory.STREAMING,
    displayName = "Bigtable Change Streams to Vector Search",
    description =
        "Streaming pipeline. Streams Bigtable data change records and writes them into Vertex AI Vector Search using Dataflow Runner V2.",
    optionsClass = BigtableChangeStreamsToVectorSearchOptions.class,
    optionsOrder = {
      BigtableChangeStreamsToVectorSearchOptions.class,
      ReadChangeStreamOptions.class,
      ReadOptions.class
    },
    skipOptions = {
      "bigtableReadAppProfile",
      "bigtableAdditionalRetryCodes",
      "bigtableRpcAttemptTimeoutMs",
      "bigtableRpcTimeoutMs"
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-change-streams-to-vector-search",
    flexContainerName = "bigtable-changestreams-to-vector-search",
    contactInformation = "https://cloud.google.com/support",
    streaming = true)
public final class BigtableChangeStreamsToVectorSearch {
  private static final Logger LOG =
      LoggerFactory.getLogger(BigtableChangeStreamsToVectorSearch.class);

  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) throws Exception {
    LOG.info("Starting replication from Cloud Bigtable Change Streams to Vector Search");

    BigtableChangeStreamsToVectorSearchOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(BigtableChangeStreamsToVectorSearchOptions.class);

    run(options);
  }

  public static PipelineResult run(BigtableChangeStreamsToVectorSearchOptions options)
      throws IOException {
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    boolean hasUseRunnerV2 = false;
    for (String experiment : experiments) {
      if (experiment.equalsIgnoreCase(USE_RUNNER_V2_EXPERIMENT)) {
        hasUseRunnerV2 = true;
        break;
      }
    }
    if (!hasUseRunnerV2) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);

    Instant startTimestamp =
        options.getBigtableChangeStreamStartTimestamp().isEmpty()
            ? Instant.now()
            : toInstant(Timestamp.parseTimestamp(options.getBigtableChangeStreamStartTimestamp()));

    String bigtableProjectId = getBigtableProjectId(options);

    LOG.info("  - startTimestamp {}", startTimestamp);
    LOG.info("  - bigtableReadInstanceId {}", options.getBigtableReadInstanceId());
    LOG.info("  - bigtableReadTableId {}", options.getBigtableReadTableId());
    LOG.info("  - bigtableChangeStreamAppProfile {}", options.getBigtableChangeStreamAppProfile());
    LOG.info("  - embeddingColumn {}", options.getEmbeddingColumn());
    LOG.info("  - crowdingTagColumn {}", options.getCrowdingTagColumn());
    LOG.info("  - project {}", options.getProject());
    LOG.info("  - indexName {}", options.getVectorSearchIndex());

    String indexName = options.getVectorSearchIndex();

    String vertexRegion = Utils.extractRegionFromIndexName(indexName);
    String vertexEndpoint = vertexRegion + "-aiplatform.googleapis.com:443";

    final Pipeline pipeline = Pipeline.create(options);

    DeadLetterQueueManager dlqManager = buildDlqManager(options);

    BigtableIO.ReadChangeStream readChangeStream =
        BigtableIO.readChangeStream()
            .withChangeStreamName(options.getBigtableChangeStreamName())
            .withExistingPipelineOptions(
                options.getBigtableChangeStreamResume()
                    ? BigtableIO.ExistingPipelineOptions.RESUME_OR_FAIL
                    : BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS)
            .withProjectId(bigtableProjectId)
            .withAppProfileId(options.getBigtableChangeStreamAppProfile())
            .withInstanceId(options.getBigtableReadInstanceId())
            .withTableId(options.getBigtableReadTableId())
            .withMetadataTableInstanceId(options.getBigtableChangeStreamMetadataInstanceId())
            .withMetadataTableTableId(options.getBigtableMetadataTableTableId())
            .withStartTime(startTimestamp);

    PCollectionTuple results =
        pipeline
            .apply("Read from Cloud Bigtable Change Streams", readChangeStream)
            .apply("Create Values", Values.create())
            .apply(
                "Converting to Vector Search Datapoints",
                ParDo.of(
                        new ChangeStreamMutationToDatapointOperationFn(
                            options.getEmbeddingColumn(),
                            options.getEmbeddingByteSize(),
                            options.getCrowdingTagColumn(),
                            Utils.parseColumnMapping(options.getAllowRestrictsMappings()),
                            Utils.parseColumnMapping(options.getDenyRestrictsMappings()),
                            Utils.parseColumnMapping(options.getIntNumericRestrictsMappings()),
                            Utils.parseColumnMapping(options.getFloatNumericRestrictsMappings()),
                            Utils.parseColumnMapping(options.getDoubleNumericRestrictsMappings())))
                    .withOutputTags(
                        ChangeStreamMutationToDatapointOperationFn.UPSERT_DATAPOINT_TAG,
                        TupleTagList.of(
                            ChangeStreamMutationToDatapointOperationFn.REMOVE_DATAPOINT_TAG)));
    results
        .get(ChangeStreamMutationToDatapointOperationFn.UPSERT_DATAPOINT_TAG)
        .apply("Add placeholer keys", WithKeys.of("placeholder"))
        .apply(
            "Batch Contents",
            GroupIntoBatches.<String, IndexDatapoint>ofSize(
                    bufferSizeOption(options.getUpsertMaxBatchSize()))
                .withMaxBufferingDuration(
                    bufferDurationOption(options.getUpsertMaxBufferDuration())))
        .apply("Map to Values", Values.create())
        .apply(
            "Upsert Datapoints to VectorSearch",
            ParDo.of(new UpsertDatapointsFn(vertexEndpoint, indexName)))
        .apply(
            "Write errors to DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectory() + "YYYY/MM/dd/HH/mm/")
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    results
        .get(ChangeStreamMutationToDatapointOperationFn.REMOVE_DATAPOINT_TAG)
        .apply("Add placeholder keys", WithKeys.of("placeholer"))
        .apply(
            "Batch Contents",
            GroupIntoBatches.<String, String>ofSize(
                    bufferSizeOption(options.getDeleteMaxBatchSize()))
                .withMaxBufferingDuration(
                    bufferDurationOption(options.getDeleteMaxBufferDuration())))
        .apply("Map to Values", Values.create())
        .apply(
            "Remove Datapoints From VectorSearch",
            ParDo.of(new RemoveDatapointsFn(vertexEndpoint, indexName)))
        .apply(
            "Write errors to DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectory() + "YYYY/MM/dd/HH/mm/")
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }

  private static String getBigtableProjectId(BigtableChangeStreamsToVectorSearchOptions options) {
    return StringUtils.isEmpty(options.getBigtableReadProjectId())
        ? options.getProject()
        : options.getBigtableReadProjectId();
  }

  private static Instant toInstant(Timestamp timestamp) {
    if (timestamp == null) {
      return null;
    } else {
      return Instant.ofEpochMilli(timestamp.getSeconds() * 1000 + timestamp.getNanos() / 1000000);
    }
  }

  private static int bufferSizeOption(int size) {
    if (size < 1) {
      size = 1;
    }

    return size;
  }

  private static Duration bufferDurationOption(String duration) {
    if (duration.isEmpty()) {
      return Duration.standardSeconds(1);
    }

    return DurationUtils.parseDuration(duration);
  }

  private static DeadLetterQueueManager buildDlqManager(
      BigtableChangeStreamsToVectorSearchOptions options) {
    String dlqDirectory = options.getDlqDirectory();
    if (dlqDirectory.isEmpty()) {
      LOG.info("Falling back to temp dir for DLQ");

      String tempLocation = options.as(DataflowPipelineOptions.class).getTempLocation();

      LOG.info("Have temp location {}", tempLocation);
      if (tempLocation == null || tempLocation.isEmpty()) {
        tempLocation = "/";
      } else if (!tempLocation.endsWith("/")) {
        tempLocation += "/";
      }

      dlqDirectory = tempLocation + "dlq";
    }

    LOG.info("Writing dead letter queue to: {}", dlqDirectory);

    return DeadLetterQueueManager.create(dlqDirectory, 1);
  }
}