Bigtable 変更ストリームからベクトル検索テンプレートへ

このテンプレートでは、Bigtable データ変更レコードをストリーミングし、Dataflow Runner V2 を使用して Vertex AI Vector Search に書き込むストリーミング パイプラインを作成します。

パイプラインの要件

  • Bigtable ソース インスタンスが存在している必要があります。
  • Bigtable ソーステーブルが存在し、そのテーブルで変更ストリームが有効になっている必要があります。
  • Bigtable アプリケーション プロファイルが存在している必要があります。
  • ベクトル検索インデックスのパスが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
embeddingColumn エンベディングが保存される列の完全修飾名。形式は cf:col。
embeddingByteSize エンベディング配列内の各エントリのバイトサイズ。浮動小数点数には 4、倍精度には 8 を使用します。デフォルトは 4 です。
vectorSearchIndex 変更がストリーミングされるベクトル検索インデックス。形式は 'projects/{projectID}/locations/{region}/indexes/{indexID}' です(先頭または末尾にスペースを入れないでください)。例: projects/123/locations/us-east1/indexes/456
bigtableChangeStreamAppProfile Bigtable でワークロードを区別するために使用するアプリケーション プロファイル。
bigtableReadInstanceId テーブルが含まれている Bigtable インスタンスの ID。
bigtableReadTableId 読み取り元の Bigtable テーブル。
bigtableMetadataTableTableId 省略可: 作成されるメタデータ テーブルの ID。設定されていない場合、Bigtable で ID を生成します。
crowdingTagColumn 省略可: クラウディング タグが保存される列の完全修飾名(形式は cf:col)。
allowRestrictsMappings 省略可: allow の制限として使用する列の完全修飾名とそれらのエイリアスのカンマ区切りリスト。それぞれの列名は cf:col->alias の形式にする必要があります。
denyRestrictsMappings 省略可: deny の制限として使用する列の完全修飾名とそれらのエイリアスのカンマ区切りリスト。それぞれの列名は cf:col->alias の形式にする必要があります。
intNumericRestrictsMappings 省略可: 整数 numeric_restricts として使用する列の完全修飾名とそれらのエイリアスのカンマ区切りリスト。それぞれの列名は cf:col->alias の形式にする必要があります。
floatNumericRestrictsMappings 省略可: 浮動小数点数(4 バイト)numeric_restricts として使用する列の完全修飾名とそれらのエイリアスのカンマ区切りリスト。それぞれの列名は cf:col->alias の形式にする必要があります
doubleNumericRestrictsMappings 省略可: 倍精度(8 バイト)numeric_restricts として使用する列の完全修飾名とそれらのエイリアスのカンマ区切りリスト。それぞれの列名は cf:col->alias の形式にする必要があります
upsertMaxBatchSize 省略可: バッチをベクトル検索インデックスに upsert する前にバッファに格納する upsert の最大数。バッチは、いずれかの upsertBatchSize レコードの準備が整ったときに送信されます。例: 10
upsertMaxBufferDuration 省略可: upsert のバッチがベクトル検索に送信されるまでの最大遅延。バッチは、いずれかの upsertBatchSize レコードの準備が整ったときに送信されます。指定できる形式は、秒が Ns(例: 5s)、分が Nm(例: 12m)、時が Nh(例: 2h)です。デフォルト: 10s
deleteMaxBatchSize 省略可: ベクトル検索インデックスからバッチを削除する前にバッファに格納する削除の最大数。バッチは、いずれかの deleteBatchSize レコードの準備が整ったときに送信されます。例: 10
deleteMaxBufferDuration 省略可: 削除のバッチがベクトル検索に送信されるまでの最大遅延。バッチは、いずれかの deleteBatchSize レコードの準備が整ったときに送信されます。指定できる形式は、秒が Ns(例: 5s)、分が Nm(例: 12m)、時が Nh(例: 2h)です。デフォルト: 10s
dlqDirectory 省略可: 処理できなかった理由とともに、未処理のレコードを保存するパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。デフォルト値はほとんどのシナリオに適しています。
bigtableChangeStreamMetadataInstanceId 省略可: 変更ストリーム コネクタのメタデータ テーブルに使用する Bigtable インスタンス。デフォルトは空です。
bigtableChangeStreamMetadataTableTableId 省略可: 使用する Bigtable 変更ストリーム コネクタのメタデータ テーブルの ID。指定しない場合、パイプライン フロー中に Bigtable 変更ストリーム コネクタのメタデータ テーブルが自動的に作成されます。デフォルトは空です。
bigtableChangeStreamCharset 省略可: 値と列修飾子を読み取るときの Bigtable 変更ストリームの文字セット名。デフォルトは UTF-8 です。
bigtableChangeStreamStartTimestamp 省略可: 変更ストリームの読み取りに使用される開始日時(この値を含む)(https://tools.ietf.org/html/rfc3339)。例: 2022-05-05T07:59:59Z。デフォルトは、パイプライン開始時のタイムスタンプです。
bigtableChangeStreamIgnoreColumnFamilies 省略可: キャプチャされない列ファミリー名における変更のカンマ区切りのリスト。デフォルトは空です。
bigtableChangeStreamIgnoreColumns 省略可: キャプチャされない列名における変更のカンマ区切りのリスト。デフォルトは空です。
bigtableChangeStreamName 省略可: クライアント パイプラインの一意の名前。このパラメータを使用すると、以前に実行していたパイプラインが停止した時点から処理を再開できます。デフォルトは自動生成された名前です。使用される値については、Dataflow ジョブのログをご覧ください。
bigtableChangeStreamResume

省略可: true に設定すると、同じ名前で以前に実行していたパイプラインが停止した時点から、新しいパイプラインで処理を再開します。その名前のパイプラインが過去に実行されていない場合、新しいパイプラインを開始することはできません。bigtableChangeStreamName パラメータを使用して、パイプライン ラインを指定します。

false に設定すると、新しいパイプラインが開始されます。指定したソースに対して bigtableChangeStreamName と同じ名前のパイプラインが過去に実行されていた場合、新しいパイプラインを開始することはできません。

デフォルトは false です。

bigtableReadProjectId 省略可: Bigtable データの読み取り元のプロジェクト。このパラメータのデフォルトは、Dataflow パイプラインが実行されているプロジェクトです。

テンプレートを実行する

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Bigtable Change Streams to Vector Search template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • EMBEDDING_COLUMN: エンベディング列
  • EMBEDDING_BYTE_SIZE: エンベディング配列のバイトサイズ。4 または 8 のいずれかです。
  • VECTOR_SEARCH_INDEX: ベクトル検索インデックスのパス
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: Bigtable アプリケーション プロファイル ID
  • BIGTABLE_READ_INSTANCE_ID: ソース Bigtable インスタンス ID
  • BIGTABLE_READ_TABLE_ID: ソース Bigtable テーブル ID

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • EMBEDDING_COLUMN: エンベディング列
  • EMBEDDING_BYTE_SIZE: エンベディング配列のバイトサイズ。4 または 8 のいずれかです。
  • VECTOR_SEARCH_INDEX: ベクトル検索インデックスのパス
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: Bigtable アプリケーション プロファイル ID
  • BIGTABLE_READ_INSTANCE_ID: ソース Bigtable インスタンス ID
  • BIGTABLE_READ_TABLE_ID: ソース Bigtable テーブル ID
Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstovectorsearch;

import com.google.cloud.Timestamp;
import com.google.cloud.aiplatform.v1.IndexDatapoint;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions.ReadChangeStreamOptions;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions.ReadOptions;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.options.BigtableChangeStreamsToVectorSearchOptions;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Template(
    name = "Bigtable_Change_Streams_to_Vector_Search",
    category = TemplateCategory.STREAMING,
    displayName = "Bigtable Change Streams to Vector Search",
    description =
        "Streaming pipeline. Streams Bigtable data change records and writes them into Vertex AI Vector Search using Dataflow Runner V2.",
    optionsClass = BigtableChangeStreamsToVectorSearchOptions.class,
    optionsOrder = {
      BigtableChangeStreamsToVectorSearchOptions.class,
      ReadChangeStreamOptions.class,
      ReadOptions.class
    },
    skipOptions = {
      "bigtableReadAppProfile",
      "bigtableAdditionalRetryCodes",
      "bigtableRpcAttemptTimeoutMs",
      "bigtableRpcTimeoutMs"
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-change-streams-to-vector-search",
    flexContainerName = "bigtable-changestreams-to-vector-search",
    contactInformation = "https://cloud.google.com/support",
    streaming = true)
public final class BigtableChangeStreamsToVectorSearch {
  private static final Logger LOG =
      LoggerFactory.getLogger(BigtableChangeStreamsToVectorSearch.class);

  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) throws Exception {
    LOG.info("Starting replication from Cloud Bigtable Change Streams to Vector Search");

    BigtableChangeStreamsToVectorSearchOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(BigtableChangeStreamsToVectorSearchOptions.class);

    run(options);
  }

  public static PipelineResult run(BigtableChangeStreamsToVectorSearchOptions options)
      throws IOException {
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    boolean hasUseRunnerV2 = false;
    for (String experiment : experiments) {
      if (experiment.equalsIgnoreCase(USE_RUNNER_V2_EXPERIMENT)) {
        hasUseRunnerV2 = true;
        break;
      }
    }
    if (!hasUseRunnerV2) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);

    Instant startTimestamp =
        options.getBigtableChangeStreamStartTimestamp().isEmpty()
            ? Instant.now()
            : toInstant(Timestamp.parseTimestamp(options.getBigtableChangeStreamStartTimestamp()));

    String bigtableProjectId = getBigtableProjectId(options);

    LOG.info("  - startTimestamp {}", startTimestamp);
    LOG.info("  - bigtableReadInstanceId {}", options.getBigtableReadInstanceId());
    LOG.info("  - bigtableReadTableId {}", options.getBigtableReadTableId());
    LOG.info("  - bigtableChangeStreamAppProfile {}", options.getBigtableChangeStreamAppProfile());
    LOG.info("  - embeddingColumn {}", options.getEmbeddingColumn());
    LOG.info("  - crowdingTagColumn {}", options.getCrowdingTagColumn());
    LOG.info("  - project {}", options.getProject());
    LOG.info("  - indexName {}", options.getVectorSearchIndex());

    String indexName = options.getVectorSearchIndex();

    String vertexRegion = Utils.extractRegionFromIndexName(indexName);
    String vertexEndpoint = vertexRegion + "-aiplatform.googleapis.com:443";

    final Pipeline pipeline = Pipeline.create(options);

    DeadLetterQueueManager dlqManager = buildDlqManager(options);

    BigtableIO.ReadChangeStream readChangeStream =
        BigtableIO.readChangeStream()
            .withChangeStreamName(options.getBigtableChangeStreamName())
            .withExistingPipelineOptions(
                options.getBigtableChangeStreamResume()
                    ? BigtableIO.ExistingPipelineOptions.RESUME_OR_FAIL
                    : BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS)
            .withProjectId(bigtableProjectId)
            .withAppProfileId(options.getBigtableChangeStreamAppProfile())
            .withInstanceId(options.getBigtableReadInstanceId())
            .withTableId(options.getBigtableReadTableId())
            .withMetadataTableInstanceId(options.getBigtableChangeStreamMetadataInstanceId())
            .withMetadataTableTableId(options.getBigtableMetadataTableTableId())
            .withStartTime(startTimestamp);

    PCollectionTuple results =
        pipeline
            .apply("Read from Cloud Bigtable Change Streams", readChangeStream)
            .apply("Create Values", Values.create())
            .apply(
                "Converting to Vector Search Datapoints",
                ParDo.of(
                        new ChangeStreamMutationToDatapointOperationFn(
                            options.getEmbeddingColumn(),
                            options.getEmbeddingByteSize(),
                            options.getCrowdingTagColumn(),
                            Utils.parseColumnMapping(options.getAllowRestrictsMappings()),
                            Utils.parseColumnMapping(options.getDenyRestrictsMappings()),
                            Utils.parseColumnMapping(options.getIntNumericRestrictsMappings()),
                            Utils.parseColumnMapping(options.getFloatNumericRestrictsMappings()),
                            Utils.parseColumnMapping(options.getDoubleNumericRestrictsMappings())))
                    .withOutputTags(
                        ChangeStreamMutationToDatapointOperationFn.UPSERT_DATAPOINT_TAG,
                        TupleTagList.of(
                            ChangeStreamMutationToDatapointOperationFn.REMOVE_DATAPOINT_TAG)));
    results
        .get(ChangeStreamMutationToDatapointOperationFn.UPSERT_DATAPOINT_TAG)
        .apply("Add placeholer keys", WithKeys.of("placeholder"))
        .apply(
            "Batch Contents",
            GroupIntoBatches.<String, IndexDatapoint>ofSize(
                    bufferSizeOption(options.getUpsertMaxBatchSize()))
                .withMaxBufferingDuration(
                    bufferDurationOption(options.getUpsertMaxBufferDuration())))
        .apply("Map to Values", Values.create())
        .apply(
            "Upsert Datapoints to VectorSearch",
            ParDo.of(new UpsertDatapointsFn(vertexEndpoint, indexName)))
        .apply(
            "Write errors to DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectory() + "YYYY/MM/dd/HH/mm/")
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    results
        .get(ChangeStreamMutationToDatapointOperationFn.REMOVE_DATAPOINT_TAG)
        .apply("Add placeholder keys", WithKeys.of("placeholer"))
        .apply(
            "Batch Contents",
            GroupIntoBatches.<String, String>ofSize(
                    bufferSizeOption(options.getDeleteMaxBatchSize()))
                .withMaxBufferingDuration(
                    bufferDurationOption(options.getDeleteMaxBufferDuration())))
        .apply("Map to Values", Values.create())
        .apply(
            "Remove Datapoints From VectorSearch",
            ParDo.of(new RemoveDatapointsFn(vertexEndpoint, indexName)))
        .apply(
            "Write errors to DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectory() + "YYYY/MM/dd/HH/mm/")
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }

  private static String getBigtableProjectId(BigtableChangeStreamsToVectorSearchOptions options) {
    return StringUtils.isEmpty(options.getBigtableReadProjectId())
        ? options.getProject()
        : options.getBigtableReadProjectId();
  }

  private static Instant toInstant(Timestamp timestamp) {
    if (timestamp == null) {
      return null;
    } else {
      return Instant.ofEpochMilli(timestamp.getSeconds() * 1000 + timestamp.getNanos() / 1000000);
    }
  }

  private static int bufferSizeOption(int size) {
    if (size < 1) {
      size = 1;
    }

    return size;
  }

  private static Duration bufferDurationOption(String duration) {
    if (duration.isEmpty()) {
      return Duration.standardSeconds(1);
    }

    return DurationUtils.parseDuration(duration);
  }

  private static DeadLetterQueueManager buildDlqManager(
      BigtableChangeStreamsToVectorSearchOptions options) {
    String dlqDirectory = options.getDlqDirectory();
    if (dlqDirectory.isEmpty()) {
      LOG.info("Falling back to temp dir for DLQ");

      String tempLocation = options.as(DataflowPipelineOptions.class).getTempLocation();

      LOG.info("Have temp location {}", tempLocation);
      if (tempLocation == null || tempLocation.isEmpty()) {
        tempLocation = "/";
      } else if (!tempLocation.endsWith("/")) {
        tempLocation += "/";
      }

      dlqDirectory = tempLocation + "dlq";
    }

    LOG.info("Writing dead letter queue to: {}", dlqDirectory);

    return DeadLetterQueueManager.create(dlqDirectory, 1);
  }
}