Bonnes pratiques pour les pipelines Dataflow

Cette page décrit les bonnes pratiques à suivre lors du développement de vos pipelines Dataflow. L'application de ces bonnes pratiques présente les avantages suivants :

  • Améliorer l'observabilité et les performances des pipelines
  • Améliorer la productivité des développeurs
  • Améliorer la testabilité des pipelines

Les exemples de code Apache Beam de cette page utilisent Java, mais le contenu s'applique aux SDK Apache Beam Java, Python et Go.

Éléments à prendre en compte

Lorsque vous concevez votre pipeline, posez-vous les questions suivantes :

  • Où les données d'entrée de votre pipeline sont-elles stockées ? De combien d'ensembles de données d'entrée disposez-vous ?
  • Comment vos données se présentent-elles ?
  • Que voulez-vous faire de vos données ?
  • Où vont les données de sortie de votre pipeline ?
  • Votre job Dataflow utilise-t-il Assured Workloads ?

Utiliser des modèles

Pour accélérer le développement du pipeline, au lieu de créer un pipeline en écrivant du code Apache Beam, utilisez si possible un modèle Dataflow. Les modèles présentent les avantages suivants :

  • Les modèles sont réutilisables.
  • Les modèles vous permettent de personnaliser chaque job en modifiant des paramètres de pipeline spécifiques.
  • Toute personne disposant d'autorisations peut ensuite utiliser le modèle pour déployer le pipeline. Par exemple, un développeur peut créer un job à partir d'un modèle, et un data scientist de l'organisation peut déployer ce modèle ultérieurement.

Vous pouvez utiliser un modèle fourni par Google ou créer votre propre modèle. Certains modèles fournis par Google vous permettent d'ajouter une logique personnalisée en tant qu'étape de pipeline. Par exemple, le modèle Pub/Sub vers BigQuery fournit un paramètre permettant d'exécuter une fonction JavaScript définie par l'utilisateur (UDF) stockée dans Cloud Storage.

Les modèles fournis par Google étant Open Source sous la licence Apache 2.0, vous pouvez les utiliser comme base pour de nouveaux pipelines. Les modèles sont également utiles comme exemples de code. Consultez le code du modèle dans le dépôt GitHub.

Assured Workloads

Assured Workloads permet d'appliquer des exigences de sécurité et de conformité pour les clients Google Cloud. Par exemple, la page Régions de l'UE et assistance pour les contrôles de souveraineté permet d'appliquer des garanties de résidence et de souveraineté des données pour les clients basés dans l'UE. Pour fournir ces fonctionnalités, certaines fonctionnalités Dataflow sont restreintes ou limitées. Si vous utilisez Assured Workloads avec Dataflow, toutes les ressources auxquelles votre pipeline accède doivent être situées dans le projet ou dossier Assured Workloads de votre organisation. Voici notamment ce que vous y trouverez :

  • Buckets Cloud Storage
  • Ensembles de données BigQuery
  • Sujets et abonnements Pub/Sub
  • Ensembles de données Firestore
  • Connecteurs d'E/S

Dans Dataflow, pour les jobs de traitement par flux créées après le 7 mars 2024, toutes les données utilisateur sont chiffrées avec des CMEK.

Pour les jobs de traitement par flux créés avant le 7 mars 2024, les clés de données utilisées dans les opérations basées sur des clés (telles que le fenêtrage, le regroupement et la jointure) ne sont pas protégées par le chiffrement CMEK. Pour activer ce chiffrement pour vos jobs, drainez ou annulez le job, puis redémarrez-le. Pour en savoir plus, consultez la section Chiffrement des artefacts associés aux états du pipeline.

Partager des données entre plusieurs pipelines

Aucun mécanisme de communication spécifique à Dataflow ne permet de partager des données ni de traiter du contexte entre plusieurs pipelines. Vous pouvez toutefois recourir à un service de stockage durable tel que Cloud Storage ou à un cache en mémoire tel que App Engine pour partager des données entre plusieurs instances de pipeline.

Planifier des jobs

Vous pouvez automatiser l'exécution des pipelines de différentes manières :

Bonnes pratiques pour écrire le code du pipeline

Les sections suivantes décrivent les bonnes pratiques à suivre lorsque vous créez des pipelines en écrivant du code Apache Beam.

