PCollection

Los SDK de Dataflow usan una clase especializada llamada PCollection para representar datos en una canalización. Una PCollection representa un conjunto de datos de elementos múltiples.

Puedes pensar en PCollection como datos de “canalización”. Las transformaciones de Dataflow usan PCollection como entradas y resultados. Como tales, si trabajas con datos en tu canalización, debe ser en el formato de una PCollection. Cada PCollection es propiedad de un objeto de Pipeline específico y solo puede usarla ese objeto Pipeline.

IMPORTANTE: Este documento contiene información sobre PCollection no delimitadas y sistemas de ventanas. Estos conceptos se refieren solo al SDK de Dataflow para Java y aún no se encuentran disponibles en el SDK de Dataflow para Python.

Características de PCollection

Una PCollection representa una “bolsa” de elementos inmutable y de un potencial gran tamaño. No existe un tope a la cantidad de elementos que puede contener una PCollection. Cualquier PCollection puede entrar en la memoria, o puede representar un conjunto de datos muy grande respaldado por un almacén de datos persistentes.

Java

Los elementos de una PCollection pueden ser de cualquier tipo, pero deben ser todos del mismo. Sin embargo, Dataflow necesita poder codificar cada elemento individual como una string de byte a fin de ser compatible con el procesamiento distribuido. Los SDK de Dataflow proporcionan un mecanismo de codificación de datos que incluye codificaciones incorporadas para tipos usados con frecuencia y compatibilidad a fin de especificar codificaciones personalizadas según sea necesario. Crear una codificación válida para un tipo arbitrario puede ser desafiante, pero puedes construir codificaciones personalizadas para tipos con estructuras simples.

Un tipo de datos importante para el procesamiento de datos a gran escala es el par clave-valor. Los SDK de Dataflow usan la clase KV<K, V> para representar pares clave-valor.

Python

Un tipo de datos importante para el procesamiento de datos a gran escala es el par clave-valor. El SDK de Dataflow para Python usa 2 tuplas a fin de representar los pares clave-valor.

Limitaciones de PCollection

Una PCollection tiene muchos aspectos claves en los que difiere de una clase de colección regular:

  • Una PCollection es inmutable. Una vez creada, no puedes agregar, quitar o cambiar elementos individuales.
  • Una PCollection no es compatible con el acceso aleatorio a elementos individuales.
  • Una PCollection pertenece a la canalización en la que se creó. No puedes compartir una PCollection entre objetos de Pipeline.

Una PCollection puede estar respaldada de manera física por datos en un almacén existente, o puede representar datos que aún no se procesaron. Como tales, los datos en una PCollection son inmutables. Puedes usar una PCollection en computaciones que generan datos de canalización nuevos (como una PCollection nueva). Sin embargo, no puedes cambiar los elementos de una PCollection existente una vez que se creó.

Una PCollection no almacena datos en sí. Recuerda que una PCollection puede tener demasiados elementos como para entrar en una memoria local en la que se ejecuta tu programa de Dataflow. Cuando creas o transformas una PCollection, los datos no se copian o se mueven en la memoria como con algunas clases de contenedores regulares. En cambio, una PCollection representa un conjunto de datos muy grandes en la nube.

PCollections delimitadas y no delimitadas

El tamaño de una PCollection puede ser delimitado o no delimitado y la delimitación (o no delimitación) se determina cuando creas la PCollection. Algunas transformaciones de raíz crean PCollections delimitadas mientras que otras crean unas no delimitadas. Esto depende de la fuente de tus datos de entrada.

PCollections delimitadas

Tu PCollection está delimitada si representa un conjunto de datos fijos, que tiene un tamaño esperado que no cambia. Un ejemplo de un conjunto de datos fijos puede ser los “registros del servidor del mes de octubre”, o “todos los pedidos procesados la semana pasada”. Las transformaciones de raíz TextIO y BigQueryIO crean PCollection delimitadas.

A continuación, se muestra una lista de las fuentes de datos que crean PCollection delimitadas:

Java

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • Fuentes de datos delimitadas personalizadas que creas con la API de Custom Source

Python

A continuación, se muestra una lista de los receptores de datos que aceptan PCollection delimitadas:

Java

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • Receptores de datos delimitados personalizados que creas con la API de Custom Sink

Python

  • TextIO
  • BigQueryIO
  • Receptores de datos delimitados personalizados que creas con la API de Custom Sink

PCollections no delimitadas

