Batch ingest feature values

Stay organized with collections Save and categorize content based on your preferences.

Batch ingestion lets you import feature values in bulk from a valid data source. In a batch ingestion request, you can import values for up to 100 features for an entity type. Note that you can have only one batch ingestion job running per entity type to avoid collisions.

In a batch ingestion request, specify the location of your source data and how it maps to features in your featurestore. Because each batch ingestion request is for a single entity type, your source data must also be for a single entity type.

After the import has successfully completed, feature values are available to subsequent read operations.

Ingestion job performance

Vertex AI Feature Store provides high throughput ingestion, but the minimum latency can take a few minutes. Each request to Vertex AI Feature Store starts a job to complete the work. An ingestion job takes a few minutes to complete even if you are ingesting a single record.

If you want to make adjustments to how a job performs, change the following two variables:

  • The number of featurestore online serving nodes.
  • The number of workers used for the ingestion job. Workers process and write data into the featurestore.

The recommended number of workers is one worker for every 10 online serving nodes on the featurestore. You can go higher if the online serving load is low. You can specify a maximum of 100 workers. For more guidance, see monitor and tune resources accordingly to optimize batch ingestion.

If the online serving cluster is under-provisioned, the ingestion job might fail. In the event of a failure, retry the import request when the online serving load is low, or increase the node count of your featurestore and then retry the request.

If the featurestore doesn't have an online store (zero online serving nodes), the ingestion job writes only to the offline store, and the performance of the job depends solely on the number of ingestion workers.

Data consistency

Inconsistencies can be introduced if the source data is modified during import. Ensure that any source data modifications are complete before you start an ingestion job. Also, duplicate feature values can result in different values being served between online and batch requests. Ensure that you have one feature value for each entity ID and timestamp pair.

If an import operation fails, the featurestore might only have partial data, which can lead to inconsistent values being returned between online and batch serving requests. To avoid this inconsistency, retry the same import request again and wait until the request successfully completes.

Null values and empty arrays

During ingestion, Vertex AI Feature Store considers null scalar values or empty arrays as empty values. These include empty values in a CSV column. Vertex AI Feature Store doesn't support non-scalar null values, such as a null value in an array.

During online serving and batch serving, Vertex AI Feature Store returns the latest non-null or non-empty value of the feature. If a historical value of the feature isn't available, then Vertex AI Feature Store returns null.

NaN values

Vertex AI Feature Store supports NaN (Not a Number) values in Double and DoubleArray. During ingestion, you can enter NaN in the serving input CSV file to represent a NaN value. During online serving and batch serving, Vertex AI Feature Store returns NaN for NaN values.

Batch ingestion

Import values in bulk into a featurestore for one or more features of a single entity type.

Web UI

  1. In the Vertex AI section of the Google Cloud console, go to the Features page.

    Go to the Features page

  2. Select a region from the Region drop-down list.
  3. In the features table, view the Entity type column and find the entity type that contains the features that you want to ingest values for.
  4. Click the name of the entity type.
  5. From the action bar, click Ingest values.
  6. For Data source, select Cloud Storage for AVRO and CSV files or BigQuery table.
    • For Cloud Storage, specify the path and name of your file.
    • For BigQuery, browse and select a table to use, which takes the following form: PROJECT_ID.DATASET_ID.TABLE_ID.
  7. Click Continue.
  8. For Map column to features, specify which columns in your source data map to entities and features in your featurestore.
    1. Specify the column name in your source data that contains the entity IDs.
    2. For the timestamp, specify a timestamp column in your source data or specify a single timestamp associated with all feature values that you ingest.
    3. In the list of features, enter the source data column name that maps to each feature. By default, Vertex AI Feature Store assumes that the feature name and column name match.
  9. Click Ingest.

REST

To ingest feature values for existing features, send a POST request by using the featurestores.entityTypes.importFeatureValues method. Note that if the names of the source data columns and the destination feature IDs are different, include the sourceField parameter.

