Exemple de pipeline WordCount

Si ce n'est pas déjà fait, veuillez suivre les étapes décrites dans le Guide de démarrage rapide avant de poursuivre.

Les exemples WordCount montrent comment configurer un pipeline de traitement capable de lire du texte, de segmenter les lignes de texte en mots individuels et de calculer la fréquence de chacun de ces mots. Les SDK Dataflow contiennent une série de quatre exemples de code WordCount, chacun plus détaillé que le précédent et s'appuyant les uns sur les autres. Le texte d'entrée pour tous les exemples est une collection des œuvres de Shakespeare.

Chaque exemple WordCount introduit des concepts différents du SDK Dataflow.

  • Minimal WordCount illustre les principes de base impliqués dans la construction d'un pipeline Dataflow.
  • WordCount présente certaines des bonnes pratiques les plus courantes en matière de création de pipelines faciles à réutiliser et à maintenir.
  • Debugging WordCount présente les pratiques de journalisation et de débogage.
  • Windowed WordCount montre comment utiliser le modèle de programmation de Dataflow pour gérer des ensembles de données limités et illimités.

Commencez par comprendre Minimal WordCount, qui est le plus simple de ces exemples. Une fois que vous êtes à l'aise avec les principes de base de la création d'un pipeline, poursuivez votre apprentissage avec les bonnes pratiques en matière d'écriture de programmes Dataflow illustrées dans l'exemple WordCount. Lisez ensuite l'exemple Debugging WordCount pour comprendre comment exploiter les pratiques courantes pour la journalisation et le débogage. Enfin, découvrez comment utiliser le même schéma de calcul pour des ensembles de données finis et infinis avec l'exemple Windowed WordCount.

MinimalWordCount

Minimal WordCount illustre un pipeline simple capable de lire un bloc de texte à partir d'un fichier hébergé dans Google Cloud Storage, d'appliquer des transformations pour segmenter et compter les mots, ainsi que d'écrire les données résultantes dans un fichier de sortie stocké dans un bucket Cloud Storage. Cet exemple code en dur les emplacements de ses fichiers d'entrée et de sortie et n'effectue aucune vérification d'erreur. Il sert uniquement à vous présenter les "très grandes lignes" de la création d'un pipeline Dataflow. Dans les prochains exemples, nous inclurons des paramètres pour les sources d'entrée et de sortie du pipeline et nous vous présenterons d'autres bonnes pratiques.

Java

Concepts clés
  1. Créer le pipeline
  2. Appliquer des transformations au pipeline
    • Lecture de l'entrée (dans cet exemple : lecture de fichiers texte)
    • Application de transformations ParDo
    • Application de transformations fournies par le SDK (dans cet exemple : Count)
    • Écriture de la sortie (dans cet exemple : écriture dans Google Cloud Storage)
  3. Exécuter le pipeline

Les sections suivantes expliquent en détail ces concepts et présente des extraits du code correspondant dans le pipeline Minimal WordCount.

Créer le pipeline

La première étape de la création d'un pipeline Cloud Dataflow consiste à créer un objet Pipeline Options. Cet objet nous permet de définir diverses options pour notre pipeline, telles que le gestionnaire de pipeline chargé de son exécution, l'ID de notre projet et l'emplacement de stockage intermédiaire utilisé par le pipeline pour ses fichiers de travail (utilisé pour rendre vos fichiers jar accessibles dans le Cloud). Dans cet exemple, nous définissons ces options en dur dans le programme, mais le plus souvent, on utilise des arguments de ligne de commande pour définir les options du pipeline.

Dans notre exemple, nous avons spécifié BlockingDataflowPipelineRunner en tant que gestionnaire PipelineRunner, afin que notre pipeline soit exécuté dans le cloud au moyen du service Google Cloud Dataflow. Il existe d'autres options que vous pouvez définir pour l'exécution de votre pipeline dans le cloud. Vous pouvez également omettre complètement cette option, auquel cas le gestionnaire par défaut exécutera votre pipeline en local. Ces différentes options sont illustrées dans les deux exemples WordCount suivants et sont décrites plus en détail dans la section Spécifier des paramètres d'exécution.

Java

DataflowPipelineOptions options = PipelineOptionsFactory.create()
    .as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("SET-YOUR-PROJECT-ID-HERE");
// The 'gs' URI means that this is a Google Cloud Storage path
options.setStagingLocation("gs://SET-YOUR-BUCKET-NAME-HERE");

