Bigtable to Vertex AI Vector Search 템플릿

Cloud Storage의 Bigtable to Vertex AI Vector Search 파일 템플릿은 Bigtable 테이블에서 데이터를 읽고 JSON 형식으로 Cloud Storage 버킷에 쓰는 일괄 파이프라인을 만듭니다. 벡터 임베딩에 이 템플릿을 사용합니다.

파이프라인 요구사항

  • Bigtable 테이블이 있어야 합니다.
  • 파이프라인을 실행하기 전에 출력 Cloud Storage 버킷이 있어야 합니다.

템플릿 매개변수

필수 매개변수

  • bigtableProjectId: 데이터를 읽으려는 Bigtable 인스턴스가 포함된 Google Cloud 프로젝트의 ID입니다.
  • bigtableInstanceId: 테이블을 포함하는 Bigtable 인스턴스의 ID입니다.
  • bigtableTableId: 읽어올 Bigtable 테이블의 ID입니다.
  • outputDirectory: 출력 JSON 파일이 저장되는 Cloud Storage 경로입니다. 예를 들면 gs://your-bucket/your-path/입니다.
  • idColumn: ID가 저장되는 정규화된 열 이름입니다. 형식은 cf:col 또는 _key입니다.
  • embeddingColumn: 임베딩이 저장되는 정규화된 열 이름입니다. 형식은 cf:col 또는 _key입니다.

선택적 매개변수

  • filenamePrefix: JSON 파일 이름의 접두사입니다. 예를 들면 table1-입니다. 값이 제공되지 않은 경우 기본값은 part입니다.
  • crowdingTagColumn: 크라우딩 태그가 저장되는 정규화된 열 이름입니다. 형식은 cf:col 또는 _key입니다.
  • embeddingByteSize: 임베딩 배열에 있는 각 항목의 바이트 크기입니다. 부동 소수점 수의 경우 4 값을 사용합니다. 실수의 경우 8 값을 사용합니다. 기본값은 4입니다.
  • allowRestrictsMappings: allow restricts로 사용할 열의 쉼표로 구분된 정규화된 열 이름입니다(별칭 포함). 형식은 cf:col->alias입니다.
  • denyRestrictsMappings: deny restricts로 사용할 열의 쉼표로 구분된 정규화된 열 이름입니다(별칭 포함). 형식은 cf:col->alias입니다.
  • intNumericRestrictsMappings: 정수 numeric_restricts로 사용할 열의 쉼표로 구분된 정규화된 열 이름입니다(별칭 포함). 형식은 cf:col->alias입니다.
  • floatNumericRestrictsMappings: 부동 소수점 수(4바이트) numeric_restricts로 사용할 열의 쉼표로 구분된 정규화된 열 이름입니다(별칭 포함). 형식은 cf:col->alias입니다.
  • doubleNumericRestrictsMappings: double(8바이트) numeric_restricts로 사용할 열의 쉼표로 구분된 정규화된 열 이름입니다(별칭 포함). 형식은 cf:col->alias입니다.
  • bigtableAppProfileId: 내보내기에 사용될 Cloud Bigtable 앱 프로필의 ID입니다. 기본값은 default입니다.

