PCollection

Los SDK de Dataflow usan una clase especializada llamada PCollection para representar datos en una canalización. A PCollection representa un conjunto de datos de varios elementos.

Puedes pensar en un PCollection como datos de "canalización". Las transformaciones de Dataflow usan PCollection s como entradas y salidas. Por lo tanto, si desea trabajar con datos en su canalización, debe tener la forma de un PCollection. Cada PCollection es propiedad de un objeto Pipeline específico, y solo ese objeto Pipeline puede usarlo.

IMPORTANTE: Este documento contiene información acerca de unbound PCollections y Windowing. Estos conceptos se refieren solo al SDK de Dataflow para Java y aún no se encuentran disponibles en el SDK de Dataflow para Python.

Características de PCollection

Una PCollection representa una "bolsa" de elementos potencialmente grande e inmutable. No hay un límite superior de cuántos elementos puede contener PCollection. cualquier PCollection dado puede caber en la memoria, o puede representar un conjunto de datos muy grande respaldado por un almacén de datos persistente.

Java

Los elementos de un PCollection pueden ser de cualquier tipo, pero todos deben ser del mismo tipo. Sin embargo, Dataflow necesita poder codificar cada elemento individual como una string de byte a fin de ser compatible con el procesamiento distribuido. Los SDK de Dataflow proporcionan un mecanismo de codificación de datos que incluye codificaciones incorporadas para tipos usados con frecuencia y compatibilidad a fin de especificar codificaciones personalizadas según sea necesario. Crear una codificación válida para un tipo arbitrario puede ser desafiante, pero puedes construir codificaciones personalizadas para tipos con estructuras simples.

Un tipo de datos importante para el procesamiento de datos a gran escala es el par clave-valor. Los SDK de Dataflow usan la clase KV<K, V> para representar los pares clave-valor.

Python

Un tipo de datos importante para el procesamiento de datos a gran escala es el par clave-valor. El SDK de Dataflow para Python usa 2 tuplas a fin de representar los pares clave-valor.

Limitaciones de PCollection

Un PCollection tiene varios aspectos clave en los que difiere de una clase de colección normal:

  • Un PCollection es inmutable. Una vez creada, no puedes agregar, quitar o cambiar elementos individuales.
  • Un PCollection no admite el acceso aleatorio a elementos individuales.
  • Un PCollection pertenece a la canalización en la que se creó. No puedes compartir un PCollection entre Pipeline objetos.

Un PCollection puede estar físicamente respaldado por datos en el almacenamiento existente o puede representar datos que aún no se han calculado. Por lo tanto, los datos en un PCollection son inmutables. Puede usar un PCollection en los cálculos que generan datos de canalización nuevos (como un nuevo PCollection); sin embargo, no puede cambiar los elementos de un PCollection existente una vez que se haya creado.

A PCollection no almacena datos; recuerda que un PCollection puede tener demasiados elementos para ajustarse a la memoria local en la que se ejecuta tu programa de Dataflow. Cuando creas o transformas un PCollection, los datos no se copian ni se mueven en la memoria, como ocurre con algunas clases de contenedores normales. En cambio, un PCollection representa un conjunto de datos potencialmente muy grande en la nube.

PCollections delimitadas y no delimitadas

El tamaño de una PCollection puede ser acotado o ilimitado, y la acotación (o desvinculación) se determina cuando creas la PCollection. Algunas transformaciones raíz crean delimitadas PCollections, mientras que otras crean relaciones ilimitadas. depende de la fuente de sus datos de entrada.

PCollections delimitadas

Tu PCollection está limitado si representa un conjunto de datos fijo, que tiene un tamaño conocido que no cambia. Un ejemplo de un conjunto de datos fijo podría ser "registros del servidor del mes de octubre" o "todos los pedidos procesados la semana pasada". Las transformaciones raíz TextIO y BigQueryIO crean PCollection s delimitadas.

Las fuentes de datos que crean PCollection s delimitadas incluyen:

Java

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • Fuentes de datos delimitadas personalizadas que creas con la API de Custom Source

Python

Los receptores de datos que aceptan PCollection s delimitados incluyen:

Java

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • Receptores de datos delimitados personalizados que creas con la API de Custom Sink

Python

  • TextIO
  • BigQueryIO
  • Receptores de datos delimitados personalizados que creas con la API de Custom Sink

PCollections no delimitadas

