PCollection

Los SDK de Dataflow usan una clase especializada llamada PCollection para representar datos en una canalización. Una PCollection representa un conjunto de datos de varios elementos.

Puedes pensar en una PCollection como datos de “canalización”. Las Transformaciones de Dataflow usan PCollections como entradas y salidas, por lo tanto, si deseas trabajar con datos en tu canalización, deben tener la forma de una PCollection. Cada PCollection es propiedad de un objeto Pipeline específico, y solo ese objeto Pipeline puede usarla.

IMPORTANTE: Este documento contiene información sobre PCollection no delimitadas y sistemas de ventanas. Estos conceptos se refieren solo al SDK de Dataflow para Java y aún no se encuentran disponibles en el SDK de Dataflow para Python.

Características de PCollection

Una PCollection representa una “bolsa” de elementos potencialmente inmutable y grande. No hay un límite máximo para la cantidad de elementos que puede contener una PCollection; cualquier PCollection podría caber en la memoria o representar un conjunto muy grande de datos respaldado por un almacén de datos persistentes.

Java

Los elementos de una PCollection pueden ser de cualquier tipo, pero deben ser todos del mismo tipo. Sin embargo, Dataflow necesita poder codificar cada elemento individual como una string de bytes para ser compatible con el procesamiento distribuido. Los SDK de Dataflow proporcionan un mecanismo de Codificación de datos que incluye codificaciones integradas para tipos que se usan con frecuencia y compatibilidad con el fin de especificar codificaciones personalizadas según sea necesario. Crear una codificación válida para un tipo arbitrario puede ser desafiante, pero puedes construir codificaciones personalizadas para tipos con estructuras simples.

Un tipo de dato importante para el procesamiento de datos a gran escala es el par clave-valor. Los SDK de Dataflow usan la clase KV<K, V> para representar pares clave-valor.

Python

Un tipo de datos importante para el procesamiento de datos a gran escala es el par clave-valor. El SDK de Dataflow para Python usa 2 tuplas a fin de representar los pares clave-valor.

Limitaciones de PCollection

Una PCollection tiene varios aspectos clave que la diferencian de una clase de colección normal:

  • Una PCollection es inmutable. Una vez creada, no puedes agregar, quitar ni cambiar elementos individuales.
  • Una PCollection no admite el acceso aleatorio a elementos individuales.
  • Una PCollection pertenece a la canalización en la que se crea. No puedes compartir una PCollection entre objetos Pipeline.

Una PCollection puede estar físicamente respaldada por datos en el almacenamiento existente o puede representar datos que aún no se procesaron. Por lo tanto, los datos en una PCollection son inmutables. Puedes usar una PCollection en los cálculos que generan datos de canalización nuevos (como una PCollection nueva), sin embargo, no puedes cambiar los elementos de una PCollection existente una vez que se crea.

Una PCollection no almacena datos. Recuerda que una PCollection puede tener demasiados elementos para caber en la memoria local donde se ejecuta tu programa de Dataflow. Cuando creas o transformas una PCollection, los datos no se copian ni se mueven en la memoria, como ocurre con algunas clases de contenedores regulares. En cambio, una PCollection representa un conjunto de datos potencialmente muy grande en la nube.

PCollections delimitadas y no delimitadas

El tamaño de una PCollection puede ser delimitado o no delimitado, y la delimitación (o no delimitación) se determina cuando creas la PCollection. Algunas transformaciones raíz crean PCollections delimitadas y otras, no delimitadas; todo depende de la fuente de tus datos de entrada.

PCollections delimitadas

Tu PCollection está delimitada si representa un conjunto de datos fijo con un tamaño conocido que no cambia. Un ejemplo de un conjunto de datos fijo podría ser “registros del servidor del mes de octubre” o “todos los pedidos procesados la semana pasada”. Las transformaciones raíz TextIO y BigQueryIO crean PCollections delimitadas.

Entre las fuentes de datos que crean PCollections delimitadas, se incluyen las siguientes:

Java

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • Fuentes de datos delimitadas personalizadas que creas con la API de Custom Source

Python

Entre los receptores de datos que aceptan PCollections delimitadas, se incluyen los siguientes:

Java

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • Receptores de datos delimitados personalizados que creas con la API de Custom Sink

Python

  • TextIO
  • BigQueryIO
  • Receptores de datos delimitados personalizados que creas con la API de Custom Sink

PCollections no delimitadas