Tu PCollection no está delimitada si representa un conjunto de datos de actualización continua o datos de transmisión. Un ejemplo de un conjunto de datos de actualización continua puede ser “registros del servidor a medida que se generan” o “todos los pedidos nuevos a medida que se procesan”. Las transformaciones de raíz PubsubIO crean PCollection no delimitadas.

Algunas fuentes, en especial las que crean PCollection no delimitadas (como PubsubIO), anexan de manera automática una marca de tiempo a cada elemento de la colección.

A continuación, se muestra una lista de fuentes de datos que crean PCollection no delimitadas:

A continuación, se muestra una lista de los receptores de datos que aceptan PCollection no delimitadas:

  • PubsubIO
  • BigQueryIO

Características de procesamiento

La naturaleza delimitada (o no delimitada) de tu PCollection afecta la forma en la que Dataflow procesa tus datos. Las PCollection delimitadas pueden procesarse con trabajos por lote, que pueden leer el conjunto de datos entero una vez y realizar el procesamiento en un trabajo finito. Las PCollection no delimitadas deben procesarse con trabajos de transmisión, ya que la colección entera nunca puede estar disponible para procesarse al mismo tiempo.

Cuando se agrupan PCollection no delimitadas, Dataflow requiere un concepto llamado sistema de ventanas para dividir un conjunto de datos de actualización continua en ventanas lógicas de tamaño finito. Dataflow procesa cada ventana como un paquete y el procesamiento continúa mientras se genera el conjunto de datos. Consulta la siguiente sección sobre Marcas de tiempo y el sistema de ventanas a fin de obtener más información.

Marcas de tiempo de elementos de PCollection

Cada elemento en una PCollection tiene asociada una marca de tiempo. Las marcas de tiempo son útiles en PCollection que contienen elementos con una noción del tiempo inherente. Por ejemplo, una PCollection de pedidos para procesar puede usar el horario en que se creó un pedido como la marca de tiempo del elemento.

En primer lugar, la fuente que crea la PCollection asigna la marca de tiempo para cada elemento. Las fuentes que crean PCollection no delimitadas asignan con frecuencia una marca de tiempo a cada elemento nuevo según cuándo se agregó a la PCollection no delimitada.

Java

Las fuentes de datos que producen conjuntos de datos fijos, como BigQueryIO o TextIO, también asignan marcas de tiempo a cada elemento. Sin embargo, estas fuentes de datos, por lo general, asignan la misma marca de tiempo (Long.MIN_VALUE) a cada elemento.

Puedes asignar marcas de tiempo a elementos de una PCollection de forma manual. Por lo general, esto se hace cuando los elementos tienen una marca de tiempo inherente, pero esa marca de tiempo debe ser calculada, por ejemplo, mediante su análisis fuera de la estructura del elemento. Para asignar una marca de tiempo de forma manual, usa una transformación ParDo. Dentro de la transformación ParDo, tu DoFn puede producir elementos de resultado con marcas de tiempo. Consulta Asigna marcas de tiempo para obtener más información.

Python

Puedes asignar marcas de tiempo a elementos de una PCollection de forma manual. Por lo general, esto se hace cuando los elementos tienen una marca de tiempo inherente, pero esa marca de tiempo debe ser calculada, por ejemplo, mediante su análisis fuera de la estructura del elemento. Para asignar una marca de tiempo de forma manual, usa una transformación ParDo. Dentro de la transformación ParDo, tu DoFn puede producir elementos de resultado con marcas de tiempo.

Sistema de ventanas

Las marcas de tiempo asociadas con cada elemento en una PCollection se usan para un concepto llamado sistema de ventanas. El sistema de ventanas divide los elementos de una PCollection según sus marcas de tiempo. El sistema de ventanas puede usarse en todas las PCollection, pero es necesario para algunas computaciones de PCollection no delimitadas a fin de dividir la transmisión continua de datos en partes finitas para el procesamiento.

Consulta la sección sobre el sistema de ventanas a fin de obtener más información sobre cómo usar los conceptos del sistema de ventanas de Dataflow en tu canalización.

Crea una PCollection

A fin de trabajar con un conjunto de datos en una canalización de Cloud Dataflow, necesitarás crear una PCollection para representar los datos, en donde sea que están almacenados. Los SDK de Dataflow proporcionan dos formas principales de crear una PCollection inicial:

  • Puedes leer los datos desde una fuente de datos externa, como un archivo.
  • Puedes crear una PCollection de datos almacenados en una clase de colección en la memoria.

