Traitement parallèle avec l'opération "ParDo"

ParDo correspond à l'opération de traitement parallèle principale dans les SDK Dataflow. ParDo est utilisé pour le traitement parallèle générique. Le style de traitement ParDo est semblable à la classe "Mapper" d'un algorithme de style "Map/Shuffle/Reduce" : l'opération ParDo extrait chaque élément dans une classe PCollection d'entrée, effectue une fonction de traitement sur cet élément, puis émet aucun, un, ou plusieurs éléments dans une classe PCollection de sortie.

Vous fournissez la fonction que l'opération ParDo exécute sur chacun des éléments de la classe PCollection d'entrée. La fonction fournie est appelée indépendamment, et en parallèle, sur plusieurs instances de nœuds de calcul dans la tâche Dataflow.

L'opération ParDo est utile pour diverses opérations de traitement de données, y compris les opérations suivantes :

  • Filtrer un ensemble de données : vous pouvez utiliser ParDo pour prendre en compte chaque élément dans une classe PCollection et générer cet élément dans une nouvelle collection ou le supprimer.
  • Formater ou convertir le type de chaque élément dans un ensemble de données : vous pouvez utiliser ParDo pour formater les éléments dans la classe PCollection, comme par exemple pour formater des paires clé/valeur en chaînes imprimables.
  • Extraire des parties de chaque élément d'un ensemble de données : vous pouvez utiliser ParDo pour n'extraire qu'une partie de chaque élément dans la classe PCollection. Cela peut être particulièrement utile pour extraire des champs individuels à partir des lignes d'une table BigQuery.
  • Effectuer des calculs sur chaque élément d'un ensemble de données : vous pouvez utiliser ParDo pour effectuer des calculs simples ou complexes sur chaque élément ou certains éléments d'une classe PCollection.

L'opération ParDo est également une étape intermédiaire courante dans un pipeline. Par exemple, vous pouvez utiliser ParDo pour affecter des clés à chaque élément d'une classe PCollection en créant des paires clé/valeur. Vous pouvez regrouper les paires ultérieurement à l'aide d'une transformation GroupByKey.

Appliquer une transformation "ParDo"

Pour utiliser l'opération ParDo, vous l'appliquez à la classe PCollection que vous souhaitez transformer et enregistrez la valeur de renvoi sous la forme d'une classe PCollection de type approprié.

L'argument que vous fournissez à ParDo doit être une sous-classe d'un type spécifique fourni par le SDK Dataflow, appelé DoFn. Pour plus d'informations sur DoFn, consultez la section Créer et spécifier une logique de traitement plus loin dans ce document.

L'exemple de code suivant montre un élément ParDo de base appliqué à une classe PCollection de chaînes afin de transmettre une fonction basée sur DoFn pour calculer la longueur de chaque chaîne et générer les longueurs de chaîne en une classe PCollection d'entiers.

Java

  // The input PCollection of Strings.
  PCollection<String> words = ...;

  // The DoFn to perform on each element in the input PCollection.
  static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

  // Apply a ParDo to the PCollection "words" to compute lengths for each word.
  PCollection<Integer> wordLengths = words.apply(
      ParDo
      .of(new ComputeWordLengthFn()));        // The DoFn to perform on each element, which
                                              // we define above.

Dans cet exemple, le code appelle l'élément apply sur la collection d'entrée (appelée "words"). L'opération ParDo représente l'argument PTransform. L'opération .of vous permet de spécifier l'argument DoFn à effectuer sur chaque élément, appelé dans ce cas, ComputeWordLengthFn().

Créer et spécifier une logique de traitement

La logique de traitement que vous fournissez pour l'opération ParDo doit être d'un type spécifique requis par le SDK Dataflow que vous utilisez pour créer le pipeline.

Java

Vous devez créer une sous-classe de la classe du SDK DoFn.

La fonction que vous fournissez est appelée indépendamment et sur plusieurs instances de Google Compute Engine.

