Enrichir les flux de données

Apache Beam simplifie le workflow d'enrichissement des données en fournissant une transformation d'enrichissement clé en main que vous pouvez ajouter à votre pipeline. Cette page explique comment utiliser la transformation d'enrichissement Apache Beam pour enrichir vos flux de données.

Lorsque vous enrichissez des données, vous augmentez les données brutes d'une source en ajoutant des données associées provenant d'une deuxième source. Les données supplémentaires peuvent provenir de diverses sources, telles que Bigtable ou BigQuery. La transformation d'enrichissement Apache Beam utilise une recherche clé-valeur pour associer les données supplémentaires aux données brutes.

Les exemples suivants fournissent des cas dans lesquels l'enrichissement des données est utile :

  • Vous souhaitez créer un pipeline d'e-commerce qui capture les activités de l'utilisateur à partir d'un site Web ou d'une application, et fournit des recommandations personnalisées. La transformation intègre les activités dans les données de votre pipeline afin que vous puissiez fournir des recommandations personnalisées.
  • Vous disposez de données sur l'utilisateur que vous souhaitez associer à des données géographiques pour effectuer des analyses basées sur la zone géographique.
  • Vous souhaitez créer un pipeline qui collecte des données à partir d'appareils IoT (Internet des objets) qui envoient des événements de télémétrie.

Avantages

La transformation d'enrichissement présente les avantages suivants :

  • Transforme vos données sans avoir à écrire de code complexe ni à gérer les bibliothèques sous-jacentes.
  • Fournit des gestionnaires de sources intégrés.
  • Utilise une limitation côté client pour gérer la limitation du débit des requêtes. Les requêtes sont sauvegardées de manière exponentielle avec une stratégie de nouvelle tentative par défaut. Vous pouvez configurer la limitation du débit en fonction de votre cas d'utilisation.

Compatibilité et limites

La transformation d'enrichissement présente les exigences suivantes :

  • Disponible pour les pipelines de traitement par lot et par flux.
  • Le gestionnaire BigTableEnrichmentHandler est disponible dans le SDK Apache Beam pour Python version 2.54.0 et ultérieure.
  • Le gestionnaire VertexAIFeatureStoreEnrichmentHandler est disponible dans le SDK Apache Beam pour Python version 2.55.0 et ultérieure.
  • Lorsque vous utilisez le SDK Apache Beam pour Python en version 2.55.0 et ultérieure, vous devez également installer le client Python pour Redis.
  • Les jobs Dataflow doivent utiliser Runner v2.

Utiliser la transformation d'enrichissement

Pour utiliser la transformation d'enrichissement, incluez le code suivant dans votre pipeline :

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

Étant donné que la transformation d'enrichissement effectue une jointure croisée par défaut, concevez la jointure personnalisée pour enrichir les données d'entrée. Cette conception garantit que la jointure n'inclut que les champs spécifiés.

Dans l'exemple suivant, left est l'élément d'entrée de la transformation d'enrichissement et right correspond aux données extraites d'un service externe pour cet élément d'entrée.

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

Paramètres

Pour utiliser la transformation d'enrichissement, le paramètre EnrichmentHandler est obligatoire.

Vous pouvez également utiliser un paramètre de configuration pour spécifier une fonction lambda pour une fonction de jointure, un délai avant expiration, un régulateur ou un répéteur (stratégie de nouvelle tentative). Les paramètres de configuration suivants sont disponibles :

  • join_fn : fonction lambda utilisant des dictionnaires en entrée et renvoyant une ligne enrichie (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). La ligne enrichie spécifie comment joindre les données extraites de l'API. Il s'agit par défaut d'une jointure croisée.
  • timeout : nombre de secondes d'attente pour que la requête soit terminée par l'API avant l'expiration du délai. La valeur par défaut est de 30 secondes.
  • throttler : spécifie le mécanisme de limitation. La seule option compatible est la limitation adaptative côté client par défaut.
  • repeater : spécifie la stratégie de nouvelle tentative lorsque des erreurs telles que TooManyRequests et TimeoutException se produisent. La valeur par défaut est ExponentialBackOffRepeater.

Étapes suivantes