Tu PCollection no está delimitada si representa un conjunto de datos que se actualiza constantemente o datos de transmisión. Un ejemplo de un conjunto de datos que se actualiza constantemente podría ser “registros del servidor a medida que se generan” o “todos los pedidos nuevos a medida que se procesan". Las transformaciones raíz PubsubIO crean PCollections no delimitadas.

Algunas fuentes, en particular las que crean PCollections no delimitadas (como PubsubIO), agregan automáticamente una marca de tiempo a cada elemento de la colección.

Entre las fuentes de datos que crean PCollections no delimitadas, se incluyen las siguientes:

Entre los receptores de datos que aceptan PCollections no delimitadas, se incluyen los siguientes:

  • PubsubIO
  • BigQueryIO

Características de procesamiento

La naturaleza delimitada (o no delimitada) de tu PCollection afecta la forma en que Dataflow procesa tus datos. Las PCollections delimitadas pueden procesarse mediante trabajos por lotes, que pueden leer todo el conjunto de datos una sola vez y realizar el procesamiento en un trabajo limitado. Las PCollections no delimitadas deben procesarse mediante trabajos de transmisión, ya que la colección completa nunca estará disponible para su procesamiento en ningún momento.

Cuando se agrupan PCollections no delimitadas, Dataflow requiere un concepto llamado sistema de ventanas para dividir un conjunto de datos que se actualiza continuamente en ventanas lógicas de tamaño limitado. Dataflow procesa cada ventana como un paquete, y el procesamiento continúa mientras se genera el conjunto de datos. Consulta la siguiente sección sobre marcas de tiempo y el sistema de ventanas a fin de obtener más información.

Marcas de tiempo de elementos de PCollection

Cada elemento en una PCollection tiene una marca de tiempo asociada. Las marcas de tiempo son útiles para las PCollections que contienen elementos con una noción de tiempo inherente. Por ejemplo, una PCollection de pedidos para procesar puede usar la hora en que se creó un pedido como marca de tiempo del elemento.

La fuente que crea la PCollection asigna inicialmente la marca de tiempo de cada elemento. Las fuentes que crean una PCollection no delimitada a menudo le asignan una marca de tiempo a cada elemento nuevo según el momento en que se agregó ese elemento a la PCollection no delimitada.

Java

Las fuentes de datos que producen conjuntos de datos fijos, como BigQueryIO o TextIO, también asignan marcas de tiempo a cada elemento. Sin embargo, estas fuentes de datos suelen asignar la misma marca de tiempo (Long.MIN_VALUE) a cada elemento.

Puedes asignar manualmente marcas de tiempo a los elementos de una PCollection. Por lo general, esto se hace cuando los elementos tienen una marca de tiempo inherente, pero esa marca de tiempo debe calcularse, por ejemplo, mediante su análisis fuera de la estructura del elemento. Para asignar una marca de tiempo manualmente, usa una transformación ParDo. Dentro de la transformación ParDo, tu DoFn puede producir elementos de salida con marcas de tiempo. Consulta la sección sobre cómo asignar marcas de tiempo para obtener más información.

Python

Puedes asignar manualmente marcas de tiempo a los elementos de una PCollection. Por lo general, esto se hace cuando los elementos tienen una marca de tiempo inherente, pero esa marca de tiempo debe calcularse, por ejemplo, mediante su análisis fuera de la estructura del elemento. Para asignar una marca de tiempo manualmente, usa una transformación ParDo. Dentro de la transformación ParDo, tu DoFn puede producir elementos de salida con marcas de tiempo.

Sistema de ventanas

Las marcas de tiempo asociadas con cada elemento en una PCollection se usan para un concepto llamado sistema de ventanas. El sistema de ventanas divide los elementos de una PCollection según sus marcas de tiempo. El sistema de ventanas se puede usar en todas las PCollections, pero es necesario para realizar algunos cálculos en PCollections no delimitadas a fin de dividir el flujo de datos continuo en fragmentos delimitados para el procesamiento.

Consulta la sección sobre sistema de ventanas a fin de obtener más información sobre cómo usar los conceptos del sistema de ventanas de Dataflow en tu canalización.

Crea una PCollection

A fin de trabajar con un conjunto de datos en una canalización de Cloud Dataflow, deberás crear una PCollection para representar los datos, dondequiera que estén almacenados. Los SDK de Dataflow proporcionan dos formas principales de crear una PCollection inicial:

  • Puedes leer los datos desde una fuente de datos externa, como un archivo.
  • Puedes crear una PCollection de datos almacenados en una clase de colección en la memoria.