De plus, l'argument DoFn ne doit s'appuyer sur aucun état persistant d'appel à appel. Toute instance donnée de la fonction de traitement dans Cloud Platform peut ne pas avoir accès aux informations d'état de toute autre instance de cette fonction.

Remarque : Le SDK Dataflow fournit une variante de l'opération ParDo que vous pouvez utiliser pour transmettre des données persistantes immuables à chaque appel du code utilisateur en tant qu'entrée secondaire.

Java

Un argument DoFn traite un élément à la fois à partir de la classe PCollection d'entrée. Lorsque vous créez une sous-classe de l'argument DoFn, vous spécifiez le type d'élément d'entrée et le type d'élément(s) de sortie en tant que paramètres de type. L'exemple de code suivant montre comment définir la fonction ComputeWordLengthFn() à partir de l'exemple précédent, qui accepte une entrée String et génère une sortie Integer :

  static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

La sous-classe de l'argument DoFn doit remplacer la méthode de traitement d'élément, processElement, dans laquelle vous fournissez le code permettant de travailler avec l'élément d'entrée. L'exemple de code suivant montre la fonction ComputeWordLengthFn() complète :

  static class ComputeWordLengthFn extends DoFn<String, Integer> {
    @Override
    public void processElement(ProcessContext c) {
      String word = c.element();
      c.output(word.length());
    }
  }

Vous n'avez pas besoin d'extraire manuellement les éléments de la collection d'entrée. Le SDK Dataflow pour Java gère l'extraction de chaque élément et le transmet à la sous-classe de l'argument DoFn. Lorsque vous remplacez la méthode processElement, la nouvelle méthode utilisée doit accepter un objet de type ProcessContext qui vous permet d'accéder à l'élément que vous souhaitez traiter. Vous accédez à l'élément transmis à l'argument DoFn à l'aide de la méthode ProcessContext.element().

Si les éléments de la classe PCollection sont des paires clé/valeur, vous pouvez accéder à la clé à l'aide de la commande ProcessContext.element().getKey() et à la valeur à l'aide de la commande ProcessContext.element().getValue().

Java

Le SDK Dataflow pour Java gère automatiquement la collecte des éléments de sortie dans un résultat PCollection. Vous utilisez l'objet ProcessContext pour générer l'élément obtenu à partir de la méthode processElement dans la collection de sortie. Pour générer un élément de la collection de résultats, utilisez la méthode ProcessContext.output().

Arguments "DoFn" simples

Les SDK Dataflow fournissent des méthodes propres à un langage pour simplifier la distribution de la mise en œuvre DoFn.

Java

Il est souvent possible de créer un argument DoFn simple pour l'opération ParDo en tant qu'instance de classe interne anonyme. Si l'argument DoFn ne contient que quelques lignes, il peut être plus facile de le spécifier de manière intégrée. L'exemple de code suivant montre comment appliquer l'opération ParDo avec la fonction ComputeWordLengthFn en tant qu'argument DoFn anonyme :

  // The input PCollection.
  PCollection<String> words = ...;

  // Apply a ParDo with an anonymous DoFn to the PCollection words.
  // Save the result as the PCollection wordLengths.
  PCollection<Integer> wordLengths = words.apply(
    ParDo
      .named("ComputeWordLengths")            // the transform name
      .of(new DoFn<String, Integer>() {       // a DoFn as an anonymous inner class instance
        @Override
        public void processElement(ProcessContext c) {
          c.output(c.element().length());
        }
      }));

