Exemple de pipeline WordCount

Si ce n'est pas déjà fait, veuillez suivre les étapes décrites dans le guide de démarrage rapide avant de poursuivre.

Les exemples WordCount montrent comment configurer un pipeline de traitement capable de lire du texte, de segmenter les lignes de texte en mots individuels et de calculer la fréquence de chacun de ces mots. Les SDK Dataflow contiennent une série de quatre exemples de code WordCount, chacun plus détaillé que le précédent et s'appuyant les uns sur les autres. Le texte d'entrée pour tous les exemples est une collection des œuvres de Shakespeare.

Chaque exemple WordCount introduit des concepts différents du SDK Dataflow.

  • Minimal WordCount illustre les principes de base impliqués dans la construction d'un pipeline Dataflow.
  • WordCount présente certaines des bonnes pratiques les plus courantes pour la création de pipelines faciles à réutiliser et à maintenir.
  • Debugging WordCount présente les pratiques de journalisation et de débogage.
  • Windowed WordCount montre comment utiliser le modèle de programmation de Dataflow pour gérer des ensembles de données limités et illimités.

Commencez par comprendre Minimal WordCount, qui est le plus simple de ces exemples. Une fois que vous êtes à l'aise avec les principes de base de la création d'un pipeline, poursuivez votre apprentissage avec les bonnes pratiques d'écriture de programmes Dataflow illustrées dans l'exemple WordCount. Lisez ensuite l'exemple Debugging WordCount pour comprendre comment exploiter les pratiques courantes pour la journalisation et le débogage. Enfin, découvrez comment utiliser le même schéma de calcul pour des ensembles de données finis et infinis avec l'exemple Windowed WordCount.

Minimal WordCount

Minimal WordCount illustre un pipeline simple capable de lire un bloc de texte à partir d'un fichier hébergé dans Google Cloud Storage, d'appliquer des transformations pour segmenter et compter les mots, ainsi que d'écrire les données résultantes dans un fichier de sortie stocké dans un bucket Cloud Storage. Cet exemple code en dur les emplacements de ses fichiers d'entrée et de sortie et n'effectue aucune vérification d'erreur. Il sert uniquement à vous présenter les "très grandes lignes" de la création d'un pipeline Dataflow. Dans les prochains exemples, nous inclurons des paramètres pour les sources d'entrée et de sortie du pipeline et nous vous présenterons d'autres bonnes pratiques.

Java

Concepts clés
  1. Créer le pipeline
  2. Appliquer des transformations au pipeline
    • Lecture de l'entrée (dans cet exemple : lecture de fichiers texte)
    • Application des transformations ParDo
    • Application des transformations fournies par le SDK (dans cet exemple : Count)
    • Écriture de la sortie (dans cet exemple : écriture dans Google Cloud Storage)
  3. Exécuter le pipeline

Les sections suivantes expliquent en détail ces concepts et présentent des extraits du code correspondant dans le pipeline Minimal WordCount.

Créer le pipeline

La première étape de la création d'un pipeline Cloud Dataflow consiste à créer un objet Pipeline Options. Cet objet nous permet de définir diverses options pour notre pipeline, telles que le gestionnaire de pipeline chargé de son exécution, l'ID de notre projet et l'emplacement de stockage intermédiaire utilisé par le pipeline pour ses fichiers de travail (utilisé pour rendre vos fichiers jar accessibles dans le Cloud). Dans cet exemple, nous définissons ces options de manière automatisée, mais le plus souvent, les options du pipeline sont définies à l'aide d'arguments de ligne de commande.

Dans notre exemple, nous avons spécifié BlockingDataflowPipelineRunner en tant que PipelineRunner, afin que notre pipeline soit exécuté dans le cloud au moyen du service Google Cloud Dataflow. Il existe d'autres options que vous pouvez définir pour l'exécution de votre pipeline dans le cloud. Vous pouvez également omettre complètement cette option, auquel cas le gestionnaire par défaut exécutera votre pipeline en local. Ces différentes options sont illustrées dans les deux exemples WordCount suivants et sont décrites plus en détail sur la page Spécifier les paramètres d'exécution.

Java

DataflowPipelineOptions options = PipelineOptionsFactory.create()
    .as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("SET-YOUR-PROJECT-ID-HERE");
// The 'gs' URI means that this is a Google Cloud Storage path
options.setStagingLocation("gs://SET-YOUR-BUCKET-NAME-HERE");

