PCollection

Les SDK Dataflow utilisent une classe spécialisée appelée PCollection pour représenter les données dans un pipeline. Une PCollection représente un ensemble de données multi-éléments.

Vous pouvez considérer une PCollection comme des données de "pipeline". Les transformations de Dataflow utilisent des classes PCollection comme entrées et sorties. Ainsi, si vous souhaitez utiliser les données de votre pipeline, elles doivent se présenter sous forme de PCollection. Chaque PCollection appartient à un objet Pipeline spécifique et seul cet objet Pipeline peut l'utiliser.

IMPORTANT : Le présent document contient des informations sur les PCollection illimitées et sur le fenêtrage. Ces concepts font référence au SDK Dataflow pour Java uniquement et ne sont pas encore disponibles dans le SDK Dataflow pour Python.

Caractéristiques de PCollection

Une PCollection représente un "sac" d'éléments potentiellement volumineux et immuable. Il n'y a pas de limite supérieure au nombre d'éléments qu'une PCollection peut contenir. Toute PCollection peut respecter la taille de mémoire ou représenter un très grand ensemble de données sauvegardées par un datastore persistant.

Java

Les éléments d'une PCollection peuvent être de n'importe quel type, mais doivent être tous du même type. Toutefois, Dataflow doit pouvoir encoder chaque élément individuel sous forme de chaîne d'octets afin de prendre en charge le traitement distribué. Les SDK Dataflow fournissent un mécanisme d'encodage de données qui inclut des encodages intégrés pour les types couramment utilisés et une compatibilité avec la spécification d'encodages personnalisés en fonction des besoins. Créer un encodage valide pour un type quelconque peut être difficile, mais vous pouvez construire un encodage personnalisé pour des types structurés simples.

La paire clé/valeur est un type de données important pour le traitement de données à grande échelle. Les SDK Dataflow utilisent la classe KV<K, V> pour représenter des paires clé/valeur.

Python

La paire clé/valeur est un type de données important pour le traitement de données à grande échelle. Le SDK Dataflow pour Python utilise 2-tuples pour représenter les paires clé/valeur.

Limitations de PCollection

Une PCollection diffère d'une classe de collection classique en différents aspects clés :

  • Une PCollection est immuable. Une fois créée, vous ne pouvez pas ajouter, supprimer ni modifier des éléments individuels.
  • Une PCollection ne permet pas un accès aléatoire à des éléments individuels.
  • Une PCollection appartient au pipeline dans lequel elle est créée. Vous ne pouvez pas partager une PCollection entre des objets Pipeline.

Une PCollection peut être physiquement sauvegardée par des données dans le stockage existant ou peut représenter des données qui n'ont pas encore été calculées. En tant que tel, les données dans une PCollection sont immuables. Vous pouvez utiliser une PCollection dans des calculs qui génèrent de nouvelles données de pipeline (en tant que nouvelle PCollection). Cependant, vous ne pouvez pas modifier les éléments d'une PCollection existante une fois qu'elle a été créée.

Une PCollection ne stocke pas de données en soi. N'oubliez pas qu'une PCollection peut contenir trop d'éléments pour tenir dans la mémoire locale où votre programme Dataflow est exécuté. Lorsque vous créez ou transformez une PCollection, les données ne sont ni copiées ni déplacées en mémoire, comme avec certaines classes de conteneur classiques. Au lieu de cela, une PCollection représente un ensemble de données potentiellement très volumineux dans le cloud.

PCollections limitées et illimitées

La taille d'une PCollection peut être limitée ou illimitée. Cette nature est déterminée lorsque vous créez la PCollection. Certaines transformations racines créent des PCollections limitées alors que d'autres en créent des illimitées. Tout dépend de la source de vos données d'entrée.

PCollections limitées

Votre PCollection est limitée si elle représente un ensemble de données fixe, dont la taille connue est inchangée. Un exemple d'ensemble de données fixe pourrait être "fichiers journaux de serveur du mois d'octobre" ou "toutes les commandes traitées la semaine dernière". Les transformations racines TextIO et BigQueryIO créent des PCollections limitées.

Les sources de données qui créent des PCollections limitées incluent :

Java

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • Sources de données liées personnalisées créées à l'aide de l'API Custom Source

Python

  • TextIO
  • BigQueryIO
  • Sources de données liées personnalisées créées à l'aide de l'API Custom Source

Les récepteurs de données acceptant les PCollections limitées incluent :

Java

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • Récepteurs de données limitées personnalisées créés à l'aide de l'API Custom Sink

