Créer des transformations composites

Les transformations du SDK Dataflow peuvent disposer d'une structure imbriquée permettant de composer une transformation complexe à partir de plusieurs transformations simples. Dans ce cas, la transformation peut comprendre plusieurs autres opérations de transformation (et peut donc exécuter plus d'une opération ParDo, Combine ou GroupByKey). Ces transformations sont appelées transformations composites. Elles s'avèrent utiles lorsque vous souhaitez créer une transformation réutilisable composée de plusieurs étapes.

L'imbrication de plusieurs transformations au sein d'une même transformation composite peut offrir de nombreux avantages à votre pipeline Dataflow :

  • Les transformations composites peuvent rendre votre code plus modulaire et plus facile à comprendre, favorisant ainsi sa réutilisation.
  • L'interface de surveillance de Dataflow peut se référer aux transformations composites par leur nom, ce qui vous aide à suivre et à comprendre la progression de votre pipeline au moment de son exécution.

Exemple de transformation composite

La plupart des transformations prédéfinies du SDK Dataflow sont des transformations composites.

La transformation CountWords de l'exemple de programme WordCount du SDK Dataflow constitue un exemple de transformation composite. CountWords est une sous-classe PTransform composée de plusieurs transformations imbriquées.

Dans sa méthode apply, la transformation CountWords applique les opérations de transformation suivantes :

  1. Elle applique une opération ParDo sur la PCollection d'entrée contenant les lignes de texte, produisant ainsi une PCollection de sortie constituée de mots individuels.
  2. Elle applique l'opération Count* de transformation de la bibliothèque du SDK Dataflow sur la PCollection de mots, produisant ainsi une PCollection de paires clé/valeur. Chaque clé représente un mot du texte, tandis que chaque valeur correspond au nombre d'occurrences de ce mot présentes dans les données d'origine.
  3. Elle applique une opération ParDo finale à la PCollection de paires clé/valeur, produisant ainsi une PCollection de chaînes imprimables adaptées à l'écriture dans un fichier de sortie.

La figure 1 décrit la façon dont le pipeline contenant CountWords est structuré à l'aide de transformations composites.

La transformation CountWords est une transformation composite qui utilise deux opérations ParDo et exploite la transformation fournie par le SDK, appelée Count.
Figure 1 : Détail de la transformation composite CountWords

Java

Les paramètres et la valeur de renvoi de votre transformation composite doivent correspondre au type d'entrée initial et au type de renvoi final dans l'ensemble de la transformation. Par exemple, CountWords.apply accepte une PCollection<String> d'entrée et renvoie une PCollection<String>, même si les données intermédiaires de la transformation changent à plusieurs reprises :

  static class CountWords
      extends PTransform<PCollection<String>, PCollection<String>> {
    @Override
    public PCollection<String> apply(PCollection<String> lines) {
      PCollection<String> words = lines.apply(
          ParDo
          .named("ExtractWords")
          .of(new ExtractWordsFn()));

      PCollection<KV<String, Integer>> wordCounts =
          words.apply(Count.<String>perElement());

      PCollection<String> results = wordCounts.apply(
          ParDo
          .named("FormatCounts")
          .of(new DoFn<KV<String, Integer>, String>() {
              @Override
              public void processElement(ProcessContext c) {
                c.output(c.element().getKey() + ": " + c.element().getValue());
              }
            }));

      return results;
    }
  }

Créer une transformation composite

Vous pouvez créer votre propre transformation composite en créant une sous-classe de la classe Ptransform du SDK Dataflow, puis en remplaçant la méthode "apply" afin de spécifier la logique de traitement réelle. Vous pouvez ensuite utiliser cette transformation de la même manière qu'une transformation intégrée au SDK.

Java

Pour les paramètres de type de la classe PTransform, vous transmettez les types de PCollection que votre transformation exploite en tant qu'entrée et produit en tant que sortie. Pour utiliser plusieurs PCollections en tant qu'entrée ou produire plusieurs PCollections en tant que sortie, utilisez l'un des types à collections multiples pour le paramètre de type approprié.

L'exemple de code suivant vous montre comment déclarer une PTransform qui accepte une PCollection de chaînes String en tant qu'entrée et une PCollection d'entiers Integer :

  static class ComputeWordLengths
    extends PTransform<PCollection<String>, PCollection<Integer>> {
    ...
  }

Remplacer la méthode "Apply"

Dans votre sous-classe PTransform, vous devez remplacer la méthode apply. La méthode apply vous permet d'ajouter la logique de traitement pour PTransform. Votre remplacement de la méthode apply doit accepter le type approprié de PCollection d'entrée en tant que paramètre et spécifier la PCollection de sortie en tant que valeur de renvoi.

Java

L'exemple de code suivant vous montre comment remplacer la méthode apply pour la classe ComputeWordLengths déclarée dans l'exemple précédent :

  static class ComputeWordLengths
      extends PTransform<PCollection<String>, PCollection<Integer>> {
    @Override
    public PCollection<Integer> apply(PCollection<String>) {
      ...
      // transform logic goes here
      ...
    }

Tant que vous remplacez la méthode apply dans votre sous-classe PTransform afin d'accepter la ou les PCollection(s) d'entrée appropriées et renvoyer la ou les PCollection(s) de sortie correspondantes, vous pouvez inclure autant de transformations que vous le souhaitez. Il peut s'agir de transformations de base, de transformations composites ou bien des transformations incluses dans les bibliothèques du SDK Dataflow.

Java

La méthode apply de PTransform n'est pas conçue pour être appelée directement par l'utilisateur d'une transformation. Vous devez à la place appeler la méthode apply sur la PCollection elle-même en spécifiant la transformation en tant qu'argument. Cela permet d'imbriquer les transformations dans la structure de votre pipeline.

Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.