Apache Beam simplifie le workflow d'enrichissement des données en fournissant une transformation d'enrichissement clé en main que vous pouvez ajouter à votre pipeline. Cette page explique comment utiliser la transformation d'enrichissement Apache Beam pour enrichir vos flux de données.
Lorsque vous enrichissez des données, vous augmentez les données brutes d'une source en ajoutant des données associées provenant d'une deuxième source. Les données supplémentaires peuvent provenir de diverses sources, telles que Bigtable ou BigQuery. La transformation d'enrichissement Apache Beam utilise une recherche clé-valeur pour associer les données supplémentaires aux données brutes.
Les exemples suivants fournissent des cas dans lesquels l'enrichissement des données est utile :
- Vous souhaitez créer un pipeline d'e-commerce qui capture les activités de l'utilisateur à partir d'un site Web ou d'une application, et fournit des recommandations personnalisées. La transformation intègre les activités dans les données de votre pipeline afin que vous puissiez fournir des recommandations personnalisées.
- Vous disposez de données sur l'utilisateur que vous souhaitez associer à des données géographiques pour effectuer des analyses basées sur la zone géographique.
- Vous souhaitez créer un pipeline qui collecte des données à partir d'appareils IoT (Internet des objets) qui envoient des événements de télémétrie.
Avantages
La transformation d'enrichissement présente les avantages suivants :
- Transforme vos données sans que vous ayez à écrire de code complexe ni à gérer les bibliothèques sous-jacentes.
- Fournit des gestionnaires de sources intégrés.
- Utilisez le gestionnaire
BigTableEnrichmentHandler
pour enrichir vos données à l'aide d'une source Bigtable sans transmettre d'informations de configuration. - Utilisez le gestionnaire
BigQueryEnrichmentHandler
pour enrichir vos données à l'aide d'une source BigQuery sans transmettre d'informations de configuration. - Utilisez le gestionnaire
VertexAIFeatureStoreEnrichmentHandler
avec Vertex AI Feature Store et la livraison en ligne Bigtable.
- Utilisez le gestionnaire
- Utilise une limitation côté client pour gérer la limitation du débit des requêtes. Les requêtes sont sauvegardées de manière exponentielle avec une stratégie de nouvelles tentatives par défaut. Vous pouvez configurer la limitation du débit en fonction de votre cas d'utilisation.
Compatibilité et limites
La transformation d'enrichissement présente les exigences suivantes :
- Disponible pour les pipelines de traitement par lot et par flux.
- Le gestionnaire
BigTableEnrichmentHandler
est disponible dans le SDK Apache Beam pour Python versions 2.54.0 et ultérieures. - Le gestionnaire
BigQueryEnrichmentHandler
est disponible dans le SDK Apache Beam pour Python versions 2.57.0 et ultérieures. - Le gestionnaire
VertexAIFeatureStoreEnrichmentHandler
est disponible dans le SDK Apache Beam pour Python versions 2.55.0 et ultérieures. - Lorsque vous utilisez le SDK Apache Beam pour Python versions 2.55.0 et ultérieures, vous devez également installer le client Python pour Redis.
- Les jobs Dataflow doivent utiliser Runner v2.
Utiliser la transformation d'enrichissement
Pour utiliser la transformation d'enrichissement, incluez le code suivant dans votre pipeline :
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
Étant donné que la transformation d'enrichissement effectue une jointure croisée par défaut, concevez la jointure personnalisée pour enrichir les données d'entrée. Cette conception garantit que la jointure n'inclut que les champs spécifiés.
Dans l'exemple suivant, left
est l'élément d'entrée de la transformation d'enrichissement et right
est les données extraites d'un service externe pour cet élément d'entrée.
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
Paramètres
Pour utiliser la transformation d'enrichissement, le paramètre EnrichmentHandler
est obligatoire.
Vous pouvez également utiliser un paramètre de configuration pour spécifier une fonction lambda
pour une fonction de jointure, un délai avant expiration, un régulateur ou un répéteur (stratégie de nouvelle tentative). Les paramètres de configuration suivants sont disponibles :
join_fn
: fonctionlambda
utilisant des dictionnaires en entrée et renvoyant une ligne enrichie (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
). La ligne enrichie spécifie comment joindre les données extraites de l'API. Il s'agit par défaut d'une jointure croisée.timeout
: nombre de secondes d'attente pour que la requête soit terminée par l'API avant l'expiration du délai. La valeur par défaut est de 30 secondes.throttler
: spécifie le mécanisme de limitation. La seule option compatible est la limitation adaptative côté client par défaut.repeater
: spécifie la stratégie de nouvelle tentative lorsque des erreurs telles queTooManyRequests
etTimeoutException
se produisent. La valeur par défaut estExponentialBackOffRepeater
.
Étapes suivantes
- Pour plus d'exemples, consultez la section Transformation d'enrichissement dans le catalogue de transformations Apache Beam.
- Utilisez Apache Beam et Bigtable pour enrichir les données.
- Utilisez Apache Beam et BigQuery pour enrichir les données.
- Utilisez Apache Beam et Vertex AI Feature Store pour enrichir les données.