Los SDK de Dataflow usan una clase especializada llamada PCollection
para representar datos en una canalización. Una PCollection
representa un conjunto de datos de varios elementos.
Puedes pensar en una PCollection
como datos de “canalización”. Las Transformaciones de Dataflow usan PCollection
s como entradas y salidas, por lo tanto, si deseas trabajar con datos en tu canalización, deben tener la forma de una PCollection
. Cada PCollection
es propiedad de un objeto Pipeline
específico, y solo ese objeto Pipeline
puede usarla.
IMPORTANTE: Este documento contiene información sobre PCollection
no delimitadas y sistemas de ventanas. Estos conceptos se refieren solo al SDK de Dataflow para Java y aún no se encuentran disponibles en el SDK de Dataflow para Python.
Características de PCollection
Una PCollection
representa una “bolsa” de elementos potencialmente inmutable y grande.
No hay un límite máximo para la cantidad de elementos que puede contener una PCollection
; cualquier PCollection
podría caber en la memoria o representar un conjunto muy grande de datos respaldado por un almacén de datos persistentes.
Java
Los elementos de una PCollection
pueden ser de cualquier tipo, pero deben ser todos del mismo tipo.
Sin embargo, Dataflow necesita poder codificar cada elemento individual como una string de bytes para ser compatible con el procesamiento distribuido. Los SDK de Dataflow proporcionan un mecanismo de Codificación de datos que incluye codificaciones integradas para tipos que se usan con frecuencia y compatibilidad con el fin de especificar codificaciones personalizadas según sea necesario.
Crear una codificación válida para un tipo arbitrario puede ser desafiante, pero puedes construir codificaciones personalizadas para tipos con estructuras simples.
Un tipo de dato importante para el procesamiento de datos a gran escala es el par clave-valor. Los SDK de Dataflow usan la clase KV<K, V>
para representar pares clave-valor.
Python
Un tipo de datos importante para el procesamiento de datos a gran escala es el par clave-valor. El SDK de Dataflow para Python usa 2 tuplas a fin de representar los pares clave-valor.
Limitaciones de PCollection
Una PCollection
tiene varios aspectos clave que la diferencian de una clase de colección normal:
- Una
PCollection
es inmutable. Una vez creada, no puedes agregar, quitar ni cambiar elementos individuales. - Una
PCollection
no admite el acceso aleatorio a elementos individuales. - Una
PCollection
pertenece a la canalización en la que se crea. No puedes compartir unaPCollection
entre objetosPipeline
.
Una PCollection
puede estar físicamente respaldada por datos en el almacenamiento existente o puede representar datos que aún no se procesaron. Por lo tanto, los datos en una PCollection
son inmutables. Puedes usar una PCollection
en los cálculos que generan datos de canalización nuevos (como una PCollection
nueva), sin embargo, no puedes cambiar los elementos de una PCollection
existente una vez que se crea.
Una PCollection
no almacena datos. Recuerda que una PCollection
puede tener demasiados elementos para caber en la memoria local donde se ejecuta tu programa de Dataflow. Cuando creas o transformas una PCollection
, los datos no se copian ni se mueven en la memoria, como ocurre con algunas clases de contenedores regulares. En cambio, una PCollection
representa un conjunto de datos potencialmente muy grande en la nube.
PCollections delimitadas y no delimitadas
El tamaño de una PCollection
puede ser delimitado o no delimitado, y la delimitación (o no delimitación) se determina cuando creas la PCollection
. Algunas transformaciones raíz crean PCollections
delimitadas y otras, no delimitadas; todo depende de la fuente de tus datos de entrada.
PCollections delimitadas
Tu PCollection
está delimitada si representa un conjunto de datos fijo con un tamaño conocido que no cambia. Un ejemplo de un conjunto de datos fijo podría ser “registros del servidor del mes de octubre” o “todos los pedidos procesados la semana pasada”. Las transformaciones raíz TextIO
y BigQueryIO
crean PCollection
s delimitadas.
Entre las fuentes de datos que crean PCollection
s delimitadas, se incluyen las siguientes:
Java
TextIO
BigQueryIO
DatastoreIO
- Fuentes de datos delimitadas personalizadas que creas con la API de Custom Source
Python
TextIO
BigQueryIO
- Fuentes de datos delimitadas personalizadas que creas con la API de Custom Source
Entre los receptores de datos que aceptan PCollection
s delimitadas, se incluyen los siguientes:
Java
TextIO
BigQueryIO
DatastoreIO
- Receptores de datos delimitados personalizados que creas con la API de Custom Sink
Python
TextIO
BigQueryIO
- Receptores de datos delimitados personalizados que creas con la API de Custom Sink
PCollections no delimitadas
Tu PCollection
no está delimitada si representa un conjunto de datos que se actualiza constantemente o datos de transmisión. Un ejemplo de un conjunto de datos que se actualiza constantemente podría ser “registros del servidor a medida que se generan” o “todos los pedidos nuevos a medida que se procesan". Las transformaciones raíz PubsubIO
crean PCollection
s no delimitadas.
Algunas fuentes, en particular las que crean PCollection
s no delimitadas (como PubsubIO
), agregan automáticamente una marca de tiempo a cada elemento de la colección.
Entre las fuentes de datos que crean PCollection
s no delimitadas, se incluyen las siguientes:
PubsubIO
- Fuentes de datos no delimitadas personalizadas que creas con la API de Custom Source
Entre los receptores de datos que aceptan PCollection
s no delimitadas, se incluyen los siguientes:
PubsubIO
BigQueryIO
Características de procesamiento
La naturaleza delimitada (o no delimitada) de tu PCollection
afecta la forma en que Dataflow procesa tus datos. Las PCollection
s delimitadas pueden procesarse mediante trabajos por lotes, que pueden leer todo el conjunto de datos una sola vez y realizar el procesamiento en un trabajo limitado. Las PCollection
s no delimitadas deben procesarse mediante trabajos de transmisión, ya que la colección completa nunca estará disponible para su procesamiento en ningún momento.
Cuando se agrupan PCollection
s no delimitadas, Dataflow requiere un concepto llamado sistema de ventanas para dividir un conjunto de datos que se actualiza continuamente en ventanas lógicas de tamaño limitado. Dataflow procesa cada ventana como un paquete, y el procesamiento continúa mientras se genera el conjunto de datos. Consulta la siguiente sección sobre marcas de tiempo y el sistema de ventanas a fin de obtener más información.
Marcas de tiempo de elementos de PCollection
Cada elemento en una PCollection
tiene una marca de tiempo asociada.
Las marcas de tiempo son útiles para las PCollection
s que contienen elementos con una noción de tiempo inherente. Por ejemplo, una PCollection
de pedidos para procesar puede usar la hora en que se creó un pedido como marca de tiempo del elemento.
La fuente que crea la PCollection
asigna inicialmente la marca de tiempo de cada elemento. Las fuentes que crean una PCollection
no delimitada a menudo le asignan una marca de tiempo a cada elemento nuevo según el momento en que se agregó ese elemento a la PCollection
no delimitada.
Java
Las fuentes de datos que producen conjuntos de datos fijos, como BigQueryIO
o TextIO
, también asignan marcas de tiempo a cada elemento. Sin embargo, estas fuentes de datos suelen asignar la misma marca de tiempo (Long.MIN_VALUE
) a cada elemento.
Puedes asignar manualmente marcas de tiempo a los elementos de una PCollection
. Por lo general, esto se hace cuando los elementos tienen una marca de tiempo inherente, pero esa marca de tiempo debe calcularse, por ejemplo, mediante su análisis fuera de la estructura del elemento. Para asignar una marca de tiempo manualmente, usa una transformación ParDo. Dentro de la transformación ParDo
, tu DoFn
puede producir elementos de salida con marcas de tiempo. Consulta la sección sobre cómo asignar marcas de tiempo para obtener más información.
Python
Puedes asignar manualmente marcas de tiempo a los elementos de una PCollection
. Por lo general, esto se hace cuando los elementos tienen una marca de tiempo inherente, pero esa marca de tiempo debe calcularse, por ejemplo, mediante su análisis fuera de la estructura del elemento. Para asignar una marca de tiempo manualmente, usa una transformación ParDo. Dentro de la transformación ParDo
, tu DoFn
puede producir elementos de salida con marcas de tiempo.
Sistema de ventanas
Las marcas de tiempo asociadas con cada elemento en una PCollection
se usan para un concepto llamado sistema de ventanas. El sistema de ventanas divide los elementos de una PCollection
según sus marcas de tiempo. El sistema de ventanas se puede usar en todas las PCollection
s, pero es necesario para realizar algunos cálculos en PCollection
s no delimitadas a fin de dividir el flujo de datos continuo en fragmentos delimitados para el procesamiento.
Consulta la sección sobre sistema de ventanas a fin de obtener más información sobre cómo usar los conceptos del sistema de ventanas de Dataflow en tu canalización.
Crea una PCollection
A fin de trabajar con un conjunto de datos en una canalización de Cloud Dataflow, deberás crear una PCollection
para representar los datos, dondequiera que estén almacenados. Los SDK de Dataflow proporcionan dos formas principales de crear una PCollection
inicial:
- Puedes leer los datos desde una fuente de datos externa, como un archivo.
- Puedes crear una
PCollection
de datos almacenados en una clase de colección en la memoria.
Lee datos externos
Consulta E/S de canalización a fin de obtener más información sobre cómo leer datos desde una fuente de datos externa.
Crea una PCollection a partir de datos en la memoria local
Puedes crear una PCollection
a partir de datos en la memoria local para que puedas usarlos en las transformaciones de tu canalización. Por lo general, usas datos de la memoria local para poner a prueba tu canalización con conjuntos de datos más pequeños y reducir la dependencia de tu canalización en la E/S externa durante las pruebas.
Java
Para crear una PCollection
a partir de una Collection
Java en la memoria, debes apply
la transformación Create
. Create
es una PTransform
raíz proporcionada por el SDK de Dataflow para Java. Create
acepta una Collection
de Java y un objeto Coder
, que especifica cómo se deben codificar los elementos de la Collection
.
En la siguiente muestra de código, se crea una PCollection
de String
, que representa líneas de texto individuales a partir de una List
Java:
// Create a Java Collection, in this case a List of Strings. static final List<String> LINES = Arrays.asList( "To be, or not to be: that is the question: ", "Whether 'tis nobler in the mind to suffer ", "The slings and arrows of outrageous fortune, ", "Or to take arms against a sea of troubles, "); PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of()) // create the PCollection
El código anterior usa Create.of
, que produce una PCollection
que contiene los elementos especificados. Ten en cuenta que si tu canalización usa el sistema de ventanas, debes usar Create.timestamped en su lugar. Create.timestamped
produce una PCollection
que contiene los elementos especificados con marcas de tiempo especificadas.
Python
Para crear una PCollection
, aplica la transformación Create
.
Create
es una transformación estándar que proporciona el SDK de Dataflow para Python.
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions # argv = None # if None, uses sys.argv pipeline_options = PipelineOptions(argv) with beam.Pipeline(options=pipeline_options) as pipeline: lines = ( pipeline | beam.Create([ 'To be, or not to be: that is the question: ', "Whether 'tis nobler in the mind to suffer ", 'The slings and arrows of outrageous fortune, ', 'Or to take arms against a sea of troubles, ', ]))
Usa PCollection con tipos de datos personalizados
Puedes crear una PCollection
en la que el tipo de elemento sea un tipo de dato personalizado que proporciones. Esto puede resultar útil si necesitas crear una colección de tu clase o estructura propias con campos específicos, como una clase de Java que contenga el nombre, la dirección y el número de teléfono de un cliente.
Cuando creas una PCollection
de un tipo personalizado, debes proporcionar un Coder
para ese tipo. El Coder
le indica al servicio de Dataflow cómo serializar y deserializar los elementos de tu PCollection
mientras tu conjunto de datos se paraleliza y particiona en varias instancias de trabajador de canalización. Consulta Codificación de datos para obtener más información.
Dataflow intentará inferir un Coder
para cualquier PCollection
para la que no hayas establecido un Coder
explícitamente. El Coder
predeterminado para un tipo personalizado es SerializableCoder
, que usa la serialización de Java. Sin embargo, Dataflow recomienda usar AvroCoder
como Coder
cuando sea posible.
Puedes registrar AvroCoder
como el codificador predeterminado para tu tipo de dato usando el CoderRegistry de tu objeto Pipeline
. Anota tu clase de la siguiente manera:
Java
@DefaultCoder(AvroCoder.class) public class MyClass { ... }
Para asegurarte de que tu clase personalizada sea compatible con AvroCoder
, es posible que debas agregar algunas anotaciones adicionales, por ejemplo, debes anotar campos nulos en tu tipo de dato con org.apache.avro.reflect.Nullable
. Consulta la documentación de referencia de la API para Java de AvroCoder
y la documentación del paquete de org.apache.avro.reflect
a fin de obtener más información.
La canalización de ejemplo de TrafficRoutes de Dataflow crea una PCollection
cuyo tipo de elemento es una clase personalizada llamada StationSpeed
. StationSpeed
registra AvroCoder
como su codificador predeterminado de la siguiente manera:
Java
/** * This class holds information about a station reading's average speed. */ @DefaultCoder(AvroCoder.class) static class StationSpeed { @Nullable String stationId; @Nullable Double avgSpeed; public StationSpeed() {} public StationSpeed(String stationId, Double avgSpeed) { this.stationId = stationId; this.avgSpeed = avgSpeed; } public String getStationId() { return this.stationId; } public Double getAvgSpeed() { return this.avgSpeed; } }