Tu PCollection no está limitado si representa un conjunto de datos que se actualiza constantemente o datos de transmisión. Un ejemplo de un conjunto de datos que se actualiza continuamente puede ser "registros del servidor a medida que se generan" o "todos los pedidos nuevos a medida que se procesan". PubsubIO las transformaciones raíz crean PCollection s sin límites.

Algunas fuentes, particularmente las que crean PCollection s ilimitadas (como PubsubIO), agregan automáticamente una marca de tiempo a cada elemento de la colección.

Las fuentes de datos que crean PCollection s ilimitadas incluyen:

Los receptores de datos que aceptan PCollection s sin límites incluyen:

  • PubsubIO
  • BigQueryIO

Características de procesamiento

La naturaleza limitada (o ilimitada) de su PCollection afecta la forma en que Dataflow procesa sus datos. Los PCollection s delimitados se pueden procesar mediante trabajos por lotes, que pueden leer todo el conjunto de datos una sola vez y realizar el procesamiento en un trabajo finito. Los PCollection s sin límites se deben procesar mediante trabajos de transmisión, ya que la colección completa no puede estar disponible para su procesamiento en un momento dado.

Cuando se agrupan PCollection s ilimitadas, Dataflow requiere un concepto llamado Windowing para dividir un conjunto de datos que se actualiza continuamente en ventanas lógicas de tamaño limitado. Dataflow procesa cada ventana como un paquete y el procesamiento continúa mientras se genera el conjunto de datos. Consulta la siguiente sección sobre Marcas de tiempo y el sistema de ventanas a fin de obtener más información.

Marcas de tiempo de elementos de PCollection

Cada elemento en un PCollection tiene una marca de tiempo asociada. Las marcas de tiempo son útiles para PCollection s que contienen elementos con una noción inherente de tiempo. Por ejemplo, un PCollection de los pedidos para procesar puede usar la hora en que se creó un pedido como la marca de tiempo del elemento.

La marca de tiempo de cada elemento la asigna inicialmente la fuente que crea el PCollection. Fuentes que crean contenido ilimitado PCollection a menudo le asignan a cada elemento nuevo una marca de tiempo según el momento en que se agregó al elemento ilimitado. PCollection .

Java

Las fuentes de datos que producen conjuntos de datos fijos, como BigQueryIO o TextIO, también asignan marcas de tiempo a cada elemento. sin embargo, estas fuentes de datos suelen asignar la misma marca de tiempo (Long.MIN_VALUE) a cada elemento.

Puedes asignar manualmente marcas de tiempo a los elementos de un PCollection. Por lo general, esto se hace cuando los elementos tienen una marca de tiempo inherente, pero esa marca de tiempo debe ser calculada, por ejemplo, mediante su análisis fuera de la estructura del elemento. Para asignar manualmente una marca de tiempo, usa una transformación ParDo. dentro de la transformación ParDo, tu DoFn puede producir elementos de salida con marcas de tiempo. Consulta Asigna marcas de tiempo para obtener más información.

Python

Puedes asignar manualmente marcas de tiempo a los elementos de un PCollection. Por lo general, esto se hace cuando los elementos tienen una marca de tiempo inherente, pero esa marca de tiempo debe ser calculada, por ejemplo, mediante su análisis fuera de la estructura del elemento. Para asignar manualmente una marca de tiempo, usa una transformación ParDo. dentro de la transformación ParDo, tu DoFn puede producir elementos de salida con marcas de tiempo.

Ventanas

Las marcas de tiempo asociadas con cada elemento en un PCollection se usan para un concepto llamado Windowing. La ventana divide los elementos de un PCollection según sus marcas de tiempo. El uso de ventanas se puede usar en todos los PCollection s, pero es obligatorio para algunos cómputos sobre PCollection s no delimitados para dividir la transmisión continua de datos en fragmentos finitos.

Consulta la sección sobre el sistema de ventanas a fin de obtener más información sobre cómo usar los conceptos del sistema de ventanas de Dataflow en tu canalización.

Crea una PCollection

Para trabajar con un conjunto de datos en una canalización de Cloud Dataflow, deberá crear un PCollection para representar los datos, dondequiera que estén almacenados. Los SDK de Dataflow proporcionan dos formas principales de crear una PCollection inicial:

  • Puedes leer los datos desde una fuente de datos externa, como un archivo.
  • Puedes crear un PCollection de datos que se almacenen en una clase de colección en la memoria.

Lee datos externos

Consulta Canalización de E/S a fin de obtener más información sobre cómo leer datos desde una fuente de datos externa.