L'étape suivante consiste à créer un objet Pipeline avec les options que nous venons de créer. L'objet Pipeline construit le graphe des transformations à exécuter, associées à ce pipeline spécifique.

Java

Pipeline p = Pipeline.create(options);

Consultez la section Pipelines pour une discussion détaillée sur l'objet pipeline et son fonctionnement.

Appliquer des transformations de pipeline

Le pipeline Minimal WordCount contient plusieurs transformations permettant de lire des données dans le pipeline, de les manipuler ou de les transformer, et d'écrire les résultats. Chaque transformation représente une opération dans le pipeline.

Chaque transformation absorbe une entrée sous quelque forme que ce soit (données ou autre) et produit des données de sortie. Les données d'entrée et de sortie sont représentées par la classe PCollection du SDK. PCollection est une classe spéciale fournie par le SDK Dataflow que vous pouvez utiliser pour représenter un ensemble de données de pratiquement n'importe quelle taille, y compris un ensemble de données de taille infinie.

La figure 1 illustre le flux de données dans le pipeline :

Le pipeline utilise une transformation TextIO.Read pour créer une PCollection à partir de données stockées dans un fichier de données d'entrée. La transformation CountWords produit une PCollection de décomptes de mots à partir de la PCollection de texte brut. TextIO.Write écrit les décomptes de mots mis en forme dans un fichier de données de sortie.
Figure 1 : Flux des données dans le pipeline.

Le pipeline Minimal WordCount comporte cinq transformations :

  1. Une transformation Read de fichier texte est appliquée à l'objet Pipeline lui-même et génère une PCollection de sortie. Chaque élément de la PCollection de sortie représente une ligne de texte du fichier d'entrée.
  2. Java

    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
    
  3. Une transformation ParDo qui appelle une classe DoFn (définie de manière intégrée en tant que classe anonyme) sur chaque élément, qui segmente les lignes de texte en mots individuels. L'entrée de cette transformation est la PCollection des lignes de texte générées par la transformation TextIO.Read précédente. La transformation ParDo génère une nouvelle PCollection, où chaque élément représente un mot individuel dans le texte.
  4. Java

    Vous pouvez attribuer à votre transformation un nom qui apparaîtra dans l'interface de surveillance Dataflow. Pour ce faire, utilisez l'opération .named() comme nous l'avons fait dans cet exemple. Lorsque le service Dataflow exécute votre pipeline, l'interface de surveillance indique le moment où chaque transformation ParDo est exécutée.

      .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
         @Override
         public void processElement(ProcessContext c) {
           for (String word : c.element().split("[^a-zA-Z']+")) {
             if (!word.isEmpty()) {
               c.output(word);
             }
           }
         }
      }))
    
  5. La transformation Count fournie par le SDK est une transformation générique qui absorbe une PCollection de n'importe quel type et renvoie une PCollection de paires clé/valeur. Chaque clé représente un élément unique de la collection d'entrée, et chaque valeur représente le nombre d'occurrences de cette clé dans la collection d'entrée.

    Dans ce pipeline, l'entrée de la transformation Count est la PCollection des mots individuels générés par la transformation ParDo précédente, et la sortie est une PCollection de paires clé/valeur, où chaque clé représente un mot unique dans le texte et la valeur associée est le nombre d'occurrences de chaque mot.
  6. Java

      .apply(Count.<String>perElement())
    
  7. L'étape suivante est une transformation qui met en forme chacune des paires clé/valeur (mot unique et nombre d'occurrences de ce mot) en tant que chaîne imprimable adaptée à l'écriture dans un fichier de sortie.
  8. Java

    MapElements est une transformation composite de niveau supérieur qui encapsule une simple transformation ParDo. MapElements applique une fonction qui produit exactement un élément de sortie pour chaque élément de la PCollection d'entrée. Cette transformation MapElements appelle une fonction SimpleFunction (définie de manière intégrée en tant que classe anonyme) qui assure la mise en forme. Cette transformation MapElements prend en entrée une PCollection de paires clé/valeur générées par Count et produit une nouvelle PCollection de chaînes imprimables.

      .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
         @Override
         public String apply(KV<String, Long> element) {
           return element.getKey() + ": " + element.getValue();
         }
      }))
    
  9. Une opération d'écriture de fichier texte, Write. Cette transformation prend en entrée la PCollection finale d'éléments String mis en forme et écrit chaque élément dans un fichier texte de sortie. Chaque élément de la PCollection d'entrée représente une ligne de texte dans le fichier de sortie obtenu.
  10. Java

      .apply(TextIO.Write.to("gs://my-bucket/counts.txt"));
    

    Notez que la transformation Write produit une valeur de résultat triviale de type PDone qui, dans ce cas, est ignorée.

