批量导入特征值

通过批量导入,您可以从有效数据源批量导入特征值。在批量导入请求中,您可以为一种实体类型导入最多 100 个特征的值。请注意,每种实体类型只能运行一个批量导入作业,以避免冲突。

在批量导入请求中,指定源数据的位置及其映射到特征存储区中特征的方式。由于每个批量导入请求针对单个实体类型,因此源数据也必须针对单个实体类型。

导入顺利完成后,后续读取操作可以使用特征值。

导入作业性能

Vertex AI Feature Store(旧版)提供高吞吐量导入,但最短延迟时间可能需要几分钟。对 Vertex AI Feature Store(旧版)的每个请求都会启动一个作业以完成相关工作。即使您要导入单个记录,导入作业也需要几分钟才能完成。

如果要调整作业的性能,请更改下面两个变量:

  • 特征存储区在线传送节点的数量。
  • 用于导入作业的工作器数量。工作器处理数据并写入特征存储区。

推荐的工作器数量是特征存储区中每 10 个在线传送节点一个工作器。如果在线传送负载较低,您可以提高工作器数量。您最多可以指定 100 个工作器。如需了解详情,请参阅相应地监控和调优资源以优化批量导入

如果在线传送集群预配不足,则导入作业可能会失败。如果失败,请在在线传送负载较低时重试导入请求,或者增加特征存储区的节点数,然后重试请求。

如果特征存储区没有在线存储区(零在线传送节点),则导入作业仅将数据写入离线存储区,并且作业的性能仅取决于导入工作器的数量。

数据一致性

如果源数据在导入过程中被修改,则可能会出现不一致。确保在启动导入作业之前完成所有源数据修改。此外,重复的特征值可能会导致在线请求和批量请求之间传送不同的值。确保每个实体 ID 和时间戳对都有一个特征值。

如果导入操作失败,则特征存储区可能只包含部分数据,这可能导致在线传送请求和批量传送请求之间返回不一致的值。为避免这种不一致,请再次重试同一导入请求,并等到请求成功完成为止。

Null 值和空数组

在导入期间,Vertex AI Feature Store(旧版)会将 null 标量值或空数组视为空值。其中包括 CSV 列中的空值。Vertex AI Feature Store(旧版)不支持非标量 null 值,例如数组中的 null 值。

当在线传送和批量传送时,Vertex AI Feature Store(旧版)会返回该特征的最新非 null 或非空值。如果特征的历史值不可用,则 Vertex AI Feature Store(旧版)会返回 null

NaN 值

Vertex AI Feature Store(旧版)支持 DoubleDoubleArray 中的 NaN(非数字)值。在注入期间,您可以在传送输入 CSV 文件中输入 NaN 来表示 NaN 值。当在线传送和批量传送时,Vertex AI Feature Store(旧版)会对 NaN 值返回 NaN

批量导入

将值批量导入到单个实体类型的一个或多个特征的特征存储区。

网页界面

  1. 在 Google Cloud 控制台的“Vertex AI”部分,转到特征页面。

    转到“特征”页面

  2. 区域下拉列表中选择一个区域。
  3. 在特征表中,查看实体类型列,并找到包含您要导入其值的特征的实体类型。
  4. 点击实体类型的名称。
  5. 在操作栏中,点击注入值
  6. 数据源部分,选择以下选项之一:
    • Cloud Storage CSV 文件:选择此选项可从 Cloud Storage 的多个 CSV 文件导入数据。指定 CSV 文件的路径和名称。如需指定更多文件,请点击添加其他文件
    • Cloud Storage AVRO 文件:选择此选项可从 Cloud Storage 的 AVRO 文件导入数据。指定 AVRO 的路径和名称。
    • BigQuery 表:选择此选项可从 BigQuery 表或 BigQuery 视图导入数据。浏览并选择要使用的表或视图,其格式如下:PROJECT_ID.DATASET_ID.TABLE_ID
  7. 点击继续
  8. 将列映射到特征部分,指定源数据中的哪些列会映射到特征存储区中的实体和特征。
    1. 指定源数据中包含实体 ID 的列名称。
    2. 对于时间戳,在源数据中指定时间戳列,或指定与您导入的所有特征值关联的单个时间戳。
    3. 在特征列表中,输入映射到每个特征的源数据列名称。默认情况下,Vertex AI Feature Store(旧版)假定特征名称和列名称匹配。
  9. 点击注入