Crea una PCollection a partir de datos en la memoria local

Puedes crear un PCollection de datos en la memoria local para que puedas usar esos datos en las transformaciones de tu canalización. Por lo general, usas datos de la memoria local para poner a prueba tu canalización con conjuntos de datos más pequeños y reducir la dependencia de tu canalización de E/S externo durante la prueba.

Java

Para crear un PCollection desde una Java en la memoria Collection, apply la transformación Create. Create es una raíz PTransform proporcionada por el SDK de Dataflow para Java. Create acepta un objeto Collection de Java y un objeto Coder, que especifica cómo se deben codificar los elementos de Collection.

El siguiente ejemplo de código crea un PCollection de String, que representa líneas de texto individuales, desde un Java List:

      // Create a Java Collection, in this case a List of Strings.
      static final List<String> LINES = Arrays.asList(
          "To be, or not to be: that is the question: ",
          "Whether 'tis nobler in the mind to suffer ",
          "The slings and arrows of outrageous fortune, ",
          "Or to take arms against a sea of troubles, ");

      PipelineOptions options = PipelineOptionsFactory.create();
      Pipeline p = Pipeline.create(options);

      p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())   // create the PCollection
    

El código anterior usa Create.of, que produce un PCollection que contiene los elementos especificados. Ten en cuenta que si tu canalización usa el sistema de ventanas, debes usar Create.timestamped en su lugar. Create.timestamped produce un PCollection que contiene los elementos especificados con marcas de tiempo especificadas.

Python

Para crear un PCollection, aplica la transformación Create. Create es una transformación estándar que proporciona el SDK de Python de Dataflow.

    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions

    # argv = None  # if None, uses sys.argv
    pipeline_options = PipelineOptions(argv)
    with beam.Pipeline(options=pipeline_options) as pipeline:
      lines = (
          pipeline
          | beam.Create([
              'To be, or not to be: that is the question: ',
              "Whether 'tis nobler in the mind to suffer ",
              'The slings and arrows of outrageous fortune, ',
              'Or to take arms against a sea of troubles, ',
          ]))

    

Usa PCollection con tipos de datos personalizados

Puede crear un PCollection en el que el tipo de elemento sea un tipo de datos personalizado que proporcione. Esto puede resultar útil si necesitas crear una colección de tu clase o estructura propia con campos específicos, como una clase de Java que contiene el nombre, la dirección y el número de teléfono de un cliente.

Cuando cree un PCollection de un tipo personalizado, deberá proporcionar un Coder para ese tipo personalizado. Coder le dice al servicio de Dataflow cómo serializar y deserializar los elementos de su PCollection ya que su conjunto de datos se paraleliza y particiona en varias instancias de pipeline worker; consulta la codificación de datos para obtener más información.

Dataflow intentará inferir un Coder para cualquier PCollection para el que no establezca explícitamente un Coder. El valor predeterminado Coder para un tipo personalizado es SerializableCoder, que usa la serialización de Java. Sin embargo, Dataflow recomienda usar AvroCoder como Coder cuando sea posible.

Puedes registrarte AvroCoder como el codificador predeterminado para tu tipo de datos mediante el uso de Pipeline objeto CoderRegistry. Anota tu clase de la siguiente manera:

Java

      @DefaultCoder(AvroCoder.class)
      public class MyClass {
        ...
     }
    

Para asegurarte de que tu clase personalizada sea compatible con AvroCoder, es posible que debas agregar algunas anotaciones adicionales; por ejemplo, debes anotar campos nulos en tu tipo de datos con org.apache.avro.reflect.Nullable. Consulta la documentación de referencia de la API para Java para AvroCoder y la documentación del paquete para org.apache.avro.reflect para obtener más información.

La canalización de ejemplo TrafficRoutes de Dataflow crea un PCollection cuyo tipo de elemento es una clase personalizada llamada StationSpeed. StationSpeed registra AvroCoder como su codificador predeterminado de la siguiente manera:

Java

      /**
       * This class holds information about a station reading's average speed.
       */
      @DefaultCoder(AvroCoder.class)
      static class StationSpeed {
        @Nullable String stationId;
        @Nullable Double avgSpeed;

        public StationSpeed() {}

        public StationSpeed(String stationId, Double avgSpeed) {
          this.stationId = stationId;
          this.avgSpeed = avgSpeed;
        }

        public String getStationId() {
          return this.stationId;
        }
        public Double getAvgSpeed() {
          return this.avgSpeed;
        }
      }