Exécuter le pipeline

Exécutez le pipeline en appelant la méthode run, qui l'envoie pour exécution au gestionnaire que vous avez spécifié lors de la création du pipeline.

Java

p.run();

Exemple WordCount

Cet exemple WordCount présente quelques bonnes pratiques de programmation qui peuvent faciliter la lecture, l'écriture et la maintenance de votre pipeline. Bien que cela ne soit pas explicitement requis, ces éléments peuvent rendre l'exécution de votre pipeline plus flexible, vous aider à le tester et rendre son code réutilisable.

Cette section part du principe que vous maîtrisez bien les concepts de base de la construction d'un pipeline. Si vous estimez que vous n'en êtes pas encore à ce stade, lisez la section ci-dessus intitulée Minimal WordCount.

Java

Nouveaux concepts :
  1. Appliquer ParDo avec une classe DoFn explicite
  2. Créer des transformations composites
  3. Utiliser des PipelineOptions paramétrables

Les sections suivantes décrivent ces concepts clés en détail et décomposent le code du pipeline en différentes sections.

Spécifier des DoFns explicites

Lorsque vous utilisez des transformations ParDo, vous devez spécifier l'opération de traitement à appliquer à chaque élément de la PCollection d'entrée. Cette opération de traitement est une sous-classe de la classe DoFn du SDK. L'exemple de pipeline de la section précédente (Minimal WordCount) crée les sous-classes DoFn pour chaque transformation ParDo intégrée, sous forme d'instance de classe interne anonyme.

Toutefois, il est souvent judicieux de définir la classe DoFn au niveau global, ce qui facilite les tests unitaires et peut rendre le code ParDo plus lisible.

Comme indiqué dans l'exemple précédent (Minimal WordCount), lorsque vous exécutez votre pipeline, l'interface de surveillance Dataflow indique à quel moment chaque transformation ParDo est exécutée. Le service Dataflow génère automatiquement des noms pour les transformations ParDo à partir du nom de la classe DoFn que vous transmettez. Par exemple, la transformation ParDo qui applique l'opération FormatAsTextFn() apparaît dans l'interface de surveillance sous la forme ParDo(FormatAsText).

Java

Dans cet exemple, les classes DoFn sont définies en tant que classes statiques :

/** A DoFn that converts a Word and Count into a printable string. */
static class FormatAsTextFn extends DoFn<KV<String, Long>, String> {
  ...

  @Override
  public void processElement(ProcessContext c) {
    ...
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  // Our pipeline passes an instance of static FormatAsTextFn() to the ParDo transform.
  p.apply(...)
   .apply(...)
   .apply(ParDo.of(new FormatAsTextFn()))
   ...
}

Pour en savoir plus sur la création et la spécification de sous-classes DoFn pour vos transformations ParDo, consultez la page Traitement parallèle avec l'opération "ParDo".

Créer des transformations composites

Si une opération de traitement doit être constituée de plusieurs transformations ou étapes ParDo, vous pouvez la créer en tant que sous-classe de PTransform. La création d'une sous-classe PTransform vous permet de créer des transformations complexes réutilisables, de rendre la structure de votre pipeline plus claire et plus modulaire, et de faciliter les tests unitaires.

Rendre la structure logique de votre pipeline explicite à l'aide des sous-classes PTransform peut également vous aider à surveiller votre pipeline. Lorsque le service Dataflow crée la structure finale optimisée de votre pipeline, l'interface de surveillance Dataflow utilise les transformations que vous avez créées pour refléter plus précisément la structure de votre pipeline.

Java

Dans cet exemple, deux transformations sont encapsulées en tant que sous-classe PTransform CountWords. CountWords contient la transformation ParDo qui exécute ExtractWordsFn et la transformation Count fournie par le SDK.

Lorsque la sous-classe CountWords est définie, nous spécifions ses entrées et sorties ultimes. L'entrée est la PCollection<String> pour l'opération d'extraction, et la sortie est la PCollection<KV<String, Long>> générée par l'opération de décompte.

public static class CountWords extends PTransform<PCollection<String>,
    PCollection<KV<String, Long>>> {
  @Override
  public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

    return wordCounts;
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  p.apply(...)
   .apply(new CountWords())
   ...
}

Utiliser des PipelineOptions paramétrables

Dans l'exemple précédent, Minimal WordCount, nous avons défini diverses options d'exécution à la création de notre pipeline. Dans cet exemple, nous définissons nos propres options de configuration personnalisées en étendant des PipelineOptions.

Vous pouvez ajouter vos propres arguments à traiter par l'analyseur de ligne de commande et spécifier des valeurs par défaut. Vous pouvez ensuite accéder aux valeurs des options dans le code de votre pipeline.

Dans l'exemple Minimal WordCount, nous avons codé en dur les options de pipeline. Cependant, la méthode la plus courante pour construire des PipelineOptions consiste à analyser les arguments de ligne de commande.

Java

public static interface WordCountOptions extends PipelineOptions {
  @Description("Path of the file to read from")
  @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
  String getInputFile();
  void setInputFile(String value);
  ...
}

public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  ...
}