Dans le cadre de transformations comme celle indiquée ci-dessus appliquant une fonction à chaque élément de l'entrée pour générer une seule sortie par élément, vous pouvez utiliser la transformation MapElements de niveau supérieur. Il s'agit d'une transformation très concise dans Java 8, car l'opération MapElements accepte une fonction lambda.

  // The input PCollection.
  PCollection<String> words = ...;

  // Apply a MapElements with an anonymous lambda function to the PCollection words.
  // Save the result as the PCollection wordLengths.
  PCollection<Integer> wordLengths = words.apply(
    MapElements.via((String word) -> word.length())
        .withOutputType(new TypeDescriptor<Integer>() {});

De même, vous pouvez utiliser les fonctions lambda de Java 8 avec les transformations Filter, FlatMapElements et Partition. Pour plus d'informations sur ces transformations, consultez la section Transformations prédéfinies dans les SDK Dataflow.

Noms de transformation

Les noms de transformation apparaissent dans le graphique d'exécution lorsque vous affichez le pipeline dans l'interface de surveillance de Dataflow. Il est particulièrement important de spécifier un nom explicite pour votre transformation afin de la reconnaître dans le graphique.

Java

L'opération .named spécifie le nom de la transformation utilisée pour cette étape dans le pipeline. Les noms de transformation apparaissent dans le graphique d'exécution lorsque vous affichez le pipeline dans l'interface de surveillance de Dataflow. Il est particulièrement important de spécifier un nom explicite lorsque vous utilisez une instance DoFn anonyme avec ParDo afin d'afficher un nom facilement lisible associé à l'étape dans l'interface de surveillance.

Entrées secondaires

Outre la classe PCollection d'entrée principale, vous pouvez fournir des entrées supplémentaires à une transformation ParDo sous la forme d'entrées secondaires. Une entrée secondaire représente une entrée supplémentaire à laquelle l'argument DoFn peut accéder chaque fois qu'il traite un élément dans la classe PCollection d'entrée. Lorsque vous spécifiez une entrée secondaire, vous créez un affichage de certaines des autres données pouvant être lu à partir de la transformation ParDo de l'argument DoFn lors du traitement de chaque élément.

Les entrées secondaires sont utiles si l'opération ParDo doit injecter des données supplémentaires lors du traitement de chaque élément dans la classe PCollection d'entrée, mais que les données supplémentaires doivent être déterminées au moment de l'exécution (et non codées en dur). Ces valeurs peuvent être déterminées par les données d'entrée ou dépendre d'une autre branche du pipeline. Par exemple, vous pouvez obtenir une valeur d'un service à distance pendant que le pipeline est en cours d'exécution et utiliser cette valeur en tant qu'entrée secondaire. Vous pouvez également utiliser une valeur calculée par une branche distincte du pipeline et l'ajouter en tant qu'entrée secondaire à l'opération ParDo d'une autre branche.

Représenter une entrée secondaire

Java

Les entrées secondaires sont toujours de type PCollectionView. PCollectionView est un moyen de représenter une classe PCollection en tant qu'entité unique que vous pouvez ensuite transmettre en tant qu'entrée secondaire à une opération ParDo. Vous pouvez créer un élément PCollectionView qui exprime une classe PCollection en tant qu'un des types suivants :

Type d'affichage Utilisation
View.asSingleton Représente une classe PCollection en tant que valeur individuelle. Cet affichage est utilisé généralement après avoir combiné une classe PCollection à l'aide de la fonction Combine.globally. Utilisez cet affichage lorsque l'entrée secondaire est une valeur calculée unique. Vous créez généralement un affichage unique à l'aide de la fonction Combine.globally(...).asSingletonView().
View.asList Représente une classe PCollection en tant qu'élément List. Utilisez cet affichage lorsque l'entrée secondaire est une collection de valeurs individuelles.
View.asMap Représente une classe PCollection en tant qu'élément Map. Utilisez cet affichage lorsque l'entrée secondaire est constituée de paires clé/valeur (PCollection<K, V>) et comporte une valeur unique pour chaque clé.
View.asMultimap Représente une classe PCollection en tant qu'élément MultiMap. Utilisez cet affichage lorsque l'entrée secondaire est constituée de paires clé/valeur (PCollection<K, V>) et comporte plusieurs valeurs pour chaque clé.

Remarque : À l'instar des autres données de pipeline, l'affichage PCollectionView ne peut pas être modifié après sa création.

Transmettre des entrées secondaires à l'opération "ParDo"

Java

Vous transmettez des entrées secondaires à la transformation ParDo en appelant .withSideInputs. Dans l'argument DoFn, vous accédez à l'entrée secondaire à l'aide de la méthode DoFn.ProcessContext.sideInput.

L'exemple de code suivant crée une entrée secondaire unique à partir d'une classe PCollection<Integer> et la transmet à une opération ParDo ultérieure.

Dans l'exemple, nous disposons d'une classe PCollection<String> appelée words qui représente une collection de mots individuels et une classe PCollection<Integer> qui représente des longueurs de mot. Nous pouvons utiliser cette dernière pour calculer une limite de longueur de mot maximale en tant que valeur unique, puis transmettre cette valeur calculée en tant qu'entrée secondaire à une opération ParDo qui filtre les éléments words en fonction de la limite.

  // The input PCollection to ParDo.
  PCollection<String> words = ...;

  // A PCollection of word lengths that we'll combine into a single value.
  PCollection<Integer> wordLengths = ...; // Singleton PCollection

  // Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
  final PCollectionView<Integer> maxWordLengthCutOffView =
     wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());

  // Apply a ParDo that takes maxWordLengthCutOffView as a side input.
    PCollection<String> wordsBelowCutOff =
    words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
                      .of(new DoFn<String, String>() {
        public void processElement(ProcessContext c) {
          String word = c.element();
          // In our DoFn, access the side input.
          int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
          if (word.length() <= lengthCutOff) {
            c.output(word);
          }
    }}));
}

