Modèle Spanner vers Vertex AI Vector Search

Le modèle des fichiers de Spanner vers Vertex AI Vector Search sur Cloud Storage crée un pipeline par lots qui exporte les données des embeddings d'une table Spanner vers Cloud Storage au format JSON. Utilisez les paramètres du modèle pour spécifier le dossier Cloud Storage vers lequel exporter les embeddings vectoriels. Le dossier Cloud Storage contient la liste des fichiers .json exportés, qui représentent les embeddings vectoriels dans un format compatible avec l'index de Vertex AI Vector Search.

Pour en savoir plus, consultez la section Format et structure des données d'entrée.

Conditions requises pour ce pipeline

  • La base de données Spanner doit exister.
  • Le bucket Cloud Storage pour la sortie des données doit exister.
  • En plus des rôles Identity and Access Management (IAM) nécessaires à l'exécution des jobs Dataflow, vous devez disposer des rôles IAM requis pour lire vos données Spanner et écrire dans votre bucket Cloud Storage.

Paramètres de modèle

Paramètres obligatoires

  • spannerProjectId: ID de projet de l'instance Spanner.
  • spannerInstanceId: ID de l'instance Spanner à partir de laquelle exporter les embeddings vectoriels.
  • spannerDatabaseId: ID de la base de données Spanner à partir de laquelle exporter les embeddings vectoriels.
  • spannerTable: table Spanner à lire.
  • spannerColumnsToExport: liste de colonnes séparées par des virgules pour l'index de Vertex AI Vector Search. Les colonnes "ID" et "Embedding" sont obligatoires pour Vector Search. Si les noms de vos colonnes ne correspondent pas à la structure de saisie de l'index de Vertex AI Vector Search, créez des mappages de colonnes à l'aide d'alias. Si les noms de colonne ne correspondent pas au format attendu par Vertex AI, utilisez la notation de:à. Par exemple, si vous avez des colonnes nommées "id" et "my_embedding", spécifiez "id, my_embedding:embedding".
  • gcsOutputFolder: dossier Cloud Storage dans lequel écrire les fichiers de sortie. Le chemin d'accès doit se terminer par une barre oblique. Exemple :gs://your-bucket/folder1/
  • gcsOutputFilePrefix: préfixe de nom de fichier pour l'écriture de fichiers de sortie. Exemple :vector-embeddings

