Bigtable to Vertex AI Vector Search files on Cloud Storage 模板会创建一个批处理流水线,该流水线可从 Bigtable 表中读取数据并以 JSON 格式将其写入 Cloud Storage 存储桶。您可以将此模板用于向量嵌入。
流水线要求
- Bigtable 表必须已存在。
- 在运行此流水线之前,输出 Cloud Storage 存储桶必须已存在。
模板参数
参数 | 说明 |
---|---|
bigtableProjectId |
您要从中读取数据的 Bigtable 实例所属的 Google Cloud 项目的 ID。 |
bigtableInstanceId |
表所属的 Bigtable 实例的 ID。 |
bigtableTableId |
要读取的 Bigtable 表的 ID。 |
filenamePrefix |
JSON 文件名的前缀。例如 table1- 。如果未提供值,则默认为 part 。 |
idColumn |
在其中存储 ID 的完全限定列名。采用 cf:col 或 _key 格式。 |
embeddingColumn |
在其中存储嵌入的完全限定列名。采用 cf:col 或 _key 格式。 |
outputDirectory |
可选:在其中存储输出 JSON 文件的 Cloud Storage 路径。例如:gs://your-bucket/your-path/ 。 |
crowdingTagColumn |
可选:在其中存储拥挤标记的完全限定列名。采用 cf:col 或 _key 格式。 |
embeddingByteSize |
可选:嵌入数组中每个条目的字节大小。对于浮点数,请使用值 4 。对于双精度数,请使用值 8 。默认值为 4 。 |
allowRestrictsMappings |
可选:要用作 allow 限制的列的以逗号分隔完全限定列名及其别名。格式为 cf:col->alias 。 |
denyRestrictsMappings |
可选:要用作 deny 限制的列的以逗号分隔完全限定列名及其别名。格式为 cf:col->alias 。 |
intNumericRestrictsMappings |
可选:要用作整数 numeric_restricts 的列的以逗号分隔完全限定列名及其别名。格式为 cf:col->alias 。 |
floatNumericRestrictsMappings |
可选:要用作浮点数(4 字节)numeric_restricts 的列的以逗号分隔完全限定列名及其别名。格式为 cf:col->alias 。 |
doubleNumericRestrictsMappings |
可选:要用作双精度(8 字节)numeric_restricts 的列的英文逗号分隔的完全限定列名称及其别名。格式为 cf:col->alias 。 |
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Cloud Bigtable to Vector Embeddings template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud CLI
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/Cloud_Bigtable_to_Vector_Embeddings \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ bigtableProjectId=BIGTABLE_PROJECT_ID,\ bigtableInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableTableId=BIGTABLE_TABLE_ID,\ filenamePrefix=FILENAME_PREFIX,\ idColumn=ID_COLUMN,\ embeddingColumn=EMBEDDING_COLUMN,\
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
BIGTABLE_PROJECT_ID
:项目 IDBIGTABLE_INSTANCE_ID
:实例 IDBIGTABLE_TABLE_ID
:表 IDFILENAME_PREFIX
:JSON 文件前缀ID_COLUMN
:ID 列EMBEDDING_COLUMN
:嵌入列
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_Bigtable_to_Vector_Embeddings { "jobName": "JOB_NAME", "parameters": { "bigtableProjectId": "BIGTABLE_PROJECT_ID", "bigtableInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableTableId": "BIGTABLE_TABLE_ID", "filenamePrefix": "FILENAME_PREFIX", "idColumn": "ID_COLUMN", "embeddingColumn": "EMBEDDING_COLUMN", }, "environment": { "maxWorkers": "10" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
BIGTABLE_PROJECT_ID
:项目 IDBIGTABLE_INSTANCE_ID
:实例 IDBIGTABLE_TABLE_ID
:表 IDFILENAME_PREFIX
:JSON 文件前缀ID_COLUMN
:ID 列EMBEDDING_COLUMN
:嵌入列
模板源代码
Java
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.bigtable;
import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.Row;
import com.google.bigtable.v2.RowFilter;
import com.google.cloud.teleport.bigtable.BigtableToVectorEmbeddings.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.gson.stream.JsonWriter;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Dataflow pipeline that exports data from a Cloud Bigtable table to JSON files in GCS,
* specifically for Vector Embedding purposes. Currently, filtering on Cloud Bigtable table is not
* supported.
*
* <p>Check out <a href=
* "https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Bigtable_to_Vector_Embeddings.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Cloud_Bigtable_to_Vector_Embeddings",
category = TemplateCategory.BATCH,
displayName = "Cloud Bigtable to Vector Embeddings",
description =
"The Bigtable to Vector Embedding template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in JSON format, for vector embeddings",
optionsClass = Options.class,
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-to-vector-embeddings",
contactInformation = "https://cloud.google.com/support",
requirements = {
"The Bigtable table must exist.",
"The output Cloud Storage bucket must exist before running the pipeline."
})
public class BigtableToVectorEmbeddings {
private static final Logger LOG = LoggerFactory.getLogger(BigtableToVectorEmbeddings.class);
/** Options for the export pipeline. */
public interface Options extends PipelineOptions {
@TemplateParameter.ProjectId(
order = 1,
description = "Project ID",
helpText =
"The ID for the Google Cloud project that contains the Bigtable instance that you want to read data from.")
ValueProvider<String> getBigtableProjectId();
@SuppressWarnings("unused")
void setBigtableProjectId(ValueProvider<String> projectId);
@TemplateParameter.Text(
order = 2,
regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
description = "Instance ID",
helpText = "The ID of the Bigtable instance that contains the table.")
ValueProvider<String> getBigtableInstanceId();
@SuppressWarnings("unused")
void setBigtableInstanceId(ValueProvider<String> instanceId);
@TemplateParameter.Text(
order = 3,
regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description = "Table ID",
helpText = "The ID of the Bigtable table to read from.")
ValueProvider<String> getBigtableTableId();
@SuppressWarnings("unused")
void setBigtableTableId(ValueProvider<String> tableId);
@TemplateParameter.GcsWriteFolder(
order = 4,
optional = true,
description = "Cloud Storage directory for storing JSON files",
helpText = "The Cloud Storage path where the output JSON files are stored.",
example = "gs://your-bucket/your-path/")
ValueProvider<String> getOutputDirectory();
@SuppressWarnings("unused")
void setOutputDirectory(ValueProvider<String> outputDirectory);
@TemplateParameter.Text(
order = 5,
description = "JSON file prefix",
helpText =
"The prefix of the JSON filename. For example: \"table1-\". If no value is provided, defaults to \"part\".")
@Default.String("part")
ValueProvider<String> getFilenamePrefix();
@SuppressWarnings("unused")
void setFilenamePrefix(ValueProvider<String> filenamePrefix);
@TemplateParameter.Text(
order = 6,
description = "ID column",
helpText =
"The fully qualified column name where the ID is stored. In the format cf:col or _key.")
ValueProvider<String> getIdColumn();
@SuppressWarnings("unused")
void setIdColumn(ValueProvider<String> value);
@TemplateParameter.Text(
order = 7,
description = "Embedding column",
helpText =
"The fully qualified column name where the embeddings are stored. In the format cf:col or _key.")
ValueProvider<String> getEmbeddingColumn();
@SuppressWarnings("unused")
void setEmbeddingColumn(ValueProvider<String> value);
@TemplateParameter.Text(
order = 8,
optional = true,
description = "Crowding tag column",
helpText =
"The fully qualified column name where the crowding tag is stored. In the format cf:col or _key.")
ValueProvider<String> getCrowdingTagColumn();
@SuppressWarnings("unused")
void setCrowdingTagColumn(ValueProvider<String> value);
@TemplateParameter.Integer(
order = 9,
optional = true,
description = "The byte size of the embeddings array. Can be 4 or 8.",
helpText =
"The byte size of each entry in the embeddings array. For float, use the value 4. For double, use the value 8. Defaults to 4.")
@Default.Integer(4)
ValueProvider<Integer> getEmbeddingByteSize();
@SuppressWarnings("unused")
void setEmbeddingByteSize(ValueProvider<Integer> value);
@TemplateParameter.Text(
order = 10,
optional = true,
description = "Allow restricts mappings",
helpText =
"The comma-separated, fully qualified column names for the columns to use as the allow restricts, with their aliases. In the format cf:col->alias.")
ValueProvider<String> getAllowRestrictsMappings();
@SuppressWarnings("unused")
void setAllowRestrictsMappings(ValueProvider<String> value);
@TemplateParameter.Text(
order = 11,
optional = true,
description = "Deny restricts mappings",
helpText =
"The comma-separated, fully qualified column names for the columns to use as the deny restricts, with their aliases. In the format cf:col->alias.")
ValueProvider<String> getDenyRestrictsMappings();
@SuppressWarnings("unused")
void setDenyRestrictsMappings(ValueProvider<String> value);
@TemplateParameter.Text(
order = 12,
optional = true,
description = "Integer numeric restricts mappings",
helpText =
"The comma-separated, fully qualified column names of the columns to use as integer numeric_restricts, with their aliases. In the format cf:col->alias.")
ValueProvider<String> getIntNumericRestrictsMappings();
@SuppressWarnings("unused")
void setIntNumericRestrictsMappings(ValueProvider<String> value);
@TemplateParameter.Text(
order = 13,
optional = true,
description = "Float numeric restricts mappings",
helpText =
"The comma-separated, fully qualified column names of the columns to use as float (4 bytes) numeric_restricts, with their aliases. In the format cf:col->alias.")
ValueProvider<String> getFloatNumericRestrictsMappings();
@SuppressWarnings("unused")
void setFloatNumericRestrictsMappings(ValueProvider<String> value);
@TemplateParameter.Text(
order = 14,
optional = true,
description = "Double numeric restricts mappings",
helpText =
"The comma-separated, fully qualified column names of the columns to use as double (8 bytes) numeric_restricts, with their aliases. In the format cf:col->alias.")
ValueProvider<String> getDoubleNumericRestrictsMappings();
@SuppressWarnings("unused")
void setDoubleNumericRestrictsMappings(ValueProvider<String> value);
@TemplateParameter.Text(
order = 15,
regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
optional = true,
description = "App Profile ID",
helpText = "The ID of the Cloud Bigtable app profile to be used for the export")
@Default.String("default")
ValueProvider<String> getBigtableAppProfileId();
@SuppressWarnings("unused")
void setBigtableAppProfileId(ValueProvider<String> value);
}
/**
* Runs a pipeline to export data from a Cloud Bigtable table to JSON files in GCS in JSON format,
* for use of Vertex Vector Search.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
PipelineResult result = run(options);
// Wait for pipeline to finish only if it is not constructing a template.
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
result.waitUntilFinish();
}
LOG.info("Completed pipeline setup");
}
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));
BigtableIO.Read read =
BigtableIO.read()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withAppProfileId(options.getBigtableAppProfileId())
.withTableId(options.getBigtableTableId())
.withRowFilter(RowFilter.newBuilder().setCellsPerColumnLimitFilter(1).build());
// Do not validate input fields if it is running as a template.
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() != null) {
read = read.withoutValidation();
}
// Concatenating cloud storage folder with file prefix to get complete path
ValueProvider<String> outputFilePrefix = options.getFilenamePrefix();
ValueProvider<String> outputFilePathWithPrefix =
ValueProvider.NestedValueProvider.of(
options.getOutputDirectory(),
(SerializableFunction<String, String>)
folder -> {
if (!folder.endsWith("/")) {
// Appending the slash if not provided by user
folder = folder + "/";
}
return folder + outputFilePrefix.get();
});
pipeline
.apply("Read from Bigtable", read)
.apply(
"Transform to JSON",
MapElements.via(
new BigtableToVectorEmbeddingsFn(
options.getIdColumn(),
options.getEmbeddingColumn(),
options.getEmbeddingByteSize(),
options.getCrowdingTagColumn(),
options.getAllowRestrictsMappings(),
options.getDenyRestrictsMappings(),
options.getIntNumericRestrictsMappings(),
options.getFloatNumericRestrictsMappings(),
options.getDoubleNumericRestrictsMappings())))
.apply("Write to storage", TextIO.write().to(outputFilePathWithPrefix).withSuffix(".json"));
return pipeline.run();
}
/** Translates Bigtable {@link Row} to Vector Embeddings JSON. */
static class BigtableToVectorEmbeddingsFn extends SimpleFunction<Row, String> {
private static final String ID_KEY = "id";
private static final String EMBEDDING_KEY = "embedding";
private static final String RESTRICTS_KEY = "restricts";
private static final String NUMERIC_RESTRICTS_KEY = "numeric_restricts";
private static final String CROWDING_TAG_KEY = "crowding_tag";
private static final String NAMESPACE_KEY = "namespace";
private static final String ALLOW_KEY = "allow";
private static final String DENY_KEY = "deny";
private static final String VALUE_INT_KEY = "value_int";
private static final String VALUE_FLOAT_KEY = "value_float";
private static final String VALUE_DOUBLE_KEY = "value_double";
private String idColumn;
private String embeddingsColumn;
private Integer embeddingByteSize;
private String crowdingTagColumn;
private Map<String, String> allowRestricts;
private Map<String, String> denyRestricts;
private Map<String, String> intNumericRestricts;
private Map<String, String> floatNumericRestricts;
private Map<String, String> doubleNumericRestricts;
private ValueProvider<Integer> embeddingByteSizeProvider;
private ValueProvider<String> idColumnProvider;
private ValueProvider<String> embeddingsColumnProvider;
private ValueProvider<String> crowdingTagColumnProvider;
private ValueProvider<String> allowRestrictsMappingsProvider;
private ValueProvider<String> denyRestrictsMappingsProvider;
private ValueProvider<String> intNumericRestrictsMappingsProvider;
private ValueProvider<String> floatNumericRestrictsMappingsProvider;
private ValueProvider<String> doubleNumericRestrictsMappingsProvider;
public BigtableToVectorEmbeddingsFn(
ValueProvider<String> idColumnProvider,
ValueProvider<String> embeddingsColumnProvider,
ValueProvider<Integer> embeddingByteSizeProvider,
ValueProvider<String> crowdingTagColumnProvider,
ValueProvider<String> allowRestrictsMappingsProvider,
ValueProvider<String> denyRestrictsMappingsProvider,
ValueProvider<String> intNumericRestrictsMappingsProvider,
ValueProvider<String> floatNumericRestrictsMappingsProvider,
ValueProvider<String> doubleNumericRestrictsMappingsProvider) {
this.idColumnProvider = idColumnProvider;
this.embeddingsColumnProvider = embeddingsColumnProvider;
this.embeddingByteSizeProvider = embeddingByteSizeProvider;
this.crowdingTagColumnProvider = crowdingTagColumnProvider;
this.allowRestrictsMappingsProvider = allowRestrictsMappingsProvider;
this.denyRestrictsMappingsProvider = denyRestrictsMappingsProvider;
this.intNumericRestrictsMappingsProvider = intNumericRestrictsMappingsProvider;
this.floatNumericRestrictsMappingsProvider = floatNumericRestrictsMappingsProvider;
this.doubleNumericRestrictsMappingsProvider = doubleNumericRestrictsMappingsProvider;
}
@Override
public String apply(Row row) {
this.embeddingByteSize = this.embeddingByteSizeProvider.get();
if (this.embeddingByteSize != 4 && this.embeddingByteSize != 8) {
throw new RuntimeException("embeddingByteSize can be either 4 or 8");
}
this.idColumn = this.idColumnProvider.get();
this.embeddingsColumn = this.embeddingsColumnProvider.get();
this.crowdingTagColumn = this.crowdingTagColumnProvider.get();
this.allowRestricts =
Optional.ofNullable(this.allowRestricts)
.orElse(extractColumnsAliases(this.allowRestrictsMappingsProvider));
this.denyRestricts =
Optional.ofNullable(this.denyRestricts)
.orElse(extractColumnsAliases(this.denyRestrictsMappingsProvider));
this.intNumericRestricts =
Optional.ofNullable(this.intNumericRestricts)
.orElse(extractColumnsAliases(this.intNumericRestrictsMappingsProvider));
this.floatNumericRestricts =
Optional.ofNullable(this.floatNumericRestricts)
.orElse(extractColumnsAliases(this.floatNumericRestrictsMappingsProvider));
this.doubleNumericRestricts =
Optional.ofNullable(this.doubleNumericRestricts)
.orElse(extractColumnsAliases(this.doubleNumericRestrictsMappingsProvider));
StringWriter stringWriter = new StringWriter();
JsonWriter jsonWriter = new JsonWriter(stringWriter);
VectorEmbeddings vectorEmbeddings = buildObject(row);
try {
serialize(jsonWriter, vectorEmbeddings);
} catch (IOException e) {
throw new RuntimeException(e);
}
return stringWriter.toString();
}
private void serialize(JsonWriter jsonWriter, VectorEmbeddings vectorEmbeddings)
throws IOException {
jsonWriter.beginObject();
// Required fields.
jsonWriter.name(ID_KEY).value(vectorEmbeddings.id);
jsonWriter.name(EMBEDDING_KEY);
jsonWriter.beginArray();
if (this.embeddingByteSize == 4) {
for (Float f : vectorEmbeddings.floatEmbeddings) {
jsonWriter.value(f);
}
} else if (this.embeddingByteSize == 8) {
for (Double d : vectorEmbeddings.doubleEmbeddings) {
jsonWriter.value(d);
}
}
jsonWriter.endArray();
// Optional fields.
if (vectorEmbeddings.crowdingTag != "") {
jsonWriter.name(CROWDING_TAG_KEY).value(vectorEmbeddings.crowdingTag);
}
if (vectorEmbeddings.restricts != null && !vectorEmbeddings.restricts.isEmpty()) {
jsonWriter.name(RESTRICTS_KEY);
jsonWriter.beginArray();
for (Restrict r : vectorEmbeddings.restricts) {
jsonWriter.beginObject();
jsonWriter.name(NAMESPACE_KEY).value(r.namespace);
if (r.allow != null && !r.allow.isEmpty()) {
jsonWriter.name(ALLOW_KEY);
jsonWriter.beginArray();
for (String a : r.allow) {
jsonWriter.value(a);
}
jsonWriter.endArray();
} else if (r.deny != null && !r.deny.isEmpty()) {
jsonWriter.name(DENY_KEY);
jsonWriter.beginArray();
for (String d : r.deny) {
jsonWriter.value(d);
}
jsonWriter.endArray();
}
jsonWriter.endObject();
}
jsonWriter.endArray();
}
if (vectorEmbeddings.numericRestricts != null
&& !vectorEmbeddings.numericRestricts.isEmpty()) {
jsonWriter.name(NUMERIC_RESTRICTS_KEY);
jsonWriter.beginArray();
for (NumericRestrict numericRestrict : vectorEmbeddings.numericRestricts) {
jsonWriter.beginObject();
jsonWriter.name(NAMESPACE_KEY).value(numericRestrict.namespace);
switch (numericRestrict.type) {
case INT:
jsonWriter.name(VALUE_INT_KEY).value(numericRestrict.valueInt);
break;
case FLOAT:
jsonWriter.name(VALUE_FLOAT_KEY).value(numericRestrict.valueFloat);
break;
case DOUBLE:
jsonWriter.name(VALUE_DOUBLE_KEY).value(numericRestrict.valueDouble);
break;
}
jsonWriter.endObject();
}
jsonWriter.endArray();
}
jsonWriter.endObject();
}
private VectorEmbeddings buildObject(Row row) {
VectorEmbeddings vectorEmbeddings = new VectorEmbeddings();
maybeAddToObject(vectorEmbeddings, "_key", row.getKey());
for (Family family : row.getFamiliesList()) {
String familyName = family.getName();
for (Column column : family.getColumnsList()) {
for (Cell cell : column.getCellsList()) {
maybeAddToObject(
vectorEmbeddings,
familyName + ":" + column.getQualifier().toStringUtf8(),
cell.getValue());
}
}
}
// Assert fields
if (StringUtils.isEmpty(vectorEmbeddings.id)) {
throw new RuntimeException(
String.format(
"'%s' value is missing for row '%s'", ID_KEY, row.getKey().toStringUtf8()));
}
if (this.embeddingByteSize == 4
&& (vectorEmbeddings.floatEmbeddings == null
|| vectorEmbeddings.floatEmbeddings.isEmpty())) {
throw new RuntimeException(
String.format(
"'%s' value is missing for row '%s'", EMBEDDING_KEY, row.getKey().toStringUtf8()));
}
if (this.embeddingByteSize == 8
&& (vectorEmbeddings.doubleEmbeddings == null
|| vectorEmbeddings.doubleEmbeddings.isEmpty())) {
throw new RuntimeException(
String.format(
"'%s' value is missing for row '%s'", EMBEDDING_KEY, row.getKey().toStringUtf8()));
}
return vectorEmbeddings;
}
private void maybeAddToObject(
VectorEmbeddings vectorEmbeddings, String columnQualifier, ByteString value) {
if (columnQualifier.equals(this.idColumn)) {
vectorEmbeddings.id = value.toStringUtf8();
} else if (columnQualifier.equals(this.crowdingTagColumn)) {
vectorEmbeddings.crowdingTag = value.toStringUtf8();
} else if (columnQualifier.equals(this.embeddingsColumn)) {
vectorEmbeddings.floatEmbeddings = new ArrayList<Float>();
vectorEmbeddings.doubleEmbeddings = new ArrayList<Double>();
byte[] bytes = value.toByteArray();
for (int i = 0; i < bytes.length; i += embeddingByteSize) {
if (embeddingByteSize == 4) {
vectorEmbeddings.floatEmbeddings.add(Bytes.toFloat(bytes, i));
} else if (embeddingByteSize == 8) {
vectorEmbeddings.doubleEmbeddings.add(Bytes.toDouble(bytes, i));
}
}
} else if (this.allowRestricts.containsKey(columnQualifier)) {
vectorEmbeddings.addRestrict(
Restrict.allowRestrict(allowRestricts.get(columnQualifier), value));
} else if (this.denyRestricts.containsKey(columnQualifier)) {
vectorEmbeddings.addRestrict(
Restrict.denyRestrict(denyRestricts.get(columnQualifier), value));
} else if (this.intNumericRestricts.containsKey(columnQualifier)) {
vectorEmbeddings.addNumericRestrict(
NumericRestrict.intValue(intNumericRestricts.get(columnQualifier), value));
} else if (this.floatNumericRestricts.containsKey(columnQualifier)) {
vectorEmbeddings.addNumericRestrict(
NumericRestrict.floatValue(floatNumericRestricts.get(columnQualifier), value));
} else if (this.doubleNumericRestricts.containsKey(columnQualifier)) {
vectorEmbeddings.addNumericRestrict(
NumericRestrict.doubleValue(doubleNumericRestricts.get(columnQualifier), value));
}
}
private Map<String, String> extractColumnsAliases(ValueProvider<String> restricts) {
Map<String, String> columnsWithAliases = new HashMap<>();
if (StringUtils.isBlank(restricts.get())) {
return columnsWithAliases;
}
String[] columnsList = restricts.get().split(",");
for (String columnsWithAlias : columnsList) {
String[] columnWithAlias = columnsWithAlias.split("->");
if (columnWithAlias.length == 2) {
columnsWithAliases.put(columnWithAlias[0], columnWithAlias[1]);
}
}
return columnsWithAliases;
}
}
}
// Data model classes.
class Restrict {
String namespace;
List<String> allow;
List<String> deny;
static Restrict allowRestrict(String namespace, ByteString value) {
Restrict restrict = new Restrict();
restrict.namespace = namespace;
restrict.allow = new ArrayList<String>();
restrict.allow.add(value.toStringUtf8());
return restrict;
}
static Restrict denyRestrict(String namespace, ByteString value) {
Restrict restrict = new Restrict();
restrict.namespace = namespace;
restrict.deny = new ArrayList<String>();
restrict.deny.add(value.toStringUtf8());
return restrict;
}
}
class NumericRestrict {
enum Type {
INT,
FLOAT,
DOUBLE
};
String namespace;
Type type;
Integer valueInt;
Float valueFloat;
Double valueDouble;
static NumericRestrict intValue(String namespace, ByteString value) {
NumericRestrict restrict = new NumericRestrict();
restrict.namespace = namespace;
restrict.valueInt = Bytes.toInt(value.toByteArray());
restrict.type = Type.INT;
return restrict;
}
static NumericRestrict floatValue(String namespace, ByteString value) {
NumericRestrict restrict = new NumericRestrict();
restrict.namespace = namespace;
restrict.valueFloat = Bytes.toFloat(value.toByteArray());
restrict.type = Type.FLOAT;
return restrict;
}
static NumericRestrict doubleValue(String namespace, ByteString value) {
NumericRestrict restrict = new NumericRestrict();
restrict.namespace = namespace;
restrict.valueDouble = Bytes.toDouble(value.toByteArray());
restrict.type = Type.DOUBLE;
return restrict;
}
}
class VectorEmbeddings {
String id;
String crowdingTag;
List<Float> floatEmbeddings;
List<Double> doubleEmbeddings;
List<Restrict> restricts;
List<NumericRestrict> numericRestricts;
void addRestrict(Restrict restrict) {
if (this.restricts == null) {
this.restricts = new ArrayList<Restrict>();
}
restricts.add(restrict);
}
void addNumericRestrict(NumericRestrict numericRestrict) {
if (this.numericRestricts == null) {
this.numericRestricts = new ArrayList<NumericRestrict>();
}
numericRestricts.add(numericRestrict);
}
}
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。