Entrées secondaires et fenêtrage

Lorsque vous créez un affichage PCollectionView d'une classe PCollectionView fenêtrée, qui peut être infinie et ne peut donc pas être compressée en une valeur unique (ou en une classe de collection unique), PCollection représente une entité unique par fenêtre. En d'autres termes, l'affichage PCollectionView représente une valeur unique par fenêtre, une liste unique par fenêtre, etc.

Dataflow utilise la ou les fenêtres de l'élément d'entrée principale pour rechercher la fenêtre appropriée pour l'élément d'entrée secondaire. Dataflow projette la fenêtre de l'élément d'entrée principale dans l'ensemble de fenêtres d'entrée secondaire, puis utilise l'entrée secondaire à partir de la fenêtre obtenue. Si l'entrée principale et les entrées secondaires disposent de fenêtres identiques, la projection fournit la fenêtre exacte correspondante. Cependant, si les entrées disposent de fenêtres différentes, Dataflow utilise la projection pour choisir la fenêtre d'entrée secondaire la plus appropriée.

Java

Par exemple, si l'entrée principale est fenêtrée à l'aide de plages fixes d'une minute et que l'entrée secondaire est fenêtrée à l'aide de plages fixes d'une heure, Dataflow projette la fenêtre d'entrée principale sur l'ensemble des fenêtres d'entrée secondaire. Dataflow sélectionne ensuite la valeur d'entrée secondaire à partir de la fenêtre d'entrée secondaire appropriée définie sur une plage fixe d'une heure.

Si l'entrée secondaire comporte plusieurs déclenchements, Dataflow utilise la valeur du dernier déclenchement. Ceci est particulièrement utile si vous utilisez une entrée secondaire avec une seule fenêtre globale et spécifiez un déclencheur.

Sorties secondaires

Bien que l'opération ParDo génère toujours une classe PCollection de sortie principale (comme la valeur de renvoi de apply), vous pouvez également demander à ParDo de générer un nombre quelconque de classes PCollection de sortie supplémentaires. Si vous choisissez de générer plusieurs sorties, l'opération ParDo renvoie toutes les classes PCollection de sortie (y compris la sortie principale) regroupées. Par exemple, sous Java, les classes PCollection de sortie sont regroupées dans un élément PCollectionTuple sécurisé.

