Streaming ingestion

Stay organized with collections Save and categorize content based on your preferences.

Streaming ingestion lets you make real-time updates to feature values. This method is useful when having the latest available data for online serving is a priority. For example, you can ingest streaming event data and, within a few seconds, Vertex AI Feature Store makes that data available for online serving scenarios.

If you must backfill data or if you compute feature values in batch, use batch ingestion. Compared to streaming ingestion requests, batch ingestion requests can handle larger payloads but take longer to complete.

For information about the oldest feature value timestamp that you can ingest, see Vertex AI Feature Store in Quotas and limits.

Example use case

An online retail organization might provide a personalized shopping experience by using the current activity of a user. As users navigate through the website, the you can capture their activity into a featurestore and then, soon after, serve all that information for online predictions. This real-time ingestion and serving can help you show useful and relevant recommendations to customers during their shopping session.

Online storage node usage

Writing feature values to an online store uses the featurestore's CPU resources (online storage nodes). Monitor your CPU usage to check that demand doesn't exceed supply, which can lead to serving errors. We recommend around a 70% usage rate or lower to avoid these errors. If you regularly exceed that value, you can update your featurestore to increase the number of nodes or use autoscaling. For more information, see Manage featurestores.

Streaming ingestion

Write a value to a particular feature. The feature value must be included as part of the ingestion request. You can't stream data directly from a data source. You can simultaneously write up to 10 different entity types per project per region. This limit includes all streaming ingestion requests to all featurestores in a given project and region. If you surpass this limit, Vertex AI Feature Store might not write all of your data to the offline store. If this issue happens, Vertex AI Feature Store doesn't surface any errors.

If you're writing to recently created features, wait a few minutes before you do so because the new features might not have propagated yet. If you don't, you might see a resource not found error.

You can ingest feature values for only one entity per write.

REST

To ingest feature values for existing features, send a POST request by using the featurestores.entityTypes.writeFeatureValues method. If the names of the source data columns and the destination feature IDs are different, include the sourceField parameter. Note that featurestores.entityTypes.writeFeatureValues lets you ingest feature values for only one entity at a time.

Before using any of the request data, make the following replacements:

  • LOCATION: Region where the featurestore is created. For example, us-central1.
  • PROJECT: Your project ID.
  • FEATURESTORE_ID: ID of the featurestore.
  • ENTITY_TYPE_ID: ID of the entity type.
  • FEATURE_ID: ID of an existing feature in the featurestore to write values for.
  • VALUE_TYPE: The value type of the feature.
  • VALUE: Value for the feature.
  • TIME_STAMP (optional): The time at which the feature was generated. The timestamp must be in the RFC3339 UTC format.

HTTP method and URL:

POST https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT/locations/LOCATION/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:writeFeatureValues

Request JSON body:

{
  "payloads": [
    {
      "entityId": "ENTITY_ID",
      "featureValues": {
        "FEATURE_ID": {
          "VALUE_TYPE": VALUE,
          "metadata": {"generate_time": "TIME_STAMP"}
        }
      }
    }
  ]
}

To send your request, choose one of these options:

curl

Save the request body in a file called request.json, and execute the following command:

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT/locations/LOCATION/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:writeFeatureValues"

PowerShell

Save the request body in a file called request.json, and execute the following command:

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT/locations/LOCATION/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:writeFeatureValues" | Select-Object -Expand Content

You should receive a successful status code (2xx) and an empty response.

Vertex AI SDK for Python

To learn how to install and use the Vertex AI SDK for Python, see Use the Vertex AI SDK for Python. For more information, see the Vertex AI SDK for Python API reference documentation.

from google.cloud import aiplatform


def write_feature_values_sample(
    project: str, location: str, entity_type_id: str, featurestore_id: str
):

    aiplatform.init(project=project, location=location)

    my_entity_type = aiplatform.featurestore.EntityType(
        entity_type_name=entity_type_id, featurestore_id=featurestore_id
    )

    my_data = {
        "movie_01": {
            "title": "The Shawshank Redemption",
            "average_rating": 4.7,
            "genre": "Drama",
        },
    }

    my_entity_type.write_feature_values(instances=my_data)

Additional languages

You can install and use the following Vertex AI client libraries to call the Vertex AI API. Cloud Client Libraries provide an optimized developer experience by using the natural conventions and styles of each supported language.

What's next