Batch ingest feature values

Batch ingestion lets you ingest feature values in bulk from a valid data source. For each request, you can import values for up to 100 features for a single entity type. Note that you can only have one batch ingestion job running per entity type to avoid any collisions.

In a batch ingestion request, specify the location of your source data and how it maps to features in your featurestore. Because each batch ingestion request is for a single entity type, your source data must also be for a single entity type.

After the import has successfully completed, feature values are available to subsequent read operations.

Ingestion job performance

Feature Store provides high throughput ingestion, but the minimum latency can be up to a few minutes. Vertex Feature Store is spinning up jobs under the hood, so an ingestion job takes a few minutes to complete even if you are ingesting a single record.

If you want to make adjustments to a job performance, the two variables that you can change are the number of featurestore online serving nodes and the number of workers used for the ingestion job. The workers process and write data into the featurestore. To start, the recommended number of workers is 1 worker for every 10 online serving nodes on the featurestore, though you can go higher if the online serving load is low. You can specify a maximum of 100 workers. For more guidance, see monitor and tune resources accordingly to optimize batch ingestion.

If the online serving cluster is under provisioned, it can sometimes cause the ingestion job to fail. Retry the import request when the online serving load is low or increase the fixedNodeCount value of your featurestore and retry the request.

Data consistency

Inconsistencies can be introduced if the source data was modified during an import. Check to be sure that any source data modification are complete before you start an ingestion job. Also, duplicate features values can result in different values being served between online and batch requests. Check that you have one feature value for each entity ID and timestamp pair.

If an import operation fails, the featurestore might only have partial data, which can lead to inconsistent values being returned between online and batch serving requests. To avoid this inconsistency, retry the same import request again and wait until the request successfully completes.

Null/NaN values and empty arrays

For scalar null values, Feature Store ignores them during ingestion, such as an empty value in a CSV column. Feature Store doesn't support non-scalar null values like a null value in an array. If your source data for a non-scalar feature has an empty array or a null value, these values are skipped. At serving time, Feature Store returns the previous non-null value or null if there is no previous value.

Feature Store supports and ingests NaN (Not a Number) values for both scalars and non-scalars. For example, you can write NaN to represent a NaN value in a CSV file. For both online and batch serving, Feature Store returns NaN for NaN values.

Batch ingestion

Import values in bulk into a featurestore for one or more features of a single entity type.

Web UI

  1. In the Vertex AI section of the Google Cloud Console, go to the Features page.

    Go to the Features page

  2. Select a region from the Region drop-down list.
  3. In the features table, view the Entity type column and find the entity type that contains the features that you want to ingest values for.
  4. Click the name of the entity type.
  5. From the action bar, click Ingest values.
  6. For Data source, select Cloud Storage for AVRO and CSV files or BigQuery table.
    • For Cloud Storage, specify the path and name of your file.
    • For BigQuery, browse and select a table to use, which takes the following form: PROJECT_ID.DATASET_ID.TABLE_ID.
  7. Click Continue.
  8. For Map column to features, specify which columns in your source data map to entities and features in your featurestore.
    1. Specify the column name in your source data that contains the entity IDs.
    2. For the timestamp, you can specify a timestamp column stored your source data or specify a single timestamp that will be associated with all feature values that you ingest.
    3. In the list of features, enter the source data column name that maps to each feature. By default, Feature Store assumes the feature name and column name match.
  9. Click Ingest.

REST & CMD LINE

To ingest feature values for existing features, send a POST request by using the featurestores.entityTypes.importFeatureValues method. Note that if the names of the source data columns and the destination feature IDs are different, include the sourceField parameter.

Before using any of the request data, make the following replacements:

  • LOCATION: Region where the featurestore is created. For example, us-central1.
  • PROJECT: Your project ID or project number.
  • FEATURESTORE_ID: ID of the featurestore.
  • ENTITY_TYPE_ID: ID of the entity type.
  • ENTITY_SOURCE_COLUMN_ID: ID of source column that contains entity IDs.
  • FEATURE_TIME_ID: ID of source column that contains the feature timestamps for the feature values.
  • FEATURE_ID: ID of an existing feature in the featurestore to import values for.
  • FEATURE_SOURCE_COLUMN_ID: ID of source column that contain feature values for the entities.
  • SOURCE_DATA_DETAILS: The source data format and location, such as "bigquerySource": { "inputUri": "bq://test.dataset.sourcetable" } for a BigQuery table.
  • WORKER_COUNT: The number of workers to use to write data to the featurestore.

