Modèle de flux de modifications Bigtable vers Vector Search

Ce modèle crée un pipeline de traitement en flux continu pour diffuser des enregistrements de modification de données Bigtable et les écrire dans Vertex AI Vector Search à l'aide de l'exécuteur Dataflow V2.

Conditions requises pour ce pipeline

  • L'instance source Bigtable doit exister.
  • La table source Bigtable doit exister et les flux de modifications doivent être activés.
  • Le profil d'application Bigtable doit exister.
  • Le chemin d'accès de l'index Vector Search doit exister.

Paramètres de modèle

Paramètre Description
embeddingColumn Nom complet de la colonne dans laquelle les embeddings sont stockés. Au format cf:col.
embeddingByteSize Taille en octets de chaque entrée du tableau des embeddings. Utilisez 4 pour les valeurs flottantes et 8 pour les valeurs doubles. La valeur par défaut est 4.
vectorSearchIndex Index Vector Search où les modifications seront diffusées, au format "projects/{projectID}/locations/{region}/indexes/{indexID}" (sans espaces de début ni de fin). Par exemple : projects/123/locations/us-east1/indexes/456.
bigtableChangeStreamAppProfile Profil d'application utilisé pour distinguer les charges de travail dans Bigtable.
bigtableReadInstanceId ID de l'instance Bigtable qui contient la table.
bigtableReadTableId Table Bigtable à partir de laquelle lire.
bigtableMetadataTableTableId Facultatif: ID de la table de métadonnées créée. Si cette valeur n'est pas définie, Bigtable génère un ID.
crowdingTagColumn Facultatif : Nom complet de la colonne dans laquelle le tag de regroupement est stocké, au format cf:col.
allowRestrictsMappings Facultatif : Noms de colonnes complets, séparés par une virgule, pour les colonnes à utiliser comme restrictions allow, avec leurs alias. Chaque nom de colonne doit être au format cf:col->alias.
denyRestrictsMappings Facultatif : Noms de colonnes complets, séparés par une virgule, pour les colonnes à utiliser comme restrictions deny, avec leurs alias. Chaque nom de colonne doit être au format cf:col->alias.
intNumericRestrictsMappings Facultatif : Noms de colonnes complets, séparés par une virgule, pour les colonnes à utiliser comme entiers numeric_restricts, avec leurs alias. Chaque nom de colonne doit être au format cf:col->alias.
floatNumericRestrictsMappings Facultatif : Noms de colonnes complets, séparés par une virgule, pour les colonnes à utiliser comme valeurs flottantes (4 octets) numeric_restricts, avec leurs alias. Chaque nom de colonne doit être au format cf:col->alias.
doubleNumericRestrictsMappings Facultatif : Noms de colonnes complets, séparés par une virgule, pour les colonnes à utiliser comme valeurs doubles (8 octets) numeric_restricts, avec leurs alias. Chaque nom de colonne doit être au format cf:col->alias.
upsertMaxBatchSize Facultatif : Nombre maximal d'opérations d'upsert à mettre en mémoire tampon avant d'effectuer l'opération d'upsert sur le lot dans l'index Vector Search. Les lots sont envoyés lorsqu'il y a upsertBatchSize enregistrements prêts. Exemple : 10.
upsertMaxBufferDuration Facultatif : Délai maximal avant l'envoi d'un lot d'opérations d'upsert à Vector Search. Les lots sont envoyés lorsqu'il y a au moins upsertBatchSize enregistrements prêts. Les formats autorisés sont les suivants : Ns pour les secondes (exemple : 5 s), Nm pour les minutes (exemple : 12 min) et Nh pour les heures (exemple : 2 h). Valeur par défaut : 10s.
deleteMaxBatchSize Facultatif : Nombre maximal de suppressions à mettre en mémoire tampon avant de supprimer le lot de l'index Vector Search. Les lots sont envoyés lorsqu'il y a au moins deleteBatchSize enregistrements prêts. Exemple : 10.
deleteMaxBufferDuration Facultatif : Délai maximal avant l'envoi d'un lot de suppressions à Vector Search. Les lots sont envoyés lorsqu'il y a deleteBatchSize enregistrements prêts. Les formats autorisés sont les suivants : Ns pour les secondes (exemple : 5 s), Nm pour les minutes (exemple : 12 min) et Nh pour les heures (exemple : 2 h). Valeur par défaut : 10s.
dlqDirectory Facultatif : Chemin d'accès au fichier de stockage des enregistrements non traités et le motif pour lesquels ils n'ont pas pu être traités. La valeur par défaut est un répertoire situé sous l'emplacement temporaire du job Dataflow. La valeur par défaut convient à la plupart des scénarios.
bigtableChangeStreamMetadataInstanceId Facultatif : Instance Bigtable à utiliser pour la table de métadonnées du connecteur de flux de modifications. La valeur par défaut est vide.
bigtableChangeStreamMetadataTableTableId Facultatif : ID de table de métadonnées du connecteur de flux de modifications Bigtable à utiliser. Si aucune valeur n'est fournie, une table des métadonnées du connecteur de flux de modifications Bigtable est automatiquement créée pendant le flux de pipeline. La valeur par défaut est vide.
bigtableChangeStreamCharset Facultatif : Nom du charset de flux de modifications Bigtable lors de la lecture des valeurs et des qualificatifs de colonnes. La valeur par défaut est UTF-8.
bigtableChangeStreamStartTimestamp Facultatif : Date et heure de début (incluses) à utiliser pour lire les flux de modifications (https://tools.ietf.org/html/rfc3339). Par exemple, 2020-01-16T16:56:00.000Z. La valeur par défaut est l'horodatage du démarrage du pipeline.
bigtableChangeStreamIgnoreColumnFamilies Facultatif : Liste des modifications de noms de familles de colonnes, séparées par une virgule, qui ne seront pas enregistrées. La valeur par défaut est vide.
bigtableChangeStreamIgnoreColumns Facultatif : Liste des modifications de noms de colonnes, séparées par une virgule, qui ne seront pas enregistrées. La valeur par défaut est vide.
bigtableChangeStreamName Facultatif : nom unique pour le pipeline client. Ce paramètre vous permet de reprendre le traitement à partir du moment où un pipeline précédemment exécuté s'est arrêté. La valeur par défaut est un nom généré automatiquement. Consultez les journaux du job Dataflow pour connaître la valeur utilisée.
bigtableChangeStreamResume

Facultatif : Lorsque ce paramètre est défini sur "true", un nouveau pipeline reprend le traitement à partir du moment où un pipeline précédemment exécuté avec le même nom s'est arrêté. Si un pipeline portant ce nom n'a jamais été exécuté par le passé, le nouveau pipeline ne démarre pas. Utilisez le paramètre bigtableChangeStreamName pour spécifier la ligne du pipeline.

Lorsque ce paramètre est défini sur "false", un nouveau pipeline est démarré. Si un pipeline portant le même nom que bigtableChangeStreamName a déjà été exécuté pour la source donnée, le nouveau pipeline ne démarre pas.

Valeur par défaut : "false".

bigtableReadProjectId Facultatif: Projet à partir duquel lire les données Bigtable. La valeur par défaut de ce paramètre correspond au projet dans lequel le pipeline Dataflow est exécuté.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Bigtable Change Streams to Vector Search template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • EMBEDDING_COLUMN : colonne "Embedding".
  • EMBEDDING_BYTE_SIZE: taille en octets du tableau des embeddings. Il peut s'agir de 4 ou 8.
  • VECTOR_SEARCH_INDEX : chemin d'accès de l'index Vector Search.
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE : ID de profil d'application Bigtable.
  • BIGTABLE_READ_INSTANCE_ID : ID de l'instance Bigtable source.
  • BIGTABLE_READ_TABLE_ID : ID de la table Bigtable source.

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • EMBEDDING_COLUMN : colonne "Embedding".
  • EMBEDDING_BYTE_SIZE: taille en octets du tableau des embeddings. Il peut s'agir de 4 ou 8.
  • VECTOR_SEARCH_INDEX : chemin d'accès de l'index Vector Search.
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE : ID de profil d'application Bigtable.
  • BIGTABLE_READ_INSTANCE_ID : ID de l'instance Bigtable source.
  • BIGTABLE_READ_TABLE_ID : ID de la table Bigtable source.
Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstovectorsearch;

import com.google.cloud.Timestamp;
import com.google.cloud.aiplatform.v1.IndexDatapoint;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions.ReadChangeStreamOptions;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions.ReadOptions;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.options.BigtableChangeStreamsToVectorSearchOptions;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Template(
    name = "Bigtable_Change_Streams_to_Vector_Search",
    category = TemplateCategory.STREAMING,
    displayName = "Bigtable Change Streams to Vector Search",
    description =
        "Streaming pipeline. Streams Bigtable data change records and writes them into Vertex AI Vector Search using Dataflow Runner V2.",
    optionsClass = BigtableChangeStreamsToVectorSearchOptions.class,
    optionsOrder = {
      BigtableChangeStreamsToVectorSearchOptions.class,
      ReadChangeStreamOptions.class,
      ReadOptions.class
    },
    skipOptions = {
      "bigtableReadAppProfile",
      "bigtableAdditionalRetryCodes",
      "bigtableRpcAttemptTimeoutMs",
      "bigtableRpcTimeoutMs"
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-change-streams-to-vector-search",
    flexContainerName = "bigtable-changestreams-to-vector-search",
    contactInformation = "https://cloud.google.com/support",
    streaming = true)
public final class BigtableChangeStreamsToVectorSearch {
  private static final Logger LOG =
      LoggerFactory.getLogger(BigtableChangeStreamsToVectorSearch.class);

  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) throws Exception {
    LOG.info("Starting replication from Cloud Bigtable Change Streams to Vector Search");

    BigtableChangeStreamsToVectorSearchOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(BigtableChangeStreamsToVectorSearchOptions.class);

    run(options);
  }

  public static PipelineResult run(BigtableChangeStreamsToVectorSearchOptions options)
      throws IOException {
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    boolean hasUseRunnerV2 = false;
    for (String experiment : experiments) {
      if (experiment.equalsIgnoreCase(USE_RUNNER_V2_EXPERIMENT)) {
        hasUseRunnerV2 = true;
        break;
      }
    }
    if (!hasUseRunnerV2) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);

    Instant startTimestamp =
        options.getBigtableChangeStreamStartTimestamp().isEmpty()
            ? Instant.now()
            : toInstant(Timestamp.parseTimestamp(options.getBigtableChangeStreamStartTimestamp()));

    String bigtableProjectId = getBigtableProjectId(options);

    LOG.info("  - startTimestamp {}", startTimestamp);
    LOG.info("  - bigtableReadInstanceId {}", options.getBigtableReadInstanceId());
    LOG.info("  - bigtableReadTableId {}", options.getBigtableReadTableId());
    LOG.info("  - bigtableChangeStreamAppProfile {}", options.getBigtableChangeStreamAppProfile());
    LOG.info("  - embeddingColumn {}", options.getEmbeddingColumn());
    LOG.info("  - crowdingTagColumn {}", options.getCrowdingTagColumn());
    LOG.info("  - project {}", options.getProject());
    LOG.info("  - indexName {}", options.getVectorSearchIndex());

    String indexName = options.getVectorSearchIndex();

    String vertexRegion = Utils.extractRegionFromIndexName(indexName);
    String vertexEndpoint = vertexRegion + "-aiplatform.googleapis.com:443";

    final Pipeline pipeline = Pipeline.create(options);

    DeadLetterQueueManager dlqManager = buildDlqManager(options);

    BigtableIO.ReadChangeStream readChangeStream =
        BigtableIO.readChangeStream()
            .withChangeStreamName(options.getBigtableChangeStreamName())
            .withExistingPipelineOptions(
                options.getBigtableChangeStreamResume()
                    ? BigtableIO.ExistingPipelineOptions.RESUME_OR_FAIL
                    : BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS)
            .withProjectId(bigtableProjectId)
            .withAppProfileId(options.getBigtableChangeStreamAppProfile())
            .withInstanceId(options.getBigtableReadInstanceId())
            .withTableId(options.getBigtableReadTableId())
            .withMetadataTableInstanceId(options.getBigtableChangeStreamMetadataInstanceId())
            .withMetadataTableTableId(options.getBigtableMetadataTableTableId())
            .withStartTime(startTimestamp);

    PCollectionTuple results =
        pipeline
            .apply("Read from Cloud Bigtable Change Streams", readChangeStream)
            .apply("Create Values", Values.create())
            .apply(
                "Converting to Vector Search Datapoints",
                ParDo.of(
                        new ChangeStreamMutationToDatapointOperationFn(
                            options.getEmbeddingColumn(),
                            options.getEmbeddingByteSize(),
                            options.getCrowdingTagColumn(),
                            Utils.parseColumnMapping(options.getAllowRestrictsMappings()),
                            Utils.parseColumnMapping(options.getDenyRestrictsMappings()),
                            Utils.parseColumnMapping(options.getIntNumericRestrictsMappings()),
                            Utils.parseColumnMapping(options.getFloatNumericRestrictsMappings()),
                            Utils.parseColumnMapping(options.getDoubleNumericRestrictsMappings())))
                    .withOutputTags(
                        ChangeStreamMutationToDatapointOperationFn.UPSERT_DATAPOINT_TAG,
                        TupleTagList.of(
                            ChangeStreamMutationToDatapointOperationFn.REMOVE_DATAPOINT_TAG)));
    results
        .get(ChangeStreamMutationToDatapointOperationFn.UPSERT_DATAPOINT_TAG)
        .apply("Add placeholer keys", WithKeys.of("placeholder"))
        .apply(
            "Batch Contents",
            GroupIntoBatches.<String, IndexDatapoint>ofSize(
                    bufferSizeOption(options.getUpsertMaxBatchSize()))
                .withMaxBufferingDuration(
                    bufferDurationOption(options.getUpsertMaxBufferDuration())))
        .apply("Map to Values", Values.create())
        .apply(
            "Upsert Datapoints to VectorSearch",
            ParDo.of(new UpsertDatapointsFn(vertexEndpoint, indexName)))
        .apply(
            "Write errors to DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectory() + "YYYY/MM/dd/HH/mm/")
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    results
        .get(ChangeStreamMutationToDatapointOperationFn.REMOVE_DATAPOINT_TAG)
        .apply("Add placeholder keys", WithKeys.of("placeholer"))
        .apply(
            "Batch Contents",
            GroupIntoBatches.<String, String>ofSize(
                    bufferSizeOption(options.getDeleteMaxBatchSize()))
                .withMaxBufferingDuration(
                    bufferDurationOption(options.getDeleteMaxBufferDuration())))
        .apply("Map to Values", Values.create())
        .apply(
            "Remove Datapoints From VectorSearch",
            ParDo.of(new RemoveDatapointsFn(vertexEndpoint, indexName)))
        .apply(
            "Write errors to DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectory() + "YYYY/MM/dd/HH/mm/")
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }

  private static String getBigtableProjectId(BigtableChangeStreamsToVectorSearchOptions options) {
    return StringUtils.isEmpty(options.getBigtableReadProjectId())
        ? options.getProject()
        : options.getBigtableReadProjectId();
  }

  private static Instant toInstant(Timestamp timestamp) {
    if (timestamp == null) {
      return null;
    } else {
      return Instant.ofEpochMilli(timestamp.getSeconds() * 1000 + timestamp.getNanos() / 1000000);
    }
  }

  private static int bufferSizeOption(int size) {
    if (size < 1) {
      size = 1;
    }

    return size;
  }

  private static Duration bufferDurationOption(String duration) {
    if (duration.isEmpty()) {
      return Duration.standardSeconds(1);
    }

    return DurationUtils.parseDuration(duration);
  }

  private static DeadLetterQueueManager buildDlqManager(
      BigtableChangeStreamsToVectorSearchOptions options) {
    String dlqDirectory = options.getDlqDirectory();
    if (dlqDirectory.isEmpty()) {
      LOG.info("Falling back to temp dir for DLQ");

      String tempLocation = options.as(DataflowPipelineOptions.class).getTempLocation();

      LOG.info("Have temp location {}", tempLocation);
      if (tempLocation == null || tempLocation.isEmpty()) {
        tempLocation = "/";
      } else if (!tempLocation.endsWith("/")) {
        tempLocation += "/";
      }

      dlqDirectory = tempLocation + "dlq";
    }

    LOG.info("Writing dead letter queue to: {}", dlqDirectory);

    return DeadLetterQueueManager.create(dlqDirectory, 1);
  }
}