Modèles de tâche Java

L'exemple d'application d'e-commerce présente les bonnes pratiques pour utiliser Dataflow afin de mettre en œuvre l'analyse de données en streaming et l'IA en temps réel. L'exemple contient des modèles de tâche qui illustrent le meilleur moyen d'effectuer des tâches de programmation Java. Ces tâches sont généralement nécessaires pour créer des applications d'e-commerce.

L'application contient les modèles de tâche Java suivants :

Utiliser des schémas Apache Beam pour traiter des données structurées

Vous pouvez utiliser des schémas Apache Beam pour simplifier le traitement des données structurées.

La conversion de vos objets en lignes vous permet de produire du code Java très propre, ce qui facilite la création de graphes orientés acycliques (DAG). Vous pouvez également référencer des propriétés d'objet en tant que champs dans les instructions analytiques que vous créez, plutôt que d'avoir à appeler des méthodes.

Exemple

CountViewsPerProduct.java

Utiliser JsonToRow pour convertir des données JSON

Il est courant de devoir traiter des chaînes JSON dans Dataflow Par exemple, des chaînes JSON sont traitées lors du streaming d'informations relatives au flux de clics capturées à partir d'applications Web. Pour traiter des chaînes JSON, celles-ci doivent être converties en lignes ou en POJO (Plain Old Java Objects, anciens objets Java standards) pendant le traitement du pipeline.

Vous pouvez utiliser la transformation intégrée JsonToRow d'Apache Beam pour convertir des chaînes JSON en lignes. Toutefois, si vous souhaitez ajouter une file d'attente pour traiter les messages en échec, vous devez la créer séparément. Consultez la section Mettre en file d'attente les données impossibles à traiter pour une analyse plus approfondie.

Si vous devez convertir une chaîne JSON en POJO à l'aide d'AutoValue, enregistrez un schéma pour le type avec l'annotation @DefaultSchema(AutoValueSchema.class), puis utilisez la classe utilitaire Convert (Convertir). Le code obtenu ressemble à ce qui suit :

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
  .apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))

Pour en savoir plus, y compris sur les différents types Java à partir desquels vous pouvez déduire des schémas, consultez la section concernant la création de schémas.

Si JsonToRow ne fonctionne pas avec vos données, Gson constitue une alternative. Le traitement par défaut des données de Gson est plutôt souple, ce qui peut vous obliger à intégrer plus de validations dans le processus de conversion des données.

Exemples

Utiliser le générateur de code AutoValue pour générer des POJO

Les schémas Apache Beam constituent souvent la meilleure façon de représenter les objets d'un pipeline grâce à la manière dont ils vous permettent d'utiliser des données structurées. Néanmoins, vous avez parfois besoin d'un POJO (Plain Old Java Object, ancien objet Java standard), par exemple pour traiter des objets clé-valeur ou gérer l'état des objets. La création manuelle de POJO nécessite de coder des dérogations pour les méthodes equals() et hashcode(), ce qui peut être long et source d'erreur, et occasionner ainsi un comportement d'application incohérent, voire une perte de données.

Pour générer des POJO, utilisez le générateur de classe AutoValue. Cela garantit l'établissement des dérogations nécessaires et empêche les éventuelles erreurs. AutoValue est très utilisé dans le codebase Apache Beam. Il est donc utile de savoir utiliser ce compilateur de classe si vous souhaitez développer des pipelines Apache Beam sur Dataflow à l'aide de Java.

Vous pouvez également utiliser AutoValue avec des schémas Apache Beam si vous ajoutez une annotation @DefaultSchema(AutoValueSchema.class). Pour en savoir plus, consultez la section décrivant la création de schémas.

Pour en savoir plus sur AutoValue, consultez le document Pourquoi utiliser AutoValue? ? ainsi que la documentation concernant AutoValue.

Exemple

Clickstream.java

Mettre en file d'attente les données impossibles à traiter pour une analyse plus approfondie

Dans les systèmes de production, il est important de gérer les données problématiques. Si possible, vous validez et corrigez les données en temps réel. Lorsque la correction n'est pas possible, enregistrez la valeur dans une file d'attente de messages non traités, parfois appelée file d'attente de lettres mortes, en vue d'une analyse ultérieure. La conversion de données d'un format à un autre est une source courante de problèmes, par exemple lors de la conversion de chaînes JSON en lignes.

