Plantilla de Búsqueda vectorial de Spanner a Vertex AI

La plantilla para Spanner a archivos de búsqueda vectorial de Vertex AI en Cloud Storage crea una canalización por lotes que exporta datos de incorporaciones vectoriales de una tabla de Spanner a Cloud Storage en formato JSON. Usa los parámetros de plantilla a fin de especificar la carpeta de Cloud Storage a la que se exportarán las incorporaciones de vectores. La carpeta de Cloud Storage contiene la lista de archivos .json exportados, que representan las incorporaciones de vector en un formato compatible con el índice de Vertex AI Vector Search.

Para obtener más información, consulta Formato y estructura de los datos de entrada.

Requisitos de la canalización

  • La base de datos de Spanner debe existir.
  • El bucket de Cloud Storage para los datos de salida debe existir.
  • Además de las funciones de administración de identidades y accesos (IAM) necesarias para ejecutar trabajos de Dataflow, necesitas lo siguiente: roles de IAM obligatorios para leer tus datos de Spanner y escribir en tu bucket de Cloud Storage.

Parámetros de la plantilla

Parámetros obligatorios

  • spannerProjectId: El ID del proyecto de la instancia de Spanner.
  • spannerInstanceId: Es el ID de la instancia de Spanner desde la que se exportarán las incorporaciones de vectores.
  • spannerDatabaseId: Es el ID de la base de datos de Spanner desde la que se exportarán las incorporaciones de vectores.
  • spannerTable: La tabla de Spanner desde la que se leerá.
  • spannerColumnsToExport: Una lista separada por comas de las columnas obligatorias para el índice de búsqueda de Vertex AI Vector. La búsqueda de vectores requiere las columnas de ID y de incorporación. Si los nombres de tus columnas no coinciden con la estructura de entrada del índice de la búsqueda de vectores de Vertex AI, crea asignaciones de columnas mediante alias. Si los nombres de las columnas no coinciden con el formato que espera Vertex AI, usa la notación de:a. Por ejemplo, si tienes columnas llamadas id y my_embedding, especifica id, my_embedding:embedding.
  • gcsOutputFolder: Es la carpeta de Cloud Storage en la que se escriben los archivos de salida. La ruta de acceso debe terminar con una barra diagonal. Por ejemplo, gs://your-bucket/folder1/
  • gcsOutputFilePrefix: Es el prefijo del nombre de archivo para escribir los archivos de salida. Por ejemplo, vector-embeddings