Structurer votre code Apache Beam

Pour créer des pipelines, il est courant d'utiliser la transformation générique de traitement parallèle Apache Beam ParDo. Lorsque vous appliquez une transformation ParDo, vous fournissez le code sous la forme d'un objet DoFn. DoFn est une classe du SDK Apache Beam qui définit une fonction de traitement distribué.

Vous pouvez considérer votre code DoFn comme de petites entités indépendantes : de nombreuses instances peuvent potentiellement être exécutées sur des machines différentes, sans se connaître. Ainsi, nous vous recommandons de créer des fonctions pures, qui sont parfaites pour la nature parallèle et distribuée des éléments DoFn. Les fonctions pures présentent les caractéristiques suivantes :

  • Les fonctions pures ne dépendent pas d'un état masqué ou externe.
  • Elles n'ont aucun effet secondaire observable.
  • Elles sont déterministes.

Le modèle de fonction pure n'est pas strictement rigide. Lorsque votre code ne dépend pas d'éléments non garantis par le service Dataflow, les informations d'état ou les données d'initialisation externes peuvent être valides pour DoFn et d'autres objets de fonction.

Lors de la structuration de vos transformations ParDo et de la création de vos éléments DoFn, tenez compte des instructions suivantes :

  • Lorsque vous utilisez un Traitement de type "exactement une fois", le service Dataflow garantit que chaque élément de votre entrée PCollection est traité par une instance DoFn exactement une fois.
  • Le service Dataflow ne garantit pas le nombre de fois qu'une fonction DoFn est appelée.
  • Le service Dataflow ne garantit pas la manière exacte dont les éléments distribués sont regroupés. Il ne garantit pas quels éléments sont traités ensemble, le cas échéant.
  • Le service Dataflow ne garantit pas le nombre exact d'instances DoFn créées au cours de l'exécution d'un pipeline.
  • Le service Dataflow est tolérant aux pannes et peut réessayer votre code plusieurs fois si les nœuds de calcul rencontrent des problèmes.
  • Le service Dataflow peut créer des copies de sauvegarde de votre code. Des problèmes peuvent survenir avec des effets secondaires manuels, par exemple si votre code s'appuie sur des fichiers temporaires avec des noms qui ne sont pas uniques ou s'il en crée.
  • Le service Dataflow sérialise le traitement des éléments par instance DoFn. Votre code n'a pas besoin d'être strictement thread-safe, mais tout état partagé entre plusieurs instances DoFn doit être thread-safe.

Créer des bibliothèques de transformations réutilisables

Le modèle de programmation Apache Beam vous permet de réutiliser des transformations. En créant une bibliothèque partagée de transformations courantes, vous pouvez améliorer la réutilisation, la testabilité et la propriété du code par différentes équipes.

Prenez les deux exemples de code Java suivants, qui lisent tous les deux les événements de paiement. En supposant que les deux pipelines effectuent le même traitement, ils peuvent utiliser les mêmes transformations via une bibliothèque partagée pour les étapes de traitement restantes.

Le premier exemple provient d'une source Pub/Sub illimitée :

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

// Initial read transform
PCollection<PaymentEvent> payments =
    p.apply("Read from topic",
        PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
        .apply("Parse strings into payment events",
            ParDo.of(new ParsePaymentEventFn()));

Le second exemple provient d'une source de base de données relationnelle limitée :

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options);

PCollection<PaymentEvent> payments =
    p.apply(
        "Read from database table",
        JdbcIO.<PaymentEvent>read()
            .withDataSourceConfiguration(...)
            .withQuery(...)
            .withRowMapper(new RowMapper<PaymentEvent>() {
              ...
            }));

La manière dont vous mettez en œuvre les bonnes pratiques de réutilisation de code varie en fonction du langage de programmation et de l'outil de création. Par exemple, si vous utilisez Maven, vous pouvez séparer le code de transformation dans son propre module. Vous pouvez ensuite l'inclure en tant que sous-module dans des projets multimodules plus volumineux pour différents pipelines, comme illustré dans l'exemple de code suivant:

// Reuse transforms across both pipelines
payments
    .apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
    .apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
  ...

Pour plus d'informations, consultez les pages de documentation Apache Beam suivantes :

