PCollection

Les SDK Dataflow utilisent une classe spécialisée appelée PCollection pour représenter les données d'un pipeline. Une PCollection représente un ensemble de données multi-éléments.

Vous pouvez considérer une PCollection comme des données de "pipeline". Les transformations de Dataflow utilisent les PCollection comme entrées et sorties. Ainsi, si vous souhaitez utiliser les données de votre pipeline, elles doivent se présenter sous forme de PCollection. Chaque PCollection appartient à un objet Pipeline spécifique, et seul cet objet Pipeline peut l'utiliser.

IMPORTANT : Ce document contient des informations sur les PCollection illimitées et sur le fenêtrage. Ces concepts font référence au SDK Dataflow pour Java uniquement et ne sont pas encore disponibles dans le SDK Dataflow pour Python.

Caractéristiques de PCollection

Une PCollection représente un ensemble d'éléments potentiellement volumineux et immuable. Il n'y a pas de limite supérieure au nombre d'éléments qu'une PCollection peut contenir. N'importe quelle PCollection peut tenir dans la mémoire, ou peut représenter un très grand ensemble de données sauvegardé par un datastore persistant.

Java

Les éléments d'une PCollection peuvent être de n'importe quel type, mais ils doivent tous être du même type. Toutefois, Dataflow doit pouvoir encoder chaque élément individuel sous forme de chaîne d'octets afin de prendre en charge le traitement distribué. Les SDK Dataflow fournissent un mécanisme d'encodage de données qui inclut des encodages intégrés pour les types couramment utilisés et permet la spécification d'encodages personnalisés en fonction des besoins. Créer un encodage valide pour un type quelconque peut être difficile, mais vous pouvez construire un encodage personnalisé pour des types structurés simples.

La paire clé/valeur est un type de données important pour le traitement de données à grande échelle. Les SDK Dataflow utilisent la classe KV<K, V> pour représenter les paires clé/valeur.

Python

La paire clé/valeur est un type de données important pour le traitement de données à grande échelle. Le SDK Dataflow pour Python utilise 2-tuples pour représenter les paires clé/valeur.

Limitations de PCollection

Une PCollection diffère d'une classe de collection standard en différents aspects clés :

  • Une PCollection est immuable. Une fois créée, vous ne pouvez pas ajouter, supprimer ni modifier des éléments individuels.
  • Une PCollection ne permet pas un accès aléatoire à des éléments individuels.
  • Une PCollection appartient au pipeline dans lequel elle est créée. Vous ne pouvez pas partager une PCollection entre des objets Pipeline.

