Fenêtrage

Les SDK Dataflow utilisent un concept appelé fenêtrage pour subdiviser une classe PCollection suivant l'horodatage de ses éléments individuels. Les transformations Dataflow qui regroupent plusieurs éléments, tels que GroupByKey et Combine, fonctionnent de manière implicite sur chaque fenêtre. En d'autres termes, ces transformations traitent chaque classe PCollection comme une succession de plusieurs fenêtres finies, bien que la collection entière puisse avoir une taille illimitée ou infinie.

Les SDK Dataflow utilisent un concept associé appelé déclencheurs afin de déterminer le moment approprié pour "fermer" chaque fenêtre finie à l'arrivée des données illimitées. L'utilisation d'un déclencheur peut vous aider à affiner la stratégie de fenêtrage de la classe PCollection afin de gérer les données tardives ou de fournir des résultats préliminaires. Pour plus d'informations, consultez la page Déclencheurs.

Principes de base du fenêtrage

Le fenêtrage est particulièrement utile avec une classe PCollection illimitée, en d'autres termes, un ensemble de données continuellement mis à jour de taille inconnue/illimitée (par exemple, des données en flux continu). Certaines transformations Dataflow, telles que GroupByKey et Combine, regroupent plusieurs éléments selon une clé commune. Normalement, cette opération regroupe tous les éléments ayant la même clé dans l'ensemble de données complet. Dans le cadre d'un ensemble de données illimité, il est impossible de collecter tous les éléments, car de nouveaux éléments sont constamment ajoutés.

Dans le modèle Dataflow, toute classe PCollection peut être subdivisée en fenêtres logiques. Chaque élément dans une classe PCollection est attribué à une ou plusieurs fenêtres selon la fonction de fenêtrage associée à la classe PCollection ; chaque fenêtre contient un nombre fini d'éléments. Les transformations de regroupement considèrent ensuite chaque élément de la classe PCollection par fenêtre. GroupByKey, par exemple, regroupe implicitement les éléments d'une classe PCollection par clé et par fenêtre. Dataflow ne regroupe que les données d'une même fenêtre et ne regroupe pas les données dans d'autres fenêtres.

Contraintes liées au fenêtrage

Une fois que vous avez défini la fonction de fenêtrage pour une classe PCollection, les fenêtres des éléments sont utilisées lors de la prochaine application d'une transformation de regroupement à la classe PCollection. Dataflow effectue le regroupement réel de la fenêtre selon les besoins. Si vous définissez une fonction de fenêtrage à l'aide de la transformation Window, chaque élément est attribué à une fenêtre, mais les fenêtres ne sont pas prises en compte tant que vous ne regroupez pas la classe PCollection à l'aide des fonctions GroupByKey ou Combine. Cela peut avoir différents effets sur le pipeline.

Prenons l'exemple du pipeline de la figure 1 ci-dessous :

Pipeline appliquant dans l'ordre : le fenêtrage, une opération "ParDo" et une fonction "GroupByKey"
Figure 1 : Pipeline appliquant la fonction de fenêtrage

Dans le pipeline ci-dessus, nous créons une classe PCollection illimitée en lisant un ensemble de paires clé/valeur à l'aide de PubsubIO, puis nous appliquons une fonction de fenêtrage à cette collection à l'aide de la transformation Window. Nous appliquons ensuite une opération ParDo à la collection, puis regroupons ultérieurement le résultat de cette opération ParDo à l'aide de la fonction GroupByKey. La fonction de fenêtrage n'a aucun effet sur la transformation ParDo, car les fenêtres ne sont réellement utilisées que lorsqu'elles sont nécessaires pour GroupByKey.

Les transformations ultérieures, cependant, sont appliquées au résultat de GroupByKey, c'est-à-dire aux données regroupées par clé et par fenêtre.

Utiliser le fenêtrage avec des classes "PCollection" limitées