REST

如需导入现有特征的特征值,请使用 featurestores.entityTypes.importFeatureValues 方法发送 POST 请求。请注意,如果源数据列的名称和目标特征 ID 不同,请添加 sourceField 参数。

在使用任何请求数据之前,请先进行以下替换:

  • LOCATION_ID:在其中创建特征存储区的区域。例如 us-central1
  • PROJECT_ID:您的项目 ID
  • FEATURESTORE_ID:特征存储区的 ID。
  • ENTITY_TYPE_ID:实体类型的 ID。
  • ENTITY_SOURCE_COLUMN_ID:包含实体 ID 的源列的 ID。
  • FEATURE_TIME_ID:包含特征值的特征时间戳的源列的 ID。
  • FEATURE_ID:待导入特征存储区中现有特征的 ID。
  • FEATURE_SOURCE_COLUMN_ID:包含实体特征值的源列的 ID。
  • SOURCE_DATA_DETAILS:源数据位置,还表示 BigQuery 表或 BigQuery 视图的格式,例如 "bigquerySource": { "inputUri": "bq://test.dataset.sourcetable" }
  • WORKER_COUNT:将数据写入特征存储区的工作器数量。

HTTP 方法和网址:

POST https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues

请求 JSON 正文:

{
  "entityIdField": "ENTITY_SOURCE_COLUMN_ID",
  "featureTimeField": "FEATURE_TIME_ID",
  SOURCE_DATA_DETAILS,
  "featureSpecs": [{
    "id": "FEATURE_ID",
    "sourceField": "FEATURE_SOURCE_COLUMN_ID"
  }],
  "workerCount": WORKER_COUNT
}

如需发送请求,请选择以下方式之一:

curl

将请求正文保存在名为 request.json 的文件中,然后执行以下命令:

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues"

PowerShell

将请求正文保存在名为 request.json 的文件中,然后执行以下命令:

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues" | Select-Object -Expand Content

您应该会看到类似如下所示的输出。您可以使用响应中的 OPERATION_ID获取操作的状态

{
  "name": "projects/PROJECT_NUMBER/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata",
    "genericMetadata": {
      "createTime": "2021-03-02T00:04:13.039166Z",
      "updateTime": "2021-03-02T00:04:13.039166Z"
    }
  }
}

Python

如需了解如何安装或更新 Python,请参阅安装 Python 版 Vertex AI SDK。 如需了解详情,请参阅 Python API 参考文档

import datetime
from typing import List, Union

from google.cloud import aiplatform

def import_feature_values_sample(
    project: str,
    location: str,
    entity_type_id: str,
    featurestore_id: str,
    feature_ids: List[str],
    feature_time: Union[str, datetime.datetime],
    gcs_source_uris: Union[str, List[str]],
    gcs_source_type: str,
):

    aiplatform.init(project=project, location=location)

    my_entity_type = aiplatform.featurestore.EntityType(
        entity_type_name=entity_type_id, featurestore_id=featurestore_id
    )

    my_entity_type.ingest_from_gcs(
        feature_ids=feature_ids,
        feature_time=feature_time,
        gcs_source_uris=gcs_source_uris,
        gcs_source_type=gcs_source_type,
    )

Python

Python 版 Vertex AI SDK 的安装包含 Vertex AI 客户端库。如需了解如何安装 Python 版 Vertex AI SDK,请参阅安装 Python 版 Vertex AI SDK。如需了解详情,请参阅 Python 版 Vertex AI SDK API 参考文档

from google.cloud import aiplatform