Python

  • TextIO
  • BigQueryIO
  • Récepteurs de données limitées personnalisées créés à l'aide de l'API Custom Sink

PCollections illimitées

Votre PCollection est illimitée si elle représente un ensemble de données mis à jour continuellement ou des données en streaming. Un exemple d'ensemble de données mis à jour continuellement pourrait être "fichiers journaux de serveur générés" ou "toutes les commandes en cours de traitement". Les transformations racines PubsubIO créent des PCollections illimitées.

Certaines sources, en particulier celles qui créent des PCollections illimitées, telles que PubsubIO, ajoutent automatiquement un horodatage à chaque élément de la collection.

Les sources de données qui créent des PCollections illimitées incluent :

  • PubsubIO
  • Sources de données illimitées personnalisées que vous créez à l'aide de l'API Custom Source

Les récepteurs de données qui acceptent les PCollections illimitées incluent :

  • PubsubIO
  • BigQueryIO

Caractéristiques de traitement

La nature limitée (ou illimitée) de votre PCollection affecte la manière dont Dataflow traite vos données. Les PCollections limitées peuvent être traitées à l'aide de tâches par lots, qui peuvent lire l'ensemble de données en entier une fois et effectuer le traitement dans une tâche terminée. Les PCollections illimitées doivent être traitées à l'aide de tâches de streaming car toute la collection ne peut jamais être disponible pour le traitement en une seule fois.

Lors du regroupement de PCollection illimitées, Dataflow a recours à un concept appelé fenêtrage pour diviser un ensemble de données mis à jour en continu en fenêtres logiques de taille fixe. Dataflow traite chaque fenêtre comme un lot et le traitement se poursuit à mesure que l'ensemble de données est généré. Consultez la section suivante sur les horodatages et le fenêtrage pour plus d'informations.

Horodatage d'éléments de PCollection

Chaque élément d'une PCollection comporte un horodatage associé. Les horodatages sont utiles pour les PCollections qui contiennent des éléments avec une notion inhérente de temps. Par exemple, une PCollection de commandes à traiter peut utiliser l'heure à laquelle une commande a été créée en tant qu'horodatage d'élément.

L'horodatage de chaque élément est assigné par la source qui crée la PCollection. Les sources qui créent des PCollections illimitées attribuent souvent à chaque nouvel élément un horodatage en fonction du moment où il a été ajouté à la PCollection illimitée.

Java

Les sources de données qui produisent des ensembles de données fixes, tels que BigQueryIO ou TextIO, attribuent également des horodatages à chaque élément. Toutefois, ces sources de données attribuent généralement le même horodatage (Long.MIN_VALUE) à chaque élément.

Vous pouvez attribuer manuellement des horodatages aux éléments d'une PCollection. Cela se fait généralement lorsque les éléments ont un horodatage inhérent, mais que cet horodatage doit être calculé, par exemple, en recherchant dans la structure de l'élément. Pour attribuer manuellement un horodatage, utilisez une transformation ParDo. Dans la transformation ParDo, votre DoFn peut générer des éléments de sortie avec des horodatages. Voir Affecter des horodatages pour plus d'informations.

Python

Vous pouvez attribuer manuellement des horodatages aux éléments d'une PCollection. Cela se fait généralement lorsque les éléments ont un horodatage inhérent, mais que cet horodatage doit être calculé, par exemple, en recherchant dans la structure de l'élément. Pour attribuer manuellement un horodatage, utilisez une transformation ParDo. Dans la transformation ParDo, votre DoFn peut générer des éléments de sortie avec des horodatages.

Fenêtrage

Les horodatages associés à chaque élément d'une PCollection sont utilisés pour un concept appelé fenêtrage. Le fenêtrage divise les éléments d'une PCollection en fonction de leur horodatage. Vous pouvez utiliser du fenêtrage sur toutes les PCollections, mais il est nécessaire pour certains calculs sur des PCollections illimitées afin de diviser le flux de données continu en fragments finis pour le traitement.

Reportez-vous à la section Fenêtrage pour plus d'informations sur l'utilisation des concepts de fenêtrage de Dataflow dans votre pipeline.

Créer une PCollection

Pour utiliser un ensemble de données dans un pipeline Cloud Dataflow, vous devez créer une PCollection afin de représenter les données, où qu'elles soient stockées. Les SDK Dataflow fournissent deux méthodes principales pour créer une PCollection initiale :

  • Vous pouvez lire les données à partir d'une source de données externe, telle qu'un fichier.
  • Vous pouvez créer une PCollection de données stockée dans une classe de collection en mémoire.

Lecture de données externes