Tags pour les sorties secondaires

Java

Pour émettre des éléments sur une classe PCollection de sortie secondaire, vous devez créer un objet TupleTag afin d'identifier chaque collection générée par l'opération ParDo. Par exemple, si ParDo génère trois classes PCollection de sortie (la sortie principale et deux sorties secondaires), vous devez créer trois objets TupleTag associés.

L'exemple de code suivant montre comment créer des objets TupleTag pour une opération ParDo avec une sortie principale et deux sorties secondaires :

  // Input PCollection to our ParDo.
  PCollection<String> words = ...;

  // The ParDo will filter words whose length is below a cutoff and add them to
  // the main output PCollection<String>.
  // If a word is above the cutoff, the ParDo will add the word length to a side output
  // PCollection<Integer>.
  // If a word starts with the string "MARKER", the ParDo will add that word to a different
  // side output PCollection<String>.
  final int wordLengthCutOff = 10;

  // Create the TupleTags for the main and side outputs.
  // Main output.
  final TupleTag<String> wordsBelowCutOffTag =
      new TupleTag<String>(){};
  // Word lengths side output.
  final TupleTag<Integer> wordLengthsAboveCutOffTag =
      new TupleTag<Integer>(){};
  // "MARKER" words side output.
  final TupleTag<String> markedWordsTag =
      new TupleTag<String>(){};

Transmettre des tags de sortie à l'opération "ParDo"

Une fois que vous avez spécifié les objets TupleTag pour chacune de vos sorties ParDo, vous devez transmettre ces tags à l'opération ParDo en appelant .withOutputTags. Vous transmettez d'abord le tag pour la sortie principale, puis les tags pour toutes les sorties secondaires dans un objet TupleTagList.

Sur la base de l'exemple précédent, vous pouvez transmettre les trois objets TupleTag (un pour la sortie principale et deux pour les sorties secondaires) à l'opération ParDo comme suit :

  PCollectionTuple results =
      words.apply(
          ParDo
          // Specify the tag for the main output, wordsBelowCutoffTag.
          .withOutputTags(wordsBelowCutOffTag,
          // Specify the tags for the two side outputs as a TupleTagList.
                          TupleTagList.of(wordLengthsAboveCutOffTag)
                                      .and(markedWordsTag))
          .of(new DoFn<String, String>() {
            // DoFn continues here.
            ...
          }

Notez que toutes les sorties (y compris la classe PCollection de sortie principale) sont regroupées dans les classes PCollectionTuple renvoyées appelées results..

Émettre des sorties secondaires dans l'argument "DoFn"

Java

Au sein de l'argument DoFn de l'opération ParDo, vous pouvez émettre un élément vers une sortie secondaire en utilisant la méthode ProcessContext.sideOutput. Vous devez transmettre l'objet TupleTag approprié pour la collection de sortie secondaire cible lorsque vous appelez ProcessContext.sideOutput.

Sur la base de l'exemple précédent, vous pouvez émettre la sortie principale et des sorties secondaires dans l'argument DoFn comme suit :

  .of(new DoFn<String, String>() {
     public void processElement(ProcessContext c) {
       String word = c.element();
       if (word.length() <= wordLengthCutOff) {
         // Emit this short word to the main output.
         c.output(word);
       } else {
         // Emit this long word's length to a side output.
         c.sideOutput(wordLengthsAboveCutOffTag, word.length());
       }
       if (word.startsWith("MARKER")) {
         // Emit this word to a different side output.
         c.sideOutput(markedWordsTag, word);
       }
     }}));

Une fois l'opération ParDo exécutée, vous devez extraire les classes PCollection de sortie principale et de sortie secondaire obtenues à partir de l'objet PCollectionTuple renvoyé. Consultez la section relative aux objets PCollectionTuple pour obtenir des exemples montrant comment extraire des classes PCollection individuelles d'un tuple.

Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.