Spanner to Vertex AI Vector Search 模板

Spanner to Vertex AI Vector Search files on Cloud Storage 模板会创建一个批处理流水线,可采用 JSON 格式将 Spanner 表中的向量嵌入数据导出到 Cloud Storage。使用模板参数指定要将向量嵌入导出到其中的 Cloud Storage 文件夹。Cloud Storage 文件夹包含导出的 .json 文件列表,这些文件以 Vertex AI Vector Search 索引支持的格式表示向量嵌入。

如需了解详情,请参阅输入数据格式和结构

流水线要求

  • Spanner 数据库必须已存在。
  • 用于输出数据的 Cloud Storage 存储桶必须已存在。
  • 除了运行 Dataflow 作业所需的 Identity and Access Management (IAM) 角色之外,您还需要具有读取 Spanner 数据并写入 Cloud Storage 存储桶的必需 IAM 角色

模板参数

必需参数

  • spannerProjectId:Spanner 实例的项目 ID。
  • spannerInstanceId:要从中导出向量嵌入的 Spanner 实例的 ID。
  • spannerDatabaseId:要从中导出向量嵌入的 Spanner 数据库的 ID。
  • spannerTable:要从中读取数据的 Spanner 表。
  • spannerColumnsToExport:Vertex AI Vector Search 索引的必需列的英文逗号分隔列表。Vector Search 需要 ID 和嵌入列。如果列名称与 Vertex AI Vector Search 索引输入结构不匹配,请使用别名创建列映射。如果列名称与 Vertex AI 预期的格式不匹配,请使用“from:to”表示法。例如,如果您有名为 id 和 my_embedding 的列,请指定 id, my_embedding:embedding。
  • gcsOutputFolder:用于写入输出文件的 Cloud Storage 文件夹。该路径应以斜杠结尾。例如 gs://your-bucket/folder1/
  • gcsOutputFilePrefix:用于写入输出文件的文件名前缀。例如 vector-embeddings

可选参数