템플릿 실행

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Bigtable to Vector Embeddings template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/Cloud_Bigtable_to_Vector_Embeddings \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       bigtableProjectId=BIGTABLE_PROJECT_ID,\
       bigtableInstanceId=BIGTABLE_INSTANCE_ID,\
       bigtableTableId=BIGTABLE_TABLE_ID,\
       filenamePrefix=FILENAME_PREFIX,\
       idColumn=ID_COLUMN,\
       embeddingColumn=EMBEDDING_COLUMN,\

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • BIGTABLE_PROJECT_ID: 프로젝트 ID
  • BIGTABLE_INSTANCE_ID: 인스턴스 ID입니다.
  • BIGTABLE_TABLE_ID: 테이블 ID입니다.
  • FILENAME_PREFIX: JSON 파일 프리픽스입니다.
  • ID_COLUMN: ID 열입니다.
  • EMBEDDING_COLUMN: 임베딩 열입니다.

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_Bigtable_to_Vector_Embeddings
{
   "jobName": "JOB_NAME",
   "parameters": {
     "bigtableProjectId": "BIGTABLE_PROJECT_ID",
     "bigtableInstanceId": "BIGTABLE_INSTANCE_ID",
     "bigtableTableId": "BIGTABLE_TABLE_ID",
     "filenamePrefix": "FILENAME_PREFIX",
     "idColumn": "ID_COLUMN",
     "embeddingColumn": "EMBEDDING_COLUMN",
   },
   "environment": { "maxWorkers": "10" }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • BIGTABLE_PROJECT_ID: 프로젝트 ID
  • BIGTABLE_INSTANCE_ID: 인스턴스 ID입니다.
  • BIGTABLE_TABLE_ID: 테이블 ID입니다.
  • FILENAME_PREFIX: JSON 파일 프리픽스입니다.
  • ID_COLUMN: ID 열입니다.
  • EMBEDDING_COLUMN: 임베딩 열입니다.
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.bigtable;

import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.Row;
import com.google.bigtable.v2.RowFilter;
import com.google.cloud.teleport.bigtable.BigtableToVectorEmbeddings.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.gson.stream.JsonWriter;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Dataflow pipeline that exports data from a Cloud Bigtable table to JSON files in GCS,
 * specifically for Vector Embedding purposes. Currently, filtering on Cloud Bigtable table is not
 * supported.
 *
 * <p>Check out <a href=
 * "https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Bigtable_to_Vector_Embeddings.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_Bigtable_to_Vector_Embeddings",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Bigtable to Vector Embeddings",
    description =
        "The Bigtable to Vector Embedding template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in JSON format, for vector embeddings",
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-to-vector-embeddings",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Bigtable table must exist.",
      "The output Cloud Storage bucket must exist before running the pipeline."
    })
public class BigtableToVectorEmbeddings {
  private static final Logger LOG = LoggerFactory.getLogger(BigtableToVectorEmbeddings.class);

  /** Options for the export pipeline. */
  public interface Options extends PipelineOptions {
    @TemplateParameter.ProjectId(
        order = 1,
        groupName = "Source",
        description = "Project ID",
        helpText =
            "The ID for the Google Cloud project that contains the Bigtable instance that you want to read data from.")
    ValueProvider<String> getBigtableProjectId();

    @SuppressWarnings("unused")
    void setBigtableProjectId(ValueProvider<String> projectId);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
        description = "Instance ID",
        helpText = "The ID of the Bigtable instance that contains the table.")
    ValueProvider<String> getBigtableInstanceId();

    @SuppressWarnings("unused")
    void setBigtableInstanceId(ValueProvider<String> instanceId);

    @TemplateParameter.Text(
        order = 3,
        groupName = "Source",
        regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
        description = "Table ID",
        helpText = "The ID of the Bigtable table to read from.")
    ValueProvider<String> getBigtableTableId();

    @SuppressWarnings("unused")
    void setBigtableTableId(ValueProvider<String> tableId);

    @TemplateParameter.GcsWriteFolder(
        order = 4,
        groupName = "Target",
        description = "Cloud Storage directory for storing JSON files",
        helpText = "The Cloud Storage path where the output JSON files are stored.",
        example = "gs://your-bucket/your-path/")
    @Required
    ValueProvider<String> getOutputDirectory();

    @SuppressWarnings("unused")
    void setOutputDirectory(ValueProvider<String> outputDirectory);

    @TemplateParameter.Text(
        order = 5,
        groupName = "Target",
        optional = true,
        description = "JSON file prefix",
        helpText =
            "The prefix of the JSON filename. For example: `table1-`. If no value is provided, defaults to `part`.")
    @Default.String("part")
    ValueProvider<String> getFilenamePrefix();

    @SuppressWarnings("unused")
    void setFilenamePrefix(ValueProvider<String> filenamePrefix);