Lee datos externos

Consulta E/S de canalización a fin de obtener más información sobre cómo leer datos desde una fuente de datos externa.

Crea una PCollection a partir de datos en la memoria local

Puedes crear una PCollection a partir de datos en la memoria local para que puedas usarlos en las transformaciones de tu canalización. Por lo general, usas datos de la memoria local para poner a prueba tu canalización con conjuntos de datos más pequeños y reducir la dependencia de tu canalización en la E/S externa durante las pruebas.

Java

Para crear una PCollection a partir de una Collection Java en la memoria, debes apply la transformación Create. Create es una PTransform raíz proporcionada por el SDK de Dataflow para Java. Create acepta una Collection de Java y un objeto Coder, que especifica cómo se deben codificar los elementos de la Collection.

En la siguiente muestra de código, se crea una PCollection de String, que representa líneas de texto individuales a partir de una List Java:

  // Create a Java Collection, in this case a List of Strings.
  static final List<String> LINES = Arrays.asList(
      "To be, or not to be: that is the question: ",
      "Whether 'tis nobler in the mind to suffer ",
      "The slings and arrows of outrageous fortune, ",
      "Or to take arms against a sea of troubles, ");

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())   // create the PCollection

El código anterior usa Create.of, que produce una PCollection que contiene los elementos especificados. Ten en cuenta que si tu canalización usa el sistema de ventanas, debes usar Create.timestamped en su lugar. Create.timestamped produce una PCollection que contiene los elementos especificados con marcas de tiempo especificadas.

Python

Para crear una PCollection, aplica la transformación Create. Create es una transformación estándar que proporciona el SDK de Dataflow para Python.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# argv = None  # if None, uses sys.argv
pipeline_options = PipelineOptions(argv)
with beam.Pipeline(options=pipeline_options) as pipeline:
  lines = (
      pipeline
      | beam.Create([
          'To be, or not to be: that is the question: ',
          "Whether 'tis nobler in the mind to suffer ",
          'The slings and arrows of outrageous fortune, ',
          'Or to take arms against a sea of troubles, ',
      ]))

Usa PCollection con tipos de datos personalizados

Puedes crear una PCollection en la que el tipo de elemento sea un tipo de dato personalizado que proporciones. Esto puede resultar útil si necesitas crear una colección de tu clase o estructura propias con campos específicos, como una clase de Java que contenga el nombre, la dirección y el número de teléfono de un cliente.

Cuando creas una PCollection de un tipo personalizado, debes proporcionar un Coder para ese tipo. El Coder le indica al servicio de Dataflow cómo serializar y deserializar los elementos de tu PCollection mientras tu conjunto de datos se paraleliza y particiona en varias instancias de trabajador de canalización. Consulta Codificación de datos para obtener más información.

Dataflow intentará inferir un Coder para cualquier PCollection para la que no hayas establecido un Coder explícitamente. El Coder predeterminado para un tipo personalizado es SerializableCoder, que usa la serialización de Java. Sin embargo, Dataflow recomienda usar AvroCoder como Coder cuando sea posible.

Puedes registrar AvroCoder como el codificador predeterminado para tu tipo de dato usando el CoderRegistry de tu objeto Pipeline. Anota tu clase de la siguiente manera:

Java

  @DefaultCoder(AvroCoder.class)
  public class MyClass {
    ...
 }

Para asegurarte de que tu clase personalizada sea compatible con AvroCoder, es posible que debas agregar algunas anotaciones adicionales, por ejemplo, debes anotar campos nulos en tu tipo de dato con org.apache.avro.reflect.Nullable. Consulta la documentación de referencia de la API para Java de AvroCoder y la documentación del paquete de org.apache.avro.reflect a fin de obtener más información.

La canalización de ejemplo de TrafficRoutes de Dataflow crea una PCollection cuyo tipo de elemento es una clase personalizada llamada StationSpeed. StationSpeed registra AvroCoder como su codificador predeterminado de la siguiente manera:

Java

  /**
   * This class holds information about a station reading's average speed.
   */
  @DefaultCoder(AvroCoder.class)
  static class StationSpeed {
    @Nullable String stationId;
    @Nullable Double avgSpeed;

    public StationSpeed() {}

    public StationSpeed(String stationId, Double avgSpeed) {
      this.stationId = stationId;
      this.avgSpeed = avgSpeed;
    }

    public String getStationId() {
      return this.stationId;
    }
    public Double getAvgSpeed() {
      return this.avgSpeed;
    }
  }