特徴値を一括インポートする

一括インポートにより、有効なデータソースから特徴値を一括でインポートできます。一括インポート リクエストでは、エンティティ タイプごとに最大 100 個の特徴に値をインポートできます。競合を避けるため、一括インポート ジョブはエンティティ タイプごとに 1 つだけ実行します。

一括インポート リクエストでは、ソースデータの場所と、そのデータを featurestore の特徴にマッピングする方法を指定します。各一括インポート リクエストは 1 つのエンティティ タイプを対象としているため、ソースデータも単一エンティティ タイプにする必要があります。

インポートが正常に完了すると、後続の読み取りオペレーションで特徴値を使用できます。

取り込みジョブのパフォーマンス

Vertex AI Feature Store(従来版)は高スループットの取り込みを行いますが、最小レイテンシが数分になる場合があります。Vertex AI Feature Store(従来版)に対するリクエストによって、処理を完了するためのジョブが開始します。単一のレコードをインポートしている場合でも、取り込みジョブが完了するまで数分かかります。

ジョブのパフォーマンスを調整する場合は、次の 2 つの変数を変更します。

  • featurestore オンライン サービング ノードの数。
  • 取り込みジョブに使用されるワーカーの数。ワーカーは、データを処理して featurestore に書き込みます。

ワーカーの推奨数は、featurestore のオンライン サービング ノード 10 個につき 1 個です。オンライン サービングの負荷が低い場合は、さらに増やすことができます。最大 100 個のワーカーを指定できます。詳細については、リソースのモニタリングと調整で一括インポートを最適化するをご覧ください。

オンライン サービング クラスタがプロビジョニングされていないと、インポート ジョブが失敗する可能性があります。障害が発生した場合は、オンライン サービングの負荷が低いときにインポート リクエストを再試行するか、featurestore のノード数を増やしてからリクエストを再試行します。

featurestore にオンライン ストアがない場合(オンライン サービング ノードが 0)、インポート ジョブはオフライン ストアにのみ書き込みを行い、ジョブのパフォーマンスはインポート ワーカーの数にのみ依存します。

データの整合性

インポート中にソースデータが変更されると、不整合が生じる可能性があります。インポート ジョブを開始する前に、ソースデータの変更が完了していることを確認してください。また、特徴値が重複していると、オンライン リクエストとバッチ リクエストの間で異なる値が処理される可能性があります。エンティティ ID とタイムスタンプのペアごとに 1 つの特徴値があることを確認します。

インポート オペレーションが失敗すると、featurestore に部分的なデータが保存され、オンライン サービング リクエストとバッチ サービング リクエストの間で整合性のない値が返される可能性があります。この不一致を回避するには、同じインポート リクエストを再試行し、リクエストが正常に完了するまで待ちます。

null 値と空の配列

インポート中、Vertex AI Feature Store(従来版)は null スカラー値または空の配列を空の値と見なします。これには、CSV 列の空の値も含まれます。Vertex AI Feature Store(従来版)は、配列内の null 値などの非スカラー null 値をサポートしていません。

オンライン サービングとバッチ サービングで、Vertex AI Feature Store(従来版)は特徴の最新の非 null 値または空ではない値を返します。特徴の過去の値が利用できない場合、Vertex AI Feature Store(従来版)は null を返します。

NaN 値

Vertex AI Feature Store(従来型)は、DoubleDoubleArray の(数値以外の)NaN 値をサポートしています。インポート中、サービング入力 CSV ファイルに NaN を入力して、NaN 値を表すことができます。オンライン サービングとバッチ サービングでは、Vertex AI Feature Store(従来版)は NaN 値に対して NaN を返します。

バッチ インポート

単一エンティティ タイプの 1 つ以上の特徴を featurestore に一括でインポートします。