Before using any of the request data, make the following replacements:

  • LOCATION_ID: Region where the featurestore is created. For example, us-central1.
  • PROJECT_ID: Your project ID.
  • FEATURESTORE_ID: ID of the featurestore.
  • ENTITY_TYPE_ID: ID of the entity type.
  • ENTITY_SOURCE_COLUMN_ID: ID of source column that contains entity IDs.
  • FEATURE_TIME_ID: ID of source column that contains the feature timestamps for the feature values.
  • FEATURE_ID: ID of an existing feature in the featurestore to import values for.
  • FEATURE_SOURCE_COLUMN_ID: ID of source column that contain feature values for the entities.
  • SOURCE_DATA_DETAILS: The source data format and location, such as "bigquerySource": { "inputUri": "bq://test.dataset.sourcetable" } for a BigQuery table.
  • WORKER_COUNT: The number of workers to use to write data to the featurestore.

HTTP method and URL:

POST https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues

Request JSON body:

{
  "entityIdField": "ENTITY_SOURCE_COLUMN_ID",
  "featureTimeField": "FEATURE_TIME_ID",
  SOURCE_DATA_DETAILS,
  "featureSpecs": [{
    "id": "FEATURE_ID",
    "sourceField": "FEATURE_SOURCE_COLUMN_ID"
  }],
  "workerCount": WORKER_COUNT
}

To send your request, choose one of these options:

curl

Save the request body in a file called request.json, and execute the following command:

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues"

PowerShell

Save the request body in a file called request.json, and execute the following command:

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues" | Select-Object -Expand Content

You should see output similar to the following. You can use the OPERATION_ID in the response to get the status of the operation.

{
  "name": "projects/PROJECT_NUMBER/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata",
    "genericMetadata": {
      "createTime": "2021-03-02T00:04:13.039166Z",
      "updateTime": "2021-03-02T00:04:13.039166Z"
    }
  }
}

Vertex AI SDK for Python

To learn how to install and use the Vertex AI SDK for Python, see Use the Vertex AI SDK for Python. For more information, see the Vertex AI SDK for Python API reference documentation.

import datetime
from typing import List, Union

from google.cloud import aiplatform


def import_feature_values_sample(
    project: str,
    location: str,
    entity_type_id: str,
    featurestore_id: str,
    feature_ids: List[str],
    feature_time: Union[str, datetime.datetime],
    gcs_source_uris: Union[str, List[str]],
    gcs_source_type: str,
):

    aiplatform.init(project=project, location=location)

    my_entity_type = aiplatform.featurestore.EntityType(
        entity_type_name=entity_type_id, featurestore_id=featurestore_id
    )

    my_entity_type.ingest_from_gcs(
        feature_ids=feature_ids,
        feature_time=feature_time,
        gcs_source_uris=gcs_source_uris,
        gcs_source_type=gcs_source_type,
    )

Python

The client library for Vertex AI is included when you install the Vertex AI SDK for Python. To learn how to install and use the Vertex AI SDK for Python, see Use the Vertex AI SDK for Python. For more information, see the Vertex AI SDK for Python API reference documentation.

from google.cloud import aiplatform