    @TemplateParameter.Text(
        order = 6,
        description = "ID column",
        helpText =
            "The fully qualified column name where the ID is stored. In the format `cf:col` or `_key`.")
    ValueProvider<String> getIdColumn();

    @SuppressWarnings("unused")
    void setIdColumn(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 7,
        description = "Embedding column",
        helpText =
            "The fully qualified column name where the embeddings are stored. In the format `cf:col` or `_key`.")
    ValueProvider<String> getEmbeddingColumn();

    @SuppressWarnings("unused")
    void setEmbeddingColumn(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 8,
        optional = true,
        description = "Crowding tag column",
        helpText =
            "The fully qualified column name where the crowding tag is stored. In the format `cf:col` or `_key`.")
    ValueProvider<String> getCrowdingTagColumn();

    @SuppressWarnings("unused")
    void setCrowdingTagColumn(ValueProvider<String> value);

    @TemplateParameter.Integer(
        order = 9,
        optional = true,
        description = "The byte size of the embeddings array. Can be 4 or 8.",
        helpText =
            "The byte size of each entry in the embeddings array. For float, use the value `4`. For double, use the value `8`. Defaults to `4`.")
    @Default.Integer(4)
    ValueProvider<Integer> getEmbeddingByteSize();

    @SuppressWarnings("unused")
    void setEmbeddingByteSize(ValueProvider<Integer> value);

    @TemplateParameter.Text(
        order = 10,
        optional = true,
        description = "Allow restricts mappings",
        helpText =
            "The comma-separated, fully qualified column names for the columns to use as the allow restricts, with their aliases. In the format `cf:col->alias`.")
    ValueProvider<String> getAllowRestrictsMappings();

    @SuppressWarnings("unused")
    void setAllowRestrictsMappings(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 11,
        optional = true,
        description = "Deny restricts mappings",
        helpText =
            "The comma-separated, fully qualified column names for the columns to use as the deny restricts, with their aliases. In the format `cf:col->alias`.")
    ValueProvider<String> getDenyRestrictsMappings();

    @SuppressWarnings("unused")
    void setDenyRestrictsMappings(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 12,
        optional = true,
        description = "Integer numeric restricts mappings",
        helpText =
            "The comma-separated, fully qualified column names of the columns to use as integer numeric_restricts, with their aliases. In the format `cf:col->alias`.")
    ValueProvider<String> getIntNumericRestrictsMappings();

    @SuppressWarnings("unused")
    void setIntNumericRestrictsMappings(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 13,
        optional = true,
        description = "Float numeric restricts mappings",
        helpText =
            "The comma-separated, fully qualified column names of the columns to use as float (4 bytes) numeric_restricts, with their aliases. In the format `cf:col->alias`.")
    ValueProvider<String> getFloatNumericRestrictsMappings();

    @SuppressWarnings("unused")
    void setFloatNumericRestrictsMappings(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 14,
        optional = true,
        description = "Double numeric restricts mappings",
        helpText =
            "The comma-separated, fully qualified column names of the columns to use as double (8 bytes) numeric_restricts, with their aliases. In the format `cf:col->alias`.")
    ValueProvider<String> getDoubleNumericRestrictsMappings();

    @SuppressWarnings("unused")
    void setDoubleNumericRestrictsMappings(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 15,
        regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
        optional = true,
        description = "App Profile ID",
        helpText = "The ID of the Cloud Bigtable app profile to be used for the export")
    @Default.String("default")
    ValueProvider<String> getBigtableAppProfileId();