ウェブ UI

  1. Google Cloud Console の [Vertex AI] セクションで、[特徴] ページに移動します。

    [特徴] ページに移動

  2. [リージョン] プルダウン リストからリージョンを選択します。
  3. 特徴テーブルの [エンティティ タイプ] 列で、値を取り込む特徴を含むエンティティ タイプを見つけます。
  4. エンティティ タイプの名前をクリックします。
  5. アクションバーで [値を取り込む] をクリックします。
  6. [データソース] で、次のいずれかを選択します。
    • Cloud Storage CSV ファイル: Cloud Storage の複数の CSV ファイルからデータをインポートするには、このオプションを選択します。CSV ファイルのパスと名前を指定します。追加のファイルを指定するには、[別のファイルを追加] をクリックします。
    • Cloud Storage AVRO ファイル: Cloud Storage の AVRO ファイルからデータをインポートするには、このオプションを選択します。AVRO ファイルのパスと名前を指定します。
    • BigQuery テーブル: BigQuery テーブルまたは BigQuery ビューからデータをインポートするには、このオプションを選択します。使用するテーブルまたはビューを参照して選択します。形式は PROJECT_ID.DATASET_ID.TABLE_ID です。
  7. [続行] をクリックします。
  8. 特徴にマッピングする列の場合、ソースデータのどの列を featurestore のエンティティと特徴にマッピングするかを指定します。
    1. エンティティ ID を含むソースデータ内の列名を指定します。
    2. タイムスタンプには、ソースデータのタイムスタンプ列を指定するか、インポートするすべての特徴値に関連付けられた単一のタイムスタンプを指定します。
    3. 特徴のリストに、各特徴にマッピングするソースデータの列名を入力します。デフォルトでは、Vertex AI Feature Store(従来版)は特徴名と列名が一致していることを前提とします。
  9. [取り込み] をクリックします。

REST

既存の特徴の特徴値をインポートするには、featurestores.entityTypes.importFeatureValues メソッドを使用して POST リクエストを送信します。ソースデータ列の名前と宛先の特徴 ID が異なる場合は、sourceField パラメータを含めます。

リクエストのデータを使用する前に、次のように置き換えます。

  • LOCATION_ID: featurestore が作成されるリージョン。例: us-central1
  • PROJECT_ID: 実際のプロジェクト ID
  • FEATURESTORE_ID: featurestore の ID。
  • ENTITY_TYPE_ID: エンティティ タイプの ID。
  • ENTITY_SOURCE_COLUMN_ID: エンティティ ID を含むソース列の ID。
  • FEATURE_TIME_ID: 特徴値のタイムスタンプを含むソース列の ID。
  • FEATURE_ID: 値を読み込む既存の特徴の ID。
  • FEATURE_SOURCE_COLUMN_ID: エンティティの特徴値を含むソース列の ID。
  • SOURCE_DATA_DETAILS: ソースデータのロケーション。BigQuery テーブルや BigQuery ビューの "bigquerySource": { "inputUri": "bq://test.dataset.sourcetable" } などの形式も示されます。
  • WORKER_COUNT: featurestore へのデータの書き込みに使用するワーカー数。

HTTP メソッドと URL:

POST https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues

リクエストの本文(JSON):

{
  "entityIdField": "ENTITY_SOURCE_COLUMN_ID",
  "featureTimeField": "FEATURE_TIME_ID",
  SOURCE_DATA_DETAILS,
  "featureSpecs": [{
    "id": "FEATURE_ID",
    "sourceField": "FEATURE_SOURCE_COLUMN_ID"
  }],
  "workerCount": WORKER_COUNT
}

リクエストを送信するには、次のいずれかのオプションを選択します。

curl

リクエスト本文を request.json という名前のファイルに保存して、次のコマンドを実行します。

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues"

PowerShell

リクエスト本文を request.json という名前のファイルに保存して、次のコマンドを実行します。

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues" | Select-Object -Expand Content

出力は次のようになります。レスポンスの OPERATION_ID を使用して、オペレーションのステータスを取得できます。

{
  "name": "projects/PROJECT_NUMBER/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata",
    "genericMetadata": {
      "createTime": "2021-03-02T00:04:13.039166Z",
      "updateTime": "2021-03-02T00:04:13.039166Z"
    }
  }
}

Python

Python をインストールまたは更新する方法については、Vertex AI SDK for Python をインストールするをご覧ください。詳細については、Python API リファレンス ドキュメントをご覧ください。

import datetime
from typing import List, Union

from google.cloud import aiplatform

def import_feature_values_sample(
    project: str,
    location: str,
    entity_type_id: str,
    featurestore_id: str,
    feature_ids: List[str],
    feature_time: Union[str, datetime.datetime],
    gcs_source_uris: Union[str, List[str]],
    gcs_source_type: str,
):

    aiplatform.init(project=project, location=location)

    my_entity_type = aiplatform.featurestore.EntityType(
        entity_type_name=entity_type_id, featurestore_id=featurestore_id
    )

    my_entity_type.ingest_from_gcs(
        feature_ids=feature_ids,
        feature_time=feature_time,
        gcs_source_uris=gcs_source_uris,
        gcs_source_type=gcs_source_type,
    )

Python

Vertex AI SDK for Python をインストールすると、Vertex AI のクライアント ライブラリが組み込まれます。Vertex AI SDK for Python のインストール方法については、Vertex AI SDK for Python をインストールするをご覧ください。詳細については、Vertex AI SDK for Python API のリファレンス ドキュメントをご覧ください。