def import_feature_values_sample(
    project: str,
    featurestore_id: str,
    entity_type_id: str,
    avro_gcs_uri: str,
    entity_id_field: str,
    feature_time_field: str,
    worker_count: int = 2,
    location: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    timeout: int = 300,
):
    # The AI Platform services require regional API endpoints, which need to be
    # in the same region or multi-region overlap with the Feature Store location.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.FeaturestoreServiceClient(client_options=client_options)
    entity_type = f"projects/{project}/locations/{location}/featurestores/{featurestore_id}/entityTypes/{entity_type_id}"
    avro_source = aiplatform.gapic.AvroSource(
        gcs_source=aiplatform.gapic.GcsSource(uris=[avro_gcs_uri])
    )
    feature_specs = [
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="age"),
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="gender"),
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="liked_genres"),
    ]
    import_feature_values_request = aiplatform.gapic.ImportFeatureValuesRequest(
        entity_type=entity_type,
        avro_source=avro_source,
        feature_specs=feature_specs,
        entity_id_field=entity_id_field,
        feature_time_field=feature_time_field,
        worker_count=worker_count,
    )
    lro_response = client.import_feature_values(request=import_feature_values_request)
    print("Long running operation:", lro_response.operation.name)
    import_feature_values_response = lro_response.result(timeout=timeout)
    print("import_feature_values_response:", import_feature_values_response)

Java

To learn how to install and use the client library for Vertex AI, see Vertex AI client libraries. For more information, see the Vertex AI Java API reference documentation.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.aiplatform.v1.AvroSource;
import com.google.cloud.aiplatform.v1.EntityTypeName;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceClient;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceSettings;
import com.google.cloud.aiplatform.v1.GcsSource;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest.FeatureSpec;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ImportFeatureValuesSample {

  public static void main(String[] args)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    // TODO(developer): Replace these variables before running the sample.
    String project = "YOUR_PROJECT_ID";
    String featurestoreId = "YOUR_FEATURESTORE_ID";
    String entityTypeId = "YOUR_ENTITY_TYPE_ID";
    String entityIdField = "YOUR_ENTITY_FIELD_ID";
    String featureTimeField = "YOUR_FEATURE_TIME_FIELD";
    String gcsSourceUri = "YOUR_GCS_SOURCE_URI";
    int workerCount = 2;
    String location = "us-central1";
    String endpoint = "us-central1-aiplatform.googleapis.com:443";
    int timeout = 300;
    importFeatureValuesSample(
        project,
        featurestoreId,
        entityTypeId,
        gcsSourceUri,
        entityIdField,
        featureTimeField,
        workerCount,
        location,
        endpoint,
        timeout);
  }

  static void importFeatureValuesSample(
      String project,
      String featurestoreId,
      String entityTypeId,
      String gcsSourceUri,
      String entityIdField,
      String featureTimeField,
      int workerCount,
      String location,
      String endpoint,
      int timeout)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    FeaturestoreServiceSettings featurestoreServiceSettings =
        FeaturestoreServiceSettings.newBuilder().setEndpoint(endpoint).build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (FeaturestoreServiceClient featurestoreServiceClient =
        FeaturestoreServiceClient.create(featurestoreServiceSettings)) {
      List<FeatureSpec> featureSpecs = new ArrayList<>();

      featureSpecs.add(FeatureSpec.newBuilder().setId("title").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("genres").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("average_rating").build());
      ImportFeatureValuesRequest importFeatureValuesRequest =
          ImportFeatureValuesRequest.newBuilder()
              .setEntityType(
                  EntityTypeName.of(project, location, featurestoreId, entityTypeId).toString())
              .setEntityIdField(entityIdField)
              .setFeatureTimeField(featureTimeField)
              .addAllFeatureSpecs(featureSpecs)
              .setWorkerCount(workerCount)
              .setAvroSource(
                  AvroSource.newBuilder()
                      .setGcsSource(GcsSource.newBuilder().addUris(gcsSourceUri)))
              .build();
      OperationFuture<ImportFeatureValuesResponse, ImportFeatureValuesOperationMetadata>
          importFeatureValuesFuture =
              featurestoreServiceClient.importFeatureValuesAsync(importFeatureValuesRequest);
      System.out.format(
          "Operation name: %s%n", importFeatureValuesFuture.getInitialFuture().get().getName());
      System.out.println("Waiting for operation to finish...");
      ImportFeatureValuesResponse importFeatureValuesResponse =
          importFeatureValuesFuture.get(timeout, TimeUnit.SECONDS);
      System.out.println("Import Feature Values Response");
      System.out.println(importFeatureValuesResponse);
      featurestoreServiceClient.close();
    }
  }
}