运行模板

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Spanner to Vertex AI Vector Search files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/Cloud_Spanner_vectors_to_Cloud_Storage \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       spannerProjectId=SPANNER_PROJECT_ID,\
       spannerInstanceId=SPANNER_INSTANCE_ID,\
       spannerDatabaseId=SPANNER_DATABASE_ID,\
       spannerTable=SPANNER_TABLE,\
       spannerColumnsToExport=SPANNER_COLUMNS_TO_EXPORT,\
       gcsOutputFolder=GCS_OUTPUT_FOLDER,\
       gcsOutputFilePrefix=GCS_OUTPUT_FILE_PREFIX,\

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • SPANNER_PROJECT_ID:Spanner 项目 ID
  • SPANNER_INSTANCE_ID:Spanner 实例 ID
  • SPANNER_DATABASE_ID:Spanner 数据库 ID
  • SPANNER_TABLE:Spanner 表
  • SPANNER_COLUMNS_TO_EXPORT:要从 Spanner 表中导出的列
  • GCS_OUTPUT_FOLDER:要将文件输出到的 Cloud Storage 文件夹
  • GCS_OUTPUT_FILE_PREFIX:Cloud Storage 中的输出文件前缀

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_Spanner_vectors_to_Cloud_Storage
{
   "jobName": "JOB_NAME",
   "parameters": {
     "spannerProjectId": "SPANNER_PROJECT_ID",
     "spannerInstanceId": "SPANNER_INSTANCE_ID",
     "spannerDatabaseId": "SPANNER_DATABASE_ID",
     "spannerTable": "SPANNER_TABLE",
     "spannerColumnsToExport": "SPANNER_COLUMNS_TO_EXPORT",
     "gcsOutputFolder": "GCS_OUTPUT_FOLDER",
     "gcsOutputFilePrefix": "GCS_OUTPUT_FILE_PREFIX",
   },
   "environment": { "maxWorkers": "10" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • SPANNER_PROJECT_ID:Spanner 项目 ID
  • SPANNER_INSTANCE_ID:Spanner 实例 ID
  • SPANNER_DATABASE_ID:Spanner 数据库 ID
  • SPANNER_TABLE:Spanner 表
  • SPANNER_COLUMNS_TO_EXPORT:要从 Spanner 表中导出的列
  • GCS_OUTPUT_FOLDER:要将文件输出到的 Cloud Storage 文件夹
  • GCS_OUTPUT_FILE_PREFIX:Cloud Storage 中的输出文件前缀
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.templates.SpannerVectorEmbeddingExport.SpannerToVectorEmbeddingJsonOptions;
import com.google.cloud.teleport.templates.common.SpannerConverters;
import com.google.cloud.teleport.templates.common.SpannerConverters.CreateTransactionFnWithTimestamp;
import com.google.cloud.teleport.templates.common.SpannerConverters.VectorSearchStructValidator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Dataflow template which export vector embeddings from Spanner to GCS in json format. It exports a
 * Spanner table using <a
 * href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">Batch API</a>, which
 * creates multiple workers in parallel for better performance. The result is written to a JSON file
 * in Google Cloud Storage.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Spanner_to_Vector_Embedding.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_Spanner_vectors_to_Cloud_Storage",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Spanner vectors to Cloud Storage for Vertex Vector Search",
    optionsClass = SpannerToVectorEmbeddingJsonOptions.class,
    description = {
      "The Cloud Spanner to Vector Embeddings on Cloud Storage template is a batch pipeline that exports vector embeddings data from Cloud Spanner's table to Cloud Storage in JSON format. "
          + "Vector embeddings are exported to a Cloud Storage folder specified by the user in the template parameters."
          + " The Cloud Storage folder will contain the list of exported `.json` files representing vector embeddings in a format supported by Vertex AI Vector Search Index.\n",
      "Check <a href=\"https://cloud.google.com/vertex-ai/docs/vector-search/setup/format-structure#json\">Vector Search Format Structure</a> for additional details."
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-vertex-vector-search",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner database must exist.",
      "The output Cloud Storage bucket must exist.",
      "In addition to the Identity and Access Management (IAM) roles necessary to run Dataflow jobs, you must also have the <a href=\"https://cloud.google.com/spanner/docs/export#iam\">appropriate IAM roles</a> for reading your Cloud Spanner data and writing to your Cloud Storage bucket."
    })
@SuppressWarnings("unused")
public class SpannerVectorEmbeddingExport {

  private static final Logger LOG = LoggerFactory.getLogger(SpannerVectorEmbeddingExport.class);

  /** Custom PipelineOptions. */
  public interface SpannerToVectorEmbeddingJsonOptions extends PipelineOptions {
    @TemplateParameter.ProjectId(
        order = 10,
        groupName = "Source",
        description = "Cloud Spanner Project Id",
        helpText = "The project ID of the Spanner instance.")
    ValueProvider<String> getSpannerProjectId();

    void setSpannerProjectId(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 20,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9\\-]*[a-z0-9]"},
        description = "Cloud Spanner instance ID",
        helpText = "The ID of the Spanner instance to export the vector embeddings from.")
    ValueProvider<String> getSpannerInstanceId();

    void setSpannerInstanceId(ValueProvider<String> spannerInstanceId);

    @TemplateParameter.Text(
        order = 30,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9_\\-]*[a-z0-9]"},
        description = "Cloud Spanner database ID",
        helpText = "The ID of the Spanner database to export the vector embeddings from.")
    ValueProvider<String> getSpannerDatabaseId();

    void setSpannerDatabaseId(ValueProvider<String> spannerDatabaseId);

    @TemplateParameter.Text(
        order = 40,
        groupName = "Source",
        regexes = {"^.+$"},
        description = "Spanner Table",
        helpText = "The Spanner table to read from.")
    ValueProvider<String> getSpannerTable();

    void setSpannerTable(ValueProvider<String> table);

    @TemplateParameter.Text(
        order = 50,
        groupName = "Source",
        description = "Columns to Export from Spanner Table",
        helpText =
            "A comma-separated list of required columns for the Vertex AI Vector Search index. The ID and embedding columns are required by Vector Search. If your column names don't match the Vertex AI Vector Search index input structure, create column mappings by using aliases. If the column names don't match the format expected by Vertex AI, use the notation from:to. For example, if you have columns named id and my_embedding, specify id, my_embedding:embedding.")
    ValueProvider<String> getSpannerColumnsToExport();

    void setSpannerColumnsToExport(ValueProvider<String> value);

    @TemplateParameter.GcsWriteFolder(
        order = 60,
        groupName = "Target",
        description = "Output files folder in Cloud Storage",
        helpText =
            "The Cloud Storage folder to write output files to. The path must end with a slash.",
        example = "gs://your-bucket/folder1/")
    ValueProvider<String> getGcsOutputFolder();

    void setGcsOutputFolder(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 70,
        groupName = "Target",
        description = "Output files prefix in Cloud Storage",
        helpText = "The filename prefix for writing output files.",
        example = "vector-embeddings")
    ValueProvider<String> getGcsOutputFilePrefix();

    void setGcsOutputFilePrefix(ValueProvider<String> textWritePrefix);

    @TemplateParameter.Text(
        order = 80,
        groupName = "Source",
        optional = true,
        description = "Cloud Spanner Endpoint to call",
        helpText =
            "The Spanner endpoint to call in the template. The default value is https://batch-spanner.googleapis.com.",
        example = "https://batch-spanner.googleapis.com")
    @Default.String("https://batch-spanner.googleapis.com")
    ValueProvider<String> getSpannerHost();

    void setSpannerHost(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 90,
        groupName = "Source",
        optional = true,
        regexes = {
          "^([0-9]{4})-([0-9]{2})-([0-9]{2})T([0-9]{2}):([0-9]{2}):(([0-9]{2})(\\.[0-9]+)?)Z$"
        },
        description = "Timestamp to read stale data from a version in the past.",
        helpText =
            "If set, specifies the time when the database version must be taken. The value is a string in the RFC-3339 date format in Unix epoch time. For example: `1990-12-31T23:59:60Z`. The timestamp must be in the past, and maximum timestamp staleness (https://cloud.google.com/spanner/docs/timestamp-bounds#maximum_timestamp_staleness) applies. If not set, a strong bound (https://cloud.google.com/spanner/docs/timestamp-bounds#strong) is used to read the latest data. Defaults to `empty`.",
        example = "1990-12-31T23:59:60Z")
    @Default.String(value = "")
    ValueProvider<String> getSpannerVersionTime();

    void setSpannerVersionTime(ValueProvider<String> value);

    @TemplateParameter.Boolean(
        order = 100,
        groupName = "Source",
        optional = true,
        description = "Use independent compute resource (Spanner DataBoost).",
        helpText =
            "When set to `true`, the template uses Spanner on-demand compute. The export job runs on independent compute resources that don't impact current Spanner workloads. Using this option incurs additional charges in Spanner. For more information, see Spanner Data Boost overview (https://cloud.google.com/spanner/docs/databoost/databoost-overview). Defaults to: `false`.")
    @Default.Boolean(false)
    ValueProvider<Boolean> getSpannerDataBoostEnabled();

    void setSpannerDataBoostEnabled(ValueProvider<Boolean> value);

    @TemplateParameter.Enum(
        order = 110,
        groupName = "Source",
        enumOptions = {
          @TemplateEnumOption("LOW"),
          @TemplateEnumOption("MEDIUM"),
          @TemplateEnumOption("HIGH")
        },
        optional = true,
        description = "Priority for Spanner RPC invocations",
        helpText =
            "The request priority for Spanner calls. The allowed values are `HIGH`, `MEDIUM`, and `LOW`. The default value is `MEDIUM`.")
    ValueProvider<RpcPriority> getSpannerPriority();

    void setSpannerPriority(ValueProvider<RpcPriority> value);
  }

  /**
   * Runs a pipeline which reads in vector embeddings records from Spanner, and writes the JSON to
   * TextIO sink.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    LOG.info("Starting pipeline setup");
    PipelineOptionsFactory.register(SpannerToVectorEmbeddingJsonOptions.class);

    SpannerToVectorEmbeddingJsonOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(SpannerToVectorEmbeddingJsonOptions.class);

    FileSystems.setDefaultPipelineOptions(options);
    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(options.getSpannerHost())
            .withProjectId(options.getSpannerProjectId())
            .withInstanceId(options.getSpannerInstanceId())
            .withDatabaseId(options.getSpannerDatabaseId())
            .withRpcPriority(options.getSpannerPriority())
            .withDataBoostEnabled(options.getSpannerDataBoostEnabled());

    ValueProvider<String> gcsOutputFilePrefix = options.getGcsOutputFilePrefix();

    // Concatenating cloud storage folder with file prefix to get complete path
    ValueProvider<String> gcsOutputFilePathWithPrefix =
        ValueProvider.NestedValueProvider.of(
            options.getGcsOutputFolder(),
            (SerializableFunction<String, String>)
                folder -> {
                  if (!folder.endsWith("/")) {
                    // Appending the slash if not provided by user
                    folder = folder + "/";
                  }
                  return folder + gcsOutputFilePrefix.get();
                });

    PTransform<PBegin, PCollection<ReadOperation>> spannerExport =
        SpannerConverters.ExportTransformFactory.create(
            options.getSpannerTable(),
            spannerConfig,
            gcsOutputFilePathWithPrefix,
            options.getSpannerVersionTime(),
            options.getSpannerColumnsToExport(),
            ValueProvider.StaticValueProvider.of(/* disable_schema_export= */ false));

    /* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
     * only take a timestamp object for exact staleness which works when
     * parameters are provided during template compile time. They do not work with
     * a Timestamp valueProvider which can take parameters at runtime. Hence a new
     * ParDo class CreateTransactionFnWithTimestamp had to be created for this
     * purpose.
     */
    PCollectionView<Transaction> tx =
        pipeline
            .apply("Setup for Transaction", Create.of(1))
            .apply(
                "Create transaction",
                ParDo.of(
                    new CreateTransactionFnWithTimestamp(
                        spannerConfig, options.getSpannerVersionTime())))
            .apply("As PCollectionView", View.asSingleton());

    PCollection<String> json =
        pipeline
            .apply("Create export", spannerExport)
            // We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
            // because ValueProvider parameters such as table name required for
            // LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
            // type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
            // these parameters at the pipeline execution time.
            .apply(
                "Read all records",
                LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
            .apply(
                "Struct To JSON",
                MapElements.into(TypeDescriptors.strings())
                    .via(
                        struct ->
                            (new SpannerConverters.StructJSONPrinter(
                                    new VectorSearchStructValidator()))
                                .print(struct)));

    json.apply(
        "Write to storage", TextIO.write().to(gcsOutputFilePathWithPrefix).withSuffix(".json"));

    pipeline.run();
    LOG.info("Completed pipeline setup");
  }
}

后续步骤