Organiza tus páginas con colecciones
Guarda y categoriza el contenido según tus preferencias.
Apache Beam simplifica el flujo de trabajo de enriquecimiento de datos, ya que proporciona una transformación de enriquecimiento lista para usar que puedes agregar a tu canalización. En esta página, se explica cómo usar la transformación de enriquecimiento de Apache Beam para enriquecer los datos de transmisión.
Cuando enriqueces los datos, aumentas los datos sin procesar de una fuente agregando datos relacionados de una segunda fuente. Los datos adicionales pueden provenir de una variedad de fuentes, como Bigtable o BigQuery. La transformación de enriquecimiento de Apache Beam usa una búsqueda de par clave-valor para conectar los datos adicionales con los datos sin procesar.
En los siguientes ejemplos, se proporcionan algunos casos en los que el enriquecimiento de datos es útil:
Cuando quieres crear una canalización de comercio electrónico que capture las actividades de los usuarios de un sitio web o una app y proporcione recomendaciones personalizadas. La transformación incorpora las actividades en tus datos de canalización para que puedas proporcionar las recomendaciones personalizadas.
Cuando tienes datos del usuario que quieres unir con datos geográficos para realizar estadísticas basadas en la ubicación geográfica.
Cuando quieres crear una canalización que recopile datos de dispositivos de la Internet de las cosas (IOT) que envíen eventos de telemetría.
Ventajas
La transformación de enriquecimiento tiene los siguientes beneficios:
Transforma los datos sin que tengas que escribir código complejo o administrar las bibliotecas subyacentes.
Proporciona controladores de código fuente integrados.
Usa el controlador BigTableEnrichmentHandler para enriquecer tus datos mediante una fuente de Bigtable sin pasar los detalles de configuración.
Usa el controlador BigQueryEnrichmentHandler para enriquecer tus datos mediante una fuente de BigQuery sin pasar los detalles de configuración.
Usa la limitación del cliente para administrar el límite de frecuencia de las solicitudes. Las solicitudes se respaldan de manera exponencial con una estrategia de reintento predeterminada. Puedes configurar el límite de frecuencia para que se adapte a tu caso práctico.
Asistencia y limitaciones
La transformación de enriquecimiento tiene los siguientes requisitos:
Disponible para canalizaciones por lotes y de transmisión.
El controlador BigTableEnrichmentHandler está disponible en las versiones 2.54.0 y posteriores del SDK de Apache Beam para Python.
El controlador BigQueryEnrichmentHandler está disponible en las versiones 2.57.0 y posteriores del SDK de Apache Beam para Python.
El controlador VertexAIFeatureStoreEnrichmentHandler está disponible en las versiones 2.55.0 y posteriores del SDK de Apache Beam para Python.
Cuando usas las versiones 2.55.0 y posteriores del SDK de Apache Beam para Python, también debes instalar el cliente de Python para Redis.
Para usar la transformación de enriquecimiento, incluye el siguiente código en tu canalización:
importapache_beamasbeamfromapache_beam.transforms.enrichmentimportEnrichmentfromapache_beam.transforms.enrichment_handlers.bigtableimportBigTableEnrichmentHandlerbigtable_handler=BigTableEnrichmentHandler(...)withbeam.Pipeline()asp:output=(p...|"Create" >> beam.Create(data)|"Enrich with Bigtable" >> Enrichment(bigtable_handler)...)
Debido a que la transformación de enriquecimiento realiza una unión cruzada de forma predeterminada, diseña la unión personalizada para enriquecer los datos de entrada. Este diseño garantiza que la unión incluya solo los campos especificados.
En el siguiente ejemplo, left es el elemento de entrada de la transformación de enriquecimiento y right son los datos recuperados de un servicio externo para ese elemento de entrada.
Para usar la transformación de enriquecimiento, se requiere el parámetro EnrichmentHandler.
También puedes usar un parámetro de configuración para especificar una función lambda para una función de unión, un tiempo de espera, un regulador o un repetidor (estrategia de reintento). Están disponibles los siguientes parámetros de configuración:
join_fn: Es una función lambda que toma diccionarios como entrada y muestra una fila enriquecida (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). La fila enriquecida especifica cómo unir los datos recuperados de la API.
La configuración predeterminada es una unión cruzada.
timeout: Es la cantidad de segundos que se espera hasta que la API complete la solicitud antes de que se agote el tiempo de espera. La configuración predeterminada es de 30 segundos.
throttler: Especifica el mecanismo de limitación. La única opción compatible es la limitación predeterminada del cliente.
repeater: Especifica la estrategia de reintento cuando se producen errores como TooManyRequests y TimeoutException. La configuración predeterminada es ExponentialBackOffRepeater.
[[["Fácil de comprender","easyToUnderstand","thumb-up"],["Resolvió mi problema","solvedMyProblem","thumb-up"],["Otro","otherUp","thumb-up"]],[["Difícil de entender","hardToUnderstand","thumb-down"],["Información o código de muestra incorrectos","incorrectInformationOrSampleCode","thumb-down"],["Faltan la información o los ejemplos que necesito","missingTheInformationSamplesINeed","thumb-down"],["Problema de traducción","translationIssue","thumb-down"],["Otro","otherDown","thumb-down"]],["Última actualización: 2025-09-04 (UTC)"],[[["\u003cp\u003eApache Beam's enrichment transform simplifies data enrichment workflows by allowing users to augment raw data with related data from various sources like Bigtable or BigQuery.\u003c/p\u003e\n"],["\u003cp\u003eThe enrichment transform offers benefits such as transforming data without writing complex code, providing built-in source handlers for Bigtable, BigQuery, and Vertex AI Feature Store, and using client-side throttling for rate limiting.\u003c/p\u003e\n"],["\u003cp\u003eTo utilize the enrichment transform, users need to include specific code in their pipeline using \u003ccode\u003eBigTableEnrichmentHandler\u003c/code\u003e, and ensure they have the correct Apache Beam Python SDK versions, among other requirements.\u003c/p\u003e\n"],["\u003cp\u003eThe transform enables data enrichment for use cases such as creating ecommerce pipelines with customized recommendations, joining user data with geographical data for analytics, or gathering data from IoT devices.\u003c/p\u003e\n"],["\u003cp\u003eThe transform defaults to cross join but can be configured using a join function, timeout, throttler or repeater for greater control over how the data is enriched.\u003c/p\u003e\n"]]],[],null,["# Enrich streaming data\n\nApache Beam simplifies the data enrichment workflow by providing a turnkey\nenrichment transform that you can add to your pipeline. This page explains how\nto use the Apache Beam enrichment transform to enrich your streaming data.\n\nWhen you enrich data, you augment the raw data from one source by adding related\ndata from a second source. The additional data can come from a variety of\nsources, such as [Bigtable](/bigtable/docs/overview) or\n[BigQuery](/bigquery/docs/introduction). The Apache Beam enrichment\ntransform uses a key-value lookup to connect the additional data to the raw data.\n\nThe following examples provide some cases where data enrichment is useful:\n\n- You want to create an ecommerce pipeline that captures user activities from a website or app and provides customized recommendations. The transform incorporates the activities into your pipeline data so that you can provide the customized recommendations.\n- You have user data that you want to join with geographical data to do geography-based analytics.\n- You want to create a pipeline that gathers data from internet-of-things (IOT) devices that send out telemetry events.\n\nBenefits\n--------\n\nThe enrichment transform has the following benefits:\n\n- Transforms your data without requiring you to write complex code or manage underlying libraries.\n- Provides built-in source handlers.\n - Use the [`BigTableEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigtable.html#apache_beam.transforms.enrichment_handlers.bigtable.BigTableEnrichmentHandler) handler to enrich your data by using a Bigtable source without passing configuration details.\n - Use the [`BigQueryEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigquery.html#apache_beam.transforms.enrichment_handlers.bigquery.BigQueryEnrichmentHandler) handler to enrich your data by using a BigQuery source without passing configuration details.\n - Use the [`VertexAIFeatureStoreEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.html#apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.VertexAIFeatureStoreEnrichmentHandler) handler with [Vertex AI Feature Store](/vertex-ai/docs/featurestore/latest/overview) and [Bigtable online serving](/vertex-ai/docs/featurestore/latest/overview#online_serving).\n- Uses client-side throttling to manage rate limiting the requests. The requests are exponentially backed off with a default retry strategy. You can configure rate limiting to suit your use case.\n\nSupport and limitations\n-----------------------\n\nThe enrichment transform has the following requirements:\n\n- Available for batch and streaming pipelines.\n- The `BigTableEnrichmentHandler` handler is available in the Apache Beam Python SDK versions 2.54.0 and later.\n- The `BigQueryEnrichmentHandler` handler is available in the Apache Beam Python SDK versions 2.57.0 and later.\n- The `VertexAIFeatureStoreEnrichmentHandler` handler is available in the Apache Beam Python SDK versions 2.55.0 and later.\n- When using the Apache Beam Python SDK versions 2.55.0 and later, you also need to install the [Python client for Redis](https://pypi.org/project/redis/).\n- Dataflow jobs must use [Runner v2](/dataflow/docs/runner-v2).\n\nUse the enrichment transform\n----------------------------\n\nTo use the enrichment transform, include the following code in\nyour pipeline: \n\n import apache_beam as beam\n from apache_beam.transforms.enrichment import Enrichment\n from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler\n\n bigtable_handler = BigTableEnrichmentHandler(...)\n\n with beam.Pipeline() as p:\n output = (p\n ...\n | \"Create\" \u003e\u003e beam.Create(data)\n | \"Enrich with Bigtable\" \u003e\u003e Enrichment(bigtable_handler)\n ...\n )\n\nBecause the enrichment transform performs a cross join by default, design the\ncustom join to enrich the input data. This design ensures that the join includes\nonly the specified fields.\n\nIn the following example, `left` is the input element of the enrichment\ntransform, and `right` is data fetched from an external service for that input\nelement. \n\n def custom_join(left: Dict[str, Any], right: Dict[str, Any]):\n enriched = {}\n enriched['\u003cvar translate=\"no\"\u003eFIELD_NAME\u003c/var\u003e'] = left['\u003cvar translate=\"no\"\u003eFIELD_NAME\u003c/var\u003e']\n ...\n return beam.Row(**enriched)\n\n### Parameters\n\nTo use the enrichment transform, the `EnrichmentHandler` parameter is required.\n\nYou can also use a configuration parameter to specify a `lambda` function for a join\nfunction, a timeout, a throttler, or a repeater (retry strategy). The following\nconfiguration parameters are available:\n\n- `join_fn`: A `lambda` function that takes dictionaries as input and returns an enriched row (`Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]`). The enriched row specifies how to join the data fetched from the API. Defaults to a cross join.\n- `timeout`: The number of seconds to wait for the request to be completed by the API before timing out. Defaults to 30 seconds.\n- `throttler`: Specifies the throttling mechanism. The only supported option is default client-side adaptive throttling.\n- `repeater`: Specifies the retry strategy when errors like `TooManyRequests` and `TimeoutException` occur. Defaults to `ExponentialBackOffRepeater`.\n\nWhat's next\n-----------\n\n- For more examples, see [Enrichment transform](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment) in the Apache Beam transform catalog.\n- [Use Apache Beam and Bigtable to enrich data](/dataflow/docs/notebooks/bigtable_enrichment_transform).\n- [Use Apache Beam and BigQuery to enrich data](/dataflow/docs/notebooks/bigquery_enrichment_transform).\n- [Use Apache Beam and Vertex AI Feature Store to enrich data](/dataflow/docs/notebooks/vertex_ai_feature_store_enrichment)."]]