Vous pouvez utiliser le fenêtrage avec des ensembles de données de taille fixe dans des PCollections limitées. Sachez toutefois que le fenêtrage ne considère que les horodatages implicites associés à chaque élément d'une classe PCollection, et que les sources de données qui créent des ensembles de données fixes (tels que TextIO et BigQueryIO) attribuent le même horodatage à chaque élément. Cela signifie que, par défaut, tous les éléments font partie d'une même fenêtre globale. L'attribution de tous les éléments à la même fenêtre entraîne l'exécution d'un pipeline dans le style de traitement par lot "MapReduce" classique.

Pour utiliser le fenêtrage avec des ensembles de données fixes, vous pouvez attribuer vos propres horodatages à chaque élément. Pour attribuer des horodatages aux éléments, vous utilisez une transformation ParDo avec un argument DoFn, qui va générer chaque élément avec un nouvel horodatage.

L'utilisation du fenêtrage avec une classe PCollection limitée peut affecter la manière dont le pipeline traite les données. Par exemple, considérons le pipeline suivant :

Pipeline appliquant une fonction "GroupByKey" suivie d'une opération "ParDo" sur une collection limitée
Figure 2 : Fonctions "GroupByKey" et "ParDo" appliquées sans fenêtrage sur une collection limitée

Dans le pipeline ci-dessus, nous créons une classe PCollection limitée en lisant un ensemble de paires clé/valeur à l'aide de TextIO. Nous regroupons ensuite la collection à l'aide de la fonction GroupByKey et appliquons une transformation ParDo à la collection PCollection groupée. Dans cet exemple, la fonction GroupByKey crée une collection de clés uniques, puis l'opération ParDo est appliquée une seule fois par clé.

Considérons maintenant le même pipeline qui utilise une fonction de fenêtrage :

Pipeline appliquant le fenêtrage, puis une fonction "GroupByKey" suivie d'une opération "ParDo" sur une collection limitée
Figure 3 : Fonctions "GroupByKey" et "ParDo" appliquées avec fenêtrage sur une collection limitée

Comme précédemment, le pipeline crée une classe PCollection limitée de paires clé/valeur. Nous définissons ensuite une fonction de fenêtrage pour cette classe PCollection. La transformation GroupByKey regroupe maintenant les éléments de la classe PCollection par clé et par fenêtre. La transformation ParDo est ensuite appliquée plusieurs fois par clé, une fois pour chaque fenêtre.

Fonctions de fenêtrage

Les SDK Dataflow vous permettent de définir différents types de fenêtres pour diviser les éléments de la classe PCollection. Le SDK fournit plusieurs fonctions de fenêtrage, y compris les fonctions suivantes :

  • Fenêtres à durée fixe
  • Fenêtres à durée flexible
  • Fenêtres par session
  • Fenêtre globale unique

Sachez que chaque élément peut appartenir logiquement à plusieurs fenêtres, selon la fonction de fenêtrage utilisée. Par exemple, le fenêtrage à durée flexible permet de créer des fenêtres qui se chevauchent dans lesquelles un même élément peut être attribué à plusieurs fenêtres.

Fenêtres à durée fixe

La forme la plus simple de fenêtrage consiste en une fenêtre à durée fixe : avec une classe PCollection horodatée qui peut être continuellement mise à jour, chaque fenêtre peut capturer (par exemple) cinq minutes d'éléments.

Une fenêtre à durée fixe représente l'intervalle de temps dans le flux de données qui définit un ensemble de données à traiter. Prenons une fenêtre qui fonctionne avec des intervalles de cinq minutes : tous les éléments de la classe PCollection illimitée dont les valeurs d'horodatage sont comprises entre 0:00:00 et 0:04:59 appartiennent à la première fenêtre ; les éléments dont les valeurs d'horodatage sont comprises entre 0:05:00 et 0:09:59 appartiennent à la deuxième fenêtre, et ainsi de suite.

Diagramme représentant le fenêtrage à durée fixe
Figure 4 : Fenêtres à durée fixe (de 30 secondes)

Fenêtres à durée flexible