Une PCollection peut être physiquement sauvegardée par des données dans un espace de stockage existant, ou peut représenter des données qui n'ont pas encore été calculées. Par conséquent, les données d'une PCollection sont immuables. Vous pouvez utiliser une PCollection dans les calculs qui génèrent de nouvelles données de pipeline (sous la forme d'une nouvelle PCollection). Cependant, vous ne pouvez pas modifier les éléments d'une PCollection existante une fois qu'elle a été créée.

Une PCollection ne stocke pas de données en soi. N'oubliez pas qu'une PCollection peut contenir trop d'éléments pour tenir dans la mémoire locale où votre programme Dataflow est exécuté. Lorsque vous créez ou transformez une PCollection, les données ne sont ni copiées, ni déplacées en mémoire, comme c'est le cas avec certaines classes de conteneurs standards. À la place, une PCollection représente un ensemble de données potentiellement très volumineux dans le cloud.

PCollections limitées et illimitées

La taille d'une PCollection peut être limitée ou illimitée. Cette caractéristique est déterminée lorsque vous créez la PCollection. Certaines transformations racines créent des PCollections limitées alors que d'autres en créent des illimitées. Tout dépend de la source de vos données d'entrée.

PCollections limitées

Votre PCollection est limitée si elle représente un ensemble de données fixe, dont la taille connue ne change pas. Un exemple d'ensemble de données fixe peut être des "journaux de serveur du mois d'octobre" ou "toutes les commandes traitées la semaine dernière". Les transformations racines TextIO et BigQueryIO créent des PCollection limitées.

Les sources de données qui créent des PCollection limitées incluent :

Java

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • Sources de données liées personnalisées créées à l'aide de l'API Custom Source

Python

  • TextIO
  • BigQueryIO
  • Sources de données liées personnalisées créées à l'aide de l'API Custom Source

Les récepteurs de données qui acceptent les PCollection limitées incluent :

Java

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • Récepteurs de données limitées personnalisées créés à l'aide de l'API Custom Sink

Python

  • TextIO
  • BigQueryIO
  • Récepteurs de données limitées personnalisées créés à l'aide de l'API Custom Sink

PCollections illimitées

Votre PCollection est illimitée si elle représente un ensemble de données mis à jour continuellement, ou des données traitées par flux. Un exemple de données mis à jour continuellement peut être des "journaux de serveur au fur et à mesure de leur génération" ou "toutes les nouvelles commandes au fur et à mesure de leur traitement". Les transformations racines PubsubIO créent des PCollection illimitées.

Certaines sources, en particulier celles qui créent des PCollection illimitées (telles que PubsubIO), ajoutent automatiquement un horodatage à chaque élément de la collection.

Les sources de données qui créent des PCollection illimitées incluent :

  • PubsubIO
  • Sources de données illimitées personnalisées que vous créez à l'aide de l'API Custom Source

Les récepteurs de données qui acceptent les PCollection illimitées incluent :

  • PubsubIO
  • BigQueryIO

Caractéristiques de traitement

Le caractère limité (ou non limité) de votre PCollection affecte la façon dont Dataflow traite vos données. Les PCollection limitées peuvent être traitées à l'aide de tâches par lot, qui peuvent lire l'intégralité de l'ensemble de données une fois et effectuer un traitement dans une tâche finie. Les PCollection illimitées doivent être traitées à l'aide de tâches de traitement par flux, car l'ensemble de la collection ne peut jamais être disponible pour être traité en une seule fois.

Lors du regroupement de PCollection illimitées, Dataflow requiert un concept appelé fenêtrage pour diviser un ensemble de données mis à jour continuellement en fenêtres logiques de taille finie. Dataflow traite chaque fenêtre comme un lot et le traitement se poursuit à mesure que l'ensemble de données est généré. Pour en savoir plus, consultez la section suivante sur les horodatages et le fenêtrage.

Horodatage d'éléments de PCollection

Chaque élément d'une PCollection est associé à un horodatage. Les horodatages sont utiles pour les PCollection qui contiennent des éléments avec une notion inhérente de temps. Par exemple, une PCollection de commandes à traiter peut utiliser l'heure à laquelle une commande a été créée en tant qu'horodatage d'élément.

L'horodatage de chaque élément est initialement attribué par la source qui crée la PCollection. Les sources qui créent une PCollection illimitée attribuent souvent un horodatage à chaque nouvel élément en fonction de l'heure à laquelle il a été ajouté à la PCollection illimitée.

Java

Les sources de données qui produisent des ensembles de données fixes, telles que BigQueryIO ou TextIO, attribuent également des horodatages à chaque élément. Toutefois, ces sources de données attribuent généralement le même horodatage (Long.MIN_VALUE) à chaque élément.

Vous pouvez attribuer manuellement des horodatages aux éléments d'une PCollection. Cela se fait généralement lorsque les éléments ont un horodatage inhérent, mais que cet horodatage doit être calculé, par exemple, en recherchant dans la structure de l'élément. Pour attribuer manuellement un horodatage, utilisez une transformation ParDo. Dans la transformation ParDo, votre DoFn peut produire des éléments de sortie avec des horodatages. Pour en savoir plus, consultez la section Attribuer des horodatages.

Python

Vous pouvez attribuer manuellement des horodatages aux éléments d'une PCollection. Cela se fait généralement lorsque les éléments ont un horodatage inhérent, mais que cet horodatage doit être calculé, par exemple, en recherchant dans la structure de l'élément. Pour attribuer manuellement un horodatage, utilisez une transformation ParDo. Dans la transformation ParDo, votre DoFn peut produire des éléments de sortie avec des horodatages.

Fenêtrage

Les horodatages associés à chaque élément d'une PCollection sont utilisés pour un concept appelé fenêtrage. Le fenêtrage divise les éléments d'une PCollection en fonction de leur horodatage. Le fenêtrage peut être utilisé sur toutes les PCollection, mais il est nécessaire pour certains calculs sur des PCollection illimitées afin de diviser le flux de données continu en fragments finis en vue du traitement.

Reportez-vous à la section Fenêtrage pour plus d'informations sur l'utilisation des concepts de fenêtrage de Dataflow dans votre pipeline.

Créer une PCollection

Pour utiliser un ensemble de données dans un pipeline Cloud Dataflow, vous devez créer une PCollection afin de représenter les données, où qu'elles soient stockées. Les SDK Dataflow proposent deux méthodes principales pour créer une PCollection initiale :

  • Vous pouvez lire les données à partir d'une source de données externe, telle qu'un fichier.
  • Vous pouvez créer une PCollection de données stockées dans une classe de collection en mémoire.

Lecture de données externes

Consultez la page E/S de pipeline pour plus d'informations sur la lecture de données à partir d'une source de données externe.

Création d'une PCollection à partir de données en mémoire locale

Vous pouvez créer une PCollection à partir de données dans la mémoire locale afin de pouvoir utiliser ces données dans les transformations de votre pipeline. En règle générale, vous utilisez les données de la mémoire locale pour tester votre pipeline avec des ensembles de données plus petits et pour réduire la dépendance de votre pipeline à l'égard des E/S externes lors des tests.

Java

Pour créer une PCollection à partir d'une Collection Java en mémoire, vous devez appliquer (apply) la transformation Create. Create est une transformation PTransform racine fournie par le SDK Dataflow pour Java. Create accepte une Collection Java et un objet Coder, qui spécifient la façon dont les éléments de la Collection doivent être encodés.

L'exemple de code suivant crée une PCollection de String, représentant des lignes de texte individuelles, à partir d'une List Java :

  // Create a Java Collection, in this case a List of Strings.
  static final List<String> LINES = Arrays.asList(
      "To be, or not to be: that is the question: ",
      "Whether 'tis nobler in the mind to suffer ",
      "The slings and arrows of outrageous fortune, ",
      "Or to take arms against a sea of troubles, ");

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())   // create the PCollection