Utiliser des files d'attente de lettres mortes pour le traitement des erreurs

Parfois, votre pipeline ne peut pas traiter d'éléments. Les problèmes de données sont une cause fréquente. Par exemple, un élément contenant un format JSON mal formaté peut entraîner des échecs d'analyse.

Bien que vous puissiez repérer les exceptions dans la méthode DoFn.ProcessElement, consigner l'erreur et supprimer l'élément, cette approche entraîne la perte des données et empêche leur inspection ultérieure à des fins de traitement manuel ou de dépannage.

Utilisez plutôt un modèle appelé file d'attente de lettres mortes (file d'attente de messages non traités). Repérez les exceptions dans la méthode DoFn.ProcessElement et consignez les erreurs. Au lieu de supprimer l'élément défaillant, vous utilisez des résultats avec branchements pour écrire les éléments défaillants dans un objet PCollection distinct. Ces éléments sont ensuite écrits dans un récepteur de données en vue d'une inspection et d'un traitement ultérieurs avec une transformation distincte.

L'exemple de code Java suivant montre comment mettre en œuvre le modèle de file d'attente de lettres mortes.

TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};

PCollection<Input> input = /* ... */;

PCollectionTuple outputTuple =
    input.apply(ParDo.of(new DoFn<Input, Output>() {
      @Override
      void processElement(ProcessContext c) {
        try {
          c.output(process(c.element()));
        } catch (Exception e) {
          LOG.severe("Failed to process input {} -- adding to dead-letter file",
              c.element(), e);
          c.sideOutput(deadLetterTag, c.element());
        }
      }).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
    .apply(BigQueryIO.write(...));

// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...

Utilisez Cloud Monitoring pour appliquer différentes règles de surveillance et d'alerte à la file d'attente des lettres mortes de votre pipeline. Par exemple, vous pouvez visualiser le nombre et la taille des éléments traités par votre transformation de lettres mortes, et configurer des alertes à déclencher lorsque certaines conditions de seuil sont remplies.

Gérer les mutations de schéma

Vous pouvez gérer les données comportant des schémas inattendus mais valides à l'aide d'un modèle de lettres mortes, qui écrit les éléments défaillants dans un objet PCollection distinct. Dans certains cas, vous souhaitez gérer automatiquement les éléments qui reflètent un schéma muté sous la forme d'éléments valides. Par exemple, si le schéma d'un élément reflète une mutation comme l'ajout de nouveaux champs, vous pouvez adapter le schéma du récepteur de données pour gérer les mutations.

La mutation de schéma automatique repose sur l'approche de résultat avec branchement utilisée par le modèle de lettres mortes. Cependant, dans ce cas, elle déclenche une transformation qui modifie le schéma de destination chaque fois que des schémas additifs apparaissent. Pour obtenir un exemple de cette approche, consultez l'article du blog Google Cloud Gérer les schémas JSON en mutation dans un pipeline par flux avec Square Enix.

Choisir comment associer des ensembles de données

L'association d'ensembles de données est un cas d'utilisation courant pour les pipelines de données. Vous pouvez utiliser des entrées secondaires ou la transformation CoGroupByKey pour effectuer des jointures dans le pipeline. Chaque méthode présente des avantages et des inconvénients.

Les entrées secondaires constituent un moyen flexible de résoudre les problèmes courants de traitement des données, tels que l'enrichissement des données et les recherches avec clés. Contrairement aux objets PCollection, les entrées secondaires sont modifiables et peuvent être déterminées au moment de l'exécution. Par exemple, les valeurs d'une entrée secondaire peuvent être calculées par une autre branche de votre pipeline ou déterminées en appelant un service distant.

Dataflow accepte les entrées secondaires en conservant les données dans un espace de stockage persistant, semblable à un disque partagé. Cette configuration rend l'entrée secondaire complète disponible pour tous les nœuds de calcul.

Toutefois, les tailles des entrées secondaires peuvent être très volumineuses et ne pas rentrer dans la mémoire des nœuds de calcul. Les opérations de lecture à partir d'une entrée secondaire volumineuse peut entraîner des problèmes de performances si les nœuds de calcul doivent constamment lire les données à partir d'un espace de stockage persistant.

La transformation CoGroupByKey est une transformation Apache Beam de base qui fusionne (aplatit) plusieurs objets PCollection et regroupe des éléments ayant une clé commune. Contrairement à une entrée secondaire, qui rend l'intégralité des données d'entrée secondaires disponibles pour chaque nœud de calcul, CoGroupByKey effectue une opération "shuffle" (regroupement) pour répartir les données entre les nœuds de calcul. CoGroupByKey est donc idéal lorsque les objets PCollection que vous souhaitez joindre sont très volumineux et ne rentrent pas dans la mémoire des nœuds de calcul.

Suivez ces consignes pour décider si vous souhaitez utiliser des entrées secondaires ou CoGroupByKey :

  • Utilisez des entrées secondaires lorsque l'un des objets PCollection que vous joignez est disproportionné (plus petit) par rapport aux autres et que l'objet PCollection plus petit s'intègre à la mémoire du nœud de calcul. La mise en cache complète de l'entrée secondaire dans la mémoire permet de récupérer rapidement et efficacement les éléments.
  • Utilisez les entrées secondaires lorsque vous disposez d'un objet PCollection qui doit être joint plusieurs fois dans votre pipeline. Au lieu d'utiliser plusieurs transformations CoGroupByKey, créez une entrée secondaire unique pouvant être réutilisée par plusieurs transformations ParDo.
  • Utilisez CoGroupByKey si vous avez besoin de récupérer une grande partie d'un objet PCollection qui dépasse de manière significative la mémoire de calcul.

Pour en savoir plus, consultez la section Résoudre les erreurs Dataflow de mémoire insuffisante.

Minimiser les opérations par élément coûteuses

Une instance DoFn traite des lots d'éléments appelés groupes, qui sont des unités de travail atomiques composées de zéro ou plusieurs éléments. Les éléments individuels sont ensuite traités par la méthode DoFn.ProcessElement, qui s'exécute pour chaque élément. Étant donné que la méthode DoFn.ProcessElement est appelée pour chaque élément, toutes les opérations coûteuses ou chronophages qui sont appelées par cette méthode s'exécutent pour chaque élément traité par la méthode.

Si vous devez effectuer des opérations coûteuses une seule fois pour un lot d'éléments, incluez ces opérations dans la méthode DoFn.Setup ou la méthode DoFn.StartBundle plutôt que dans l'élément DoFn.ProcessElement. Voici quelques exemples d'opérations :

  • Analyser un fichier de configuration qui contrôle certains aspects du comportement de l'instance DoFn. N'appelez cette action qu'une seule fois lorsque l'instance DoFn est initialisée à l'aide de la méthode DoFn.Setup.

  • Instanciation d'un client de courte durée qui doit être réutilisé pour tous les éléments d'un groupe (par exemple, si tous les éléments du groupe doivent être envoyés via une seule connexion réseau). Appelez cette action une fois par lot à l'aide de la méthode DoFn.StartBundle.

Limiter les tailles de lot et les appels simultanés à des services externes

Lorsque vous appelez des services externes, vous pouvez réduire les frais par appel à l'aide de la transformation GroupIntoBatches. Cette transformation crée des lots d'éléments d'une taille spécifiée. Le traitement par lots envoie des éléments à un service externe sous la forme d'une charge utile au lieu de manière individuelle.

En combinaison avec le traitement par lots, limitez le nombre maximal d'appels parallèles (simultanés) au service externe en choisissant les clés appropriées pour partitionner les données entrantes. Le nombre de partitions détermine la parallélisation maximale. Par exemple, si chaque élément reçoit la même clé, une transformation en aval pour appeler le service externe ne s'exécute pas en parallèle.

Envisagez l'une des approches suivantes pour produire des clés pour les éléments :

  • Choisissez un attribut de l'ensemble de données à utiliser comme clés de données, par exemple les ID utilisateur.
  • Générez des clés de données pour répartir les éléments de manière aléatoire sur un nombre fixe de partitions, où le nombre de valeurs de clé possibles détermine le nombre de partitions. Vous devez créer suffisamment de partitions pour le chargement en parallèle. Chaque partition doit comporter suffisamment d'éléments pour que la transformation GroupIntoBatches soit utile.

L'exemple de code Java suivant montre comment répartir de manière aléatoire des éléments sur dix partitions :

// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;

int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
    sensitiveData
        .apply("Assign data into partitions",
            ParDo.of(new DoFn<String, KV<Long, String>>() {
              Random random = new Random();

              @ProcessElement
              public void assignRandomPartition(ProcessContext context) {
                context.output(
                  KV.of(randomPartitionNumber(), context.element()));
              }
              private static int randomPartitionNumber() {
                return random.nextInt(numPartitions);
              }
            }))
        .apply("Create batches of sensitive data",
            GroupIntoBatches.<Long, String>ofSize(100L));

// Use batched sensitive data to fully utilize Redaction API,
// which has a rate limit but allows large payloads.
batchedData
    .apply("Call Redaction API in batches", callRedactionApiOnBatch());

Identifier les problèmes de performances causés par des étapes fusionnées

Dataflow crée un graphique d'étapes qui représente votre pipeline en fonction des transformations et des données que vous avez utilisées lors de la construction. Ce graphique est appelé graphique d'exécution du pipeline.

Lorsque vous déployez votre pipeline, Dataflow peut modifier le graphique d'exécution de votre pipeline pour améliorer les performances. Par exemple, Dataflow peut fusionner certaines opérations, un processus appelé optimisation de la fusion, pour éviter l'impact sur les performances et les coûts de l'écriture de chaque objet PCollection intermédiaire dans votre pipeline.

Dans certains cas, Dataflow peut déterminer de manière incorrecte la méthode optimale de fusionner des opérations dans le pipeline, ce qui peut limiter la capacité de votre job à utiliser tous les nœuds de calcul disponibles. Dans ce cas, vous pouvez empêcher la fusion des opérations.

Prenons l'exemple de code Apache Beam suivant. Une transformation GenerateSequence crée un petit objet PCollection limité, qui est ensuite traité par deux transformations ParDo en aval.

La transformation Find Primes Less-than-N peut être coûteuse en calcul et est susceptible de s'exécuter lentement pour de grands nombres. À l'inverse, la transformation Increment Number se termine probablement rapidement.

import com.google.common.math.LongMath;
...

public class FusedStepsPipeline {

  final class FindLowerPrimesFn extends DoFn<Long, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      Long n = c.element();
      if (n > 1) {
        for (long i = 2; i < n; i++) {
          if (LongMath.isPrime(i)) {
            c.output(Long.toString(i));
          }
        }
      }
    }
  }

  public static void main(String[] args) {
    Pipeline p = Pipeline.create(options);

    PCollection<Long> sequence = p.apply("Generate Sequence",
        GenerateSequence
            .from(0)
            .to(1000000));

    // Pipeline branch 1
    sequence.apply("Find Primes Less-than-N",
        ParDo.of(new FindLowerPrimesFn()));

    // Pipeline branch 2
    sequence.apply("Increment Number",
        MapElements.via(new SimpleFunction<Long, Long>() {
          public Long apply(Long n) {
            return ++n;
          }
        }));

    p.run().waitUntilFinish();
  }
}