Exemple Debugging WordCount

L'exemple Debugging WordCount illustre certaines des bonnes pratiques pour instrumenter le code de votre pipeline. Vous pouvez exploiter l'interface de surveillance et les agrégateurs de Dataflow pour bénéficier d'une visibilité accrue sur votre pipeline au cours de son exécution.

Java

Vous pouvez également utiliser la fonction DataflowAssert du SDK pour tester la sortie de vos transformations à différentes étapes du pipeline.

Java

Nouveaux concepts :
  1. Afficher les journaux dans l'interface de surveillance de Dataflow
  2. Contrôler les niveaux de journalisation des nœuds de calcul Dataflow
  3. Créer des Aggregators
  4. Tester votre pipeline via DataflowAssert

Les sections suivantes décrivent ces concepts clés en détail et décomposent le code du pipeline en différentes sections.

Afficher les journaux dans l'interface de surveillance de Dataflow

Google Cloud Logging agrège les journaux de tous les nœuds de calcul de votre tâche Dataflow dans un seul emplacement dans Google Cloud Console. Vous pouvez utiliser l'interface de surveillance Dataflow pour afficher les journaux de toutes les instances Compute Engine générées par Dataflow pour traiter votre tâche. Vous pouvez ajouter des instructions de journalisation dans les instances DoFn de votre pipeline, qui apparaîtront dans l'interface de surveillance lors de son exécution.

Java

L'enregistreur SLF4J suivant utilise le nom de classe complet de FilterTextFn en tant que nom d'enregistreur. Toutes les entrées de journal émises par cet enregistreur seront référencées sous ce nom et seront visibles dans l'interface de surveillance Dataflow, du moment qu'elles correspondent aux paramètres de niveau de journalisation.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DebuggingWordCount {
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...
    private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
    ...

    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        // Log at the "DEBUG" level each element that we match. When executing this pipeline
        // using the Dataflow service, these log lines will appear in the Dataflow Monitoring UI
        // only if the log level is set to "DEBUG" or lower.
        LOG.debug("Matched: " + c.element().getKey());
        ...
      } else {
        // Log at the "TRACE" level each element that is not matched. Different log levels
        // can be used to control the verbosity of logging providing an effective mechanism
        // to filter less important information.
        LOG.trace("Did not match: " + c.element().getKey());
        ...
      }
    }
  }
}

Contrôler les niveaux de journalisation des nœuds de calcul Dataflow

Java

Les nœuds de calcul Dataflow qui exécutent le code utilisateur sont configurés pour se connecter à Cloud Logging avec un niveau de journalisation par défaut équivalent à INFO et supérieur. Vous pouvez remplacer les niveaux de journalisation pour des espaces de noms de journalisation spécifiques en renseignant :

--workerLogLevelOverrides={"Name1"="Level1","Name2"="Level2",...}

Par exemple, en spécifiant :

--workerLogLevelOverrides={"com.example":"DEBUG"}

lors de l'exécution de ce pipeline à l'aide du service Dataflow, l'interface de surveillance ne contiendra que des journaux de niveau DEBUG ou supérieur pour le package com.example, en plus des journaux de niveau INFO ou supérieur par défaut.

En outre, vous pouvez remplacer la configuration par défaut de la journalisation des nœuds de calcul Dataflow en spécifiant :

--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>

Par exemple, en spécifiant :

--defaultWorkerLogLevel=DEBUG

lors de l'exécution de ce pipeline avec le service Dataflow, l'interface de surveillance contiendra tous les journaux de niveau DEBUG ou supérieur. Notez que faire passer le niveau de journalisation par défaut des nœuds de calcul à TRACE ou DEBUG augmente considérablement la quantité d'informations journalisées.