from google.cloud import aiplatform

def import_feature_values_sample(
    project: str,
    featurestore_id: str,
    entity_type_id: str,
    avro_gcs_uri: str,
    entity_id_field: str,
    feature_time_field: str,
    worker_count: int = 2,
    location: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    timeout: int = 300,
):
    # The AI Platform services require regional API endpoints, which need to be
    # in the same region or multi-region overlap with the Feature Store location.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.FeaturestoreServiceClient(client_options=client_options)
    entity_type = f"projects/{project}/locations/{location}/featurestores/{featurestore_id}/entityTypes/{entity_type_id}"
    avro_source = aiplatform.gapic.AvroSource(
        gcs_source=aiplatform.gapic.GcsSource(uris=[avro_gcs_uri])
    )
    feature_specs = [
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="age"),
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="gender"),
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="liked_genres"),
    ]
    import_feature_values_request = aiplatform.gapic.ImportFeatureValuesRequest(
        entity_type=entity_type,
        avro_source=avro_source,
        feature_specs=feature_specs,
        entity_id_field=entity_id_field,
        feature_time_field=feature_time_field,
        worker_count=worker_count,
    )
    lro_response = client.import_feature_values(request=import_feature_values_request)
    print("Long running operation:", lro_response.operation.name)
    import_feature_values_response = lro_response.result(timeout=timeout)
    print("import_feature_values_response:", import_feature_values_response)

Java

このサンプルを試す前に、Vertex AI クイックスタート: クライアント ライブラリの使用にある Java の設定手順を完了してください。詳細については、Vertex AI Java API のリファレンス ドキュメントをご覧ください。

Vertex AI に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。


import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.aiplatform.v1.AvroSource;
import com.google.cloud.aiplatform.v1.EntityTypeName;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceClient;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceSettings;
import com.google.cloud.aiplatform.v1.GcsSource;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest.FeatureSpec;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ImportFeatureValuesSample {

  public static void main(String[] args)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    // TODO(developer): Replace these variables before running the sample.
    String project = "YOUR_PROJECT_ID";
    String featurestoreId = "YOUR_FEATURESTORE_ID";
    String entityTypeId = "YOUR_ENTITY_TYPE_ID";
    String entityIdField = "YOUR_ENTITY_FIELD_ID";
    String featureTimeField = "YOUR_FEATURE_TIME_FIELD";
    String gcsSourceUri = "YOUR_GCS_SOURCE_URI";
    int workerCount = 2;
    String location = "us-central1";
    String endpoint = "us-central1-aiplatform.googleapis.com:443";
    int timeout = 300;
    importFeatureValuesSample(
        project,
        featurestoreId,
        entityTypeId,
        gcsSourceUri,
        entityIdField,
        featureTimeField,
        workerCount,
        location,
        endpoint,
        timeout);
  }

  static void importFeatureValuesSample(
      String project,
      String featurestoreId,
      String entityTypeId,
      String gcsSourceUri,
      String entityIdField,
      String featureTimeField,
      int workerCount,
      String location,
      String endpoint,
      int timeout)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    FeaturestoreServiceSettings featurestoreServiceSettings =
        FeaturestoreServiceSettings.newBuilder().setEndpoint(endpoint).build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (FeaturestoreServiceClient featurestoreServiceClient =
        FeaturestoreServiceClient.create(featurestoreServiceSettings)) {
      List<FeatureSpec> featureSpecs = new ArrayList<>();

      featureSpecs.add(FeatureSpec.newBuilder().setId("title").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("genres").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("average_rating").build());
      ImportFeatureValuesRequest importFeatureValuesRequest =
          ImportFeatureValuesRequest.newBuilder()
              .setEntityType(
                  EntityTypeName.of(project, location, featurestoreId, entityTypeId).toString())
              .setEntityIdField(entityIdField)
              .setFeatureTimeField(featureTimeField)
              .addAllFeatureSpecs(featureSpecs)
              .setWorkerCount(workerCount)
              .setAvroSource(
                  AvroSource.newBuilder()
                      .setGcsSource(GcsSource.newBuilder().addUris(gcsSourceUri)))
              .build();
      OperationFuture<ImportFeatureValuesResponse, ImportFeatureValuesOperationMetadata>
          importFeatureValuesFuture =
              featurestoreServiceClient.importFeatureValuesAsync(importFeatureValuesRequest);
      System.out.format(
          "Operation name: %s%n", importFeatureValuesFuture.getInitialFuture().get().getName());
      System.out.println("Waiting for operation to finish...");
      ImportFeatureValuesResponse importFeatureValuesResponse =
          importFeatureValuesFuture.get(timeout, TimeUnit.SECONDS);
      System.out.println("Import Feature Values Response");
      System.out.println(importFeatureValuesResponse);
      featurestoreServiceClient.close();
    }
  }
}