HTTP method and URL:

POST https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT/locations/LOCATION/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues

Request JSON body:

{
  "entityIdField": "ENTITY_SOURCE_COLUMN_ID",
  "featureTimeField": "FEATURE_TIME_ID",
  SOURCE_DATA_DETAILS,
  "featureSpecs": [{
    "id": "FEATURE_ID",
    "sourceField": "FEATURE_SOURCE_COLUMN_ID"
  }],
  "workerCount": WORKER_COUNT
}

To send your request, choose one of these options:

curl

Save the request body in a file called request.json, and execute the following command:

curl -X POST \
-H "Authorization: Bearer "$(gcloud auth application-default print-access-token) \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT/locations/LOCATION/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues"

PowerShell

Save the request body in a file called request.json, and execute the following command:

$cred = gcloud auth application-default print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT/locations/LOCATION/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues" | Select-Object -Expand Content

You should see output similar to the following. You can use the OPERATION_ID in the response to get the status of the operation.

{
  "name": "projects/PROJECT_NUMBER/locations/LOCATION/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata",
    "genericMetadata": {
      "createTime": "2021-03-02T00:04:13.039166Z",
      "updateTime": "2021-03-02T00:04:13.039166Z"
    }
  }
}

Python

from google.cloud import aiplatform_v1beta1 as aiplatform


def import_feature_values_sample(
    project: str,
    featurestore_id: str,
    entity_type_id: str,
    avro_gcs_uri: str,
    entity_id_field: str,
    feature_time_field: str,
    worker_count: int = 2,
    location: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    timeout: int = 300,
):
    # The AI Platform services require regional API endpoints, which need to be
    # in the same region or multi-region overlap with the Feature Store location.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.FeaturestoreServiceClient(client_options=client_options)
    entity_type = f"projects/{project}/locations/{location}/featurestores/{featurestore_id}/entityTypes/{entity_type_id}"
    avro_source = aiplatform.AvroSource(
        gcs_source=aiplatform.GcsSource(uris=[avro_gcs_uri])
    )
    feature_specs = [
        aiplatform.ImportFeatureValuesRequest.FeatureSpec(id="age"),
        aiplatform.ImportFeatureValuesRequest.FeatureSpec(id="gender"),
        aiplatform.ImportFeatureValuesRequest.FeatureSpec(id="liked_genres"),
    ]
    import_feature_values_request = aiplatform.ImportFeatureValuesRequest(
        entity_type=entity_type,
        avro_source=avro_source,
        feature_specs=feature_specs,
        entity_id_field=entity_id_field,
        feature_time_field=feature_time_field,
        worker_count=worker_count,
    )
    lro_response = client.import_feature_values(request=import_feature_values_request)
    print("Long running operation:", lro_response.operation.name)
    import_feature_values_response = lro_response.result(timeout=timeout)
    print("import_feature_values_response:", import_feature_values_response)

Additional languages

You can install and use the following Vertex AI client libraries to call the Vertex AI API. Cloud Client Libraries provide an optimized developer experience by using each supported language's natural conventions and styles.

View ingestion jobs

Use the Cloud Console to view batch ingestion jobs in a Google Cloud project.

Web UI

  1. In the Vertex AI section of the Google Cloud Console, go to the Features page.

    Go to the Features page

  2. Select a region from the Region drop-down list.
  3. From the action bar, click View ingestion jobs to list ingestion jobs for all featurestores.
  4. Click the ID of an ingestion job to view its details such as its data source, number of ingested entities, and number of ingested feature values.

Overwrite existing data in a featurestore

You can re-import values to overwrite existing feature values if they both have the same timestamps. You don't need to delete existing feature values first. For example, you might rely on an underlying source data that was recently changed. To keep your featurestore consistent with that underlying data, import your feature values again. If you have mismatched timestamps, the imported values are considered unique and the old values continue to exist (they aren't overwritten).

To ensure consistency between online and batch serving requests, wait until the ingestion job is complete before making any serving requests.

Backfill historical data

If you're backfilling data, where you're ingesting past feature values, disable online serving for your ingestion job. Online serving is for serving the latest feature values only, which backfilling doesn't include. Disabling online serving is useful because you eliminate any load on your online serving nodes and increase throughput for your ingestion job, which can decrease its completion time.

You can disable online serving for ingestion jobs when you use the API or client libraries. For more information, see the disableOnlineServing field for the importFeatureValue method.

What's next