Parámetros opcionales

  • spannerHost: Es el extremo de Spanner al que se llamará en la plantilla. El valor predeterminado es https://batch-spanner.googleapis.com. Por ejemplo, https://batch-spanner.googleapis.com.
  • spannerVersionTime: Si se configura, especifica la hora en la que se debe tomar la versión de la base de datos. El valor es una string en formato de fecha RFC-3339 en hora de época Unix. Por ejemplo: 1990-12-31T23:59:60Z. La marca de tiempo debe ser del pasado y se aplica la máxima inactividad de la marca de tiempo (https://cloud.google.com/spanner/docs/timestamp-bounds#maximum_timestamp_staleness). Si no se establece, se usa un límite sólido (https://cloud.google.com/spanner/docs/timestamp-bounds#strong) para leer los datos más recientes. El valor predeterminado es empty. Por ejemplo, 1990-12-31T23:59:60Z.
  • spannerDataBoostEnabled: Cuando se establece en true, la plantilla usa el procesamiento a pedido de Spanner. El trabajo de exportación se ejecuta en recursos de procesamiento independientes que no afectan las cargas de trabajo actuales de Spanner. El uso de esta opción genera cargos adicionales en Spanner. Para obtener más información, consulta la descripción general de Data Boost (https://cloud.google.com/spanner/docs/databoost/databoost-overview). El valor predeterminado es false.
  • spannerPriority: La prioridad de solicitud para llamadas de Spanner. Los valores permitidos son HIGH, MEDIUM y LOW. El valor predeterminado es MEDIUM.

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Spanner to Vertex AI Vector Search files on Cloud Storage template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/Cloud_Spanner_vectors_to_Cloud_Storage \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       spannerProjectId=SPANNER_PROJECT_ID,\
       spannerInstanceId=SPANNER_INSTANCE_ID,\
       spannerDatabaseId=SPANNER_DATABASE_ID,\
       spannerTable=SPANNER_TABLE,\
       spannerColumnsToExport=SPANNER_COLUMNS_TO_EXPORT,\
       gcsOutputFolder=GCS_OUTPUT_FOLDER,\
       gcsOutputFilePrefix=GCS_OUTPUT_FILE_PREFIX,\

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • SPANNER_PROJECT_ID: Es el ID del proyecto de Spanner.
  • SPANNER_INSTANCE_ID: Es el ID de la instancia de Spanner.
  • SPANNER_DATABASE_ID: Es el ID de la base de datos de Spanner.
  • SPANNER_TABLE: Es la tabla de Spanner.
  • SPANNER_COLUMNS_TO_EXPORT: Son las columnas que se exportarán desde la tabla de Spanner
  • GCS_OUTPUT_FOLDER: Es la carpeta de Cloud Storage a la que se exportarán los archivos
  • GCS_OUTPUT_FILE_PREFIX: Es el prefijo de los archivos de salida en Cloud Storage

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_Spanner_vectors_to_Cloud_Storage
{
   "jobName": "JOB_NAME",
   "parameters": {
     "spannerProjectId": "SPANNER_PROJECT_ID",
     "spannerInstanceId": "SPANNER_INSTANCE_ID",
     "spannerDatabaseId": "SPANNER_DATABASE_ID",
     "spannerTable": "SPANNER_TABLE",
     "spannerColumnsToExport": "SPANNER_COLUMNS_TO_EXPORT",
     "gcsOutputFolder": "GCS_OUTPUT_FOLDER",
     "gcsOutputFilePrefix": "GCS_OUTPUT_FILE_PREFIX",
   },
   "environment": { "maxWorkers": "10" }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • SPANNER_PROJECT_ID: Es el ID del proyecto de Spanner.
  • SPANNER_INSTANCE_ID: Es el ID de la instancia de Spanner.
  • SPANNER_DATABASE_ID: Es el ID de la base de datos de Spanner.
  • SPANNER_TABLE: Es la tabla de Spanner.
  • SPANNER_COLUMNS_TO_EXPORT: Son las columnas que se exportarán desde la tabla de Spanner
  • GCS_OUTPUT_FOLDER: Es la carpeta de Cloud Storage a la que se exportarán los archivos
  • GCS_OUTPUT_FILE_PREFIX: Es el prefijo de los archivos de salida en Cloud Storage
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.templates.SpannerVectorEmbeddingExport.SpannerToVectorEmbeddingJsonOptions;
import com.google.cloud.teleport.templates.common.SpannerConverters;
import com.google.cloud.teleport.templates.common.SpannerConverters.CreateTransactionFnWithTimestamp;
import com.google.cloud.teleport.templates.common.SpannerConverters.VectorSearchStructValidator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Dataflow template which export vector embeddings from Spanner to GCS in json format. It exports a
 * Spanner table using <a
 * href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">Batch API</a>, which
 * creates multiple workers in parallel for better performance. The result is written to a JSON file
 * in Google Cloud Storage.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Spanner_to_Vector_Embedding.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_Spanner_vectors_to_Cloud_Storage",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Spanner vectors to Cloud Storage for Vertex Vector Search",
    optionsClass = SpannerToVectorEmbeddingJsonOptions.class,
    description = {
      "The Cloud Spanner to Vector Embeddings on Cloud Storage template is a batch pipeline that exports vector embeddings data from Cloud Spanner's table to Cloud Storage in JSON format. "
          + "Vector embeddings are exported to a Cloud Storage folder specified by the user in the template parameters."
          + " The Cloud Storage folder will contain the list of exported `.json` files representing vector embeddings in a format supported by Vertex AI Vector Search Index.\n",
      "Check <a href=\"https://cloud.google.com/vertex-ai/docs/vector-search/setup/format-structure#json\">Vector Search Format Structure</a> for additional details."
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-vertex-vector-search",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner database must exist.",
      "The output Cloud Storage bucket must exist.",
      "In addition to the Identity and Access Management (IAM) roles necessary to run Dataflow jobs, you must also have the <a href=\"https://cloud.google.com/spanner/docs/export#iam\">appropriate IAM roles</a> for reading your Cloud Spanner data and writing to your Cloud Storage bucket."
    })
@SuppressWarnings("unused")
public class SpannerVectorEmbeddingExport {

  private static final Logger LOG = LoggerFactory.getLogger(SpannerVectorEmbeddingExport.class);

  /** Custom PipelineOptions. */
  public interface SpannerToVectorEmbeddingJsonOptions extends PipelineOptions {
    @TemplateParameter.ProjectId(
        order = 10,
        groupName = "Source",
        description = "Cloud Spanner Project Id",
        helpText = "The project ID of the Spanner instance.")
    ValueProvider<String> getSpannerProjectId();

    void setSpannerProjectId(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 20,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9\\-]*[a-z0-9]"},
        description = "Cloud Spanner instance ID",
        helpText = "The ID of the Spanner instance to export the vector embeddings from.")
    ValueProvider<String> getSpannerInstanceId();

    void setSpannerInstanceId(ValueProvider<String> spannerInstanceId);

    @TemplateParameter.Text(
        order = 30,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9_\\-]*[a-z0-9]"},
        description = "Cloud Spanner database ID",
        helpText = "The ID of the Spanner database to export the vector embeddings from.")
    ValueProvider<String> getSpannerDatabaseId();

    void setSpannerDatabaseId(ValueProvider<String> spannerDatabaseId);

    @TemplateParameter.Text(
        order = 40,
        groupName = "Source",
        regexes = {"^.+$"},
        description = "Spanner Table",
        helpText = "The Spanner table to read from.")
    ValueProvider<String> getSpannerTable();

    void setSpannerTable(ValueProvider<String> table);

    @TemplateParameter.Text(
        order = 50,
        groupName = "Source",
        description = "Columns to Export from Spanner Table",
        helpText =
            "A comma-separated list of required columns for the Vertex AI Vector Search index. The ID and embedding columns are required by Vector Search. If your column names don't match the Vertex AI Vector Search index input structure, create column mappings by using aliases. If the column names don't match the format expected by Vertex AI, use the notation from:to. For example, if you have columns named id and my_embedding, specify id, my_embedding:embedding.")
    ValueProvider<String> getSpannerColumnsToExport();

    void setSpannerColumnsToExport(ValueProvider<String> value);

    @TemplateParameter.GcsWriteFolder(
        order = 60,
        groupName = "Target",
        description = "Output files folder in Cloud Storage",
        helpText =
            "The Cloud Storage folder to write output files to. The path must end with a slash.",
        example = "gs://your-bucket/folder1/")
    ValueProvider<String> getGcsOutputFolder();

    void setGcsOutputFolder(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 70,
        groupName = "Target",
        description = "Output files prefix in Cloud Storage",
        helpText = "The filename prefix for writing output files.",
        example = "vector-embeddings")
    ValueProvider<String> getGcsOutputFilePrefix();

    void setGcsOutputFilePrefix(ValueProvider<String> textWritePrefix);

    @TemplateParameter.Text(
        order = 80,
        groupName = "Source",
        optional = true,
        description = "Cloud Spanner Endpoint to call",
        helpText =
            "The Spanner endpoint to call in the template. The default value is https://batch-spanner.googleapis.com.",
        example = "https://batch-spanner.googleapis.com")
    @Default.String("https://batch-spanner.googleapis.com")
    ValueProvider<String> getSpannerHost();

    void setSpannerHost(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 90,
        groupName = "Source",
        optional = true,
        regexes = {
          "^([0-9]{4})-([0-9]{2})-([0-9]{2})T([0-9]{2}):([0-9]{2}):(([0-9]{2})(\\.[0-9]+)?)Z$"
        },
        description = "Timestamp to read stale data from a version in the past.",
        helpText =
            "If set, specifies the time when the database version must be taken. The value is a string in the RFC-3339 date format in Unix epoch time. For example: `1990-12-31T23:59:60Z`. The timestamp must be in the past, and maximum timestamp staleness (https://cloud.google.com/spanner/docs/timestamp-bounds#maximum_timestamp_staleness) applies. If not set, a strong bound (https://cloud.google.com/spanner/docs/timestamp-bounds#strong) is used to read the latest data. Defaults to `empty`.",
        example = "1990-12-31T23:59:60Z")
    @Default.String(value = "")
    ValueProvider<String> getSpannerVersionTime();

    void setSpannerVersionTime(ValueProvider<String> value);

    @TemplateParameter.Boolean(
        order = 100,
        groupName = "Source",
        optional = true,
        description = "Use independent compute resource (Spanner DataBoost).",
        helpText =
            "When set to `true`, the template uses Spanner on-demand compute. The export job runs on independent compute resources that don't impact current Spanner workloads. Using this option incurs additional charges in Spanner. For more information, see Spanner Data Boost overview (https://cloud.google.com/spanner/docs/databoost/databoost-overview). Defaults to: `false`.")
    @Default.Boolean(false)
    ValueProvider<Boolean> getSpannerDataBoostEnabled();

    void setSpannerDataBoostEnabled(ValueProvider<Boolean> value);

    @TemplateParameter.Enum(
        order = 110,
        groupName = "Source",
        enumOptions = {
          @TemplateEnumOption("LOW"),
          @TemplateEnumOption("MEDIUM"),
          @TemplateEnumOption("HIGH")
        },
        optional = true,
        description = "Priority for Spanner RPC invocations",
        helpText =
            "The request priority for Spanner calls. The allowed values are `HIGH`, `MEDIUM`, and `LOW`. The default value is `MEDIUM`.")
    ValueProvider<RpcPriority> getSpannerPriority();

    void setSpannerPriority(ValueProvider<RpcPriority> value);
  }

  /**
   * Runs a pipeline which reads in vector embeddings records from Spanner, and writes the JSON to
   * TextIO sink.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    LOG.info("Starting pipeline setup");
    PipelineOptionsFactory.register(SpannerToVectorEmbeddingJsonOptions.class);

    SpannerToVectorEmbeddingJsonOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(SpannerToVectorEmbeddingJsonOptions.class);

    FileSystems.setDefaultPipelineOptions(options);
    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(options.getSpannerHost())
            .withProjectId(options.getSpannerProjectId())
            .withInstanceId(options.getSpannerInstanceId())
            .withDatabaseId(options.getSpannerDatabaseId())
            .withRpcPriority(options.getSpannerPriority())
            .withDataBoostEnabled(options.getSpannerDataBoostEnabled());

    ValueProvider<String> gcsOutputFilePrefix = options.getGcsOutputFilePrefix();

    // Concatenating cloud storage folder with file prefix to get complete path
    ValueProvider<String> gcsOutputFilePathWithPrefix =
        ValueProvider.NestedValueProvider.of(
            options.getGcsOutputFolder(),
            (SerializableFunction<String, String>)
                folder -> {
                  if (!folder.endsWith("/")) {
                    // Appending the slash if not provided by user
                    folder = folder + "/";
                  }
                  return folder + gcsOutputFilePrefix.get();
                });

    PTransform<PBegin, PCollection<ReadOperation>> spannerExport =
        SpannerConverters.ExportTransformFactory.create(
            options.getSpannerTable(),
            spannerConfig,
            gcsOutputFilePathWithPrefix,
            options.getSpannerVersionTime(),
            options.getSpannerColumnsToExport(),
            ValueProvider.StaticValueProvider.of(/* disable_schema_export= */ false));

    /* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
     * only take a timestamp object for exact staleness which works when
     * parameters are provided during template compile time. They do not work with
     * a Timestamp valueProvider which can take parameters at runtime. Hence a new
     * ParDo class CreateTransactionFnWithTimestamp had to be created for this
     * purpose.
     */
    PCollectionView<Transaction> tx =
        pipeline
            .apply("Setup for Transaction", Create.of(1))
            .apply(
                "Create transaction",
                ParDo.of(
                    new CreateTransactionFnWithTimestamp(
                        spannerConfig, options.getSpannerVersionTime())))
            .apply("As PCollectionView", View.asSingleton());

    PCollection<String> json =
        pipeline
            .apply("Create export", spannerExport)
            // We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
            // because ValueProvider parameters such as table name required for
            // LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
            // type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
            // these parameters at the pipeline execution time.
            .apply(
                "Read all records",
                LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
            .apply(
                "Struct To JSON",
                MapElements.into(TypeDescriptors.strings())
                    .via(
                        struct ->
                            (new SpannerConverters.StructJSONPrinter(
                                    new VectorSearchStructValidator()))
                                .print(struct)));

    json.apply(
        "Write to storage", TextIO.write().to(gcsOutputFilePathWithPrefix).withSuffix(".json"));

    pipeline.run();
    LOG.info("Completed pipeline setup");
  }
}

¿Qué sigue?