L'étape suivante consiste à créer un objet Pipeline avec les options que nous venons de construire. L'objet Pipeline construit le graphe des transformations à exécuter qui est associé à ce pipeline spécifique.

Java

Pipeline p = Pipeline.create(options);

Consultez la section Pipelines pour une discussion détaillée sur l'objet pipeline et son fonctionnement.

Appliquer des transformations de pipeline

Le pipeline Minimal WordCount contient plusieurs transformations permettant de lire des données dans le pipeline, de les manipuler ou de les transformer, et d'écrire les résultats. Chaque transformation représente une opération dans le pipeline.

Chaque transformation absorbe une entrée sous quelque forme que ce soit (données ou autre) et produit des données de sortie. Les données d'entrée et de sortie sont représentées par la classe PCollection du SDK. PCollection est une classe spéciale fournie par le SDK Dataflow que vous pouvez utiliser pour représenter un ensemble de données de pratiquement n'importe quelle taille, y compris un ensemble de données de taille infinie.

La figure 1 illustre le flux de données dans le pipeline :

Le pipeline utilise une transformation TextIO.Read pour créer une PCollection à partir de données stockées dans un fichier de données d'entrée. La transformation CountWords produit une PCollection de décomptes de mots à partir de la PCollection de texte brut. TextIO.Write écrit le nombre de mots mis en forme dans un fichier de données en sortie.
Figure 1 : Flux des données dans le pipeline.

Le pipeline Minimal WordCount comporte cinq transformations :

  1. Une transformation de lecture de fichier texte, Read, est appliquée à l'objet Pipeline lui-même et génère une PCollection en sortie. Chaque élément de la PCollection de sortie représente une ligne de texte du fichier d'entrée.
  2. Java

    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
    
  3. Une transformation ParDo qui appelle une fonction DoFn (définie de manière intégrée comme une classe anonyme) sur chaque élément et segmente les lignes de texte en mots individuels. L'entrée de cette transformation est la PCollection de lignes de texte générée par la transformation précédente TextIO.Read. La transformation ParDo produit une nouvelle PCollection dans laquelle chaque élément représente un mot individuel dans le texte.
  4. Java

    Vous pouvez attribuer à votre transformation un nom qui apparaîtra dans l'interface de surveillance de Dataflow. Pour cela, utilisez, l'opération .named() comme nous l'avons fait dans cet exemple. Lorsque le service Dataflow exécute votre pipeline, l'interface de surveillance indique le moment où s'exécute chaque transformation ParDo.

      .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
         @Override
         public void processElement(ProcessContext c) {
           for (String word : c.element().split("[^a-zA-Z']+")) {
             if (!word.isEmpty()) {
               c.output(word);
             }
           }
         }
      }))
    
  5. La transformation Count fournie par le SDK est une transformation générique qui absorbe une PCollection de n'importe quel type et renvoie une PCollection de paires clé/valeur. Chaque clé représente un élément unique de la collection d'entrée, et chaque valeur représente le nombre d'occurrences de cette clé dans la collection d'entrée.

    Dans ce pipeline, l'entrée de Count est la PCollection de mots individuels générée par la transformation ParDo précédente. La sortie est une PCollection de paires clé/valeur, chaque clé représentant un mot unique dans le texte, la valeur associée étant le nombre d'occurrences de cette clé dans le texte.
  6. Java

      .apply(Count.<String>perElement())
    
  7. L'étape suivante est une transformation qui met en forme chacune des paires clé/valeur (mot unique et nombre d'occurrences de ce mot) en tant que chaîne imprimable adaptée à l'écriture dans un fichier de sortie.
  8. Java

    MapElements est une transformation composite de niveau supérieur qui encapsule une simple transformation ParDo. MapElements applique une fonction qui produit exactement un élément en sortie pour chaque élément de la PCollection d'entrée. Cette fonction MapElements appelle une fonction SimpleFunction (définie de manière intégrée comme une classe anonyme) qui assure la mise en forme. Cette fonction MapElements prend en entrée une PCollection de paires clés/valeurs générée par Count et produit en sortie une nouvelle PCollection de chaînes imprimables.

      .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
         @Override
         public String apply(KV<String, Long> element) {
           return element.getKey() + ": " + element.getValue();
         }
      }))
    
  9. Une opération d'écriture de fichier texte, Write. Cette transformation prend en entrée la PCollection finale de Strings mis en forme et écrit chaque élément dans un fichier texte en sortie. Chaque élément de la PCollection d'entrée représente une ligne de texte dans le fichier de sortie résultant.
  10. Java

      .apply(TextIO.Write.to("gs://my-bucket/counts.txt"));
    

    Notez que la transformation Write produit une valeur de résultat triviale de type PDone qui, dans ce cas, est ignorée.

