Spanner to Vertex AI Vector Search 템플릿

Cloud Storage의 Spanner to Vertex AI Vector Search 파일 템플릿은 Spanner 테이블부터 Cloud Storage에 이르기까지의 데이터를 JSON 형식으로 임베딩하는 벡터를 내보내는 일괄 파이프라인을 만듭니다. 템플릿 매개변수를 사용하여 벡터 임베딩을 내보낼 Cloud Storage 폴더를 지정합니다. Cloud Storage 폴더에는 Vertex AI Vector Search 색인에서 지원하는 형식으로 벡터 임베딩을 나타내는 내보낸 .json 파일의 목록이 포함됩니다.

자세한 내용은 입력 데이터 형식 및 구조를 참조하세요.

파이프라인 요구사항

  • Spanner 데이터베이스가 있어야 합니다.
  • 데이터를 출력할 수 있는 Cloud Storage 버킷이 있어야 합니다.
  • Dataflow 작업을 실행하는 데 필요한 Identity and Access Management(IAM) 역할 외에도 Spanner 데이터를 읽고 Cloud Storage에 쓰는 데 필요한 IAM 역할이 필요합니다.

템플릿 매개변수

필수 매개변수

  • spannerProjectId: Spanner 인스턴스의 프로젝트 ID
  • spannerInstanceId: 벡터 임베딩을 내보낼 Spanner 인스턴스의 ID
  • spannerDatabaseId: 벡터 임베딩을 내보낼 Spanner 데이터베이스의 ID
  • spannerTable: 읽을 Spanner 테이블
  • spannerColumnsToExport: Vertex AI Vector Search 색인의 쉼표로 구분된 필수 열 목록. 벡터 검색에는 ID 및 임베딩 열이 필요합니다. 열 이름이 Vertex AI Vector Search 색인 입력 구조와 일치하지 않으면 별칭을 사용하여 열 매핑을 만듭니다. 열 이름이 Vertex AI에서 사용하는 형식과 일치하지 않으면 from:to 표기법을 사용하세요. 예를 들어 id 및 my_embedding이라는 열이 있는 경우 id, my_embedding:embedding을 지정합니다.
  • gcsOutputFolder: 출력 파일을 쓸 Cloud Storage 폴더. 이 경로는 슬래시로 끝나야 합니다. 예를 들면 gs://your-bucket/folder1/입니다.
  • gcsOutputFilePrefix: 출력 파일을 쓰기 위한 파일 이름 프리픽스. 예를 들면 vector-embeddings입니다.

