Bigtable change streams to Vector Search 템플릿

이 템플릿은 스트리밍 파이프라인을 생성하여 Dataflow Runner V2를 사용해 Bigtable 데이터 변경 레코드를 스트리밍하고 Vertex AI 벡터 검색에 기록합니다.

파이프라인 요구사항

  • Bigtable 소스 인스턴스가 있어야 합니다.
  • Bigtable 소스 테이블이 있어야 하며 테이블에 변경 내역이 사용 설정되어 있어야 합니다.
  • Bigtable 애플리케이션 프로필이 있어야 합니다.
  • 벡터 검색 색인 경로가 있어야 합니다.

템플릿 매개변수

매개변수 설명
embeddingColumn 임베딩이 저장되는 정규화된 열 이름입니다. cf:col 형식입니다.
embeddingByteSize 임베딩 배열에 있는 각 항목의 바이트 크기입니다. 부동 소수점 수에는 4를, 실수에는 8을 사용합니다. 기본값은 4입니다.
vectorSearchIndex 변경사항을 스트리밍할 벡터 검색 색인의 형식은 'projects/{projectID}/locations/{region}/indexes/{indexID}'(선행 또는 후행 공백 없음)입니다. 예를 들면 projects/123/locations/us-east1/indexes/456입니다.
bigtableChangeStreamAppProfile Bigtable에서 워크로드를 구분하는 데 사용되는 애플리케이션 프로필입니다.
bigtableReadInstanceId 테이블을 포함하는 Bigtable 인스턴스의 ID입니다.
bigtableReadTableId 읽을 Bigtable 테이블입니다.
bigtableMetadataTableTableId 선택사항: 생성된 메타데이터 테이블의 ID입니다. 설정하지 않으면 Bigtable에서 ID를 생성합니다.
crowdingTagColumn 선택사항: 크라우딩 태그가 저장되는 정규화된 열 이름(cf:col 형식)입니다.
allowRestrictsMappings 선택사항: allow 제한사항으로 사용할 열의 쉼표로 구분된 정규화된 열 이름입니다(별칭 포함). 각 열 이름은 cf:col->alias 형식이어야 합니다.
denyRestrictsMappings 선택사항: deny 제한사항으로 사용할 열의 쉼표로 구분된 정규화된 열 이름입니다(별칭 포함). 각 열 이름은 cf:col->alias 형식이어야 합니다.
intNumericRestrictsMappings (선택사항): 정수 numeric_restricts로 사용할 열의 쉼표로 구분된 정규화된 열 이름입니다(별칭 포함). 각 열 이름은 cf:col->alias 형식이어야 합니다.
floatNumericRestrictsMappings 선택사항: 부동 소수점 수(4바이트) numeric_restricts로 사용할 열의 쉼표로 구분된 정규화된 열 이름입니다(별칭 포함). 각 열 이름은 cf:col->alias 형식이어야 합니다.
doubleNumericRestrictsMappings (선택사항): 실수(8바이트) numeric_restricts로 사용할 열의 쉼표로 구분된 정규화된 열 이름입니다(별칭 포함). 각 열 이름은 cf:col->alias 형식이어야 합니다.
upsertMaxBatchSize 선택사항: 배치를 벡터 검색 색인에 삽입/업데이트(upsert)하기 전에 버퍼링할 최대 삽입/업데이트(upsert) 수입니다. 배치는 upsertBatchSize 레코드가 준비되었을 때 전송됩니다. 예: 10
upsertMaxBufferDuration 선택사항: 삽입/업데이트(upsert) 배치가 벡터 검색으로 전송되기까지의 최대 지연 시간입니다. 배치는 upsertBatchSize 레코드가 준비되었을 때 전송됩니다. 허용되는 형식은 Ns(초 단위, 예: 5s), Nm(분 단위, 예:12m), Nh(시간 단위, 예:2h)입니다. 기본값: 10s
deleteMaxBatchSize 선택사항: 벡터 검색 색인에서 일괄 삭제하기 전에 버퍼링할 최대 삭제 수입니다. 배치는 deleteBatchSize 레코드가 준비되었을 때 전송됩니다. 예를 들면 10입니다.
deleteMaxBufferDuration 선택사항: 일괄 삭제가 벡터 검색에 전송되기까지의 최대 지연 시간입니다. 배치는 deleteBatchSize 레코드가 준비되었을 때 전송됩니다. 허용되는 형식은 Ns(초 단위, 예: 5s), Nm(분 단위, 예:12m), Nh(시간 단위, 예:2h)입니다. 기본값: 10s
dlqDirectory 선택사항: 메시지를 처리할 수 없는 이유와 함께 처리되지 않은 모든 레코드가 저장되는 경로입니다. 기본값은 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다. 기본값은 대부분의 시나리오에 적합합니다.
bigtableChangeStreamMetadataInstanceId 선택사항: 변경 내역 커넥터 메타데이터 테이블에 사용할 Bigtable 인스턴스입니다. 기본값은 빈 값입니다.
bigtableChangeStreamMetadataTableTableId 선택사항: 사용할 Bigtable 변경 내역 커넥터 메타데이터 테이블 ID입니다. 제공하지 않으면 파이프라인 흐름 중에 Bigtable 변경 내역 커넥터 메타데이터 테이블이 자동으로 생성됩니다. 기본값은 빈 값입니다.
bigtableChangeStreamCharset 선택사항: 값 및 column qualifier를 읽을 때 Bigtable 변경 내역 문자 집합 이름입니다. 기본값은 UTF-8입니다.
bigtableChangeStreamStartTimestamp 선택사항: 변경 내역을 읽는 데 사용할 시작 DateTime(https://tools.ietf.org/html/rfc3339)입니다. 예를 들면 2022-05-05T07:59:59Z입니다. 기본값은 파이프라인이 시작되는 시점의 타임스탬프입니다.
bigtableChangeStreamIgnoreColumnFamilies 선택사항: 캡처할 수 없는 column family 이름 변경사항의 쉼표로 구분된 목록입니다. 기본값은 빈 값입니다.
bigtableChangeStreamIgnoreColumns 선택사항: 캡처할 수 없는 되지 않는 열 이름 변경사항의 쉼표로 구분된 목록입니다. 기본값은 빈 값입니다.
bigtableChangeStreamName 선택사항: 클라이언트 파이프라인의 고유한 이름입니다. 이 매개변수를 사용하면 이전에 실행 중인 파이프라인이 중지된 지점에서부터 처리를 재개할 수 있습니다. 기본값은 자동으로 생성된 이름입니다. 사용된 값은 Dataflow 작업 로그를 참고하세요.
bigtableChangeStreamResume

선택사항: true로 설정하면 이전에 실행 중이던 동일한 이름의 파이프라인이 중지된 지점부터 새 파이프라인이 처리를 재개합니다. 해당 이름의 파이프라인이 이전에 실행된 적이 없으면 새 파이프라인이 시작되지 않습니다. bigtableChangeStreamName 매개변수를 사용하여 파이프라인 선을 지정합니다.

false로 설정하면 새 파이프라인이 시작됩니다. 지정진 소스에 대해 bigtableChangeStreamName과 같은 이름의 파이프라인이 과거에 이미 실행된 경우 새 파이프라인이 시작되지 않습니다.

기본값은 false입니다.

bigtableReadProjectId 선택사항: Bigtable 데이터를 읽을 프로젝트입니다. 이 매개변수의 기본값은 Dataflow 파이프라인이 실행되는 프로젝트입니다.

템플릿 실행

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Bigtable Change Streams to Vector Search template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • EMBEDDING_COLUMN: 임베딩 열
  • EMBEDDING_BYTE_SIZE: 임베딩 배열의 바이트 크기입니다. 4 또는 8일 수 있습니다.
  • VECTOR_SEARCH_INDEX: 벡터 검색 색인 경로
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: Bigtable 애플리케이션 프로필 ID
  • BIGTABLE_READ_INSTANCE_ID: 소스 Bigtable 인스턴스 ID
  • BIGTABLE_READ_TABLE_ID: 소스 Bigtable 테이블 ID

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • EMBEDDING_COLUMN: 임베딩 열
  • EMBEDDING_BYTE_SIZE: 임베딩 배열의 바이트 크기입니다. 4 또는 8일 수 있습니다.
  • VECTOR_SEARCH_INDEX: 벡터 검색 색인 경로
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: Bigtable 애플리케이션 프로필 ID
  • BIGTABLE_READ_INSTANCE_ID: 소스 Bigtable 인스턴스 ID
  • BIGTABLE_READ_TABLE_ID: 소스 Bigtable 테이블 ID
Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstovectorsearch;

import com.google.cloud.Timestamp;
import com.google.cloud.aiplatform.v1.IndexDatapoint;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions.ReadChangeStreamOptions;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions.ReadOptions;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.options.BigtableChangeStreamsToVectorSearchOptions;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Template(
    name = "Bigtable_Change_Streams_to_Vector_Search",
    category = TemplateCategory.STREAMING,
    displayName = "Bigtable Change Streams to Vector Search",
    description =
        "Streaming pipeline. Streams Bigtable data change records and writes them into Vertex AI Vector Search using Dataflow Runner V2.",
    optionsClass = BigtableChangeStreamsToVectorSearchOptions.class,
    optionsOrder = {
      BigtableChangeStreamsToVectorSearchOptions.class,
      ReadChangeStreamOptions.class,
      ReadOptions.class
    },
    skipOptions = {
      "bigtableReadAppProfile",
      "bigtableAdditionalRetryCodes",
      "bigtableRpcAttemptTimeoutMs",
      "bigtableRpcTimeoutMs"
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-change-streams-to-vector-search",
    flexContainerName = "bigtable-changestreams-to-vector-search",
    contactInformation = "https://cloud.google.com/support",
    streaming = true)
public final class BigtableChangeStreamsToVectorSearch {
  private static final Logger LOG =
      LoggerFactory.getLogger(BigtableChangeStreamsToVectorSearch.class);

  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) throws Exception {
    LOG.info("Starting replication from Cloud Bigtable Change Streams to Vector Search");

    BigtableChangeStreamsToVectorSearchOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(BigtableChangeStreamsToVectorSearchOptions.class);

    run(options);
  }

  public static PipelineResult run(BigtableChangeStreamsToVectorSearchOptions options)
      throws IOException {
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    boolean hasUseRunnerV2 = false;
    for (String experiment : experiments) {
      if (experiment.equalsIgnoreCase(USE_RUNNER_V2_EXPERIMENT)) {
        hasUseRunnerV2 = true;
        break;
      }
    }
    if (!hasUseRunnerV2) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);

    Instant startTimestamp =
        options.getBigtableChangeStreamStartTimestamp().isEmpty()
            ? Instant.now()
            : toInstant(Timestamp.parseTimestamp(options.getBigtableChangeStreamStartTimestamp()));

    String bigtableProjectId = getBigtableProjectId(options);

    LOG.info("  - startTimestamp {}", startTimestamp);
    LOG.info("  - bigtableReadInstanceId {}", options.getBigtableReadInstanceId());
    LOG.info("  - bigtableReadTableId {}", options.getBigtableReadTableId());
    LOG.info("  - bigtableChangeStreamAppProfile {}", options.getBigtableChangeStreamAppProfile());
    LOG.info("  - embeddingColumn {}", options.getEmbeddingColumn());
    LOG.info("  - crowdingTagColumn {}", options.getCrowdingTagColumn());
    LOG.info("  - project {}", options.getProject());
    LOG.info("  - indexName {}", options.getVectorSearchIndex());

    String indexName = options.getVectorSearchIndex();

    String vertexRegion = Utils.extractRegionFromIndexName(indexName);
    String vertexEndpoint = vertexRegion + "-aiplatform.googleapis.com:443";

    final Pipeline pipeline = Pipeline.create(options);

    DeadLetterQueueManager dlqManager = buildDlqManager(options);

    BigtableIO.ReadChangeStream readChangeStream =
        BigtableIO.readChangeStream()
            .withChangeStreamName(options.getBigtableChangeStreamName())
            .withExistingPipelineOptions(
                options.getBigtableChangeStreamResume()
                    ? BigtableIO.ExistingPipelineOptions.RESUME_OR_FAIL
                    : BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS)
            .withProjectId(bigtableProjectId)
            .withAppProfileId(options.getBigtableChangeStreamAppProfile())
            .withInstanceId(options.getBigtableReadInstanceId())
            .withTableId(options.getBigtableReadTableId())
            .withMetadataTableInstanceId(options.getBigtableChangeStreamMetadataInstanceId())
            .withMetadataTableTableId(options.getBigtableMetadataTableTableId())
            .withStartTime(startTimestamp);

    PCollectionTuple results =
        pipeline
            .apply("Read from Cloud Bigtable Change Streams", readChangeStream)
            .apply("Create Values", Values.create())
            .apply(
                "Converting to Vector Search Datapoints",
                ParDo.of(
                        new ChangeStreamMutationToDatapointOperationFn(
                            options.getEmbeddingColumn(),
                            options.getEmbeddingByteSize(),
                            options.getCrowdingTagColumn(),
                            Utils.parseColumnMapping(options.getAllowRestrictsMappings()),
                            Utils.parseColumnMapping(options.getDenyRestrictsMappings()),
                            Utils.parseColumnMapping(options.getIntNumericRestrictsMappings()),
                            Utils.parseColumnMapping(options.getFloatNumericRestrictsMappings()),
                            Utils.parseColumnMapping(options.getDoubleNumericRestrictsMappings())))
                    .withOutputTags(
                        ChangeStreamMutationToDatapointOperationFn.UPSERT_DATAPOINT_TAG,
                        TupleTagList.of(
                            ChangeStreamMutationToDatapointOperationFn.REMOVE_DATAPOINT_TAG)));
    results
        .get(ChangeStreamMutationToDatapointOperationFn.UPSERT_DATAPOINT_TAG)
        .apply("Add placeholer keys", WithKeys.of("placeholder"))
        .apply(
            "Batch Contents",
            GroupIntoBatches.<String, IndexDatapoint>ofSize(
                    bufferSizeOption(options.getUpsertMaxBatchSize()))
                .withMaxBufferingDuration(
                    bufferDurationOption(options.getUpsertMaxBufferDuration())))
        .apply("Map to Values", Values.create())
        .apply(
            "Upsert Datapoints to VectorSearch",
            ParDo.of(new UpsertDatapointsFn(vertexEndpoint, indexName)))
        .apply(
            "Write errors to DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectory() + "YYYY/MM/dd/HH/mm/")
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    results
        .get(ChangeStreamMutationToDatapointOperationFn.REMOVE_DATAPOINT_TAG)
        .apply("Add placeholder keys", WithKeys.of("placeholer"))
        .apply(
            "Batch Contents",
            GroupIntoBatches.<String, String>ofSize(
                    bufferSizeOption(options.getDeleteMaxBatchSize()))
                .withMaxBufferingDuration(
                    bufferDurationOption(options.getDeleteMaxBufferDuration())))
        .apply("Map to Values", Values.create())
        .apply(
            "Remove Datapoints From VectorSearch",
            ParDo.of(new RemoveDatapointsFn(vertexEndpoint, indexName)))
        .apply(
            "Write errors to DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectory() + "YYYY/MM/dd/HH/mm/")
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }

  private static String getBigtableProjectId(BigtableChangeStreamsToVectorSearchOptions options) {
    return StringUtils.isEmpty(options.getBigtableReadProjectId())
        ? options.getProject()
        : options.getBigtableReadProjectId();
  }

  private static Instant toInstant(Timestamp timestamp) {
    if (timestamp == null) {
      return null;
    } else {
      return Instant.ofEpochMilli(timestamp.getSeconds() * 1000 + timestamp.getNanos() / 1000000);
    }
  }

  private static int bufferSizeOption(int size) {
    if (size < 1) {
      size = 1;
    }

    return size;
  }

  private static Duration bufferDurationOption(String duration) {
    if (duration.isEmpty()) {
      return Duration.standardSeconds(1);
    }

    return DurationUtils.parseDuration(duration);
  }

  private static DeadLetterQueueManager buildDlqManager(
      BigtableChangeStreamsToVectorSearchOptions options) {
    String dlqDirectory = options.getDlqDirectory();
    if (dlqDirectory.isEmpty()) {
      LOG.info("Falling back to temp dir for DLQ");

      String tempLocation = options.as(DataflowPipelineOptions.class).getTempLocation();

      LOG.info("Have temp location {}", tempLocation);
      if (tempLocation == null || tempLocation.isEmpty()) {
        tempLocation = "/";
      } else if (!tempLocation.endsWith("/")) {
        tempLocation += "/";
      }

      dlqDirectory = tempLocation + "dlq";
    }

    LOG.info("Writing dead letter queue to: {}", dlqDirectory);

    return DeadLetterQueueManager.create(dlqDirectory, 1);
  }
}