def import_feature_values_sample(
    project: str,
    featurestore_id: str,
    entity_type_id: str,
    avro_gcs_uri: str,
    entity_id_field: str,
    feature_time_field: str,
    worker_count: int = 2,
    location: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    timeout: int = 300,
):
    # The AI Platform services require regional API endpoints, which need to be
    # in the same region or multi-region overlap with the Feature Store location.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.FeaturestoreServiceClient(client_options=client_options)
    entity_type = f"projects/{project}/locations/{location}/featurestores/{featurestore_id}/entityTypes/{entity_type_id}"
    avro_source = aiplatform.gapic.AvroSource(
        gcs_source=aiplatform.gapic.GcsSource(uris=[avro_gcs_uri])
    )
    feature_specs = [
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="age"),
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="gender"),
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="liked_genres"),
    ]
    import_feature_values_request = aiplatform.gapic.ImportFeatureValuesRequest(
        entity_type=entity_type,
        avro_source=avro_source,
        feature_specs=feature_specs,
        entity_id_field=entity_id_field,
        feature_time_field=feature_time_field,
        worker_count=worker_count,
    )
    lro_response = client.import_feature_values(request=import_feature_values_request)
    print("Long running operation:", lro_response.operation.name)
    import_feature_values_response = lro_response.result(timeout=timeout)
    print("import_feature_values_response:", import_feature_values_response)

Java

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Java 设置说明执行操作。如需了解详情,请参阅 Vertex AI Java API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.aiplatform.v1.AvroSource;
import com.google.cloud.aiplatform.v1.EntityTypeName;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceClient;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceSettings;
import com.google.cloud.aiplatform.v1.GcsSource;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest.FeatureSpec;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ImportFeatureValuesSample {

  public static void main(String[] args)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    // TODO(developer): Replace these variables before running the sample.
    String project = "YOUR_PROJECT_ID";
    String featurestoreId = "YOUR_FEATURESTORE_ID";
    String entityTypeId = "YOUR_ENTITY_TYPE_ID";
    String entityIdField = "YOUR_ENTITY_FIELD_ID";
    String featureTimeField = "YOUR_FEATURE_TIME_FIELD";
    String gcsSourceUri = "YOUR_GCS_SOURCE_URI";
    int workerCount = 2;
    String location = "us-central1";
    String endpoint = "us-central1-aiplatform.googleapis.com:443";
    int timeout = 300;
    importFeatureValuesSample(
        project,
        featurestoreId,
        entityTypeId,
        gcsSourceUri,
        entityIdField,
        featureTimeField,
        workerCount,
        location,
        endpoint,
        timeout);
  }

  static void importFeatureValuesSample(
      String project,
      String featurestoreId,
      String entityTypeId,
      String gcsSourceUri,
      String entityIdField,
      String featureTimeField,
      int workerCount,
      String location,
      String endpoint,
      int timeout)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    FeaturestoreServiceSettings featurestoreServiceSettings =
        FeaturestoreServiceSettings.newBuilder().setEndpoint(endpoint).build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (FeaturestoreServiceClient featurestoreServiceClient =
        FeaturestoreServiceClient.create(featurestoreServiceSettings)) {
      List<FeatureSpec> featureSpecs = new ArrayList<>();

      featureSpecs.add(FeatureSpec.newBuilder().setId("title").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("genres").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("average_rating").build());
      ImportFeatureValuesRequest importFeatureValuesRequest =
          ImportFeatureValuesRequest.newBuilder()
              .setEntityType(
                  EntityTypeName.of(project, location, featurestoreId, entityTypeId).toString())
              .setEntityIdField(entityIdField)
              .setFeatureTimeField(featureTimeField)
              .addAllFeatureSpecs(featureSpecs)
              .setWorkerCount(workerCount)
              .setAvroSource(
                  AvroSource.newBuilder()
                      .setGcsSource(GcsSource.newBuilder().addUris(gcsSourceUri)))
              .build();
      OperationFuture<ImportFeatureValuesResponse, ImportFeatureValuesOperationMetadata>
          importFeatureValuesFuture =
              featurestoreServiceClient.importFeatureValuesAsync(importFeatureValuesRequest);
      System.out.format(
          "Operation name: %s%n", importFeatureValuesFuture.getInitialFuture().get().getName());
      System.out.println("Waiting for operation to finish...");
      ImportFeatureValuesResponse importFeatureValuesResponse =
          importFeatureValuesFuture.get(timeout, TimeUnit.SECONDS);
      System.out.println("Import Feature Values Response");
      System.out.println(importFeatureValuesResponse);
      featurestoreServiceClient.close();
    }
  }
}

