Restez organisé à l'aide des collections
Enregistrez et classez les contenus selon vos préférences.
Apache Beam simplifie le workflow d'enrichissement des données en fournissant une transformation d'enrichissement clé en main que vous pouvez ajouter à votre pipeline. Cette page explique comment utiliser la transformation d'enrichissement Apache Beam pour enrichir vos flux de données.
Lorsque vous enrichissez des données, vous augmentez les données brutes d'une source en ajoutant des données associées provenant d'une deuxième source. Les données supplémentaires peuvent provenir de diverses sources, telles que Bigtable ou BigQuery. La transformation d'enrichissement Apache Beam utilise une recherche clé-valeur pour associer les données supplémentaires aux données brutes.
Les exemples suivants fournissent des cas dans lesquels l'enrichissement des données est utile :
Vous souhaitez créer un pipeline d'e-commerce qui capture les activités de l'utilisateur à partir d'un site Web ou d'une application, et fournit des recommandations personnalisées. La transformation intègre les activités dans les données de votre pipeline afin que vous puissiez fournir des recommandations personnalisées.
Vous disposez de données sur l'utilisateur que vous souhaitez associer à des données géographiques pour effectuer des analyses basées sur la zone géographique.
Vous souhaitez créer un pipeline qui collecte des données à partir d'appareils IoT (Internet des objets) qui envoient des événements de télémétrie.
Avantages
La transformation d'enrichissement présente les avantages suivants :
Transforme vos données sans que vous ayez à écrire de code complexe ni à gérer les bibliothèques sous-jacentes.
Fournit des gestionnaires de sources intégrés.
Utilisez le gestionnaire BigTableEnrichmentHandler pour enrichir vos données à l'aide d'une source Bigtable sans transmettre d'informations de configuration.
Utilisez le gestionnaire BigQueryEnrichmentHandler pour enrichir vos données à l'aide d'une source BigQuery sans transmettre d'informations de configuration.
Utilise une limitation côté client pour gérer la limitation du débit des requêtes. Les requêtes sont sauvegardées de manière exponentielle avec une stratégie de nouvelles tentatives par défaut. Vous pouvez configurer la limitation du débit en fonction de votre cas d'utilisation.
Compatibilité et limites
La transformation d'enrichissement présente les exigences suivantes :
Disponible pour les pipelines de traitement par lot et par flux.
Le gestionnaire BigTableEnrichmentHandler est disponible dans le SDK Apache Beam pour Python versions 2.54.0 et ultérieures.
Le gestionnaire BigQueryEnrichmentHandler est disponible dans le SDK Apache Beam pour Python versions 2.57.0 et ultérieures.
Le gestionnaire VertexAIFeatureStoreEnrichmentHandler est disponible dans le SDK Apache Beam pour Python versions 2.55.0 et ultérieures.
Lorsque vous utilisez le SDK Apache Beam pour Python versions 2.55.0 et ultérieures, vous devez également installer le client Python pour Redis.
Pour utiliser la transformation d'enrichissement, incluez le code suivant dans votre pipeline :
importapache_beamasbeamfromapache_beam.transforms.enrichmentimportEnrichmentfromapache_beam.transforms.enrichment_handlers.bigtableimportBigTableEnrichmentHandlerbigtable_handler=BigTableEnrichmentHandler(...)withbeam.Pipeline()asp:output=(p...|"Create" >> beam.Create(data)|"Enrich with Bigtable" >> Enrichment(bigtable_handler)...)
Étant donné que la transformation d'enrichissement effectue une jointure croisée par défaut, concevez la jointure personnalisée pour enrichir les données d'entrée. Cette conception garantit que la jointure n'inclut que les champs spécifiés.
Dans l'exemple suivant, left est l'élément d'entrée de la transformation d'enrichissement et right est les données extraites d'un service externe pour cet élément d'entrée.
Pour utiliser la transformation d'enrichissement, le paramètre EnrichmentHandler est obligatoire.
Vous pouvez également utiliser un paramètre de configuration pour spécifier une fonction lambda pour une fonction de jointure, un délai avant expiration, un régulateur ou un répéteur (stratégie de nouvelle tentative). Les paramètres de configuration suivants sont disponibles :
join_fn : fonction lambda utilisant des dictionnaires en entrée et renvoyant une ligne enrichie (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). La ligne enrichie spécifie comment joindre les données extraites de l'API.
Il s'agit par défaut d'une jointure croisée.
timeout : nombre de secondes d'attente pour que la requête soit terminée par l'API avant l'expiration du délai. La valeur par défaut est de 30 secondes.
throttler : spécifie le mécanisme de limitation. La seule option compatible est la limitation adaptative côté client par défaut.
repeater : spécifie la stratégie de nouvelle tentative lorsque des erreurs telles que TooManyRequests et TimeoutException se produisent. La valeur par défaut est ExponentialBackOffRepeater.
Étapes suivantes
Pour plus d'exemples, consultez la section Transformation d'enrichissement dans le catalogue de transformations Apache Beam.
Sauf indication contraire, le contenu de cette page est régi par une licence Creative Commons Attribution 4.0, et les échantillons de code sont régis par une licence Apache 2.0. Pour en savoir plus, consultez les Règles du site Google Developers. Java est une marque déposée d'Oracle et/ou de ses sociétés affiliées.
Dernière mise à jour le 2025/09/10 (UTC).
[[["Facile à comprendre","easyToUnderstand","thumb-up"],["J'ai pu résoudre mon problème","solvedMyProblem","thumb-up"],["Autre","otherUp","thumb-up"]],[["Difficile à comprendre","hardToUnderstand","thumb-down"],["Informations ou exemple de code incorrects","incorrectInformationOrSampleCode","thumb-down"],["Il n'y a pas l'information/les exemples dont j'ai besoin","missingTheInformationSamplesINeed","thumb-down"],["Problème de traduction","translationIssue","thumb-down"],["Autre","otherDown","thumb-down"]],["Dernière mise à jour le 2025/09/10 (UTC)."],[[["\u003cp\u003eApache Beam's enrichment transform simplifies data enrichment workflows by allowing users to augment raw data with related data from various sources like Bigtable or BigQuery.\u003c/p\u003e\n"],["\u003cp\u003eThe enrichment transform offers benefits such as transforming data without writing complex code, providing built-in source handlers for Bigtable, BigQuery, and Vertex AI Feature Store, and using client-side throttling for rate limiting.\u003c/p\u003e\n"],["\u003cp\u003eTo utilize the enrichment transform, users need to include specific code in their pipeline using \u003ccode\u003eBigTableEnrichmentHandler\u003c/code\u003e, and ensure they have the correct Apache Beam Python SDK versions, among other requirements.\u003c/p\u003e\n"],["\u003cp\u003eThe transform enables data enrichment for use cases such as creating ecommerce pipelines with customized recommendations, joining user data with geographical data for analytics, or gathering data from IoT devices.\u003c/p\u003e\n"],["\u003cp\u003eThe transform defaults to cross join but can be configured using a join function, timeout, throttler or repeater for greater control over how the data is enriched.\u003c/p\u003e\n"]]],[],null,["Apache Beam simplifies the data enrichment workflow by providing a turnkey\nenrichment transform that you can add to your pipeline. This page explains how\nto use the Apache Beam enrichment transform to enrich your streaming data.\n\nWhen you enrich data, you augment the raw data from one source by adding related\ndata from a second source. The additional data can come from a variety of\nsources, such as [Bigtable](/bigtable/docs/overview) or\n[BigQuery](/bigquery/docs/introduction). The Apache Beam enrichment\ntransform uses a key-value lookup to connect the additional data to the raw data.\n\nThe following examples provide some cases where data enrichment is useful:\n\n- You want to create an ecommerce pipeline that captures user activities from a website or app and provides customized recommendations. The transform incorporates the activities into your pipeline data so that you can provide the customized recommendations.\n- You have user data that you want to join with geographical data to do geography-based analytics.\n- You want to create a pipeline that gathers data from internet-of-things (IOT) devices that send out telemetry events.\n\nBenefits\n\nThe enrichment transform has the following benefits:\n\n- Transforms your data without requiring you to write complex code or manage underlying libraries.\n- Provides built-in source handlers.\n - Use the [`BigTableEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigtable.html#apache_beam.transforms.enrichment_handlers.bigtable.BigTableEnrichmentHandler) handler to enrich your data by using a Bigtable source without passing configuration details.\n - Use the [`BigQueryEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigquery.html#apache_beam.transforms.enrichment_handlers.bigquery.BigQueryEnrichmentHandler) handler to enrich your data by using a BigQuery source without passing configuration details.\n - Use the [`VertexAIFeatureStoreEnrichmentHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.html#apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.VertexAIFeatureStoreEnrichmentHandler) handler with [Vertex AI Feature Store](/vertex-ai/docs/featurestore/latest/overview) and [Bigtable online serving](/vertex-ai/docs/featurestore/latest/overview#online_serving).\n- Uses client-side throttling to manage rate limiting the requests. The requests are exponentially backed off with a default retry strategy. You can configure rate limiting to suit your use case.\n\nSupport and limitations\n\nThe enrichment transform has the following requirements:\n\n- Available for batch and streaming pipelines.\n- The `BigTableEnrichmentHandler` handler is available in the Apache Beam Python SDK versions 2.54.0 and later.\n- The `BigQueryEnrichmentHandler` handler is available in the Apache Beam Python SDK versions 2.57.0 and later.\n- The `VertexAIFeatureStoreEnrichmentHandler` handler is available in the Apache Beam Python SDK versions 2.55.0 and later.\n- When using the Apache Beam Python SDK versions 2.55.0 and later, you also need to install the [Python client for Redis](https://pypi.org/project/redis/).\n- Dataflow jobs must use [Runner v2](/dataflow/docs/runner-v2).\n\nUse the enrichment transform\n\nTo use the enrichment transform, include the following code in\nyour pipeline: \n\n import apache_beam as beam\n from apache_beam.transforms.enrichment import Enrichment\n from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler\n\n bigtable_handler = BigTableEnrichmentHandler(...)\n\n with beam.Pipeline() as p:\n output = (p\n ...\n | \"Create\" \u003e\u003e beam.Create(data)\n | \"Enrich with Bigtable\" \u003e\u003e Enrichment(bigtable_handler)\n ...\n )\n\nBecause the enrichment transform performs a cross join by default, design the\ncustom join to enrich the input data. This design ensures that the join includes\nonly the specified fields.\n\nIn the following example, `left` is the input element of the enrichment\ntransform, and `right` is data fetched from an external service for that input\nelement. \n\n def custom_join(left: Dict[str, Any], right: Dict[str, Any]):\n enriched = {}\n enriched['\u003cvar translate=\"no\"\u003eFIELD_NAME\u003c/var\u003e'] = left['\u003cvar translate=\"no\"\u003eFIELD_NAME\u003c/var\u003e']\n ...\n return beam.Row(**enriched)\n\nParameters\n\nTo use the enrichment transform, the `EnrichmentHandler` parameter is required.\n\nYou can also use a configuration parameter to specify a `lambda` function for a join\nfunction, a timeout, a throttler, or a repeater (retry strategy). The following\nconfiguration parameters are available:\n\n- `join_fn`: A `lambda` function that takes dictionaries as input and returns an enriched row (`Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]`). The enriched row specifies how to join the data fetched from the API. Defaults to a cross join.\n- `timeout`: The number of seconds to wait for the request to be completed by the API before timing out. Defaults to 30 seconds.\n- `throttler`: Specifies the throttling mechanism. The only supported option is default client-side adaptive throttling.\n- `repeater`: Specifies the retry strategy when errors like `TooManyRequests` and `TimeoutException` occur. Defaults to `ExponentialBackOffRepeater`.\n\nWhat's next\n\n- For more examples, see [Enrichment transform](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment) in the Apache Beam transform catalog.\n- [Use Apache Beam and Bigtable to enrich data](/dataflow/docs/notebooks/bigtable_enrichment_transform).\n- [Use Apache Beam and BigQuery to enrich data](/dataflow/docs/notebooks/bigquery_enrichment_transform).\n- [Use Apache Beam and Vertex AI Feature Store to enrich data](/dataflow/docs/notebooks/vertex_ai_feature_store_enrichment)."]]