Lee datos externos

Consulta Canalización de E/S a fin de obtener más información sobre cómo leer datos desde una fuente de datos externa.

Crea una PCollection a partir de datos en la memoria local

Puedes crear una PCollection a partir de datos en la memoria local a fin de poder usar esos datos en las transformaciones de tu canalización. Por lo general, usas datos de la memoria local para poner a prueba tu canalización con conjuntos de datos más pequeños y reducir la dependencia de tu canalización de E/S externo durante la prueba.

Java

A fin de crear una PCollection a partir de una Collection de Java en la memoria, apply la transformación Create. Create es una raíz PTransform que proporciona el SDK de Dataflow para Java. Create acepta una Collection de Java y un objeto Coder, que especifica cómo deben codificarse los elementos en la Collection.

El siguiente código de muestra crea una PCollection de String, que representa líneas individuales de texto de una List de Java:

  // Create a Java Collection, in this case a List of Strings.
  static final List<String> LINES = Arrays.asList(
      "To be, or not to be: that is the question: ",
      "Whether 'tis nobler in the mind to suffer ",
      "The slings and arrows of outrageous fortune, ",
      "Or to take arms against a sea of troubles, ");

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())   // create the PCollection

El código de arriba usa Create.of, que produce una PCollection que contiene los elementos especificados. Ten en cuenta que si tu canalización usa el sistema de ventanas, debes usar Create.timestamped en su lugar. Create.timestamped produce una PCollection que contiene los elementos especificados con las marcas de tiempo especificadas.

Python

Para crear una PCollection, aplica la transformación Create. Create es una transformación estándar que proporciona el SDK de Dataflow para Python.

with beam.Pipeline(options=pipeline_options) as p:

  lines = (p
           | beam.Create([
               'To be, or not to be: that is the question: ',
               'Whether \'tis nobler in the mind to suffer ',
               'The slings and arrows of outrageous fortune, ',
               'Or to take arms against a sea of troubles, ']))

Usa PCollection con tipos de datos personalizados

Puedes crear una PCollection en la que el tipo de elemento es un tipo de datos personalizado que proporcionas. Esto puede resultar útil si necesitas crear una colección de tu clase o estructura propia con campos específicos, como una clase de Java que contiene el nombre, la dirección y el número de teléfono de un cliente.

Cuando crees una PCollection de un tipo personalizado, deberás proporcionar un Coder para este. El Coder le dice al servicio de Dataflow cómo serializar y deserializar los elementos de tu PCollection mientras tu conjunto de datos se paraleliza y se divide en particiones en múltiples instancias de trabajadores de canalización. Consulta codificación de datos para obtener más información.

Dataflow intentará inferir un Coder para cualquier PCollection sin un Coder especificado de forma explícita. El Coder predeterminado para un tipo personalizado es SerializableCoder, que usa la serialización de Java. Sin embargo, Dataflow recomienda usar AvroCoder como el Coder cuando sea posible.

Puedes registrar AvroCoder como el codificador predeterminado para tu tipo de datos si usas el CoderRegistry de tu objeto de Pipeline. Anota tu clase de la siguiente manera:

Java

  @DefaultCoder(AvroCoder.class)
  public class MyClass {
    ...
 }

A fin de asegurarte de que tu clase personalizada sea compatible con AvroCoder, quizás debas agregar algunas anotaciones adicionales. Por ejemplo, deberás anotar los campos nulos en tu tipo de datos con org.apache.avro.reflect.Nullable. Consulta la documentación de referencia de la API para Java de AvroCoder y la documentación del paquete de org.apache.avro.reflect a fin de obtener más información.

La canalización de ejemplo de TrafficRoutes de Dataflow crea una PCollection cuyo tipo de elemento es una clase personalizada llamada StationSpeed. StationSpeed registra AvroCoder como su codificador predeterminado de la siguiente manera:

Java

  /**
   * This class holds information about a station reading's average speed.
   */
  @DefaultCoder(AvroCoder.class)
  static class StationSpeed {
    @Nullable String stationId;
    @Nullable Double avgSpeed;

    public StationSpeed() {}

    public StationSpeed(String stationId, Double avgSpeed) {
      this.stationId = stationId;
      this.avgSpeed = avgSpeed;
    }

    public String getStationId() {
      return this.stationId;
    }
    public Double getAvgSpeed() {
      return this.avgSpeed;
    }
  }
¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

¿Necesitas ayuda? Visita nuestra página de asistencia.