Consultez la page E/S de pipeline pour plus d'informations sur la lecture de données à partir d'une source de données externe.

Création d'une PCollection à partir de données en mémoire locale

Vous pouvez créer une PCollection à partir de données en mémoire locale afin de pouvoir utiliser ces données dans vos transformations de pipeline. En règle générale, vous utilisez les données de la mémoire locale pour tester votre pipeline avec des ensembles de données plus petits et pour réduire la dépendance de votre pipeline à l'égard des E/S externes lors des tests.

Java

Pour créer une PCollection à partir d'une Collection Java en mémoire, vous appliquez (méthode apply) la transformation Create. Create est une PTransform racine fournie par le SDK Dataflow pour Java. Create est compatible avec une Collection Java et un objet Coder qui spécifient comment les éléments de la Collection doivent être encodés.

L'exemple de code suivant crée une PCollection de String, représentant des lignes de texte individuelles, à partir d'une List Java :

  // Create a Java Collection, in this case a List of Strings.
  static final List<String> LINES = Arrays.asList(
      "To be, or not to be: that is the question: ",
      "Whether 'tis nobler in the mind to suffer ",
      "The slings and arrows of outrageous fortune, ",
      "Or to take arms against a sea of troubles, ");

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())   // create the PCollection

Le code ci-dessus utilise Create.of qui produit une PCollection contenant les éléments spécifiés. Notez que si votre pipeline a recours au fenêtrage, vous devez plutôt utiliser Create.timestamped. Create.timestamped génère une PCollection contenant les éléments indiqués avec les horodatages spécifiés.

Python

Pour créer une PCollection, appliquez la transformation Create. Create est une transformation standard fournie par le SDK Dataflow pour Python.

with beam.Pipeline(options=pipeline_options) as p:

  lines = (p
           | beam.Create([
               'To be, or not to be: that is the question: ',
               'Whether \'tis nobler in the mind to suffer ',
               'The slings and arrows of outrageous fortune, ',
               'Or to take arms against a sea of troubles, ']))

Utiliser une PCollection avec des types de données personnalisés

Vous pouvez créer une PCollection où le type d'élément est un type de données personnalisé que vous fournissez. Cela peut être utile si vous devez créer une collection de votre classe ou de votre structure avec des champs spécifiques, comme une classe Java contenant le nom, l'adresse et le numéro de téléphone d'un client.

Lorsque vous créez une PCollection d'un type personnalisé, vous devez fournir un Coder pour ce type personnalisé. Le Coder indique au service Dataflow comment sérialiser et désérialiser les éléments de votre PCollection lorsque votre ensemble de données est chargé en parallèle et partitionné sur plusieurs instances de nœuds de calcul de pipeline. Pour en savoir plus, consultez la page Codage des données.

Dataflow tentera d'inférer un Coder pour toute PCollection pour laquelle vous ne définissez pas explicitement de Coder. Le Coder par défaut pour un type personnalisé est SerializableCoder, qui utilise la sérialisation Java. Cependant, Dataflow recommande d'utiliser AvroCoder en tant que Coder lorsque cela est possible.

Vous pouvez enregistrer AvroCoder en tant qu'encodeur par défaut pour votre type de données en utilisant l'élément de votre objet Pipeline CoderRegistry. Annotez votre classe comme suit :

Java

  @DefaultCoder(AvroCoder.class)
  public class MyClass {
    ...
 }

Pour vous assurer que votre classe personnalisée est compatible avec AvroCoder, vous devrez peut-être ajouter des annotations supplémentaires. Par exemple, vous devez annoter les champs null de votre type de données avec org.apache.avro.reflect.Nullable. Pour en savoir plus, consultez la documentation de référence de l'API pour Java relative à AvroCoder et la documentation du package pour org.apache.avro.reflect.

L'exemple de pipeline TrafficRoutes de Dataflow crée une PCollection dont le type d'élément est une classe personnalisée appelée StationSpeed. StationSpeed enregistre AvroCoder en tant qu'encodeur par défaut comme suit :

Java

  /**
   * This class holds information about a station reading's average speed.
   */
  @DefaultCoder(AvroCoder.class)
  static class StationSpeed {
    @Nullable String stationId;
    @Nullable Double avgSpeed;

    public StationSpeed() {}

    public StationSpeed(String stationId, Double avgSpeed) {
      this.stationId = stationId;
      this.avgSpeed = avgSpeed;
    }

    public String getStationId() {
      return this.stationId;
    }
    public Double getAvgSpeed() {
      return this.avgSpeed;
    }
  }
Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.