Node.js

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Node.js 设置说明执行操作。如需了解详情,请参阅 Vertex AI Node.js API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.\
 * (Not necessary if passing values as arguments)
 */

// const project = 'YOUR_PROJECT_ID';
// const featurestoreId = 'YOUR_FEATURESTORE_ID';
// const entityTypeId = 'YOUR_ENTITY_TYPE_ID';
// const avroGcsUri = 'AVRO_FILE_IN_THE_GCS_URI';
// const entityIdField = 'ENTITY_ID_FIELD_IN_AVRO';
// const featureTimeField = 'TIMESTAMP_FIELD_IN_AVRO';
// const workerCount = <NO_OF_WORKERS_FOR_INGESTION_JOB>;
// const location = 'YOUR_PROJECT_LOCATION';
// const apiEndpoint = 'YOUR_API_ENDPOINT';
// const timeout = <TIMEOUT_IN_MILLI_SECONDS>;

// Imports the Google Cloud Featurestore Service Client library
const {FeaturestoreServiceClient} = require('@google-cloud/aiplatform').v1;

// Specifies the location of the api endpoint
const clientOptions = {
  apiEndpoint: apiEndpoint,
};

// Instantiates a client
const featurestoreServiceClient = new FeaturestoreServiceClient(
  clientOptions
);

async function importFeatureValues() {
  // Configure the entityType resource
  const entityType = `projects/${project}/locations/${location}/featurestores/${featurestoreId}/entityTypes/${entityTypeId}`;

  const avroSource = {
    gcsSource: {
      uris: [avroGcsUri],
    },
  };

  const featureSpecs = [{id: 'age'}, {id: 'gender'}, {id: 'liked_genres'}];

  const request = {
    entityType: entityType,
    avroSource: avroSource,
    entityIdField: entityIdField,
    featureSpecs: featureSpecs,
    featureTimeField: featureTimeField,
    workerCount: Number(workerCount),
  };

  // Import Feature Values Request
  const [operation] = await featurestoreServiceClient.importFeatureValues(
    request,
    {timeout: Number(timeout)}
  );
  const [response] = await operation.promise();

  console.log('Import feature values response');
  console.log('Raw response:');
  console.log(JSON.stringify(response, null, 2));
}
importFeatureValues();

查看导入作业

使用 Google Cloud 控制台查看 Google Cloud 项目中的批量导入作业。

网页界面

  1. 在 Google Cloud 控制台的“Vertex AI”部分,转到特征页面。

    转到“特征”页面

  2. 区域下拉列表中选择一个区域。
  3. 在操作栏中,点击查看注入作业以列出所有特征存储区的导入作业。
  4. 点击导入作业的 ID 以查看其详细信息,例如其数据源、导入实体的数量,以及导入的特征值的数量。

覆盖特征存储区中的现有数据

如果现有特征值与导入值具有相同的时间戳,您可以通过重新导入值来覆盖现有特征值。您无需先删除现有特征值。例如,您可能依赖于最近更改的底层源数据。要使特征存储区与该底层数据保持一致,请再次导入特征值。如果时间戳不匹配,则导入的值会被视为唯一,旧值会继续存在(不会被覆盖)。

要确保在线和批量传送请求之间的一致性,请等待导入作业完成后再发出任何传送请求。

回填历史数据

如果您要回填数据(即导入过去的特征值),请停用导入作业的在线传送功能。在线传送仅用于传送回填不包含的最新特征值。停用在线传送非常有用,因为您可以消除在线传送节点上的任何负载,并提高导入作业的吞吐量,从而缩短其完成时间。

使用 API 或客户端库时,您可以停用导入作业的在线传送。如需了解详情,请参阅 importFeatureValue 方法disableOnlineServing 字段。

后续步骤