Exécuter le pipeline

Exécutez le pipeline en appelant la méthode run, qui l'envoie pour exécution au gestionnaire que vous avez spécifié lors de la création du pipeline.

Java

p.run();

Exemple WordCount

Cet exemple WordCount présente quelques bonnes pratiques de programmation qui peuvent faciliter la lecture, l'écriture et la maintenance de votre pipeline. Bien que cela ne soit pas explicitement requis, ces éléments peuvent rendre l'exécution de votre pipeline plus flexible, vous aider à le tester et rendre son code réutilisable.

Cette section part du principe que vous maîtrisez bien les concepts de base de la construction d'un pipeline. Si vous estimez que vous n'en êtes pas encore à ce stade, lisez la section ci-dessus intitulée Minimal WordCount.

Java

Nouveaux concepts :
  1. Appliquer une fonction ParDo avec une DoFn explicite
  2. Créer des transformations composites
  3. Utiliser des PipelineOptions paramétrables

Les sections suivantes décrivent ces concepts clés en détail et décomposent le code du pipeline en différentes sections.

Spécifier des DoFns explicites

Lorsque vous utilisez des transformations ParDo, vous devez spécifier l'opération de traitement à appliquer à chaque élément de la PCollection d'entrée. Cette opération de traitement est une sous-classe de la classe DoFn du SDK. L'exemple de pipeline de la section précédente (Minimal WordCount) crée les sous-classes DoFn pour chaque ParDo intégrée sous forme de classe interne anonyme.

Toutefois, il est souvent plus judicieux de définir la DoFn au niveau global, ce qui facilite les tests unitaires et rend le code de la fonction ParDo plus lisible.

Comme indiqué dans l'exemple précédent (Minimal WordCount), lorsque vous exécutez votre pipeline, l'interface de surveillance de Dataflow indique le moment où s'exécute chaque transformation ParDo. Le service Dataflow génère automatiquement des noms pour les transformations ParDo à partir du nom que vous avez fourni pour la fonction DoFn. Par exemple, la fonction ParDo qui applique FormatAsTextFn() apparaît dans l'interface de surveillance en tant que ParDo(FormatAsText).

Java

Dans cet exemple, les DoFns sont définies en tant que classes statiques :

/** A DoFn that converts a Word and Count into a printable string. */
static class FormatAsTextFn extends DoFn<KV<String, Long>, String> {
  ...

  @Override
  public void processElement(ProcessContext c) {
    ...
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  // Our pipeline passes an instance of static FormatAsTextFn() to the ParDo transform.
  p.apply(...)
   .apply(...)
   .apply(ParDo.of(new FormatAsTextFn()))
   ...
}

Pour en savoir plus sur la création et la spécification de sous-classes DoFn pour vos transformations ParDo, consultez la section Traitement parallèle avec l'opération "ParDo".

Créer des transformations composites

Si une opération de traitement doit être constituée de plusieurs transformations ou étapes ParDo, vous pouvez la créer en tant que sous-classe de PTransform. Créer une sous-classe PTransform vous permet de créer des transformations complexes réutilisables, de rendre la structure de votre pipeline plus claire et plus modulaire, et de faciliter les tests unitaires.

Rendre explicite la structure logique de votre pipeline à l'aide de sous-classes PTransform peut également faciliter la surveillance de votre pipeline. Lorsque le service Dataflow crée la structure finale optimisée de votre pipeline, l'interface de surveillance Dataflow utilise les transformations que vous avez créées pour refléter plus précisément la structure de votre pipeline.

Java

Dans cet exemple, deux transformations sont encapsulées dans la sous-classe PTransform CountWords. CountWords contient la fonction ParDo qui exécute ExtractWordsFn et la transformation Count fournie par le SDK.

Lorsque CountWords est définie, nous spécifions son entrée et sa sortie ultimes. L'entrée est l'objet PCollection<String> pour l'opération d'extraction et la sortie est la PCollection<KV<String, Long>> générée par l'opération de comptage.

public static class CountWords extends PTransform<PCollection<String>,
    PCollection<KV<String, Long>>> {
  @Override
  public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

    return wordCounts;
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  p.apply(...)
   .apply(new CountWords())
   ...
}

Utiliser des PipelineOptions paramétrables

Dans l'exemple précédent, Minimal WordCount, nous avons défini diverses options d'exécution à la création de notre pipeline. Dans cet exemple, nous définissons nos propres options de configuration personnalisées par extension de PipelineOptions.

Vous pouvez ajouter vos propres arguments à traiter par l'analyseur de ligne de commande et spécifier des valeurs par défaut. Vous pouvez ensuite accéder aux valeurs des options dans le code de votre pipeline.

Dans l'exemple Minimal WordCount, nous avons codé en dur les options de pipeline. Cependant, la méthode la plus courante pour construire PipelineOptions consiste à analyser les arguments en ligne de commande.

Java

public static interface WordCountOptions extends PipelineOptions {
  @Description("Path of the file to read from")
  @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
  String getInputFile();
  void setInputFile(String value);
  ...
}

public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  ...
}