Node.js

このサンプルを試す前に、Vertex AI クライアント ライブラリをインストールするにある Node.js の設定手順を完了してください。詳細については、Vertex AI Node.js API のリファレンス ドキュメントをご覧ください。

Vertex AI に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.\
 * (Not necessary if passing values as arguments)
 */

// const project = 'YOUR_PROJECT_ID';
// const featurestoreId = 'YOUR_FEATURESTORE_ID';
// const entityTypeId = 'YOUR_ENTITY_TYPE_ID';
// const avroGcsUri = 'AVRO_FILE_IN_THE_GCS_URI';
// const entityIdField = 'ENTITY_ID_FIELD_IN_AVRO';
// const featureTimeField = 'TIMESTAMP_FIELD_IN_AVRO';
// const workerCount = <NO_OF_WORKERS_FOR_INGESTION_JOB>;
// const location = 'YOUR_PROJECT_LOCATION';
// const apiEndpoint = 'YOUR_API_ENDPOINT';
// const timeout = <TIMEOUT_IN_MILLI_SECONDS>;

// Imports the Google Cloud Featurestore Service Client library
const {FeaturestoreServiceClient} = require('@google-cloud/aiplatform').v1;

// Specifies the location of the api endpoint
const clientOptions = {
  apiEndpoint: apiEndpoint,
};

// Instantiates a client
const featurestoreServiceClient = new FeaturestoreServiceClient(
  clientOptions
);

async function importFeatureValues() {
  // Configure the entityType resource
  const entityType = `projects/${project}/locations/${location}/featurestores/${featurestoreId}/entityTypes/${entityTypeId}`;

  const avroSource = {
    gcsSource: {
      uris: [avroGcsUri],
    },
  };

  const featureSpecs = [{id: 'age'}, {id: 'gender'}, {id: 'liked_genres'}];

  const request = {
    entityType: entityType,
    avroSource: avroSource,
    entityIdField: entityIdField,
    featureSpecs: featureSpecs,
    featureTimeField: featureTimeField,
    workerCount: Number(workerCount),
  };

  // Import Feature Values Request
  const [operation] = await featurestoreServiceClient.importFeatureValues(
    request,
    {timeout: Number(timeout)}
  );
  const [response] = await operation.promise();

  console.log('Import feature values response');
  console.log('Raw response:');
  console.log(JSON.stringify(response, null, 2));
}
importFeatureValues();

インポート ジョブを表示する

Google Cloud コンソールを使用して、Google Cloud プロジェクトの一括インポート ジョブを表示します。

ウェブ UI

  1. Google Cloud Console の [Vertex AI] セクションで、[特徴] ページに移動します。

    [特徴] ページに移動

  2. [リージョン] プルダウン リストからリージョンを選択します。
  3. アクションバーで [取り込みジョブを表示] をクリックして、すべての featurestore のインポート ジョブを一覧表示します。
  4. インポート ジョブの ID をクリックすると、データソース、インポート エンティティの数、インポートされた特徴値の数などの詳細が表示されます。

featurestore 内の既存のデータを上書きする

既存の特徴値のタイムスタンプが同じ場合は、値を再度インポートして既存の特徴値を上書きできます。既存の特徴値を先に削除する必要はありません。たとえば、基になるソースデータが最近変更された場合、featurestore とそのデータとの整合性を保つため、特徴値を再度インポートします。タイムスタンプが一致しない場合、インポートされた値は一意とみなされ、古い値が残ります(上書きされません)。

オンライン サービング リクエストとバッチ サービング リクエストの整合性を保つため、インポート ジョブが完了するまでサービス リクエストを行わないでください。

過去のデータのバックフィル

データをバックフィルして過去の特徴値をインポートする場合は、取り込みジョブのオンライン サービングを無効にします。オンライン サービングでは最新の特徴値のみが提供されます。バックフィルは含まれません。オンライン サービングを無効にすると、オンライン サービング ノードの負荷が軽減され、インポート ジョブのスループットが向上し、ジョブの完了時間を短縮できます。

API またはクライアント ライブラリを使用している場合は、インポート ジョブのオンライン サービングを無効にできます。詳細については、importFeatureValue メソッドdisableOnlineServing フィールドをご覧ください。

次のステップ