Jump to Content
AI & Machine Learning

Enabling real-time AI with Streaming Ingestion in Vertex AI

September 20, 2022
Erwin Huizenga

Machine Learning Engineer, Google

Kaz Sato

Developer Advocate, Cloud AI

Many machine learning (ML) use cases, like fraud detection, ad targeting, and recommendation engines, require near real-time predictions. The performance of these predictions is heavily dependent on access to the most up-to-date data, with delays of even a few seconds making all the difference. But it’s difficult to set up the infrastructure needed to support high-throughput updates and low-latency retrieval of data. 

Starting this month, Vertex AI Matching Engine and Feature Store will support real-time Streaming Ingestion as Preview features. With Streaming Ingestion for Matching Engine, a fully managed vector database for vector similarity search, items in an index are updated continuously and reflected in similarity search results immediately. With Streaming Ingestion for Feature Store, you can retrieve the latest feature values with low latency for highly accurate predictions, and extract real-time datasets for training. 

For example, Digits is taking advantage of Vertex AI Matching Engine Streaming Ingestion to help power their product, Boost, a tool that saves accountants time by automating manual quality control work.“Vertex AI Matching Engine Streaming Ingestion has been key to Digits Boost being able to deliver features and analysis in real-time. Before Matching Engine, transactions were classified on a 24 hour batch schedule, but now with Matching Engine Streaming Ingestion, we can perform near real time incremental indexing - activities like inserting, updating or deleting embeddings on an existing index, which helped us speed up the process. Now feedback to customers is immediate, and we can handle more transactions, more quickly,” said Hannes Hapke, Machine Learning Engineer at Digits.

This blog post covers how these new features can improve predictions and enable near real-time use cases, such as recommendations, content personalization, and cybersecurity monitoring.

https://storage.googleapis.com/gweb-cloudblog-publish/original_images/Streaming_Ingestion_animation.gif
Streaming Ingestion enables you to serve valuable data to millions of users in real time.

Streaming Ingestion enables real-time AI

As organizations recognize the potential business impact of better predictions based on up-to-date data, more real-time AI use cases are being implemented. Here are some examples:

  • Real-time recommendations and a real-time marketplace: By adding Streaming Ingestion to their existing Matching Engine-based product recommendations, Mercari is creating a real-time marketplace where users can browse products based on their specific interests, and where results are updated instantly when sellers add new products. Once it’s fully implemented, the experience will be like visiting an early-morning farmer's market, with fresh food being brought in as you shop. By combining Streaming Ingestion with Matching Engine’s filtering capability, Mercari can specify whether or not an item should be included in the search results, based on tags such as "online/offline" or "instock/nostock."

https://storage.googleapis.com/gweb-cloudblog-publish/original_images/Mercari_Shops.gif
Mercari Shops: Streaming Ingestion enables real-time shopping experimen
  • Large-scale personalized content streaming: For any stream of content representable with feature vectors (including text, images, or documents), you can design pub-sub channels to pick up valuable content for each subscriber's specific interests. Because Matching Engine is scalable (i.e., it can process millions of queries each second), you can support millions of online subscribers for content streaming, serving a wide variety of topics that are changing dynamically. With Matching Engine’s filtering capability, you also have real-time control over what content should be included, by assigning tags such as "explicit" or "spam" to each object. You can use Feature Store as a central repository for storing and serving the feature vectors of the contents in near real time.

  • Monitoring: Content streaming can also be used for monitoring events or signals from IT infrastructure, IoT devices, manufacturing production lines, and security systems, among other commercial use cases. For example, you can extract signals from millions of sensors and devices and represent them as feature vectors. Matching Engine can be used to continuously update a list of "the top 100 devices with possible defective signals," or "top 100 sensor events with outliers," all in near real time.

  • Threat/spam detection: If you are monitoring signals from security threat signatures or spam activity patterns, you can use Matching Engine to instantly identify possible attacks from millions of monitoring points. In contrast, security threat identification based on batch processing often involves potentially significant lag, leaving the company vulnerable. With real-time data, your models are better able to catch threats or spams as they happen in your enterprise network, web services, online games, etc.

Implementing streaming use cases

Let’s take a closer look at how you can implement some of these use cases. 

Real-time recommendations for retail

Mercari built a feature extraction pipeline with Streaming Ingestion.

https://storage.googleapis.com/gweb-cloudblog-publish/images/3_Mercaris_real-time_feature_extraction_pi.max-2000x2000.jpg
Mercari’s real-time feature extraction pipeline

The feature extraction pipeline is defined with Vertex AI Pipelines, and is periodically invoked by Cloud Scheduler and Cloud Functions to initiate the following process:

  1. Get item data: The pipeline issues a query to fetch the updated item data from BigQuery.

  2. Extract feature vector: The pipeline runs predictions on the data with the word2vec model to extract feature vectors.

  3. Update index: The pipeline calls Matching Engine APIs to add the feature vectors to the vector index. The vectors are also saved to Cloud Bigtable (and can be replaced with Feature Store in the future).

"We have been evaluating the Matching Engine Streaming Ingestion and couldn't believe the super short latency of the index update for the first time. We would like to introduce the functionality to our production service as soon as it becomes GA, " said Nogami Wakana, Software Engineer at Souzoh (a Mercari group company).

This architecture design can be also applied to any retail businesses that need real-time updates for product recommendations.

Ad targeting

Ad recommender systems benefit significantly from real-time features and item matching with the most up-to-date information. Let's see how Vertex AI can help build a real-time ad targeting system.

https://storage.googleapis.com/gweb-cloudblog-publish/images/4_Real-time_ad_recommendation_system.max-2000x2000.jpg
Real-time ad recommendation system

The first step is generating a set of candidates from the ad corpus. This is challenging because you must generate relevant candidates in milliseconds and ensure they are up to date. Here you can use Vertex AI Matching Engine to perform low-latency vector similarity matching, generate suitable candidates, and use Streaming Ingestion to ensure that your index is up-to-date with the latest ads. 

Next is reranking the candidate selection using a machine learning model to ensure that you have a relevant order of ad candidates. For the model to use the latest data, you can use Feature Store Streaming Ingestion to import the latest features and use online serving to serve feature values at low latency to improve accuracy.  

After reranking the ads candidates, you can apply final optimizations, such as applying the latest business logic. You can implement the optimization step using a Cloud Function or Cloud Run

What’s Next?

Interested? The documents for Streaming Ingestion are available and you can try it out now. Using the new feature is easy: For example, when you create an index on Matching Engine with the REST API, you can specify the indexUpdateMethod attribute as STREAM_UPDATE.

Loading...

After deploying the index, you can update or rebuild the index (feature vectors) with the following format. If the data point ID exists in the index, the data point is updated, otherwise, a new data point is inserted.

Loading...

It can handle the data point insertion/update at high throughput with low latency. The new data point values will be applied in any new queries within a few seconds or milliseconds (the latency varies depending on the various conditions). 

The Streaming Ingestion is a powerful functionality and very easy to use. No need to build and operate your own streaming data pipeline for real-time indexing and storage. Yet, it adds significant value to your business with its real-time responsiveness.

To learn more, take a look at the following blog posts for learning Matching Engine and Feature Store concepts and use cases:

Posted in