    @SuppressWarnings("unused")
    void setBigtableAppProfileId(ValueProvider<String> value);
  }

  /**
   * Runs a pipeline to export data from a Cloud Bigtable table to JSON files in GCS in JSON format,
   * for use of Vertex Vector Search.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    PipelineResult result = run(options);

    // Wait for pipeline to finish only if it is not constructing a template.
    if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
      result.waitUntilFinish();
    }
    LOG.info("Completed pipeline setup");
  }

  public static PipelineResult run(Options options) {
    Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));

    BigtableIO.Read read =
        BigtableIO.read()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withAppProfileId(options.getBigtableAppProfileId())
            .withTableId(options.getBigtableTableId())
            .withRowFilter(RowFilter.newBuilder().setCellsPerColumnLimitFilter(1).build());

    // Do not validate input fields if it is running as a template.
    if (options.as(DataflowPipelineOptions.class).getTemplateLocation() != null) {
      read = read.withoutValidation();
    }

    // Concatenating cloud storage folder with file prefix to get complete path
    ValueProvider<String> filePathPrefix =
        DualInputNestedValueProvider.of(
            options.getOutputDirectory(),
            options.getFilenamePrefix(),
            new SerializableFunction<TranslatorInput<String, String>, String>() {
              @Override
              public String apply(TranslatorInput<String, String> input) {
                return FileSystems.matchNewResource(input.getX(), true)
                    .resolve(input.getY(), StandardResolveOptions.RESOLVE_FILE)
                    .toString();
              }
            });
    pipeline
        .apply("Read from Bigtable", read)
        .apply(
            "Transform to JSON",
            MapElements.via(
                new BigtableToVectorEmbeddingsFn(
                    options.getIdColumn(),
                    options.getEmbeddingColumn(),
                    options.getEmbeddingByteSize(),
                    options.getCrowdingTagColumn(),
                    options.getAllowRestrictsMappings(),
                    options.getDenyRestrictsMappings(),
                    options.getIntNumericRestrictsMappings(),
                    options.getFloatNumericRestrictsMappings(),
                    options.getDoubleNumericRestrictsMappings())))
        .apply("Write to storage", TextIO.write().to(filePathPrefix).withSuffix(".json"));

    return pipeline.run();
  }

  /** Translates Bigtable {@link Row} to Vector Embeddings JSON. */
  static class BigtableToVectorEmbeddingsFn extends SimpleFunction<Row, String> {
    private static final String ID_KEY = "id";
    private static final String EMBEDDING_KEY = "embedding";
    private static final String RESTRICTS_KEY = "restricts";
    private static final String NUMERIC_RESTRICTS_KEY = "numeric_restricts";
    private static final String CROWDING_TAG_KEY = "crowding_tag";

    private static final String NAMESPACE_KEY = "namespace";

    private static final String ALLOW_KEY = "allow";
    private static final String DENY_KEY = "deny";

    private static final String VALUE_INT_KEY = "value_int";
    private static final String VALUE_FLOAT_KEY = "value_float";
    private static final String VALUE_DOUBLE_KEY = "value_double";

    private String idColumn;
    private String embeddingsColumn;
    private Integer embeddingByteSize;
    private String crowdingTagColumn;
    private Map<String, String> allowRestricts;
    private Map<String, String> denyRestricts;
    private Map<String, String> intNumericRestricts;
    private Map<String, String> floatNumericRestricts;
    private Map<String, String> doubleNumericRestricts;

    private ValueProvider<Integer> embeddingByteSizeProvider;
    private ValueProvider<String> idColumnProvider;
    private ValueProvider<String> embeddingsColumnProvider;
    private ValueProvider<String> crowdingTagColumnProvider;
    private ValueProvider<String> allowRestrictsMappingsProvider;
    private ValueProvider<String> denyRestrictsMappingsProvider;
    private ValueProvider<String> intNumericRestrictsMappingsProvider;
    private ValueProvider<String> floatNumericRestrictsMappingsProvider;
    private ValueProvider<String> doubleNumericRestrictsMappingsProvider;

    public BigtableToVectorEmbeddingsFn(
        ValueProvider<String> idColumnProvider,
        ValueProvider<String> embeddingsColumnProvider,
        ValueProvider<Integer> embeddingByteSizeProvider,
        ValueProvider<String> crowdingTagColumnProvider,
        ValueProvider<String> allowRestrictsMappingsProvider,
        ValueProvider<String> denyRestrictsMappingsProvider,
        ValueProvider<String> intNumericRestrictsMappingsProvider,
        ValueProvider<String> floatNumericRestrictsMappingsProvider,
        ValueProvider<String> doubleNumericRestrictsMappingsProvider) {
      this.idColumnProvider = idColumnProvider;
      this.embeddingsColumnProvider = embeddingsColumnProvider;
      this.embeddingByteSizeProvider = embeddingByteSizeProvider;
      this.crowdingTagColumnProvider = crowdingTagColumnProvider;
      this.allowRestrictsMappingsProvider = allowRestrictsMappingsProvider;
      this.denyRestrictsMappingsProvider = denyRestrictsMappingsProvider;
      this.intNumericRestrictsMappingsProvider = intNumericRestrictsMappingsProvider;
      this.floatNumericRestrictsMappingsProvider = floatNumericRestrictsMappingsProvider;
      this.doubleNumericRestrictsMappingsProvider = doubleNumericRestrictsMappingsProvider;
    }

    @Override
    public String apply(Row row) {
      this.embeddingByteSize = this.embeddingByteSizeProvider.get();
      if (this.embeddingByteSize != 4 && this.embeddingByteSize != 8) {
        throw new RuntimeException("embeddingByteSize can be either 4 or 8");
      }
      this.idColumn = this.idColumnProvider.get();
      this.embeddingsColumn = this.embeddingsColumnProvider.get();
      this.crowdingTagColumn = this.crowdingTagColumnProvider.get();
      this.allowRestricts =
          Optional.ofNullable(this.allowRestricts)
              .orElse(extractColumnsAliases(this.allowRestrictsMappingsProvider));
      this.denyRestricts =
          Optional.ofNullable(this.denyRestricts)
              .orElse(extractColumnsAliases(this.denyRestrictsMappingsProvider));
      this.intNumericRestricts =
          Optional.ofNullable(this.intNumericRestricts)
              .orElse(extractColumnsAliases(this.intNumericRestrictsMappingsProvider));
      this.floatNumericRestricts =
          Optional.ofNullable(this.floatNumericRestricts)
              .orElse(extractColumnsAliases(this.floatNumericRestrictsMappingsProvider));
      this.doubleNumericRestricts =
          Optional.ofNullable(this.doubleNumericRestricts)
              .orElse(extractColumnsAliases(this.doubleNumericRestrictsMappingsProvider));

      StringWriter stringWriter = new StringWriter();
      JsonWriter jsonWriter = new JsonWriter(stringWriter);
      VectorEmbeddings vectorEmbeddings = buildObject(row);
      try {
        serialize(jsonWriter, vectorEmbeddings);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
      return stringWriter.toString();
    }

    private void serialize(JsonWriter jsonWriter, VectorEmbeddings vectorEmbeddings)
        throws IOException {
      jsonWriter.beginObject();

      // Required fields.
      jsonWriter.name(ID_KEY).value(vectorEmbeddings.id);
      jsonWriter.name(EMBEDDING_KEY);
      jsonWriter.beginArray();
      if (this.embeddingByteSize == 4) {
        for (Float f : vectorEmbeddings.floatEmbeddings) {
          jsonWriter.value(f);
        }
      } else if (this.embeddingByteSize == 8) {
        for (Double d : vectorEmbeddings.doubleEmbeddings) {
          jsonWriter.value(d);
        }
      }
      jsonWriter.endArray();

      // Optional fields.
      if (vectorEmbeddings.crowdingTag != "") {
        jsonWriter.name(CROWDING_TAG_KEY).value(vectorEmbeddings.crowdingTag);
      }
      if (vectorEmbeddings.restricts != null && !vectorEmbeddings.restricts.isEmpty()) {
        jsonWriter.name(RESTRICTS_KEY);
        jsonWriter.beginArray();
        for (Restrict r : vectorEmbeddings.restricts) {
          jsonWriter.beginObject();
          jsonWriter.name(NAMESPACE_KEY).value(r.namespace);
          if (r.allow != null && !r.allow.isEmpty()) {
            jsonWriter.name(ALLOW_KEY);
            jsonWriter.beginArray();
            for (String a : r.allow) {
              jsonWriter.value(a);
            }
            jsonWriter.endArray();
          } else if (r.deny != null && !r.deny.isEmpty()) {
            jsonWriter.name(DENY_KEY);
            jsonWriter.beginArray();
            for (String d : r.deny) {
              jsonWriter.value(d);
            }
            jsonWriter.endArray();
          }
          jsonWriter.endObject();
        }
        jsonWriter.endArray();
      }
      if (vectorEmbeddings.numericRestricts != null
          && !vectorEmbeddings.numericRestricts.isEmpty()) {
        jsonWriter.name(NUMERIC_RESTRICTS_KEY);
        jsonWriter.beginArray();
        for (NumericRestrict numericRestrict : vectorEmbeddings.numericRestricts) {
          jsonWriter.beginObject();
          jsonWriter.name(NAMESPACE_KEY).value(numericRestrict.namespace);
          switch (numericRestrict.type) {
            case INT:
              jsonWriter.name(VALUE_INT_KEY).value(numericRestrict.valueInt);
              break;
            case FLOAT:
              jsonWriter.name(VALUE_FLOAT_KEY).value(numericRestrict.valueFloat);
              break;
            case DOUBLE:
              jsonWriter.name(VALUE_DOUBLE_KEY).value(numericRestrict.valueDouble);
              break;
          }
          jsonWriter.endObject();
        }
        jsonWriter.endArray();
      }
      jsonWriter.endObject();
    }

    private VectorEmbeddings buildObject(Row row) {
      VectorEmbeddings vectorEmbeddings = new VectorEmbeddings();

      maybeAddToObject(vectorEmbeddings, "_key", row.getKey());
      for (Family family : row.getFamiliesList()) {
        String familyName = family.getName();
        for (Column column : family.getColumnsList()) {
          for (Cell cell : column.getCellsList()) {
            maybeAddToObject(
                vectorEmbeddings,
                familyName + ":" + column.getQualifier().toStringUtf8(),
                cell.getValue());
          }
        }
      }

      // Assert fields
      if (StringUtils.isEmpty(vectorEmbeddings.id)) {
        throw new RuntimeException(
            String.format(
                "'%s' value is missing for row '%s'", ID_KEY, row.getKey().toStringUtf8()));
      }
      if (this.embeddingByteSize == 4
          && (vectorEmbeddings.floatEmbeddings == null
              || vectorEmbeddings.floatEmbeddings.isEmpty())) {
        throw new RuntimeException(
            String.format(
                "'%s' value is missing for row '%s'", EMBEDDING_KEY, row.getKey().toStringUtf8()));
      }
      if (this.embeddingByteSize == 8
          && (vectorEmbeddings.doubleEmbeddings == null
              || vectorEmbeddings.doubleEmbeddings.isEmpty())) {
        throw new RuntimeException(
            String.format(
                "'%s' value is missing for row '%s'", EMBEDDING_KEY, row.getKey().toStringUtf8()));
      }
      return vectorEmbeddings;
    }

    private void maybeAddToObject(
        VectorEmbeddings vectorEmbeddings, String columnQualifier, ByteString value) {
      if (columnQualifier.equals(this.idColumn)) {
        vectorEmbeddings.id = value.toStringUtf8();
      } else if (columnQualifier.equals(this.crowdingTagColumn)) {
        vectorEmbeddings.crowdingTag = value.toStringUtf8();
      } else if (columnQualifier.equals(this.embeddingsColumn)) {
        vectorEmbeddings.floatEmbeddings = new ArrayList<Float>();
        vectorEmbeddings.doubleEmbeddings = new ArrayList<Double>();

        byte[] bytes = value.toByteArray();
        for (int i = 0; i < bytes.length; i += embeddingByteSize) {
          if (embeddingByteSize == 4) {
            vectorEmbeddings.floatEmbeddings.add(Bytes.toFloat(bytes, i));
          } else if (embeddingByteSize == 8) {
            vectorEmbeddings.doubleEmbeddings.add(Bytes.toDouble(bytes, i));
          }
        }
      } else if (this.allowRestricts.containsKey(columnQualifier)) {
        vectorEmbeddings.addRestrict(
            Restrict.allowRestrict(allowRestricts.get(columnQualifier), value));
      } else if (this.denyRestricts.containsKey(columnQualifier)) {
        vectorEmbeddings.addRestrict(
            Restrict.denyRestrict(denyRestricts.get(columnQualifier), value));
      } else if (this.intNumericRestricts.containsKey(columnQualifier)) {
        vectorEmbeddings.addNumericRestrict(
            NumericRestrict.intValue(intNumericRestricts.get(columnQualifier), value));
      } else if (this.floatNumericRestricts.containsKey(columnQualifier)) {
        vectorEmbeddings.addNumericRestrict(
            NumericRestrict.floatValue(floatNumericRestricts.get(columnQualifier), value));
      } else if (this.doubleNumericRestricts.containsKey(columnQualifier)) {
        vectorEmbeddings.addNumericRestrict(
            NumericRestrict.doubleValue(doubleNumericRestricts.get(columnQualifier), value));
      }
    }

    private Map<String, String> extractColumnsAliases(ValueProvider<String> restricts) {
      Map<String, String> columnsWithAliases = new HashMap<>();
      if (StringUtils.isBlank(restricts.get())) {
        return columnsWithAliases;
      }
      String[] columnsList = restricts.get().split(",");

      for (String columnsWithAlias : columnsList) {
        String[] columnWithAlias = columnsWithAlias.split("->");
        if (columnWithAlias.length == 2) {
          columnsWithAliases.put(columnWithAlias[0], columnWithAlias[1]);
        }
      }
      return columnsWithAliases;
    }
  }
}

