Enrich streaming data

Apache Beam simplifies the data enrichment workflow by providing a turnkey enrichment transform that you can add to your pipeline. This page explains how to use the Apache Beam enrichment transform to enrich your streaming data.

When you enrich data, you augment the raw data from one source by adding related data from a second source. The additional data can come from a variety of sources, such as Bigtable or BigQuery. The Apache Beam enrichment transform uses a key-value lookup to connect the additional data to the raw data.

The following examples provide some cases where data enrichment is useful:

  • You want to create an ecommerce pipeline that captures user activities from a website or app and provides customized recommendations. The transform incorporates the activities into your pipeline data so that you can provide the customized recommendations.
  • You have user data that you want to join with geographical data to do geography-based analytics.
  • You want to create a pipeline that gathers data from internet-of-things (IOT) devices that send out telemetry events.

Benefits

The enrichment transform has the following benefits:

  • Transforms your data without requiring you to write complex code or manage underlying libraries.
  • Provides built-in source handlers.
  • Uses client-side throttling to manage rate limiting the requests. The requests are exponentially backed off with a default retry strategy. You can configure rate limiting to suit your use case.

Support and limitations

The enrichment transform has the following requirements:

  • Available for batch and streaming pipelines.
  • The BigTableEnrichmentHandler handler is available in the Apache Beam Python SDK versions 2.54.0 and later.
  • The VertexAIFeatureStoreEnrichmentHandler handler is available in the Apache Beam Python SDK versions 2.55.0 and later.
  • When using the Apache Beam Python SDK versions 2.55.0 and later, you also need to install the Python client for Redis.
  • Dataflow jobs must use Runner v2.

Use the enrichment transform

To use the enrichment transform, include the following code in your pipeline:

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

Because the enrichment transform performs a cross join by default, design the custom join to enrich the input data. This design ensures that the join includes only the specified fields.

In the following example, left is the input element of the enrichment transform, and right is data fetched from an external service for that input element.

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

Parameters

To use the enrichment transform, the EnrichmentHandler parameter is required.

You can also use a configuration parameter to specify a lambda function for a join function, a timeout, a throttler, or a repeater (retry strategy). The following configuration parameters are available:

  • join_fn: A lambda function that takes dictionaries as input and returns an enriched row (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). The enriched row specifies how to join the data fetched from the API. Defaults to a cross join.
  • timeout: The number of seconds to wait for the request to be completed by the API before timing out. Defaults to 30 seconds.
  • throttler: Specifies the throttling mechanism. The only supported option is default client-side adaptive throttling.
  • repeater: Specifies the retry strategy when errors like TooManyRequests and TimeoutException occur. Defaults to ExponentialBackOffRepeater.

What's next