Le schéma suivant illustre une représentation graphique du pipeline dans l'interface de surveillance Dataflow.

Représentation du flux de pipeline dans l&#39;interface Dataflow.

L'interface de surveillance de Dataflow montre que le même taux de traitement lent se produit pour les deux transformations, à savoir 13 éléments par seconde. Vous pourriez vous attendre à ce que la transformation Increment Number traite rapidement les éléments, mais elle semble être liée au même taux de traitement que Find Primes Less-than-N.

En effet, Dataflow a fusionné les étapes en une seule étape, ce qui les empêche de s'exécuter indépendamment. Vous pouvez utiliser la commande gcloud dataflow jobs describe pour obtenir plus d'informations:

gcloud dataflow jobs describe --full job-id --format json

Dans le résultat obtenu, les étapes fusionnées sont décrites dans l'objet ExecutionStageSummary du tableau ComponentTransform :

...

    "executionPipelineStage": [
      {
        "componentSource": [
          ...
        ],
        "componentTransform": [
          {
            "name": "s1",
            "originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
            "userName": "Generate Sequence/Read(BoundedCountingSource)"
          },
          {
            "name": "s2",
            "originalTransform": "Find Primes Less-than-N",
            "userName": "Find Primes Less-than-N"
          },
          {
            "name": "s3",
            "originalTransform": "Increment Number/Map",
            "userName": "Increment Number/Map"
          }
        ],
        "id": "S01",
        "kind": "PAR_DO_KIND",
        "name": "F0"
      }

...

Dans ce scénario, comme la transformation Find Primes Less-than-N est l'étape lente, décomposer la fusion avant cette étape est une stratégie appropriée. Une méthode permettant de dissocier les étapes consiste à insérer une transformation GroupByKey et à dissocier le groupe avant l'étape, comme indiqué dans l'exemple de code Java suivant.

sequence
    .apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
      public KV<Long, Void> apply(Long n) {
        return KV.of(n, null);
      }
    }))
    .apply("Group By Key", GroupByKey.<Long, Void>create())
    .apply("Emit Keys", Keys.<Long>create())
    .apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));

