Vorlage für die Vektorsuche von Spanner zu Vertex AI

Die Vorlage für Dateien der Vektorsuche von Spanner zu Vertex AI in Cloud Storage erstellt eine Batchpipeline, die Daten aus Vektoreinbettungen aus einer Spanner-Tabelle im JSON-Format in Cloud Storage exportiert. Verwenden Sie Vorlagenparameter, um den Cloud Storage-Ordner anzugeben, in den die Vektoreinbettungen exportiert werden sollen. Der Cloud Storage-Ordner enthält die Liste der exportierten .json-Dateien, die die Vektoreinbettungen in einem vom Vertex AI Vector Search-Index unterstützten Format darstellen.

Weitere Informationen finden Sie unter Format und Struktur der Eingabedaten.

Pipelineanforderungen

  • Die Spanner-Datenbank muss vorhanden sein.
  • Der Cloud Storage-Bucket für die Ausgabe von Daten muss vorhanden sein.
  • Zusätzlich zu den IAM-Rollen (Identity and Access Management), die zum Ausführen von Dataflow-Jobs erforderlich sind, benötigen Sie die erforderlichen IAM-Rollen zum Lesen Ihrer Spanner-Daten und Schreiben in Ihren Cloud Storage-Bucket.

Vorlagenparameter

Erforderliche Parameter

  • spannerProjectId: Die Projekt-ID der Spanner-Instanz.
  • spannerInstanceId: Die ID der Spanner-Instanz, aus der die Vektoreinbettungen exportiert werden sollen.
  • spannerDatabaseId: Die ID der Spanner-Datenbank, aus der die Vektoreinbettungen exportiert werden sollen.
  • spannerTable: Die Spanner-Tabelle, aus der gelesen werden soll.
  • spannerColumnsToExport: Eine durch Kommas getrennte Liste der erforderlichen Spalten für den Vertex AI-Vektorsuchindex. Die ID- und Einbettungsspalten sind für die Vektorsuche erforderlich. Wenn Ihre Spaltennamen nicht mit der Eingabestruktur des Vertex AI Vektorsuchindex übereinstimmen, erstellen Sie mithilfe von Aliassen Spaltenzuordnungen. Wenn die Spaltennamen nicht mit dem von Vertex AI erwarteten Format übereinstimmen, verwenden Sie die Notation "from:to". Wenn Sie beispielsweise Spalten mit den Namen "id" und "my_embedding" haben, geben Sie "id", "my_embedding:embedding" an.
  • gcsOutputFolder: Der Cloud Storage-Ordner, in den Ausgabedateien geschrieben werden sollen. Der Pfad muss mit einem Schrägstrich enden. Beispiel: gs://your-bucket/folder1/.
  • gcsOutputFilePrefix: Das Dateinamenpräfix zum Schreiben von Ausgabedateien. Beispiel: vector-embeddings.