// Data model classes.
class Restrict {
  String namespace;
  List<String> allow;
  List<String> deny;

  static Restrict allowRestrict(String namespace, ByteString value) {
    Restrict restrict = new Restrict();
    restrict.namespace = namespace;
    restrict.allow = new ArrayList<String>();
    restrict.allow.add(value.toStringUtf8());
    return restrict;
  }

  static Restrict denyRestrict(String namespace, ByteString value) {
    Restrict restrict = new Restrict();
    restrict.namespace = namespace;
    restrict.deny = new ArrayList<String>();
    restrict.deny.add(value.toStringUtf8());
    return restrict;
  }
}

class NumericRestrict {
  enum Type {
    INT,
    FLOAT,
    DOUBLE
  };

  String namespace;
  Type type;
  Integer valueInt;
  Float valueFloat;
  Double valueDouble;

  static NumericRestrict intValue(String namespace, ByteString value) {
    NumericRestrict restrict = new NumericRestrict();
    restrict.namespace = namespace;
    restrict.valueInt = Bytes.toInt(value.toByteArray());
    restrict.type = Type.INT;
    return restrict;
  }

  static NumericRestrict floatValue(String namespace, ByteString value) {
    NumericRestrict restrict = new NumericRestrict();
    restrict.namespace = namespace;
    restrict.valueFloat = Bytes.toFloat(value.toByteArray());
    restrict.type = Type.FLOAT;
    return restrict;
  }

  static NumericRestrict doubleValue(String namespace, ByteString value) {
    NumericRestrict restrict = new NumericRestrict();
    restrict.namespace = namespace;
    restrict.valueDouble = Bytes.toDouble(value.toByteArray());
    restrict.type = Type.DOUBLE;
    return restrict;
  }
}

class VectorEmbeddings {
  String id;
  String crowdingTag;
  List<Float> floatEmbeddings;
  List<Double> doubleEmbeddings;
  List<Restrict> restricts;
  List<NumericRestrict> numericRestricts;

  void addRestrict(Restrict restrict) {
    if (this.restricts == null) {
      this.restricts = new ArrayList<Restrict>();
    }
    restricts.add(restrict);
  }

  void addNumericRestrict(NumericRestrict numericRestrict) {
    if (this.numericRestricts == null) {
      this.numericRestricts = new ArrayList<NumericRestrict>();
    }
    numericRestricts.add(numericRestrict);
  }
}

다음 단계