Une fenêtre à durée flexible utilise également des intervalles de temps dans le flux de données pour définir des ensembles de données ; toutefois, avec le fenêtrage à durée flexible, les fenêtres se chevauchent. Chaque fenêtre peut capturer cinq minutes de données, mais une nouvelle fenêtre s'ouvre toutes les 10 secondes. La fréquence d'ouverture des fenêtres à durée flexible s'appelle la période. Par conséquent, dans notre exemple, la durée de la fenêtre correspond à cinq minutes et la période à 10 secondes.

En raison du chevauchement des fenêtres, la plupart des éléments d'un ensemble de données appartiennent à plusieurs fenêtres. Ce type de fenêtrage est utile pour calculer des moyennes de données. En utilisant un fenêtrage à durée flexible, vous pouvez calculer une moyenne mobile sur les données des cinq dernières minutes, mises à jour toutes les 10 secondes, dans notre exemple.

Diagramme représentant le fenêtrage à durée flexible
Figure 5 : Fenêtres à durée flexible (présentant une durée d'une minute et une période de 30 secondes)

Fenêtres de session

Une fonction de fenêtre de session définit des fenêtres autour des zones de concentration dans les données. Le fenêtrage de session est utile pour les données distribuées de manière irrégulière dans le temps. Par exemple, un flux de données représentant l'activité de la souris de l'utilisateur peut avoir de longues périodes d'inactivité entrecoupées de fortes concentrations de clics. Le fenêtrage de session regroupe les concentrations élevées de données dans des fenêtres séparées et filtre les sections inactives du flux de données.

Sachez que le fenêtrage de session s'applique par clé. En d'autres termes, le regroupement en sessions ne prend en compte que les données ayant la même clé. Chaque clé de la collection de données est donc regroupée dans des fenêtres disjointes de durées différentes.

Le type de fenêtrage de session le plus simple spécifie une durée d'intervalle minimale. Toutes les données survenant sous une temporisation minimale sont regroupées dans la même fenêtre. Si les données surviennent après la durée d'intervalle minimale spécifiée, cela déclenche le début d'une nouvelle fenêtre.

Diagramme représentant le fenêtrage de session
Figure 5 : Fenêtres de session avec une durée d'intervalle minimale. Sachez que chaque clé de données possède des fenêtres différentes en fonction de la distribution des données.

Fenêtre globale unique

Par défaut, toutes les données d'une classe PCollection sont attribuées à une fenêtre globale unique. Si l'ensemble de données a une taille fixe, vous pouvez utiliser la fenêtre globale par défaut pour la classe PCollection. Si les éléments de la classe PCollection appartiennent tous à une fenêtre globale unique, le pipeline s'exécute comme une tâche de traitement par lot (comme dans le traitement basé sur "MapReduce").

Autres fonctions de fenêtrage

En plus des fenêtres à durée fixe, à durée flexible, de session et globales, les SDK Dataflow fournissent d'autres fonctions de fenêtrage, telles que les fenêtres basées sur un calendrier.

Java

Consultez la documentation sur le package com.google.cloud.dataflow.sdk.transforms.windowing pour obtenir la liste complète des fonctions de fenêtrage disponibles dans le SDK Dataflow pour Java.

Définir la fonction de fenêtrage associée à la classe "PCollection"

Vous pouvez définir la fonction de fenêtrage associée à une classe PCollection en appliquant la transformation Window. Lorsque vous appliquez la transformation Window, vous devez fournir un élément WindowFn. WindowFn détermine la fonction de fenêtrage utilisée par la classe PCollection pour les transformations de regroupement ultérieures, telles qu'une fenêtre à durée fixe ou flexible.

Les SDK Dataflow fournissent des éléments WindownFn prédéfinis pour les fonctions de fenêtrage de base. Vous pouvez également définir votre propre élément WindowFn dans des cas avancés.

Techniquement, comme toutes les transformations, Window extrait une classe PCollection d'entrée et génère une nouvelle classe PCollection avec chaque élément attribué à une ou plusieurs fenêtres logiques finies.

Définir des fenêtres à durée fixe

L'exemple de code suivant montre comment appliquer la transformation Window pour diviser une classe PCollection en fenêtres fixes d'une durée d'une minute chacune :

Java

  PCollection<String> items = ...;
  PCollection<String> fixed_windowed_items = items.apply(
    Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));