Le code ci-dessus utilise Create.of, qui produit une PCollection contenant les éléments spécifiés. Notez que si votre pipeline a recours au fenêtrage, vous devez plutôt utiliser Create.timestamped. Create.timestamped génère une PCollection contenant les éléments spécifiés avec des horodatages spécifiés.

Python

Pour créer une PCollection, appliquez la transformation Create. Create est une transformation standard fournie par le SDK Dataflow pour Python.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# argv = None  # if None, uses sys.argv
pipeline_options = PipelineOptions(argv)
with beam.Pipeline(options=pipeline_options) as pipeline:
  lines = (
      pipeline
      | beam.Create([
          'To be, or not to be: that is the question: ',
          "Whether 'tis nobler in the mind to suffer ",
          'The slings and arrows of outrageous fortune, ',
          'Or to take arms against a sea of troubles, ',
      ]))

Utiliser une PCollection avec des types de données personnalisés

Vous pouvez créer une PCollection où le type d'élément est un type de données personnalisé que vous fournissez. Cela peut être utile si vous devez créer une collection de votre classe ou de votre structure avec des champs spécifiques, comme une classe Java contenant le nom, l'adresse et le numéro de téléphone d'un client.

Lorsque vous créez une PCollection d'un type personnalisé, vous devez fournir un objet Coder pour ce type personnalisé. L'objet Coder indique au service Dataflow comment sérialiser et désérialiser les éléments de votre PCollection lorsque votre ensemble de données est chargé en parallèle et partitionné sur plusieurs instances de nœuds de calcul de pipeline. Pour en savoir plus, consultez la page Encodage des données.

Dataflow essaiera de déduire un objet Coder pour toute PCollection pour laquelle vous n'avez pas explicitement défini de Coder. L'objet Coder par défaut pour un type personnalisé est SerializableCoder, qui utilise la sérialisation Java. Cependant, Dataflow recommande d'utiliser AvroCoder comme objet Coder lorsque cela est possible.

Vous pouvez enregistrer AvroCoder en tant qu'encodeur par défaut pour votre type de données en utilisant le CoderRegistry de votre objet Pipeline. Annotez votre classe comme suit :

Java

  @DefaultCoder(AvroCoder.class)
  public class MyClass {
    ...
 }

Pour vous assurer que votre classe personnalisée est compatible avec AvroCoder, vous devrez peut-être ajouter des annotations supplémentaires. Par exemple, vous devez annoter les champs nuls dans votre type de données avec org.apache.avro.reflect.Nullable. Pour plus d'informations, consultez la documentation de référence de l'API pour Java pour AvroCoder et la documentation du package pour org.apache.avro.reflect.

L'exemple de pipeline TrafficRoutes de Dataflow crée une PCollection dont le type d'élément est une classe personnalisée appelée StationSpeed. StationSpeed enregistre AvroCoder en tant qu'encodeur par défaut comme suit :

Java

  /**
   * This class holds information about a station reading's average speed.
   */
  @DefaultCoder(AvroCoder.class)
  static class StationSpeed {
    @Nullable String stationId;
    @Nullable Double avgSpeed;

    public StationSpeed() {}

    public StationSpeed(String stationId, Double avgSpeed) {
      this.stationId = stationId;
      this.avgSpeed = avgSpeed;
    }

    public String getStationId() {
      return this.stationId;
    }
    public Double getAvgSpeed() {
      return this.avgSpeed;
    }
  }