Vous pouvez également combiner ces étapes non fusionnées en une transformation composite réutilisable.

Une fois les étapes dissociées, lorsque vous exécutez le pipeline, Increment Number s'effectue en quelques secondes et la transformation Find Primes Less-than-N, dont l'exécution est beaucoup plus longue, s'effectue au cours d'une autre étape.

Cet exemple applique une opération d'association et de dissociation à des étapes non fusionnées. Vous pouvez utiliser d'autres approches dans d'autres circonstances. Dans ce cas, la gestion du résultat en double n'est pas un problème, étant donné le résultat consécutif de la transformation GenerateSequence. Les objets KV avec des clés en double sont dédupliqués vers une seule clé dans la transformation d'association (GroupByKey) et la transformation de dissociation (Keys). Pour conserver les doublons après les opérations d'association et de dissociation, créez des paires clé/valeur en procédant comme suit :

  1. Utilisez une clé aléatoire et l'entrée d'origine comme valeur.
  2. Créez l'association à l'aide de la clé aléatoire.
  3. Émettez les valeurs pour chaque clé en tant que sortie.

Vous pouvez également utiliser une transformation Reshuffle pour éviter la fusion des transformations environnantes. Cependant, les effets secondaires de la transformation Reshuffle ne sont pas transférables aux différents exécuteurs Apache Beam.