Définir des fenêtres à durée flexible

L'exemple de code suivant montre comment appliquer la transformation Window pour diviser une classe PCollection en fenêtres à durée flexible. Chaque fenêtre dure 30 minutes et une nouvelle fenêtre commence toutes les cinq secondes :

Java

  PCollection<String> items = ...;
  PCollection<String> sliding_windowed_items = items.apply(
    Window.<String>into(SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5))));

Définir des fenêtres de session

L'exemple de code suivant montre comment appliquer la transformation Window pour diviser une classe PCollection en fenêtres de session, chaque session devant être séparée par un intervalle de temps d'au moins 10 minutes :

Java

  PCollection<String> items = ...;
  PCollection<String> session_windowed_items = items.apply(
    Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(10))));

Sachez que les sessions sont définies par clé. Chaque clé de la collection possède ses propres groupes de session en fonction de la distribution des données.

Définir une fenêtre globale unique

Si la classe PCollection est limitée (durée fixe), vous pouvez attribuer tous les éléments à une fenêtre globale unique. L'exemple de code suivant montre comment définir une fenêtre globale unique pour une classe PCollection.

Pour définir une fenêtre globale unique pour la classe PCollection, transmettez l'élément new GlobalWindows() en tant que WindowFn lorsque vous appliquez la transformation Window. L'exemple de code suivant montre comment appliquer la transformation Window pour attribuer une classe PCollection à une fenêtre globale unique :

Java

  PCollection<String> items = ...;
  PCollection<String> batch_items = items.apply(
    Window.<String>into(new GlobalWindows()));

Décalage temporel, décalage des données et données tardives

Dans tout système de traitement de données, il existe un certain décalage entre le moment où un événement de données se produit (l'heure d'événement, déterminée par l'horodatage sur l'élément de données lui-même) et le moment où l'élément de données réel est traité dans une étape du pipeline (l'heure de traitement, déterminée par l'horloge du système traitant l'élément).

Dans un système parfait, l'heure d'événement pour chaque élément de données et l'heure de traitement sont identiques, ou du moins elles présentent un écart cohérent. Cependant, dans tout système informatique réel, la génération et la livraison des données sont soumises à un certain nombre de limitations temporelles. Dans les systèmes de grande taille ou distribués, tels qu'une collection distribuée de frontaux Web générant des commandes client ou des fichiers journaux, rien ne garantit que les événements de données apparaîtront dans le pipeline dans le même ordre que celui dans lequel ils ont été générés à différents endroits du Web.

Par exemple, imaginons que nous disposons d'une classe PCollection qui utilise un fenêtrage à durée fixe, avec des fenêtres d'une durée de cinq minutes. Pour chaque fenêtre, Dataflow doit collecter toutes les données qui correspondent à un horodatage de l'heure d'événement figurant dans la plage de fenêtre donnée (par exemple, entre 0:00 et 4:59 pour la première fenêtre). Les données dont l'horodatage se situe en dehors de cette plage (à partir de 5:00) appartiennent à une autre fenêtre.

Cependant, il n'est pas toujours garanti que les données arrivent dans un pipeline dans l'ordre chronologique correct ou qu'elles arrivent toujours à des intervalles prévisibles. Dataflow suit un filigrane, la notion du système qui permet de prévoir que toutes les données d'une fenêtre donnée sont arrivées dans le pipeline. Les données qui arrivent avec un horodatage au-delà du filigrane sont considérées comme des données tardives.