Node.js

To learn how to install and use the client library for Vertex AI, see Vertex AI client libraries. For more information, see the Vertex AI Node.js API reference documentation.

/**
 * TODO(developer): Uncomment these variables before running the sample.\
 * (Not necessary if passing values as arguments)
 */

// const project = 'YOUR_PROJECT_ID';
// const featurestoreId = 'YOUR_FEATURESTORE_ID';
// const entityTypeId = 'YOUR_ENTITY_TYPE_ID';
// const avroGcsUri = 'AVRO_FILE_IN_THE_GCS_URI';
// const entityIdField = 'ENTITY_ID_FIELD_IN_AVRO';
// const featureTimeField = 'TIMESTAMP_FIELD_IN_AVRO';
// const workerCount = <NO_OF_WORKERS_FOR_INGESTION_JOB>;
// const location = 'YOUR_PROJECT_LOCATION';
// const apiEndpoint = 'YOUR_API_ENDPOINT';
// const timeout = <TIMEOUT_IN_MILLI_SECONDS>;

// Imports the Google Cloud Featurestore Service Client library
const {FeaturestoreServiceClient} = require('@google-cloud/aiplatform').v1;

// Specifies the location of the api endpoint
const clientOptions = {
  apiEndpoint: apiEndpoint,
};

// Instantiates a client
const featurestoreServiceClient = new FeaturestoreServiceClient(
  clientOptions
);

async function importFeatureValues() {
  // Configure the entityType resource
  const entityType = `projects/${project}/locations/${location}/featurestores/${featurestoreId}/entityTypes/${entityTypeId}`;

  const avroSource = {
    gcsSource: {
      uris: [avroGcsUri],
    },
  };

  const featureSpecs = [{id: 'age'}, {id: 'gender'}, {id: 'liked_genres'}];

  const request = {
    entityType: entityType,
    avroSource: avroSource,
    entityIdField: entityIdField,
    featureSpecs: featureSpecs,
    featureTimeField: featureTimeField,
    workerCount: Number(workerCount),
  };

  // Import Feature Values Request
  const [operation] = await featurestoreServiceClient.importFeatureValues(
    request,
    {timeout: Number(timeout)}
  );
  const [response] = await operation.promise();

  console.log('Import feature values response');
  console.log('Raw response:');
  console.log(JSON.stringify(response, null, 2));
}
importFeatureValues();

View ingestion jobs

Use the Google Cloud console to view batch ingestion jobs in a Google Cloud project.

Web UI

  1. In the Vertex AI section of the Google Cloud console, go to the Features page.

    Go to the Features page

  2. Select a region from the Region drop-down list.
  3. From the action bar, click View ingestion jobs to list ingestion jobs for all featurestores.
  4. Click the ID of an ingestion job to view its details such as its data source, number of ingested entities, and number of ingested feature values.

Overwrite existing data in a featurestore

You can re-import values to overwrite existing feature values if they both have the same timestamps. You don't need to delete existing feature values first. For example, you might rely on an underlying source data that was recently changed. To keep your featurestore consistent with that underlying data, import your feature values again. If you have mismatched timestamps, the imported values are considered unique and the old values continue to exist (they aren't overwritten).

To ensure consistency between online and batch serving requests, wait until the ingestion job is complete before making any serving requests.

Backfill historical data

If you're backfilling data, where you're ingesting past feature values, disable online serving for your ingestion job. Online serving is for serving the latest feature values only, which backfilling doesn't include. Disabling online serving is useful because you eliminate any load on your online serving nodes and increase throughput for your ingestion job, which can decrease its completion time.

You can disable online serving for ingestion jobs when you use the API or client libraries. For more information, see the disableOnlineServing field for the importFeatureValue method.

What's next