Exemple Debugging WordCount

L'exemple Debugging WordCount illustre certaines des bonnes pratiques pour instrumenter le code de votre pipeline. Vous pouvez exploiter l'interface de surveillance et les agrégateurs de Dataflow pour obtenir une visibilité supplémentaire sur votre pipeline au cours de son exécution.

Java

Vous pouvez également utiliser la fonction DataflowAssert du SDK pour tester la sortie de vos transformations à différentes étapes du pipeline.

Java

Nouveaux concepts :
  1. Afficher les journaux dans l'interface de surveillance de Dataflow
  2. Contrôler les niveaux de journalisation des nœuds de calcul Dataflow
  3. Créer des Aggregators
  4. Tester votre pipeline via DataflowAssert

Les sections suivantes décrivent ces concepts clés en détail et décomposent le code du pipeline en différentes sections.

Afficher les journaux dans l'interface de surveillance de Dataflow

La journalisation Google Cloud agrège les journaux de tous les nœuds de calcul responsables de votre tâche Dataflow dans un emplacement unique au sein de la console Google Cloud Platform. Vous pouvez utiliser l'interface de surveillance de Dataflow pour afficher les journaux de toutes les instances Compute Engine générées par Dataflow pour traiter votre tâche. Vous pouvez ajouter des instructions de journalisation dans les instances DoFn de votre pipeline, qui apparaîtront dans l'interface de surveillance lors de son exécution.

Java

L'enregistreur SLF4J ci-dessous utilise le nom de classe complet de FilterTextFn en tant que nom d'enregistreur. Toutes les entrées de journal émises par cet enregistreur seront référencées sous ce nom et seront visibles dans l'interface de surveillance Dataflow, du moment qu'elles correspondent aux paramètres de niveau de journalisation.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DebuggingWordCount {
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...
    private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
    ...

    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        // Log at the "DEBUG" level each element that we match. When executing this pipeline
        // using the Dataflow service, these log lines will appear in the Dataflow Monitoring UI
        // only if the log level is set to "DEBUG" or lower.
        LOG.debug("Matched: " + c.element().getKey());
        ...
      } else {
        // Log at the "TRACE" level each element that is not matched. Different log levels
        // can be used to control the verbosity of logging providing an effective mechanism
        // to filter less important information.
        LOG.trace("Did not match: " + c.element().getKey());
        ...
      }
    }
  }
}

Contrôler les niveaux de journalisation des nœuds de calcul Dataflow

Java

Les nœuds de calcul Dataflow qui exécutent le code utilisateur sont configurés pour se connecter à Cloud Logging avec un niveau de journalisation par défaut équivalent à INFO et supérieur. Vous pouvez remplacer les niveaux de journalisation pour des espaces de noms de journalisation spécifiques en renseignant :

--workerLogLevelOverrides={"Name1"="Level1","Name2"="Level2",...}

Par exemple, en spécifiant :

--workerLogLevelOverrides={"com.example":"DEBUG"}

Lorsque vous exécutez ce pipeline à l'aide du service Dataflow, l'interface de surveillance n'affiche que les entrées de journal de niveau DEBUG ou supérieur pour le package com.example, en plus des entrées de journal de niveau INFO ou supérieur qui y figurent par défaut.

En outre, vous pouvez remplacer la configuration par défaut de la journalisation des nœuds de calcul Dataflow en spécifiant :

--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>

Par exemple, en spécifiant :

--defaultWorkerLogLevel=DEBUG

lors de l'exécution de ce pipeline avec le service Dataflow, l'interface de surveillance affichera toutes les entrées de journal de niveau DEBUG ou supérieur. Notez que passer le niveau de journalisation par défaut des nœuds de calcul à TRACE ou DEBUG augmente considérablement la quantité d'informations journalisées.

