Cette page décrit les bonnes pratiques à suivre lors du développement de vos pipelines Dataflow. L'application de ces bonnes pratiques présente les avantages suivants :
- Améliorer l'observabilité et les performances des pipelines
- Améliorer la productivité des développeurs
- Améliorer la testabilité des pipelines
Les exemples de code Apache Beam de cette page utilisent Java, mais le contenu s'applique aux SDK Apache Beam Java, Python et Go.
Éléments à prendre en compte
Lorsque vous concevez votre pipeline, posez-vous les questions suivantes :
- Où les données d'entrée de votre pipeline sont-elles stockées ? De combien d'ensembles de données d'entrée disposez-vous ?
- Comment vos données se présentent-elles ?
- Que voulez-vous faire de vos données ?
- Où vont les données de sortie de votre pipeline ?
- Votre job Dataflow utilise-t-il Assured Workloads ?
Utiliser des modèles
Pour accélérer le développement du pipeline, au lieu de créer un pipeline en écrivant du code Apache Beam, utilisez si possible un modèle Dataflow. Les modèles présentent les avantages suivants :
- Les modèles sont réutilisables.
- Les modèles vous permettent de personnaliser chaque job en modifiant des paramètres de pipeline spécifiques.
- Toute personne disposant d'autorisations peut ensuite utiliser le modèle pour déployer le pipeline. Par exemple, un développeur peut créer un job à partir d'un modèle, et un data scientist de l'organisation peut déployer ce modèle ultérieurement.
Vous pouvez utiliser un modèle fourni par Google ou créer votre propre modèle. Certains modèles fournis par Google vous permettent d'ajouter une logique personnalisée en tant qu'étape de pipeline. Par exemple, le modèle Pub/Sub vers BigQuery fournit un paramètre permettant d'exécuter une fonction JavaScript définie par l'utilisateur (UDF) stockée dans Cloud Storage.
Les modèles fournis par Google étant Open Source sous la licence Apache 2.0, vous pouvez les utiliser comme base pour de nouveaux pipelines. Les modèles sont également utiles comme exemples de code. Consultez le code du modèle dans le dépôt GitHub.
Assured Workloads
Assured Workloads permet d'appliquer des exigences de sécurité et de conformité pour les clients Google Cloud. Par exemple, la page Régions de l'UE et assistance pour les contrôles de souveraineté permet d'appliquer des garanties de résidence et de souveraineté des données pour les clients basés dans l'UE. Pour fournir ces fonctionnalités, certaines fonctionnalités Dataflow sont restreintes ou limitées. Si vous utilisez Assured Workloads avec Dataflow, toutes les ressources auxquelles votre pipeline accède doivent être situées dans le projet ou dossier Assured Workloads de votre organisation. Voici notamment ce que vous y trouverez :
- Buckets Cloud Storage
- Ensembles de données BigQuery
- Sujets et abonnements Pub/Sub
- Ensembles de données Firestore
- Connecteurs d'E/S
Dans Dataflow, pour les jobs de traitement par flux créées après le 7 mars 2024, toutes les données utilisateur sont chiffrées avec des CMEK.
Pour les jobs de traitement par flux créés avant le 7 mars 2024, les clés de données utilisées dans les opérations basées sur des clés (telles que le fenêtrage, le regroupement et la jointure) ne sont pas protégées par le chiffrement CMEK. Pour activer ce chiffrement pour vos jobs, drainez ou annulez le job, puis redémarrez-le. Pour en savoir plus, consultez la section Chiffrement des artefacts associés aux états du pipeline.
Partager des données entre plusieurs pipelines
Aucun mécanisme de communication spécifique à Dataflow ne permet de partager des données ni de traiter du contexte entre plusieurs pipelines. Vous pouvez toutefois recourir à un service de stockage durable tel que Cloud Storage ou à un cache en mémoire tel que App Engine pour partager des données entre plusieurs instances de pipeline.
Planifier des jobs
Vous pouvez automatiser l'exécution des pipelines de différentes manières :
- Utiliser Cloud Scheduler.
- Utiliser l'opérateur Dataflow d'Apache Airflow (l'un des nombreux opérateurs Google Cloud) dans un workflow Cloud Composer.
- Exécuter des processus de job (Cron) personnalisés sur Compute Engine.
Bonnes pratiques pour écrire le code du pipeline
Les sections suivantes décrivent les bonnes pratiques à suivre lorsque vous créez des pipelines en écrivant du code Apache Beam.
Structurer votre code Apache Beam
Pour créer des pipelines, il est courant d'utiliser la transformation générique de traitement parallèle Apache Beam ParDo
.
Lorsque vous appliquez une transformation ParDo
, vous fournissez le code sous la forme d'un objet DoFn
. DoFn
est une classe du SDK Apache Beam qui définit une fonction de traitement distribué.
Vous pouvez considérer votre code DoFn
comme de petites entités indépendantes : de nombreuses instances peuvent potentiellement être exécutées sur des machines différentes, sans se connaître. Ainsi, nous vous recommandons de créer des fonctions pures, qui sont parfaites pour la nature parallèle et distribuée des éléments DoFn
.
Les fonctions pures présentent les caractéristiques suivantes :
- Les fonctions pures ne dépendent pas d'un état masqué ou externe.
- Elles n'ont aucun effet secondaire observable.
- Elles sont déterministes.
Le modèle de fonction pure n'est pas strictement rigide. Lorsque votre code ne dépend pas d'éléments non garantis par le service Dataflow, les informations d'état ou les données d'initialisation externes peuvent être valides pour DoFn
et d'autres objets de fonction.
Lors de la structuration de vos transformations ParDo
et de la création de vos éléments DoFn
, tenez compte des instructions suivantes :
- Lorsque vous utilisez un Traitement de type "exactement une fois", le service Dataflow garantit que chaque élément de votre entrée
PCollection
est traité par une instanceDoFn
exactement une fois. - Le service Dataflow ne garantit pas le nombre de fois qu'une fonction
DoFn
est appelée. - Le service Dataflow ne garantit pas la manière exacte dont les éléments distribués sont regroupés. Il ne garantit pas quels éléments sont traités ensemble, le cas échéant.
- Le service Dataflow ne garantit pas le nombre exact d'instances
DoFn
créées au cours de l'exécution d'un pipeline. - Le service Dataflow est tolérant aux pannes et peut réessayer votre code plusieurs fois si les nœuds de calcul rencontrent des problèmes.
- Le service Dataflow peut créer des copies de sauvegarde de votre code. Des problèmes peuvent survenir avec des effets secondaires manuels, par exemple si votre code s'appuie sur des fichiers temporaires avec des noms qui ne sont pas uniques ou s'il en crée.
- Le service Dataflow sérialise le traitement des éléments par instance
DoFn
. Votre code n'a pas besoin d'être strictement thread-safe, mais tout état partagé entre plusieurs instancesDoFn
doit être thread-safe.
Créer des bibliothèques de transformations réutilisables
Le modèle de programmation Apache Beam vous permet de réutiliser des transformations. En créant une bibliothèque partagée de transformations courantes, vous pouvez améliorer la réutilisation, la testabilité et la propriété du code par différentes équipes.
Prenez les deux exemples de code Java suivants, qui lisent tous les deux les événements de paiement. En supposant que les deux pipelines effectuent le même traitement, ils peuvent utiliser les mêmes transformations via une bibliothèque partagée pour les étapes de traitement restantes.
Le premier exemple provient d'une source Pub/Sub illimitée :
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options)
// Initial read transform
PCollection<PaymentEvent> payments =
p.apply("Read from topic",
PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
.apply("Parse strings into payment events",
ParDo.of(new ParsePaymentEventFn()));
Le second exemple provient d'une source de base de données relationnelle limitée :
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<PaymentEvent> payments =
p.apply(
"Read from database table",
JdbcIO.<PaymentEvent>read()
.withDataSourceConfiguration(...)
.withQuery(...)
.withRowMapper(new RowMapper<PaymentEvent>() {
...
}));
La manière dont vous mettez en œuvre les bonnes pratiques de réutilisation de code varie en fonction du langage de programmation et de l'outil de création. Par exemple, si vous utilisez Maven, vous pouvez séparer le code de transformation dans son propre module. Vous pouvez ensuite l'inclure en tant que sous-module dans des projets multimodules plus volumineux pour différents pipelines, comme illustré dans l'exemple de code suivant:
// Reuse transforms across both pipelines
payments
.apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
.apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
...
Pour plus d'informations, consultez les pages de documentation Apache Beam suivantes :
- Conditions requises pour écrire le code utilisateur pour les transformations Apache Beam
- Guide de style
PTransform
: guide de style pour les rédacteurs de nouvelles collectionsPTransform
réutilisables
Utiliser des files d'attente de lettres mortes pour le traitement des erreurs
Parfois, votre pipeline ne peut pas traiter d'éléments. Les problèmes de données sont une cause fréquente. Par exemple, un élément contenant un format JSON mal formaté peut entraîner des échecs d'analyse.
Bien que vous puissiez repérer les exceptions dans la méthode DoFn.ProcessElement
, consigner l'erreur et supprimer l'élément, cette approche entraîne la perte des données et empêche leur inspection ultérieure à des fins de traitement manuel ou de dépannage.
Utilisez plutôt un modèle appelé file d'attente de lettres mortes (file d'attente de messages non traités).
Repérez les exceptions dans la méthode DoFn.ProcessElement
et consignez les erreurs. Au lieu de supprimer l'élément défaillant, vous utilisez des résultats avec branchements pour écrire les éléments défaillants dans un objet PCollection
distinct. Ces éléments sont ensuite écrits dans un récepteur de données en vue d'une inspection et d'un traitement ultérieurs avec une transformation distincte.
L'exemple de code Java suivant montre comment mettre en œuvre le modèle de file d'attente de lettres mortes.
TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* ... */;
PCollectionTuple outputTuple =
input.apply(ParDo.of(new DoFn<Input, Output>() {
@Override
void processElement(ProcessContext c) {
try {
c.output(process(c.element()));
} catch (Exception e) {
LOG.severe("Failed to process input {} -- adding to dead-letter file",
c.element(), e);
c.sideOutput(deadLetterTag, c.element());
}
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));
// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
.apply(BigQueryIO.write(...));
// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...
Utilisez Cloud Monitoring pour appliquer différentes règles de surveillance et d'alerte à la file d'attente des lettres mortes de votre pipeline. Par exemple, vous pouvez visualiser le nombre et la taille des éléments traités par votre transformation de lettres mortes, et configurer des alertes à déclencher lorsque certaines conditions de seuil sont remplies.
Gérer les mutations de schéma
Vous pouvez gérer les données comportant des schémas inattendus mais valides à l'aide d'un modèle de lettres mortes, qui écrit les éléments défaillants dans un objet PCollection
distinct.
Dans certains cas, vous souhaitez gérer automatiquement les éléments qui reflètent un schéma muté sous la forme d'éléments valides. Par exemple, si le schéma d'un élément reflète une mutation comme l'ajout de nouveaux champs, vous pouvez adapter le schéma du récepteur de données pour gérer les mutations.
La mutation de schéma automatique repose sur l'approche de résultat avec branchement utilisée par le modèle de lettres mortes. Cependant, dans ce cas, elle déclenche une transformation qui modifie le schéma de destination chaque fois que des schémas additifs apparaissent. Pour obtenir un exemple de cette approche, consultez l'article du blog Google Cloud Gérer les schémas JSON en mutation dans un pipeline par flux avec Square Enix.
Choisir comment associer des ensembles de données
L'association d'ensembles de données est un cas d'utilisation courant pour les pipelines de données. Vous pouvez utiliser des entrées secondaires ou la transformation CoGroupByKey
pour effectuer des jointures dans le pipeline.
Chaque méthode présente des avantages et des inconvénients.
Les entrées secondaires constituent un moyen flexible de résoudre les problèmes courants de traitement des données, tels que l'enrichissement des données et les recherches avec clés. Contrairement aux objets PCollection
, les entrées secondaires sont modifiables et peuvent être déterminées au moment de l'exécution. Par exemple, les valeurs d'une entrée secondaire peuvent être calculées par une autre branche de votre pipeline ou déterminées en appelant un service distant.
Dataflow accepte les entrées secondaires en conservant les données dans un espace de stockage persistant, semblable à un disque partagé. Cette configuration rend l'entrée secondaire complète disponible pour tous les nœuds de calcul.
Toutefois, les tailles des entrées secondaires peuvent être très volumineuses et ne pas rentrer dans la mémoire des nœuds de calcul. Les opérations de lecture à partir d'une entrée secondaire volumineuse peut entraîner des problèmes de performances si les nœuds de calcul doivent constamment lire les données à partir d'un espace de stockage persistant.
La transformation CoGroupByKey
est une transformation Apache Beam de base qui fusionne (aplatit) plusieurs objets PCollection
et regroupe des éléments ayant une clé commune. Contrairement à une entrée secondaire, qui rend l'intégralité des données d'entrée secondaires disponibles pour chaque nœud de calcul, CoGroupByKey
effectue une opération "shuffle" (regroupement) pour répartir les données entre les nœuds de calcul. CoGroupByKey
est donc idéal lorsque les objets PCollection
que vous souhaitez joindre sont très volumineux et ne rentrent pas dans la mémoire des nœuds de calcul.
Suivez ces consignes pour décider si vous souhaitez utiliser des entrées secondaires ou CoGroupByKey
:
- Utilisez des entrées secondaires lorsque l'un des objets
PCollection
que vous joignez est disproportionné (plus petit) par rapport aux autres et que l'objetPCollection
plus petit s'intègre à la mémoire du nœud de calcul. La mise en cache complète de l'entrée secondaire dans la mémoire permet de récupérer rapidement et efficacement les éléments. - Utilisez les entrées secondaires lorsque vous disposez d'un objet
PCollection
qui doit être joint plusieurs fois dans votre pipeline. Au lieu d'utiliser plusieurs transformationsCoGroupByKey
, créez une entrée secondaire unique pouvant être réutilisée par plusieurs transformationsParDo
. - Utilisez
CoGroupByKey
si vous avez besoin de récupérer une grande partie d'un objetPCollection
qui dépasse de manière significative la mémoire de calcul.
Pour en savoir plus, consultez la section Résoudre les erreurs Dataflow de mémoire insuffisante.
Minimiser les opérations par élément coûteuses
Une instance DoFn
traite des lots d'éléments appelés groupes, qui sont des unités de travail atomiques composées de zéro ou plusieurs éléments. Les éléments individuels sont ensuite traités par la méthode DoFn.ProcessElement
, qui s'exécute pour chaque élément. Étant donné que la méthode DoFn.ProcessElement
est appelée pour chaque élément, toutes les opérations coûteuses ou chronophages qui sont appelées par cette méthode s'exécutent pour chaque élément traité par la méthode.
Si vous devez effectuer des opérations coûteuses une seule fois pour un lot d'éléments, incluez ces opérations dans la méthode DoFn.Setup
ou la méthode DoFn.StartBundle
plutôt que dans l'élément DoFn.ProcessElement
. Voici quelques exemples d'opérations :
Analyser un fichier de configuration qui contrôle certains aspects du comportement de l'instance
DoFn
. N'appelez cette action qu'une seule fois lorsque l'instanceDoFn
est initialisée à l'aide de la méthodeDoFn.Setup
.Instanciation d'un client de courte durée qui doit être réutilisé pour tous les éléments d'un groupe (par exemple, si tous les éléments du groupe doivent être envoyés via une seule connexion réseau). Appelez cette action une fois par lot à l'aide de la méthode
DoFn.StartBundle
.
Limiter les tailles de lot et les appels simultanés à des services externes
Lorsque vous appelez des services externes, vous pouvez réduire les frais par appel à l'aide de la transformation GroupIntoBatches
. Cette transformation crée des lots d'éléments d'une taille spécifiée.
Le traitement par lots envoie des éléments à un service externe sous la forme d'une charge utile au lieu de manière individuelle.
En combinaison avec le traitement par lots, limitez le nombre maximal d'appels parallèles (simultanés) au service externe en choisissant les clés appropriées pour partitionner les données entrantes. Le nombre de partitions détermine la parallélisation maximale. Par exemple, si chaque élément reçoit la même clé, une transformation en aval pour appeler le service externe ne s'exécute pas en parallèle.
Envisagez l'une des approches suivantes pour produire des clés pour les éléments :
- Choisissez un attribut de l'ensemble de données à utiliser comme clés de données, par exemple les ID utilisateur.
- Générez des clés de données pour répartir les éléments de manière aléatoire sur un nombre fixe de partitions, où le nombre de valeurs de clé possibles détermine le nombre de partitions. Vous devez créer suffisamment de partitions pour le chargement en parallèle.
Chaque partition doit comporter suffisamment d'éléments pour que la transformation
GroupIntoBatches
soit utile.
L'exemple de code Java suivant montre comment répartir de manière aléatoire des éléments sur dix partitions :
// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;
int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
sensitiveData
.apply("Assign data into partitions",
ParDo.of(new DoFn<String, KV<Long, String>>() {
Random random = new Random();
@ProcessElement
public void assignRandomPartition(ProcessContext context) {
context.output(
KV.of(randomPartitionNumber(), context.element()));
}
private static int randomPartitionNumber() {
return random.nextInt(numPartitions);
}
}))
.apply("Create batches of sensitive data",
GroupIntoBatches.<Long, String>ofSize(100L));
// Use batched sensitive data to fully utilize Redaction API,
// which has a rate limit but allows large payloads.
batchedData
.apply("Call Redaction API in batches", callRedactionApiOnBatch());
Identifier les problèmes de performances causés par des étapes fusionnées
Dataflow crée un graphique d'étapes qui représente votre pipeline en fonction des transformations et des données que vous avez utilisées lors de la construction. Ce graphique est appelé graphique d'exécution du pipeline.
Lorsque vous déployez votre pipeline, Dataflow peut modifier le graphique d'exécution de votre pipeline pour améliorer les performances. Par exemple, Dataflow peut fusionner certaines opérations, un processus appelé optimisation de la fusion, pour éviter l'impact sur les performances et les coûts de l'écriture de chaque objet PCollection
intermédiaire dans votre pipeline.
Dans certains cas, Dataflow peut déterminer de manière incorrecte la méthode optimale de fusionner des opérations dans le pipeline, ce qui peut limiter la capacité de votre job à utiliser tous les nœuds de calcul disponibles. Dans ce cas, vous pouvez empêcher la fusion des opérations.
Prenons l'exemple de code Apache Beam suivant. Une transformation GenerateSequence
crée un petit objet PCollection
limité, qui est ensuite traité par deux transformations ParDo
en aval.
La transformation Find Primes Less-than-N
peut être coûteuse en calcul et est susceptible de s'exécuter lentement pour de grands nombres. À l'inverse, la transformation Increment Number
se termine probablement rapidement.
import com.google.common.math.LongMath;
...
public class FusedStepsPipeline {
final class FindLowerPrimesFn extends DoFn<Long, String> {
@ProcessElement
public void processElement(ProcessContext c) {
Long n = c.element();
if (n > 1) {
for (long i = 2; i < n; i++) {
if (LongMath.isPrime(i)) {
c.output(Long.toString(i));
}
}
}
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create(options);
PCollection<Long> sequence = p.apply("Generate Sequence",
GenerateSequence
.from(0)
.to(1000000));
// Pipeline branch 1
sequence.apply("Find Primes Less-than-N",
ParDo.of(new FindLowerPrimesFn()));
// Pipeline branch 2
sequence.apply("Increment Number",
MapElements.via(new SimpleFunction<Long, Long>() {
public Long apply(Long n) {
return ++n;
}
}));
p.run().waitUntilFinish();
}
}
Le schéma suivant illustre une représentation graphique du pipeline dans l'interface de surveillance Dataflow.
L'interface de surveillance de Dataflow montre que le même taux de traitement lent se produit pour les deux transformations, à savoir 13 éléments par seconde. Vous pourriez vous attendre à ce que la transformation Increment Number
traite rapidement les éléments, mais elle semble être liée au même taux de traitement que Find Primes Less-than-N
.
En effet, Dataflow a fusionné les étapes en une seule étape, ce qui les empêche de s'exécuter indépendamment. Vous pouvez utiliser la commande gcloud dataflow jobs describe
pour obtenir plus d'informations:
gcloud dataflow jobs describe --full job-id --format json
Dans le résultat obtenu, les étapes fusionnées sont décrites dans l'objet ExecutionStageSummary
du tableau ComponentTransform
:
...
"executionPipelineStage": [
{
"componentSource": [
...
],
"componentTransform": [
{
"name": "s1",
"originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
"userName": "Generate Sequence/Read(BoundedCountingSource)"
},
{
"name": "s2",
"originalTransform": "Find Primes Less-than-N",
"userName": "Find Primes Less-than-N"
},
{
"name": "s3",
"originalTransform": "Increment Number/Map",
"userName": "Increment Number/Map"
}
],
"id": "S01",
"kind": "PAR_DO_KIND",
"name": "F0"
}
...
Dans ce scénario, comme la transformation Find Primes Less-than-N
est l'étape lente, décomposer la fusion avant cette étape est une stratégie appropriée. Une méthode permettant de dissocier les étapes consiste à insérer une transformation GroupByKey
et à dissocier le groupe avant l'étape, comme indiqué dans l'exemple de code Java suivant.
sequence
.apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
public KV<Long, Void> apply(Long n) {
return KV.of(n, null);
}
}))
.apply("Group By Key", GroupByKey.<Long, Void>create())
.apply("Emit Keys", Keys.<Long>create())
.apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));
Vous pouvez également combiner ces étapes non fusionnées en une transformation composite réutilisable.
Une fois les étapes dissociées, lorsque vous exécutez le pipeline, Increment Number
s'effectue en quelques secondes et la transformation Find Primes Less-than-N
, dont l'exécution est beaucoup plus longue, s'effectue au cours d'une autre étape.
Cet exemple applique une opération d'association et de dissociation à des étapes non fusionnées.
Vous pouvez utiliser d'autres approches dans d'autres circonstances. Dans ce cas, la gestion du résultat en double n'est pas un problème, étant donné le résultat consécutif de la transformation GenerateSequence
.
Les objets KV
avec des clés en double sont dédupliqués vers une seule clé dans la transformation d'association (GroupByKey
) et la transformation de dissociation (Keys
). Pour conserver les doublons après les opérations d'association et de dissociation, créez des paires clé/valeur en procédant comme suit :
- Utilisez une clé aléatoire et l'entrée d'origine comme valeur.
- Créez l'association à l'aide de la clé aléatoire.
- Émettez les valeurs pour chaque clé en tant que sortie.
Vous pouvez également utiliser une transformation Reshuffle
pour éviter la fusion des transformations environnantes. Cependant, les effets secondaires de la transformation Reshuffle
ne sont pas transférables aux différents exécuteurs Apache Beam.
Pour plus d'informations sur le parallélisme et l'optimisation de la fusion, consultez la section Cycle de vie du pipeline.
Utiliser des métriques Apache Beam pour collecter des insights sur les pipelines
Les métriques Apache Beam sont une classe utilitaire qui produit des métriques pour la création de rapports sur les propriétés d'un pipeline en cours d'exécution. Lorsque vous utilisez Cloud Monitoring, les métriques Apache Beam sont disponibles en tant que métriques personnalisées Cloud Monitoring.
L'exemple suivant montre les métriques Counter
Apache Beam utilisées dans une sous-classe DoFn
.
L'exemple de code utilise deux compteurs. Un compteur suit les échecs d'analyse JSON (malformedCounter
) et l'autre surveille si le message JSON est valide, mais contient une charge utile vide (emptyCounter
). Dans Cloud Monitoring, les noms des métriques personnalisées sont custom.googleapis.com/dataflow/malformedJson
et custom.googleapis.com/dataflow/emptyPayload
. Vous pouvez utiliser les métriques personnalisées pour créer des visualisations et des règles d'alerte dans Cloud Monitoring.
final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};
final class ParseEventFn extends DoFn<String, MyObject> {
private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
private Gson gsonParser;
@Setup
public setup() {
gsonParser = new Gson();
}
@ProcessElement
public void processElement(ProcessContext c) {
try {
MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
if (myObj.getPayload() != null) {
// Output the element if non-empty payload
c.output(successTag, myObj);
}
else {
// Increment empty payload counter
emptyCounter.inc();
}
}
catch (JsonParseException e) {
// Increment malformed JSON counter
malformedCounter.inc();
// Output the element to dead-letter queue
c.output(errorTag, c.element());
}
}
}
En savoir plus
Les pages suivantes décrivent plus en détail comment structurer votre pipeline, comment choisir les transformations à appliquer à vos données et quels sont les éléments à prendre en compte lors du choix des méthodes d'entrée et de sortie de votre pipeline.
Pour plus d'informations sur la création de votre code utilisateur, consultez les prérequis pour les fonctions fournies par l'utilisateur.