Paramètres facultatifs

  • spannerHost: point de terminaison Spanner à appeler dans le modèle. La valeur par défaut est https://batch-spanner.googleapis.com. Par exemple, https://batch-spanner.googleapis.com.
  • spannerVersionTime: si ce paramètre est défini, il indique l'heure à laquelle la version de la base de données doit être choisie. La valeur est une chaîne au format RFC-3339 avec l'heure de l'epoch Unix. Exemple : 1990-12-31T23:59:60Z. L'horodatage doit être antérieur à la date et l'heure actuelles et l'Obsolescence maximale de l'horodatage (https://cloud.google.com/spanner/docs/timestamp-bounds#maximum_timestamp_staleness) s'applique. S'il n'est pas défini, une limite forte (https://cloud.google.com/spanner/docs/timestamp-bounds#strong) est utilisée pour lire les dernières données. La valeur par défaut est empty. Par exemple, 1990-12-31T23:59:60Z.
  • spannerDataBoostEnabled: lorsque ce paramètre est défini sur true, le modèle utilise le calcul à la demande de Spanner. Le job d'exportation s'exécute sur des ressources de calcul indépendantes qui n'ont pas d'incidence sur les charges de travail Spanner actuelles. L'utilisation de cette option entraîne des frais supplémentaires dans Spanner. Pour en savoir plus, consultez la présentation de Spanner Data Boost (https://cloud.google.com/spanner/docs/databoost/databoost-overview). La valeur par défaut est: false.
  • spannerPriority: priorité des requêtes pour les appels Spanner. Les valeurs autorisées sont HIGH, MEDIUM et LOW. La valeur par défaut est MEDIUM.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Spanner to Vertex AI Vector Search files on Cloud Storage template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/Cloud_Spanner_vectors_to_Cloud_Storage \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       spannerProjectId=SPANNER_PROJECT_ID,\
       spannerInstanceId=SPANNER_INSTANCE_ID,\
       spannerDatabaseId=SPANNER_DATABASE_ID,\
       spannerTable=SPANNER_TABLE,\
       spannerColumnsToExport=SPANNER_COLUMNS_TO_EXPORT,\
       gcsOutputFolder=GCS_OUTPUT_FOLDER,\
       gcsOutputFilePrefix=GCS_OUTPUT_FILE_PREFIX,\

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • SPANNER_PROJECT_ID : ID du projet Spanner
  • SPANNER_INSTANCE_ID : ID d'instance Spanner
  • SPANNER_DATABASE_ID : ID de la base de données Spanner
  • SPANNER_TABLE : table Spanner
  • SPANNER_COLUMNS_TO_EXPORT : colonnes à exporter depuis la table Spanner
  • GCS_OUTPUT_FOLDER: dossier Cloud Storage dans lequel exporter les fichiers.
  • GCS_OUTPUT_FILE_PREFIX: préfixe des fichiers de sortie dans Cloud Storage

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_Spanner_vectors_to_Cloud_Storage
{
   "jobName": "JOB_NAME",
   "parameters": {
     "spannerProjectId": "SPANNER_PROJECT_ID",
     "spannerInstanceId": "SPANNER_INSTANCE_ID",
     "spannerDatabaseId": "SPANNER_DATABASE_ID",
     "spannerTable": "SPANNER_TABLE",
     "spannerColumnsToExport": "SPANNER_COLUMNS_TO_EXPORT",
     "gcsOutputFolder": "GCS_OUTPUT_FOLDER",
     "gcsOutputFilePrefix": "GCS_OUTPUT_FILE_PREFIX",
   },
   "environment": { "maxWorkers": "10" }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • SPANNER_PROJECT_ID : ID du projet Spanner
  • SPANNER_INSTANCE_ID : ID d'instance Spanner
  • SPANNER_DATABASE_ID : ID de la base de données Spanner
  • SPANNER_TABLE : table Spanner
  • SPANNER_COLUMNS_TO_EXPORT : colonnes à exporter depuis la table Spanner
  • GCS_OUTPUT_FOLDER: dossier Cloud Storage dans lequel exporter les fichiers.
  • GCS_OUTPUT_FILE_PREFIX: préfixe des fichiers de sortie dans Cloud Storage
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.templates.SpannerVectorEmbeddingExport.SpannerToVectorEmbeddingJsonOptions;
import com.google.cloud.teleport.templates.common.SpannerConverters;
import com.google.cloud.teleport.templates.common.SpannerConverters.CreateTransactionFnWithTimestamp;
import com.google.cloud.teleport.templates.common.SpannerConverters.VectorSearchStructValidator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Dataflow template which export vector embeddings from Spanner to GCS in json format. It exports a
 * Spanner table using <a
 * href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">Batch API</a>, which
 * creates multiple workers in parallel for better performance. The result is written to a JSON file
 * in Google Cloud Storage.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Spanner_to_Vector_Embedding.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_Spanner_vectors_to_Cloud_Storage",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Spanner vectors to Cloud Storage for Vertex Vector Search",
    optionsClass = SpannerToVectorEmbeddingJsonOptions.class,
    description = {
      "The Cloud Spanner to Vector Embeddings on Cloud Storage template is a batch pipeline that exports vector embeddings data from Cloud Spanner's table to Cloud Storage in JSON format. "
          + "Vector embeddings are exported to a Cloud Storage folder specified by the user in the template parameters."
          + " The Cloud Storage folder will contain the list of exported `.json` files representing vector embeddings in a format supported by Vertex AI Vector Search Index.\n",
      "Check <a href=\"https://cloud.google.com/vertex-ai/docs/vector-search/setup/format-structure#json\">Vector Search Format Structure</a> for additional details."
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-vertex-vector-search",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner database must exist.",
      "The output Cloud Storage bucket must exist.",
      "In addition to the Identity and Access Management (IAM) roles necessary to run Dataflow jobs, you must also have the <a href=\"https://cloud.google.com/spanner/docs/export#iam\">appropriate IAM roles</a> for reading your Cloud Spanner data and writing to your Cloud Storage bucket."
    })
@SuppressWarnings("unused")
public class SpannerVectorEmbeddingExport {

  private static final Logger LOG = LoggerFactory.getLogger(SpannerVectorEmbeddingExport.class);

  /** Custom PipelineOptions. */
  public interface SpannerToVectorEmbeddingJsonOptions extends PipelineOptions {
    @TemplateParameter.ProjectId(
        order = 10,
        groupName = "Source",
        description = "Cloud Spanner Project Id",
        helpText = "The project ID of the Spanner instance.")
    ValueProvider<String> getSpannerProjectId();

    void setSpannerProjectId(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 20,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9\\-]*[a-z0-9]"},
        description = "Cloud Spanner instance ID",
        helpText = "The ID of the Spanner instance to export the vector embeddings from.")
    ValueProvider<String> getSpannerInstanceId();

    void setSpannerInstanceId(ValueProvider<String> spannerInstanceId);

    @TemplateParameter.Text(
        order = 30,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9_\\-]*[a-z0-9]"},
        description = "Cloud Spanner database ID",
        helpText = "The ID of the Spanner database to export the vector embeddings from.")
    ValueProvider<String> getSpannerDatabaseId();

    void setSpannerDatabaseId(ValueProvider<String> spannerDatabaseId);

    @TemplateParameter.Text(
        order = 40,
        groupName = "Source",
        regexes = {"^.+$"},
        description = "Spanner Table",
        helpText = "The Spanner table to read from.")
    ValueProvider<String> getSpannerTable();

    void setSpannerTable(ValueProvider<String> table);

    @TemplateParameter.Text(
        order = 50,
        groupName = "Source",
        description = "Columns to Export from Spanner Table",
        helpText =
            "A comma-separated list of required columns for the Vertex AI Vector Search index. The ID and embedding columns are required by Vector Search. If your column names don't match the Vertex AI Vector Search index input structure, create column mappings by using aliases. If the column names don't match the format expected by Vertex AI, use the notation from:to. For example, if you have columns named id and my_embedding, specify id, my_embedding:embedding.")
    ValueProvider<String> getSpannerColumnsToExport();

    void setSpannerColumnsToExport(ValueProvider<String> value);

    @TemplateParameter.GcsWriteFolder(
        order = 60,
        groupName = "Target",
        description = "Output files folder in Cloud Storage",
        helpText =
            "The Cloud Storage folder to write output files to. The path must end with a slash.",
        example = "gs://your-bucket/folder1/")
    ValueProvider<String> getGcsOutputFolder();

    void setGcsOutputFolder(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 70,
        groupName = "Target",
        description = "Output files prefix in Cloud Storage",
        helpText = "The filename prefix for writing output files.",
        example = "vector-embeddings")
    ValueProvider<String> getGcsOutputFilePrefix();

    void setGcsOutputFilePrefix(ValueProvider<String> textWritePrefix);

    @TemplateParameter.Text(
        order = 80,
        groupName = "Source",
        optional = true,
        description = "Cloud Spanner Endpoint to call",
        helpText =
            "The Spanner endpoint to call in the template. The default value is https://batch-spanner.googleapis.com.",
        example = "https://batch-spanner.googleapis.com")
    @Default.String("https://batch-spanner.googleapis.com")
    ValueProvider<String> getSpannerHost();

    void setSpannerHost(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 90,
        groupName = "Source",
        optional = true,
        regexes = {
          "^([0-9]{4})-([0-9]{2})-([0-9]{2})T([0-9]{2}):([0-9]{2}):(([0-9]{2})(\\.[0-9]+)?)Z$"
        },
        description = "Timestamp to read stale data from a version in the past.",
        helpText =
            "If set, specifies the time when the database version must be taken. The value is a string in the RFC-3339 date format in Unix epoch time. For example: `1990-12-31T23:59:60Z`. The timestamp must be in the past, and maximum timestamp staleness (https://cloud.google.com/spanner/docs/timestamp-bounds#maximum_timestamp_staleness) applies. If not set, a strong bound (https://cloud.google.com/spanner/docs/timestamp-bounds#strong) is used to read the latest data. Defaults to `empty`.",
        example = "1990-12-31T23:59:60Z")
    @Default.String(value = "")
    ValueProvider<String> getSpannerVersionTime();

    void setSpannerVersionTime(ValueProvider<String> value);

    @TemplateParameter.Boolean(
        order = 100,
        groupName = "Source",
        optional = true,
        description = "Use independent compute resource (Spanner DataBoost).",
        helpText =
            "When set to `true`, the template uses Spanner on-demand compute. The export job runs on independent compute resources that don't impact current Spanner workloads. Using this option incurs additional charges in Spanner. For more information, see Spanner Data Boost overview (https://cloud.google.com/spanner/docs/databoost/databoost-overview). Defaults to: `false`.")
    @Default.Boolean(false)
    ValueProvider<Boolean> getSpannerDataBoostEnabled();

    void setSpannerDataBoostEnabled(ValueProvider<Boolean> value);

    @TemplateParameter.Enum(
        order = 110,
        groupName = "Source",
        enumOptions = {
          @TemplateEnumOption("LOW"),
          @TemplateEnumOption("MEDIUM"),
          @TemplateEnumOption("HIGH")
        },
        optional = true,
        description = "Priority for Spanner RPC invocations",
        helpText =
            "The request priority for Spanner calls. The allowed values are `HIGH`, `MEDIUM`, and `LOW`. The default value is `MEDIUM`.")
    ValueProvider<RpcPriority> getSpannerPriority();

    void setSpannerPriority(ValueProvider<RpcPriority> value);
  }

  /**
   * Runs a pipeline which reads in vector embeddings records from Spanner, and writes the JSON to
   * TextIO sink.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    LOG.info("Starting pipeline setup");
    PipelineOptionsFactory.register(SpannerToVectorEmbeddingJsonOptions.class);

    SpannerToVectorEmbeddingJsonOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(SpannerToVectorEmbeddingJsonOptions.class);

    FileSystems.setDefaultPipelineOptions(options);
    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(options.getSpannerHost())
            .withProjectId(options.getSpannerProjectId())
            .withInstanceId(options.getSpannerInstanceId())
            .withDatabaseId(options.getSpannerDatabaseId())
            .withRpcPriority(options.getSpannerPriority())
            .withDataBoostEnabled(options.getSpannerDataBoostEnabled());

    ValueProvider<String> gcsOutputFilePrefix = options.getGcsOutputFilePrefix();

    // Concatenating cloud storage folder with file prefix to get complete path
    ValueProvider<String> gcsOutputFilePathWithPrefix =
        ValueProvider.NestedValueProvider.of(
            options.getGcsOutputFolder(),
            (SerializableFunction<String, String>)
                folder -> {
                  if (!folder.endsWith("/")) {
                    // Appending the slash if not provided by user
                    folder = folder + "/";
                  }
                  return folder + gcsOutputFilePrefix.get();
                });

    PTransform<PBegin, PCollection<ReadOperation>> spannerExport =
        SpannerConverters.ExportTransformFactory.create(
            options.getSpannerTable(),
            spannerConfig,
            gcsOutputFilePathWithPrefix,
            options.getSpannerVersionTime(),
            options.getSpannerColumnsToExport(),
            ValueProvider.StaticValueProvider.of(/* disable_schema_export= */ false));

    /* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
     * only take a timestamp object for exact staleness which works when
     * parameters are provided during template compile time. They do not work with
     * a Timestamp valueProvider which can take parameters at runtime. Hence a new
     * ParDo class CreateTransactionFnWithTimestamp had to be created for this
     * purpose.
     */
    PCollectionView<Transaction> tx =
        pipeline
            .apply("Setup for Transaction", Create.of(1))
            .apply(
                "Create transaction",
                ParDo.of(
                    new CreateTransactionFnWithTimestamp(
                        spannerConfig, options.getSpannerVersionTime())))
            .apply("As PCollectionView", View.asSingleton());

    PCollection<String> json =
        pipeline
            .apply("Create export", spannerExport)
            // We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
            // because ValueProvider parameters such as table name required for
            // LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
            // type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
            // these parameters at the pipeline execution time.
            .apply(
                "Read all records",
                LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
            .apply(
                "Struct To JSON",
                MapElements.into(TypeDescriptors.strings())
                    .via(
                        struct ->
                            (new SpannerConverters.StructJSONPrinter(
                                    new VectorSearchStructValidator()))
                                .print(struct)));

    json.apply(
        "Write to storage", TextIO.write().to(gcsOutputFilePathWithPrefix).withSuffix(".json"));

    pipeline.run();
    LOG.info("Completed pipeline setup");
  }
}

Étape suivante