Obtenez plus d'informations sur la journalisation dans Cloud Dataflow.

Créer des agrégateurs

Un agrégateur personnalisé peut suivre les valeurs de votre pipeline en cours d'exécution. Ces valeurs seront affichées dans l'interface de surveillance de Dataflow lorsque le pipeline est exécuté à l'aide de ce service.

Il se peut que les agrégateurs ne deviennent visibles qu'à partir du moment où le système commence à exécuter la transformation ParDo qui les a créés et/ou que leur valeur initiale est modifiée. Ils apparaissent alors dans l'interface de surveillance en bas du Résumé des tâches.

Les agrégateurs personnalisés ci-dessous suivent le nombre de mots ayant fait l'objet ou non d'une correspondance (matched/unmatched).

Java

public class DebuggingWordCount {
  /** A DoFn that filters for a specific key based upon a regular expression. */
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...

    private final Aggregator<Long, Long> matchedWords =
        createAggregator("matchedWords", new Sum.SumLongFn());
    private final Aggregator<Long, Long> unmatchedWords =
      createAggregator("umatchedWords", new Sum.SumLongFn());

    @Override
    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        ...
        matchedWords.addValue(1L);
        ...
      } else {
        ...
        unmatchedWords.addValue(1L);
      }
    }
  }
}

Agrégateurs dans les pipelines par lots et en flux continu

Les agrégateurs dans les pipelines par lots assurent la cohérence. Ils sont engagés une fois exactement pour les lots ayant été traités avec succès, et ne sont pas engagés pour les lots ayant échoué.

Dans les pipelines en flux continu, les agrégateurs fournissent une sémantique plus indulgente. La contribution des lots ayant été traités avec succès constitue un effort optimal et, pour les lots défaillants, elle peut être reflétée dans la valeur finale.

Tester votre pipeline via DataflowAssert

Java

DataflowAssert est un ensemble de transformations PTransforms pratiques dans le style des correspondances entre collections Hamcrest, que vous pouvez utiliser lorsque vous créez des tests au niveau du pipeline pour valider le contenu des PCollections. L'utilisation de DataflowAssert est optimale pour les tests unitaires sur de petits ensembles de données, mais nous le présentons ici comme un outil pédagogique.

Ci-dessous, nous vérifions que l'ensemble de mots filtrés correspond aux décomptes prévus. Notez que DataflowAssert ne fournit aucun résultat et que la réussite du pipeline implique que les attentes ont été satisfaites. Découvrez comment tester votre pipeline et consultez DebuggingWordCountTest pour obtenir un exemple de tests unitaires.

public static void main(String[] args) {
  ...
  List<KV<String, Long>> expectedResults = Arrays.asList(
        KV.of("Flourish", 3L),
        KV.of("stomach", 1L));
  DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults);
  ...
}

WindowedWordCount

Java

Cet exemple, WindowedWordCount, compte les mots dans un texte tout comme les exemples précédents, mais introduit plusieurs concepts avancés. L'entrée de WindowedWordCount peut être un ensemble de données fixe (comme dans les exemples précédents) ou un flux de données illimité.

Le SDK Dataflow est pratique car il vous permet de créer un unique pipeline capable de gérer à la fois des types d’entrée limités et illimités. Si l'entrée est illimitée, alors toutes les PCollections du pipeline seront également illimitées. Il en va de même pour les entrées limitées.

Avant de lire cette section, assurez-vous de bien connaître les principes de base de la construction d’un pipeline.

Nouveaux concepts :
  1. Lire des entrées limitées et illimitées
  2. Ajouter des horodatages aux données
  3. Utiliser le fenêtrage
  4. Écrire une sortie limitée et illimitée

Les sections suivantes décrivent ces concepts clés en détail et décomposent le code du pipeline en différentes sections.

Lire des entrées limitées et illimitées

L'entrée pour WindowedWordCount peut être limitée ou illimitée. Si votre entrée comporte un nombre fixe d'éléments, elle est considérée comme un ensemble de données "limité". Si votre entrée est continuellement mise à jour, elle est considérée comme "illimitée". Consultez la section PCollections limitées et illimitées pour en savoir plus sur les types d'entrée.

Dans cet exemple, vous pouvez choisir si l’entrée sera limitée ou non. Rappelez-vous que l'entrée utilisée pour tous les exemples jusqu'à présent est une collection de textes de Shakespeare, c'est-à-dire une entrée finie, limitée. Cependant, pour expliquer les nouveaux concepts de cet exemple, nous utiliserons ici comme entrée une répétition des textes de Shakespeare.