Pour plus d'informations sur le parallélisme et l'optimisation de la fusion, consultez la section Cycle de vie du pipeline.

Utiliser des métriques Apache Beam pour collecter des insights sur les pipelines

Les métriques Apache Beam sont une classe utilitaire qui produit des métriques pour la création de rapports sur les propriétés d'un pipeline en cours d'exécution. Lorsque vous utilisez Cloud Monitoring, les métriques Apache Beam sont disponibles en tant que métriques personnalisées Cloud Monitoring.

L'exemple suivant montre les métriques Counter Apache Beam utilisées dans une sous-classe DoFn.

L'exemple de code utilise deux compteurs. Un compteur suit les échecs d'analyse JSON (malformedCounter) et l'autre surveille si le message JSON est valide, mais contient une charge utile vide (emptyCounter). Dans Cloud Monitoring, les noms des métriques personnalisées sont custom.googleapis.com/dataflow/malformedJson et custom.googleapis.com/dataflow/emptyPayload. Vous pouvez utiliser les métriques personnalisées pour créer des visualisations et des règles d'alerte dans Cloud Monitoring.

final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};

final class ParseEventFn extends DoFn<String, MyObject> {

  private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
  private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
  private Gson gsonParser;

  @Setup
  public setup() {
    gsonParser = new Gson();
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    try {
      MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
      if (myObj.getPayload() != null) {
        //  Output the element if non-empty payload
        c.output(successTag, myObj);
      }
      else {
        // Increment empty payload counter
        emptyCounter.inc();
      }
    }
    catch (JsonParseException e) {
      // Increment malformed JSON counter
      malformedCounter.inc();
      // Output the element to dead-letter queue
      c.output(errorTag, c.element());
    }
  }
}

En savoir plus

Les pages suivantes décrivent plus en détail comment structurer votre pipeline, comment choisir les transformations à appliquer à vos données et quels sont les éléments à prendre en compte lors du choix des méthodes d'entrée et de sortie de votre pipeline.

Pour plus d'informations sur la création de votre code utilisateur, consultez les prérequis pour les fonctions fournies par l'utilisateur.