Optionale Parameter

  • spannerHost: Der Spanner-Endpunkt, der in der Vorlage aufgerufen werden soll. Der Standardwert ist https://batch-spanner.googleapis.com. Beispiel: https://batch-spanner.googleapis.com.
  • spannerVersionTime: Wird dieser Wert festgelegt, gibt er die Zeit an, zu der die Datenbankversion verwendet werden muss. Der Wert ist ein String im RFC-3339-Datumsformat in Unix-Epochen-Zeit. Beispiel: 1990-12-31T23:59:60Z Der Zeitstempel muss in der Vergangenheit liegen und die maximale Zeitstempelveralterung (https://cloud.google.com/spanner/docs/timestamp-bounds#maximum_timestamp_staleness) gilt. Wenn nicht festgelegt, wird eine starke Grenze (https://cloud.google.com/spanner/docs/timestamp-bounds#strong) verwendet, um die neuesten Daten zu lesen. Die Standardeinstellung ist empty. Beispiel: 1990-12-31T23:59:60Z.
  • spannerDataBoostEnabled: Wenn dieser Wert auf true gesetzt ist, verwendet die Vorlage das On-Demand-Computing von Spanner. Der Exportjob wird auf unabhängigen Rechenressourcen ausgeführt, die sich nicht auf aktuelle Spanner-Arbeitslasten auswirken. Bei Verwendung dieser Option fallen in Spanner zusätzliche Gebühren an. Weitere Informationen finden Sie in der Data Boost-Übersicht (https://cloud.google.com/spanner/docs/databoost/databoost-overview). Standardeinstellung: false.
  • spannerPriority: Die Anfragepriorität für Spanner-Aufrufe. Zulässige Werte sind HIGH, MEDIUM und LOW. Der Standardwert ist MEDIUM.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Spanner to Vertex AI Vector Search files on Cloud Storage templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/Cloud_Spanner_vectors_to_Cloud_Storage \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       spannerProjectId=SPANNER_PROJECT_ID,\
       spannerInstanceId=SPANNER_INSTANCE_ID,\
       spannerDatabaseId=SPANNER_DATABASE_ID,\
       spannerTable=SPANNER_TABLE,\
       spannerColumnsToExport=SPANNER_COLUMNS_TO_EXPORT,\
       gcsOutputFolder=GCS_OUTPUT_FOLDER,\
       gcsOutputFilePrefix=GCS_OUTPUT_FILE_PREFIX,\

Ersetzen Sie dabei Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_PROJECT_ID: Die Spanner-Projekt-ID.
  • SPANNER_INSTANCE_ID: Die Spanner-Instanz-ID
  • SPANNER_DATABASE_ID: Die Spanner-Datenbank-ID
  • SPANNER_TABLE: die Spanner-Tabelle
  • SPANNER_COLUMNS_TO_EXPORT: Die Spalten, die aus der Spanner-Tabelle exportiert werden sollen
  • GCS_OUTPUT_FOLDER: Der Cloud Storage-Ordner, in den Ausgabedateien geschrieben werden sollen.
  • GCS_OUTPUT_FILE_PREFIX: das Präfix der Ausgabedateien in Cloud Storage

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_Spanner_vectors_to_Cloud_Storage
{
   "jobName": "JOB_NAME",
   "parameters": {
     "spannerProjectId": "SPANNER_PROJECT_ID",
     "spannerInstanceId": "SPANNER_INSTANCE_ID",
     "spannerDatabaseId": "SPANNER_DATABASE_ID",
     "spannerTable": "SPANNER_TABLE",
     "spannerColumnsToExport": "SPANNER_COLUMNS_TO_EXPORT",
     "gcsOutputFolder": "GCS_OUTPUT_FOLDER",
     "gcsOutputFilePrefix": "GCS_OUTPUT_FILE_PREFIX",
   },
   "environment": { "maxWorkers": "10" }
}

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_PROJECT_ID: Die Spanner-Projekt-ID.
  • SPANNER_INSTANCE_ID: Die Spanner-Instanz-ID
  • SPANNER_DATABASE_ID: Die Spanner-Datenbank-ID
  • SPANNER_TABLE: die Spanner-Tabelle
  • SPANNER_COLUMNS_TO_EXPORT: Die Spalten, die aus der Spanner-Tabelle exportiert werden sollen
  • GCS_OUTPUT_FOLDER: Der Cloud Storage-Ordner, in den Ausgabedateien geschrieben werden sollen.
  • GCS_OUTPUT_FILE_PREFIX: das Präfix der Ausgabedateien in Cloud Storage
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.templates.SpannerVectorEmbeddingExport.SpannerToVectorEmbeddingJsonOptions;
import com.google.cloud.teleport.templates.common.SpannerConverters;
import com.google.cloud.teleport.templates.common.SpannerConverters.CreateTransactionFnWithTimestamp;
import com.google.cloud.teleport.templates.common.SpannerConverters.VectorSearchStructValidator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Dataflow template which export vector embeddings from Spanner to GCS in json format. It exports a
 * Spanner table using <a
 * href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">Batch API</a>, which
 * creates multiple workers in parallel for better performance. The result is written to a JSON file
 * in Google Cloud Storage.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Spanner_to_Vector_Embedding.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_Spanner_vectors_to_Cloud_Storage",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Spanner vectors to Cloud Storage for Vertex Vector Search",
    optionsClass = SpannerToVectorEmbeddingJsonOptions.class,
    description = {
      "The Cloud Spanner to Vector Embeddings on Cloud Storage template is a batch pipeline that exports vector embeddings data from Cloud Spanner's table to Cloud Storage in JSON format. "
          + "Vector embeddings are exported to a Cloud Storage folder specified by the user in the template parameters."
          + " The Cloud Storage folder will contain the list of exported `.json` files representing vector embeddings in a format supported by Vertex AI Vector Search Index.\n",
      "Check <a href=\"https://cloud.google.com/vertex-ai/docs/vector-search/setup/format-structure#json\">Vector Search Format Structure</a> for additional details."
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-vertex-vector-search",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner database must exist.",
      "The output Cloud Storage bucket must exist.",
      "In addition to the Identity and Access Management (IAM) roles necessary to run Dataflow jobs, you must also have the <a href=\"https://cloud.google.com/spanner/docs/export#iam\">appropriate IAM roles</a> for reading your Cloud Spanner data and writing to your Cloud Storage bucket."
    })
@SuppressWarnings("unused")
public class SpannerVectorEmbeddingExport {

  private static final Logger LOG = LoggerFactory.getLogger(SpannerVectorEmbeddingExport.class);

  /** Custom PipelineOptions. */
  public interface SpannerToVectorEmbeddingJsonOptions extends PipelineOptions {
    @TemplateParameter.ProjectId(
        order = 10,
        groupName = "Source",
        description = "Cloud Spanner Project Id",
        helpText = "The project ID of the Spanner instance.")
    ValueProvider<String> getSpannerProjectId();

    void setSpannerProjectId(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 20,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9\\-]*[a-z0-9]"},
        description = "Cloud Spanner instance ID",
        helpText = "The ID of the Spanner instance to export the vector embeddings from.")
    ValueProvider<String> getSpannerInstanceId();

    void setSpannerInstanceId(ValueProvider<String> spannerInstanceId);

    @TemplateParameter.Text(
        order = 30,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9_\\-]*[a-z0-9]"},
        description = "Cloud Spanner database ID",
        helpText = "The ID of the Spanner database to export the vector embeddings from.")
    ValueProvider<String> getSpannerDatabaseId();

    void setSpannerDatabaseId(ValueProvider<String> spannerDatabaseId);

    @TemplateParameter.Text(
        order = 40,
        groupName = "Source",
        regexes = {"^.+$"},
        description = "Spanner Table",
        helpText = "The Spanner table to read from.")
    ValueProvider<String> getSpannerTable();

    void setSpannerTable(ValueProvider<String> table);

    @TemplateParameter.Text(
        order = 50,
        groupName = "Source",
        description = "Columns to Export from Spanner Table",
        helpText =
            "A comma-separated list of required columns for the Vertex AI Vector Search index. The ID and embedding columns are required by Vector Search. If your column names don't match the Vertex AI Vector Search index input structure, create column mappings by using aliases. If the column names don't match the format expected by Vertex AI, use the notation from:to. For example, if you have columns named id and my_embedding, specify id, my_embedding:embedding.")
    ValueProvider<String> getSpannerColumnsToExport();

    void setSpannerColumnsToExport(ValueProvider<String> value);

    @TemplateParameter.GcsWriteFolder(
        order = 60,
        groupName = "Target",
        description = "Output files folder in Cloud Storage",
        helpText =
            "The Cloud Storage folder to write output files to. The path must end with a slash.",
        example = "gs://your-bucket/folder1/")
    ValueProvider<String> getGcsOutputFolder();

    void setGcsOutputFolder(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 70,
        groupName = "Target",
        description = "Output files prefix in Cloud Storage",
        helpText = "The filename prefix for writing output files.",
        example = "vector-embeddings")
    ValueProvider<String> getGcsOutputFilePrefix();

    void setGcsOutputFilePrefix(ValueProvider<String> textWritePrefix);

    @TemplateParameter.Text(
        order = 80,
        groupName = "Source",
        optional = true,
        description = "Cloud Spanner Endpoint to call",
        helpText =
            "The Spanner endpoint to call in the template. The default value is https://batch-spanner.googleapis.com.",
        example = "https://batch-spanner.googleapis.com")
    @Default.String("https://batch-spanner.googleapis.com")
    ValueProvider<String> getSpannerHost();

    void setSpannerHost(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 90,
        groupName = "Source",
        optional = true,
        regexes = {
          "^([0-9]{4})-([0-9]{2})-([0-9]{2})T([0-9]{2}):([0-9]{2}):(([0-9]{2})(\\.[0-9]+)?)Z$"
        },
        description = "Timestamp to read stale data from a version in the past.",
        helpText =
            "If set, specifies the time when the database version must be taken. The value is a string in the RFC-3339 date format in Unix epoch time. For example: `1990-12-31T23:59:60Z`. The timestamp must be in the past, and maximum timestamp staleness (https://cloud.google.com/spanner/docs/timestamp-bounds#maximum_timestamp_staleness) applies. If not set, a strong bound (https://cloud.google.com/spanner/docs/timestamp-bounds#strong) is used to read the latest data. Defaults to `empty`.",
        example = "1990-12-31T23:59:60Z")
    @Default.String(value = "")
    ValueProvider<String> getSpannerVersionTime();

    void setSpannerVersionTime(ValueProvider<String> value);

    @TemplateParameter.Boolean(
        order = 100,
        groupName = "Source",
        optional = true,
        description = "Use independent compute resource (Spanner DataBoost).",
        helpText =
            "When set to `true`, the template uses Spanner on-demand compute. The export job runs on independent compute resources that don't impact current Spanner workloads. Using this option incurs additional charges in Spanner. For more information, see Spanner Data Boost overview (https://cloud.google.com/spanner/docs/databoost/databoost-overview). Defaults to: `false`.")
    @Default.Boolean(false)
    ValueProvider<Boolean> getSpannerDataBoostEnabled();

    void setSpannerDataBoostEnabled(ValueProvider<Boolean> value);

    @TemplateParameter.Enum(
        order = 110,
        groupName = "Source",
        enumOptions = {
          @TemplateEnumOption("LOW"),
          @TemplateEnumOption("MEDIUM"),
          @TemplateEnumOption("HIGH")
        },
        optional = true,
        description = "Priority for Spanner RPC invocations",
        helpText =
            "The request priority for Spanner calls. The allowed values are `HIGH`, `MEDIUM`, and `LOW`. The default value is `MEDIUM`.")
    ValueProvider<RpcPriority> getSpannerPriority();

    void setSpannerPriority(ValueProvider<RpcPriority> value);
  }

  /**
   * Runs a pipeline which reads in vector embeddings records from Spanner, and writes the JSON to
   * TextIO sink.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    LOG.info("Starting pipeline setup");
    PipelineOptionsFactory.register(SpannerToVectorEmbeddingJsonOptions.class);

    SpannerToVectorEmbeddingJsonOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(SpannerToVectorEmbeddingJsonOptions.class);

    FileSystems.setDefaultPipelineOptions(options);
    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(options.getSpannerHost())
            .withProjectId(options.getSpannerProjectId())
            .withInstanceId(options.getSpannerInstanceId())
            .withDatabaseId(options.getSpannerDatabaseId())
            .withRpcPriority(options.getSpannerPriority())
            .withDataBoostEnabled(options.getSpannerDataBoostEnabled());

    ValueProvider<String> gcsOutputFilePrefix = options.getGcsOutputFilePrefix();

    // Concatenating cloud storage folder with file prefix to get complete path
    ValueProvider<String> gcsOutputFilePathWithPrefix =
        ValueProvider.NestedValueProvider.of(
            options.getGcsOutputFolder(),
            (SerializableFunction<String, String>)
                folder -> {
                  if (!folder.endsWith("/")) {
                    // Appending the slash if not provided by user
                    folder = folder + "/";
                  }
                  return folder + gcsOutputFilePrefix.get();
                });

    PTransform<PBegin, PCollection<ReadOperation>> spannerExport =
        SpannerConverters.ExportTransformFactory.create(
            options.getSpannerTable(),
            spannerConfig,
            gcsOutputFilePathWithPrefix,
            options.getSpannerVersionTime(),
            options.getSpannerColumnsToExport(),
            ValueProvider.StaticValueProvider.of(/* disable_schema_export= */ false));

    /* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
     * only take a timestamp object for exact staleness which works when
     * parameters are provided during template compile time. They do not work with
     * a Timestamp valueProvider which can take parameters at runtime. Hence a new
     * ParDo class CreateTransactionFnWithTimestamp had to be created for this
     * purpose.
     */
    PCollectionView<Transaction> tx =
        pipeline
            .apply("Setup for Transaction", Create.of(1))
            .apply(
                "Create transaction",
                ParDo.of(
                    new CreateTransactionFnWithTimestamp(
                        spannerConfig, options.getSpannerVersionTime())))
            .apply("As PCollectionView", View.asSingleton());

    PCollection<String> json =
        pipeline
            .apply("Create export", spannerExport)
            // We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
            // because ValueProvider parameters such as table name required for
            // LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
            // type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
            // these parameters at the pipeline execution time.
            .apply(
                "Read all records",
                LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
            .apply(
                "Struct To JSON",
                MapElements.into(TypeDescriptors.strings())
                    .via(
                        struct ->
                            (new SpannerConverters.StructJSONPrinter(
                                    new VectorSearchStructValidator()))
                                .print(struct)));

    json.apply(
        "Write to storage", TextIO.write().to(gcsOutputFilePathWithPrefix).withSuffix(".json"));

    pipeline.run();
    LOG.info("Completed pipeline setup");
  }
}

Nächste Schritte