Pour résoudre ce problème, utilisez une transformation à sorties multiples afin de transférer les éléments contenant les données non traitées vers un autre objet PCollection en vue d'un traitement ultérieur. Ce traitement est une opération courante que vous pouvez appliquer à plusieurs endroits d'un pipeline. Essayez de rendre la transformation suffisamment générique pour pouvoir l'utiliser dans plusieurs emplacements. Pour commencer, créez un objet d'erreur pour encapsuler les propriétés communes, y compris les données d'origine. Ensuite, créez une transformation de récepteur comportant plusieurs options pour la destination.

Exemples

Appliquer des transformations de validation des données en série

Les données collectées à partir de systèmes externes nécessitent souvent un nettoyage. Structurez votre pipeline de manière à lui permettre de corriger les données problématiques en temps réel lorsque c'est possible. Envoyer les données à une File d'attente pour analyse approfondie si nécessaire.

Dans la mesure où un même message peut nécessiter la correction de plusieurs problèmes, planifiez bien le graphe orienté acyclique (DAG) requis. Si un élément contient des données présentant plusieurs défauts, vous devez vous assurer qu'il passe par toutes les transformations appropriées.

Par exemple, imaginons un élément possédant les valeurs suivantes, dont aucune ne doit être nulle :

{"itemA": null,"itemB": null}

Assurez-vous que l'élément passe par des transformations qui corrigent les deux problèmes potentiels :

badElements.apply(fixItemA).apply(fixItemB)

Votre pipeline peut comporter davantage d'étapes en série, mais la fusion vous aide à réduire les frais de traitement introduits par cette approche.

Exemple

ValidateAndCorrectCSEvt.java

Utiliser DoFn.StartBundle pour traiter les appels vers des services externes par micro-lots

Vous pouvez être amené à appeler des API externes dans le cadre de votre pipeline. Étant donné qu'un pipeline répartit le travail sur plusieurs ressources de calcul, le fait d'effectuer un appel unique pour chaque élément circulant via le système peut surcharger un point de terminaison de service externe. Ce problème est particulièrement courant lorsque vous n'avez appliqué aucune fonction de réduction.

Pour éviter ce problème, effectuez les appels à des systèmes externes sous forme d'appels par lots.

Vous pouvez effectuer des appels par lot à l'aide d'une transformation GroupByKey ou de l'API Timer Apache Beam. Toutefois, ces approches nécessitent toutes deux un brassage qui implique une surcharge de traitement et la nécessité de disposer d'un numéro magique pour déterminer l'espace clé.

Utilisez plutôt les éléments de gestion du cycle de vie StartBundle et FinishBundle pour traiter vos données par lot. Avec ces options, aucun brassage n'est nécessaire.

Un inconvénient mineur de cette méthode réside dans le fait que les tailles de lot sont déterminées dynamiquement par la mise en œuvre de l'exécuteur, en fonction de ce qui se passe actuellement dans le pipeline et dans ses nœuds de calcul. En mode flux, la taille des lots est souvent réduite. Le regroupement Dataflow est influencé par des facteurs du backend tels que la segmentation de l'utilisation, la quantité de données disponibles pour une clé spécifique et le débit du pipeline.

Exemple

EventItemCorrectionService.java

Utiliser un format d'entrée secondaire approprié pour l'enrichissement des données

Dans les applications d'analyse de flux, il est fréquent d'enrichir les données avec des informations supplémentaires qui peuvent s'avérer utiles lors d'une analyse plus approfondie. Par exemple, si vous disposez de l'identifiant d'un magasin pour une transaction, vous pouvez ajouter des informations sur l'emplacement de ce magasin. Ces informations supplémentaires sont souvent ajoutées en prenant un élément et en lui adjoignant des informations provenant d'un tableau de conversion.

Pour les tableaux de conversion à évolution lente et de petite taille, une solution qui fonctionne bien consiste à placer le tableau dans le pipeline en tant que classe unique qui met en œuvre l'interface Map<K,V>. Cela vous permet d'éviter que chaque élément effectue un appel d'API pour son tableau. Après avoir inclus la copie d'une table dans le pipeline, vous devez l'actualiser régulièrement pour la maintenir à jour.

Pour gérer la mise à jour lente des entrées secondaires, utilisez les modèles d'entrées secondaires Apache Beam.

Mise en cache

Les entrées secondaires sont chargées en mémoire et sont donc automatiquement mises en cache.

Vous pouvez définir la taille du cache à l'aide de l'option --setWorkerCacheMb.

Vous pouvez partager le cache entre des instances DoFn et utiliser des déclencheurs externes pour l'actualiser.

Exemple

SlowMovingStoreLocationDimension.java