Obtenez plus d'informations sur la journalisation dans Cloud Dataflow.

Créer des agrégateurs

Un agrégateur personnalisé peut suivre les valeurs de votre pipeline en cours d'exécution. Ces valeurs seront affichées dans l'interface de surveillance de Dataflow lorsque le pipeline est exécuté à l'aide de ce service.

Il se peut que les agrégateurs ne deviennent visibles qu'à partir du moment où le système commence à exécuter la transformation ParDo qui les a créés et/ou que leur valeur initiale est modifiée. Ils apparaissent alors dans l'interface de surveillance en bas du résumé des tâches.

Les agrégateurs personnalisés ci-dessous suivent le nombre de mots ayant fait l'objet ou non d'une correspondance (matched/unmatched).

Java

public class DebuggingWordCount {
  /** A DoFn that filters for a specific key based upon a regular expression. */
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...

    private final Aggregator<Long, Long> matchedWords =
        createAggregator("matchedWords", new Sum.SumLongFn());
    private final Aggregator<Long, Long> unmatchedWords =
      createAggregator("umatchedWords", new Sum.SumLongFn());

    @Override
    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        ...
        matchedWords.addValue(1L);
        ...
      } else {
        ...
        unmatchedWords.addValue(1L);
      }
    }
  }
}

Agrégateurs dans les pipelines par lot et en flux continu

Les agrégateurs dans les pipelines par lot assurent la cohérence. Ils sont engagés une fois exactement pour les lots ayant été traités avec succès, et ne sont pas engagés pour les lots ayant échoué.

Dans les pipelines en flux continu, les agrégateurs fournissent une sémantique plus indulgente. La contribution des lots ayant abouti constitue un effort optimal et, pour les lots défaillants, elle peut être reflétée dans la valeur finale.

Tester votre pipeline via DataflowAssert

Java

DataflowAssert est un ensemble de transformations PTransforms pratiques dans le style des correspondances entre collections Hamcrest, que vous pouvez utiliser lorsque vous créez des tests au niveau du pipeline pour valider le contenu des PCollections. L'utilisation de DataflowAssert est optimale pour les tests unitaires sur de petits ensembles de données, mais nous le présentons ici comme un outil pédagogique.

Ci-dessous, nous vérifions que l'ensemble de mots filtrés correspond aux décomptes prévus. Notez que DataflowAssert ne fournit aucun résultat et que la réussite du pipeline implique que les attentes ont été satisfaites. Découvrez comment tester votre pipeline et consultez DebuggingWordCountTest pour obtenir un exemple de tests unitaires.

public static void main(String[] args) {
  ...
  List<KV<String, Long>> expectedResults = Arrays.asList(
        KV.of("Flourish", 3L),
        KV.of("stomach", 1L));
  DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults);
  ...
}

Windowed WordCount

Java

Cet exemple, WindowedWordCount, compte les mots dans un texte tout comme les exemples précédents, mais il introduit plusieurs concepts avancés. L'entrée de WindowedWordCount peut être un ensemble de données fixe (comme dans les exemples précédents) ou un flux de données illimité.

Le SDK Dataflow est pratique car il vous permet de créer un unique pipeline capable de gérer à la fois des types d'entrée limités et illimités. Si l'entrée est illimitée, toutes les PCollections du pipeline le seront également. Il en va de même pour les entrées limitées.

Avant de lire cette section, assurez-vous de bien connaître les principes de base de la construction d'un pipeline.

Nouveaux concepts :
  1. Lire des entrées limitées et illimitées
  2. Ajouter des horodatages aux données
  3. Utiliser le fenêtrage
  4. Écrire une sortie limitée et illimitée

Les sections suivantes décrivent ces concepts clés en détail et décomposent le code du pipeline en différentes sections.

Lire des entrées limitées et illimitées

L'entrée pour WindowedWordCount peut être limitée ou illimitée. Si votre entrée comporte un nombre fixe d'éléments, elle est considérée comme un ensemble de données "limité". Si votre entrée se met continuellement à jour, elle est considérée comme "illimitée". Consultez la section PCollections limitées et illimitées pour en savoir plus sur les types d'entrée.

Dans cet exemple, vous pouvez choisir si l'entrée sera limitée ou non. Rappelez-vous que l'entrée utilisée pour tous les exemples jusqu'à présent est une collection de textes de Shakespeare, c'est-à-dire une entrée finie, limitée. Cependant, pour expliquer les nouveaux concepts de cet exemple, nous utiliserons ici comme entrée une répétition des textes de Shakespeare.