선택적 매개변수

  • spannerHost: 템플릿에서 호출할 Spanner 엔드포인트. 기본값은 https://batch-spanner.googleapis.com입니다. 예를 들면 https://batch-spanner.googleapis.com입니다.
  • spannerVersionTime: 설정한 경우 데이터베이스 버전을 사용할 시간을 지정합니다. 값은 Unix epoch 시간의 RFC-3339 날짜 형식 문자열입니다. 예를 들면 1990-12-31T23:59:60Z입니다. 타임스탬프는 과거여야 하며 최대 타임스탬프 비활성(https://cloud.google.com/spanner/docs/timestamp-bounds#maximum_timestamp_staleness)이 적용됩니다. 설정하지 않으면 강력한 경계(https://cloud.google.com/spanner/docs/timestamp-bounds#strong)를 사용하여 최신 데이터를 읽습니다. 기본값은 empty입니다. 예를 들면 1990-12-31T23:59:60Z입니다.
  • spannerDataBoostEnabled: true로 설정하면 템플릿에서 Spanner 주문형 컴퓨팅을 사용합니다. 내보내기 작업은 현재 Spanner 워크로드에 영향을 주지 않는 독립 컴퓨팅 리소스에서 실행됩니다. 이 옵션을 사용하면 Spanner에서 추가 요금이 발생합니다. 자세한 내용은 Spanner Data Boost 개요(https://cloud.google.com/spanner/docs/databoost/databoost-overview)를 참조하세요. 기본값은 false입니다.
  • spannerPriority: Spanner 호출의 요청 우선순위입니다. 허용되는 값은 HIGH, MEDIUM, LOW입니다. 기본값은 MEDIUM입니다.

템플릿 실행

콘솔gcloudAPI
  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Spanner to Vertex AI Vector Search files on Cloud Storage template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/Cloud_Spanner_vectors_to_Cloud_Storage \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       spannerProjectId=SPANNER_PROJECT_ID,\
       spannerInstanceId=SPANNER_INSTANCE_ID,\
       spannerDatabaseId=SPANNER_DATABASE_ID,\
       spannerTable=SPANNER_TABLE,\
       spannerColumnsToExport=SPANNER_COLUMNS_TO_EXPORT,\
       gcsOutputFolder=GCS_OUTPUT_FOLDER,\
       gcsOutputFilePrefix=GCS_OUTPUT_FILE_PREFIX,\

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • SPANNER_PROJECT_ID: Spanner 프로젝트 ID
  • SPANNER_INSTANCE_ID: Spanner 인스턴스 ID
  • SPANNER_DATABASE_ID: Spanner 데이터베이스 ID
  • SPANNER_TABLE: Spanner 테이블
  • SPANNER_COLUMNS_TO_EXPORT: Spanner 테이블에서 내보낼 열
  • GCS_OUTPUT_FOLDER: 파일을 출력할 Cloud Storage 폴더
  • GCS_OUTPUT_FILE_PREFIX: Cloud Storage의 출력 파일 프리픽스

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_Spanner_vectors_to_Cloud_Storage
{
   "jobName": "JOB_NAME",
   "parameters": {
     "spannerProjectId": "SPANNER_PROJECT_ID",
     "spannerInstanceId": "SPANNER_INSTANCE_ID",
     "spannerDatabaseId": "SPANNER_DATABASE_ID",
     "spannerTable": "SPANNER_TABLE",
     "spannerColumnsToExport": "SPANNER_COLUMNS_TO_EXPORT",
     "gcsOutputFolder": "GCS_OUTPUT_FOLDER",
     "gcsOutputFilePrefix": "GCS_OUTPUT_FILE_PREFIX",
   },
   "environment": { "maxWorkers": "10" }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • SPANNER_PROJECT_ID: Spanner 프로젝트 ID
  • SPANNER_INSTANCE_ID: Spanner 인스턴스 ID
  • SPANNER_DATABASE_ID: Spanner 데이터베이스 ID
  • SPANNER_TABLE: Spanner 테이블
  • SPANNER_COLUMNS_TO_EXPORT: Spanner 테이블에서 내보낼 열
  • GCS_OUTPUT_FOLDER: 파일을 출력할 Cloud Storage 폴더
  • GCS_OUTPUT_FILE_PREFIX: Cloud Storage의 출력 파일 프리픽스
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.templates.SpannerVectorEmbeddingExport.SpannerToVectorEmbeddingJsonOptions;
import com.google.cloud.teleport.templates.common.SpannerConverters;
import com.google.cloud.teleport.templates.common.SpannerConverters.CreateTransactionFnWithTimestamp;
import com.google.cloud.teleport.templates.common.SpannerConverters.VectorSearchStructValidator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Dataflow template which export vector embeddings from Spanner to GCS in json format. It exports a
 * Spanner table using <a
 * href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">Batch API</a>, which
 * creates multiple workers in parallel for better performance. The result is written to a JSON file
 * in Google Cloud Storage.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Spanner_to_Vector_Embedding.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_Spanner_vectors_to_Cloud_Storage",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Spanner vectors to Cloud Storage for Vertex Vector Search",
    optionsClass = SpannerToVectorEmbeddingJsonOptions.class,
    description = {
      "The Cloud Spanner to Vector Embeddings on Cloud Storage template is a batch pipeline that exports vector embeddings data from Cloud Spanner's table to Cloud Storage in JSON format. "
          + "Vector embeddings are exported to a Cloud Storage folder specified by the user in the template parameters."
          + " The Cloud Storage folder will contain the list of exported `.json` files representing vector embeddings in a format supported by Vertex AI Vector Search Index.\n",
      "Check <a href=\"https://cloud.google.com/vertex-ai/docs/vector-search/setup/format-structure#json\">Vector Search Format Structure</a> for additional details."
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-vertex-vector-search",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner database must exist.",
      "The output Cloud Storage bucket must exist.",
      "In addition to the Identity and Access Management (IAM) roles necessary to run Dataflow jobs, you must also have the <a href=\"https://cloud.google.com/spanner/docs/export#iam\">appropriate IAM roles</a> for reading your Cloud Spanner data and writing to your Cloud Storage bucket."
    })
@SuppressWarnings("unused")
public class SpannerVectorEmbeddingExport {

  private static final Logger LOG = LoggerFactory.getLogger(SpannerVectorEmbeddingExport.class);

  /** Custom PipelineOptions. */
  public interface SpannerToVectorEmbeddingJsonOptions extends PipelineOptions {
    @TemplateParameter.ProjectId(
        order = 10,
        groupName = "Source",
        description = "Cloud Spanner Project Id",
        helpText = "The project ID of the Spanner instance.")
    ValueProvider<String> getSpannerProjectId();

    void setSpannerProjectId(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 20,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9\\-]*[a-z0-9]"},
        description = "Cloud Spanner instance ID",
        helpText = "The ID of the Spanner instance to export the vector embeddings from.")
    ValueProvider<String> getSpannerInstanceId();

    void setSpannerInstanceId(ValueProvider<String> spannerInstanceId);

    @TemplateParameter.Text(
        order = 30,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9_\\-]*[a-z0-9]"},
        description = "Cloud Spanner database ID",
        helpText = "The ID of the Spanner database to export the vector embeddings from.")
    ValueProvider<String> getSpannerDatabaseId();

    void setSpannerDatabaseId(ValueProvider<String> spannerDatabaseId);

    @TemplateParameter.Text(
        order = 40,
        groupName = "Source",
        regexes = {"^.+$"},
        description = "Spanner Table",
        helpText = "The Spanner table to read from.")
    ValueProvider<String> getSpannerTable();

    void setSpannerTable(ValueProvider<String> table);

    @TemplateParameter.Text(
        order = 50,
        groupName = "Source",
        description = "Columns to Export from Spanner Table",
        helpText =
            "A comma-separated list of required columns for the Vertex AI Vector Search index. The ID and embedding columns are required by Vector Search. If your column names don't match the Vertex AI Vector Search index input structure, create column mappings by using aliases. If the column names don't match the format expected by Vertex AI, use the notation from:to. For example, if you have columns named id and my_embedding, specify id, my_embedding:embedding.")
    ValueProvider<String> getSpannerColumnsToExport();

    void setSpannerColumnsToExport(ValueProvider<String> value);

    @TemplateParameter.GcsWriteFolder(
        order = 60,
        groupName = "Target",
        description = "Output files folder in Cloud Storage",
        helpText =
            "The Cloud Storage folder to write output files to. The path must end with a slash.",
        example = "gs://your-bucket/folder1/")
    ValueProvider<String> getGcsOutputFolder();

    void setGcsOutputFolder(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 70,
        groupName = "Target",
        description = "Output files prefix in Cloud Storage",
        helpText = "The filename prefix for writing output files.",
        example = "vector-embeddings")
    ValueProvider<String> getGcsOutputFilePrefix();

    void setGcsOutputFilePrefix(ValueProvider<String> textWritePrefix);

    @TemplateParameter.Text(
        order = 80,
        groupName = "Source",
        optional = true,
        description = "Cloud Spanner Endpoint to call",
        helpText =
            "The Spanner endpoint to call in the template. The default value is https://batch-spanner.googleapis.com.",
        example = "https://batch-spanner.googleapis.com")
    @Default.String("https://batch-spanner.googleapis.com")
    ValueProvider<String> getSpannerHost();

    void setSpannerHost(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 90,
        groupName = "Source",
        optional = true,
        regexes = {
          "^([0-9]{4})-([0-9]{2})-([0-9]{2})T([0-9]{2}):([0-9]{2}):(([0-9]{2})(\\.[0-9]+)?)Z$"
        },
        description = "Timestamp to read stale data from a version in the past.",
        helpText =
            "If set, specifies the time when the database version must be taken. The value is a string in the RFC-3339 date format in Unix epoch time. For example: `1990-12-31T23:59:60Z`. The timestamp must be in the past, and maximum timestamp staleness (https://cloud.google.com/spanner/docs/timestamp-bounds#maximum_timestamp_staleness) applies. If not set, a strong bound (https://cloud.google.com/spanner/docs/timestamp-bounds#strong) is used to read the latest data. Defaults to `empty`.",
        example = "1990-12-31T23:59:60Z")
    @Default.String(value = "")
    ValueProvider<String> getSpannerVersionTime();

    void setSpannerVersionTime(ValueProvider<String> value);

    @TemplateParameter.Boolean(
        order = 100,
        groupName = "Source",
        optional = true,
        description = "Use independent compute resource (Spanner DataBoost).",
        helpText =
            "When set to `true`, the template uses Spanner on-demand compute. The export job runs on independent compute resources that don't impact current Spanner workloads. Using this option incurs additional charges in Spanner. For more information, see Spanner Data Boost overview (https://cloud.google.com/spanner/docs/databoost/databoost-overview). Defaults to: `false`.")
    @Default.Boolean(false)
    ValueProvider<Boolean> getSpannerDataBoostEnabled();

    void setSpannerDataBoostEnabled(ValueProvider<Boolean> value);

    @TemplateParameter.Enum(
        order = 110,
        groupName = "Source",
        enumOptions = {
          @TemplateEnumOption("LOW"),
          @TemplateEnumOption("MEDIUM"),
          @TemplateEnumOption("HIGH")
        },
        optional = true,
        description = "Priority for Spanner RPC invocations",
        helpText =
            "The request priority for Spanner calls. The allowed values are `HIGH`, `MEDIUM`, and `LOW`. The default value is `MEDIUM`.")
    ValueProvider<RpcPriority> getSpannerPriority();

    void setSpannerPriority(ValueProvider<RpcPriority> value);
  }

  /**
   * Runs a pipeline which reads in vector embeddings records from Spanner, and writes the JSON to
   * TextIO sink.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    LOG.info("Starting pipeline setup");
    PipelineOptionsFactory.register(SpannerToVectorEmbeddingJsonOptions.class);

    SpannerToVectorEmbeddingJsonOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(SpannerToVectorEmbeddingJsonOptions.class);

    FileSystems.setDefaultPipelineOptions(options);
    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(options.getSpannerHost())
            .withProjectId(options.getSpannerProjectId())
            .withInstanceId(options.getSpannerInstanceId())
            .withDatabaseId(options.getSpannerDatabaseId())
            .withRpcPriority(options.getSpannerPriority())
            .withDataBoostEnabled(options.getSpannerDataBoostEnabled());

    ValueProvider<String> gcsOutputFilePrefix = options.getGcsOutputFilePrefix();

    // Concatenating cloud storage folder with file prefix to get complete path
    ValueProvider<String> gcsOutputFilePathWithPrefix =
        ValueProvider.NestedValueProvider.of(
            options.getGcsOutputFolder(),
            (SerializableFunction<String, String>)
                folder -> {
                  if (!folder.endsWith("/")) {
                    // Appending the slash if not provided by user
                    folder = folder + "/";
                  }
                  return folder + gcsOutputFilePrefix.get();
                });

    PTransform<PBegin, PCollection<ReadOperation>> spannerExport =
        SpannerConverters.ExportTransformFactory.create(
            options.getSpannerTable(),
            spannerConfig,
            gcsOutputFilePathWithPrefix,
            options.getSpannerVersionTime(),
            options.getSpannerColumnsToExport(),
            ValueProvider.StaticValueProvider.of(/* disable_schema_export= */ false));

    /* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
     * only take a timestamp object for exact staleness which works when
     * parameters are provided during template compile time. They do not work with
     * a Timestamp valueProvider which can take parameters at runtime. Hence a new
     * ParDo class CreateTransactionFnWithTimestamp had to be created for this
     * purpose.
     */
    PCollectionView<Transaction> tx =
        pipeline
            .apply("Setup for Transaction", Create.of(1))
            .apply(
                "Create transaction",
                ParDo.of(
                    new CreateTransactionFnWithTimestamp(
                        spannerConfig, options.getSpannerVersionTime())))
            .apply("As PCollectionView", View.asSingleton());

    PCollection<String> json =
        pipeline
            .apply("Create export", spannerExport)
            // We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
            // because ValueProvider parameters such as table name required for
            // LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
            // type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
            // these parameters at the pipeline execution time.
            .apply(
                "Read all records",
                LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
            .apply(
                "Struct To JSON",
                MapElements.into(TypeDescriptors.strings())
                    .via(
                        struct ->
                            (new SpannerConverters.StructJSONPrinter(
                                    new VectorSearchStructValidator()))
                                .print(struct)));

    json.apply(
        "Write to storage", TextIO.write().to(gcsOutputFilePathWithPrefix).withSuffix(".json"));

    pipeline.run();
    LOG.info("Completed pipeline setup");
  }
}

다음 단계