Dans cet exemple, si votre entrée est illimitée, elle sera lue dans un sujet Google Cloud Pub/Sub. La transformation Read qui est alors appliquée au pipeline est une transformation PubSubIO.Read. Sinon, l'entrée sera lue à partir de Google Cloud Storage.

public static void main(String[] args) throws IOException {
    ...
    PCollection<String> input;
    if (options.isUnbounded()) {
      LOG.info("Reading from PubSub.");
      // Read from the Pub/Sub topic. A topic will be created if it wasn't specified as an arg.
      input = pipeline.apply(PubsubIO.Read.topic(options.getPubsubTopic()));

  } else {
      // Else, this is a bounded pipeline. Read from the Google Cloud Storage file.
      input = pipeline.apply(TextIO.Read.from(options.getInputFile()))
      ...
    }
    ...
}

Ajouter des horodatages aux données

Chaque élément d'une PCollection comporte un horodatage associé. L'horodatage de chaque élément est assigné par la source qui crée la PCollection. Dans cet exemple, si vous choisissez pour votre pipeline une entrée illimitée, les horodatages proviendront de la source de données Pub/Sub. Si vous choisissez une entrée limitée, la méthode DoFn nommée AddTimestampsFn (invoquée par ParDo) définira un horodatage pour chaque élément de la PCollection.

public static void main(String[] args) throws IOException {
  ...
  input = pipeline
    .apply(...)
    // Add an element timestamp, using an artificial time.
    .apply(ParDo.of(new AddTimestampFn()));
}

Vous trouverez ci-dessous le code de AddTimestampsFn, une DoFn invoquée par ParDo, qui définit l'élément de données de l'horodatage en fonction de l'élément lui-même. Par exemple, si les éléments sont des lignes de journal, ce ParDo peut analyser le temps écoulé dans la chaîne de journal et le définir comme horodatage de l'élément. Il n'y a pas d'horodatage inhérent aux travaux de Shakespeare, nous avons donc créé des horodatages aléatoires pour illustrer le concept. Chaque ligne du texte d'entrée se voit associer un horodatage aléatoire au cours d'une période de 2 heures.

static class AddTimestampFn extends DoFn<String, String> {
  private static final Duration RAND_RANGE = Duration.standardHours(2);
  private final Instant minTimestamp;

  AddTimestampFn() {
    this.minTimestamp = new Instant(System.currentTimeMillis());
  }

  @Override
  public void processElement(ProcessContext c) {
    // Generate a timestamp that falls somewhere in the past 2 hours.
    long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
        Instant randomTimestamp = minTimestamp.plus(randMillis);
    // Set the data element with that timestamp.
    c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
  }
}

Pour en savoir plus sur les horodatages, consultez le document Horodatage d'éléments de PCollection.

Fenêtrage

Le SDK Dataflow utilise un concept appelé fenêtrage pour subdiviser une PCollection en fonction de l'horodatage de ses éléments individuels. Les transformations Dataflow qui agrègent plusieurs éléments, traitent chaque PCollection comme une succession de plusieurs fenêtres finies, même si la collection entière elle-même peut avoir une taille infinie (illimitée).

L'exemple WindowingWordCount applique un fenêtrage à durée fixe, dans lequel chaque fenêtre représente un intervalle de temps fixe. La durée de fenêtre fixe pour cet exemple est par défaut de 1 minute (vous pouvez modifier cette valeur grâce à une option de ligne de commande). Le pipeline applique ensuite la transformation CountWords.

PCollection<KV<String, Long>> wordCounts = input
  .apply(Window.<String>into(
    FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
   .apply(new WordCount.CountWords());

Écrire une sortie limitée et illimitée

Puisque notre entrée peut être illimitée ou non, il en va de même pour notre PCollection de sortie. Nous devons donc nous assurer de choisir un récepteur approprié. Certains récepteurs de sortie ne prennent en charge qu'un seul type de sortie, limitée ou illimitée. Par exemple, un fichier texte est un récepteur ne pouvant recevoir que des données limitées. La source de sortie BigQuery est une source prenant en charge aussi bien les entrées limitées qu'illimitées.

Dans cet exemple, nous diffusons les résultats vers une table BigQuery. Les résultats sont ensuite mis en forme pour une table BigQuery, puis écrits dans BigQuery à l'aide de BigQueryIO.Write.

wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
  .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));
Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.