Dans cet exemple, si votre entrée est illimitée, elle sera lue dans un sujet Google Cloud Pub/Sub ; dans ce cas, la transformation Read appliquée au pipeline est une transformation PubSubIO.Read. Sinon, l'entrée sera lue à partir de Google Cloud Storage.

public static void main(String[] args) throws IOException {
    ...
    PCollection<String> input;
    if (options.isUnbounded()) {
      LOG.info("Reading from PubSub.");
      // Read from the Pub/Sub topic. A topic will be created if it wasn't specified as an arg.
      input = pipeline.apply(PubsubIO.Read.topic(options.getPubsubTopic()));

  } else {
      // Else, this is a bounded pipeline. Read from the Google Cloud Storage file.
      input = pipeline.apply(TextIO.Read.from(options.getInputFile()))
      ...
    }
    ...
}

Ajouter des horodatages aux données

Chaque élément d'une PCollection est associé à un horodatage. L'horodatage de chaque élément est attribué par la source qui crée la PCollection. Dans cet exemple, si vous choisissez pour votre pipeline une entrée illimitée, les horodatages proviendront de la source de données Pub/Sub. Si vous choisissez une entrée limitée, la méthode DoFn nommée AddTimestampsFn (appelée par ParDo) définira un horodatage pour chaque élément dans la PCollection.

public static void main(String[] args) throws IOException {
  ...
  input = pipeline
    .apply(...)
    // Add an element timestamp, using an artificial time.
    .apply(ParDo.of(new AddTimestampFn()));
}

Vous trouverez ci-dessous le code pour AddTimestampsFn, une classe DoFn appelée par la transformation ParDo, qui définit l'élément de données de l'horodatage en fonction de l'élément lui-même. Par exemple, si les éléments sont des lignes de journal, cette transformation ParDo peut analyser le délai de la chaîne de journal et le définir comme horodatage de l'élément. Il n'y a pas d'horodatage inhérent aux travaux de Shakespeare, nous avons donc créé des horodatages aléatoires pour illustrer le concept. Chaque ligne du texte d'entrée se voit associer un horodatage aléatoire au cours d'une période de 2 heures.

static class AddTimestampFn extends DoFn<String, String> {
  private static final Duration RAND_RANGE = Duration.standardHours(2);
  private final Instant minTimestamp;

  AddTimestampFn() {
    this.minTimestamp = new Instant(System.currentTimeMillis());
  }

  @Override
  public void processElement(ProcessContext c) {
    // Generate a timestamp that falls somewhere in the past 2 hours.
    long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
        Instant randomTimestamp = minTimestamp.plus(randMillis);
    // Set the data element with that timestamp.
    c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
  }
}

Pour en savoir plus sur les horodatages, consultez le document Horodatage d'éléments de PCollection.

Utiliser le fenêtrage

Le SDK Dataflow utilise un concept appelé fenêtrage pour subdiviser une PCollection en fonction des horodatages de ses éléments individuels. Les transformations Dataflow qui agrègent plusieurs éléments, traitent chaque PCollection comme une succession de plusieurs fenêtres finies, même si la collection entière elle-même peut avoir une taille infinie (illimitée).

L'exemple WindowingWordCount applique un fenêtrage à durée fixe, dans lequel chaque fenêtre représente un intervalle de temps fixe. La durée de fenêtre fixe pour cet exemple est par défaut de 1 minute (vous pouvez modifier cette valeur grâce à une option de ligne de commande). Le pipeline applique ensuite la transformation CountWords.

PCollection<KV<String, Long>> wordCounts = input
  .apply(Window.<String>into(
    FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
   .apply(new WordCount.CountWords());

Écrire une sortie limitée et illimitée

Puisque notre entrée peut être limitée ou non, il en va de même pour notre PCollection de sortie. Nous devons donc nous assurer de choisir un récepteur approprié. Certains récepteurs de sortie ne prennent en charge qu'un seul type de sortie, limitée ou illimitée. Par exemple, un fichier texte est un récepteur ne pouvant recevoir que des données limitées. La source de sortie BigQuery est une source prenant en charge aussi bien les entrées limitées qu'illimitées.

Dans cet exemple, nous diffusons les résultats vers une table BigQuery. Les résultats sont ensuite mis en forme pour une table BigQuery, puis écrits dans BigQuery avec BigQueryIO.Write.

wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
  .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));