Dans notre exemple, supposons que nous ayons un simple filigrane qui définit environ 30 secondes de décalage entre les horodatages de données (heure d'événement) et le moment où les données apparaissent dans le pipeline (heure de traitement). Dataflow ferme ensuite la première fenêtre à 5:30. Si un enregistrement de données arrive à 5:34, mais avec un horodatage qui le place dans la fenêtre 0:00-4:59 (par exemple, 3:38), cet enregistrement est considéré comme une donnée tardive.

Remarque : Pour des raisons de simplicité, nous utilisons un filigrane très simple qui estime le décalage temporel. En pratique, la source de données de la classe PCollection détermine le filigrane, lequel peut être plus précis ou complexe.

Gérer le décalage temporel et les données tardives

Vous pouvez autoriser des données tardives en appelant l'opération .withAllowedLateness lorsque vous définissez la stratégie de fenêtrage de la classe PCollection. L'exemple de code suivant illustre une stratégie de fenêtrage permettant l'arrivée de données tardives jusqu'à deux jours après la fin d'une fenêtre.

Java

  PCollection<String> items = ...;
  PCollection<String> fixed_windowed_items = items.apply(
    Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
          .withAllowedLateness(Duration.standardDays(2)));

Lorsque vous définissez l'opération .withAllowedLateness sur une classe PCollection, le retard autorisé est propagé à toute classe PCollection ultérieure dérivée de la première classe PCollection à laquelle vous avez appliqué le retard. Si vous souhaitez modifier ultérieurement le retard autorisé dans le pipeline, vous devez le faire explicitement en appliquant à nouveau l'opération Window.withAllowedLateness().

Vous pouvez également utiliser l'API Triggers de Dataflow pour vous aider à affiner la stratégie de fenêtrage pour une classe PCollection. Vous pouvez utiliser des déclencheurs pour déterminer avec exactitude le moment où chaque fenêtre regroupe et rapporte ses résultats, y compris la manière dont la fenêtre émet des éléments tardifs.

Remarque : Les stratégies de fenêtrage et de déclenchement par défaut de Dataflow rejettent les données tardives. Si vous souhaitez vous assurer que le pipeline gère les instances de données tardives, vous devez définir explicitement l'opération .withAllowedLateness lorsque vous définissez la stratégie de fenêtrage de la classe PCollection, et définir des déclencheurs pour les classes PCollection en conséquence.

Ajouter des horodatages aux éléments d'une classe PCollection

Vous pouvez attribuer de nouveaux horodatages aux éléments d'une classe PCollection en appliquant une transformation ParDo qui génère de nouveaux éléments avec les horodatages que vous avez définis. L'attribution d'horodatages peut être utile si vous souhaitez utiliser les fonctions de fenêtrage de Dataflow, mais que l'ensemble de données provient d'une source sans horodatage implicite (par exemple un fichier de TextIO).

Il s'agit d'un bon modèle à suivre lorsque l'ensemble de données inclut des données d'horodatage, mais que les horodatages ne sont pas générés par la source de données Dataflow. Par exemple, si le pipeline lit les enregistrements de journal à partir d'un fichier d'entrée et si chaque enregistrement de journal comprend un champ d'horodatage, la source du fichier n'attribue pas les horodatages automatiquement. Vous pouvez analyser le champ d'horodatage de chaque enregistrement et utiliser une transformation ParDo pour associer les horodatages à chaque élément de la classe PCollection.

Java

Pour attribuer des horodatages, la transformation ParDo doit utiliser un argument DoFn qui génère des éléments à l'aide de la fonction ProcessContext.outputWithTimestamp (plutôt que la fonction habituelle ProcessContext.output, utilisée pour émettre des éléments dans la collection de sortie principale). L'exemple de code suivant montre une opération ParDo avec un argument DoFn qui génère des éléments avec de nouveaux horodatages :

  PCollection<LogEntry> unstampedLogs = ...;
  PCollection<LogEntry> stampedLogs =
      unstampedLogs.apply(ParDo.of(new DoFn<LogEntry, LogEntry>() {
        public void processElement(ProcessContext c) {
          // Extract the timestamp from log entry we're currently processing.
          Instant logTimeStamp = extractTimeStampFromLogEntry(c.element());
          // Use outputWithTimestamp to emit the log entry with timestamp attached.
          c.outputWithTimestamp(c.element(), logTimeStamp);
        }
      }));
Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.