Sistema de ventanas

Los SDK de Dataflow usan un concepto llamado sistema de ventanas para subdividir una PCollection de acuerdo con las marcas de tiempo de sus elementos individuales. Las transformaciones de Dataflow que agregan varios elementos, como GroupByKey y Combine, funcionan de forma implícita por ventana, es decir, procesan cada PCollection como una sucesión de muchas ventanas finitas, aunque toda la colección en sí puede ser de tamaño ilimitado o infinito.

Los SDK de Dataflow usan un concepto relacionado llamado activadores a fin de determinar cuándo “cerrar” cada ventana finita a medida que llegan datos no delimitados. El uso de un activador puede ayudar a definir mejor la estrategia de ventanas para tu PCollection a fin de manejar los datos tardíos o proporcionar resultados tempranos. Para obtener más información, consulta Activadores.

Conceptos básicos del sistema de ventanas

El sistema de ventanas es más útil con una PCollection no delimitada, que representa un conjunto de datos en constante actualización y tamaño desconocido o ilimitado (p. ej., datos de transmisión). Algunas transformaciones de Dataflow, como GroupByKey y Combine, agrupan varios elementos con una clave común. En general, esa operación de agrupación junta todos los elementos que tienen la misma clave en todo el conjunto de datos. Con un conjunto de datos no delimitados, es imposible recopilar todos los elementos, ya que se agregan nuevos elementos de forma constante.

En el modelo de Dataflow, cualquier PCollection se puede subdividir en ventanas lógicas. Cada elemento de una PCollection se asigna a una o más ventanas según la función analítica de PCollection, y cada ventana individual contiene un número finito de elementos. Luego, las transformaciones de agrupación consideran los elementos de cada PCollection de acuerdo a la ventana. GroupByKey, por ejemplo, agrupa de forma implícita los elementos de una PCollection por clave y ventana. Dataflow solo agrupa datos dentro de la misma ventana y no agrupa datos de otras ventanas.

Restricciones del sistema de ventanas

Si ya configuraste la función analítica para una PCollection, se usarán las ventanas de los elementos en esa PCollection la próxima vez que apliques una transformación de agrupación. Dataflow realiza la agrupación de ventanas real según sea necesario; si configuras una función analítica mediante la transformación Window, cada elemento se asigna a una ventana, pero las ventanas no se consideran hasta que agrupas la PCollection con GroupByKey o Combine. Esto puede tener diferentes efectos en tu canalización.

Considera la canalización de ejemplo en la Figura 1 a continuación:

Una canalización que aplica un sistema de ventanas, un ParDo y un GroupByKey en orden
Figura 1: Canalización con la aplicación de un sistema de ventanas

En la canalización anterior, creamos un PCollection no delimitado mediante la lectura de un conjunto de pares clave-valor mediante PubsubIO y, luego, aplicamos una función analítica a esa colección mediante la transformación Window. Luego, aplicamos un ParDo a la colección y agrupamos el resultado de ese ParDo mediante GroupByKey. La función analítica no tiene efecto en la transformación ParDo, ya que las ventanas no se usan hasta que se necesitan de nuevo para el GroupByKey.

Sin embargo, las transformaciones posteriores se aplican al resultado de GroupByKey, es decir, los datos agrupados por clave y ventana.

Usa el sistema de ventanas con PCollections delimitadas

Puedes usar ventanas con conjuntos de datos de tamaño fijo en PCollection delimitadas. Sin embargo, ten en cuenta que el sistema de ventanas solo considera las marcas de tiempo implícitas conectadas a cada elemento de una PCollection, y las fuentes de datos que crean conjuntos de datos fijos (como TextIO y BigQueryIO) asignan la misma marca de tiempo a cada elemento. Esto significa que todos los elementos son, de forma predeterminada, parte de una única ventana global. Tener todos los elementos asignados a la misma ventana hará que una canalización se ejecute en el clásico estilo de lotes de MapReduce.

Para usar un sistema de ventanas con conjuntos de datos fijos, puedes asignar tus propias marcas de tiempo a cada elemento. Para asignar marcas de tiempo a elementos, usa una transformación ParDo con un DoFn que genera cada elemento con una marca de tiempo nueva.

El uso de ventanas con una PCollection delimitada puede afectar la forma en que tu canalización procesa los datos. Por ejemplo, considera la siguiente canalización:

Una canalización que aplica un GroupByKey seguido de un ParDo en una colección delimitada.
Figura 2: GroupByKey y ParDo sin sistema de ventanas, en una colección delimitada.

En la canalización anterior, creamos una PCollection delimitada mediante la lectura de un conjunto de pares clave-valor mediante TextIO. Luego, agrupamos la colección mediante GroupByKey y aplicamos una transformación ParDo a la PCollection agrupada. En este ejemplo, el GroupByKey crea una colección de claves únicas y, luego, se aplica ParDo exactamente una vez por clave.

Ahora, considera la misma canalización, pero mediante una función analítica:

Una canalización que aplica un sistema de ventanas, luego un GroupByKey seguido de un ParDo en una colección delimitada.
Figura 3: GroupByKey y ParDo con un sistema de ventanas, en una colección delimitada.

Al igual que antes, la canalización crea una PCollection delimitada de pares clave-valor. Luego, configuramos una función analítica para esa PCollection. La transformación GroupByKey ahora agrupa los elementos de PCollection por clave y ventana. La transformación ParDo siguiente se aplica varias veces por clave y una vez por cada ventana.

Funciones analíticas

Los SDK de Dataflow te permiten definir diferentes tipos de ventanas para dividir los elementos de tu PCollection. El SDK proporciona, entre otras, las siguientes funciones analíticas:

  • Ventanas de tiempo fijo
  • Ventanas de tiempo variable
  • Ventanas por sesión
  • Ventana global única

Ten en cuenta que cada elemento puede pertenecer lógicamente a más de una ventana, según la función analítica que uses. El sistema de ventanas de tiempo variable, por ejemplo, crea ventanas superpuestas en las que se puede asignar un solo elemento a varias ventanas.

Ventanas de tiempo fijo

La forma más simple de un sistema de ventanas es una ventana de tiempo fijo: dada una PCollection con marca de tiempo, que podría actualizarse de forma continua, cada ventana podría capturar (por ejemplo) cinco minutos de elementos.

Una ventana de tiempo fijo representa el intervalo de tiempo en la transmisión de datos que define un paquete de datos para su procesamiento. Considera una ventana que opera en intervalos de cinco minutos: todos los elementos en tu PCollection no delimitada con valores de marca de tiempo entre 0:00:00 y 0:04:59 pertenecen a la primera ventana, los elementos con valores de marca de tiempo entre 0:05:00 y 0:09:59 pertenecen a la segunda ventana, y así sucesivamente.

Diagrama que muestra un sistema de ventanas de tiempo fijo.
Figura 4: Ventanas de tiempo fijo con un tamaño de 30 segundos.

Ventanas de tiempo variable

Una ventana de tiempo variable también usa intervalos de tiempo en la transmisión de datos para definir paquetes de datos; sin embargo, en sistemas con este tipo de ventanas, las ventanas se superponen. Cada ventana puede capturar cinco minutos de datos, pero cada diez segundos comienza una nueva ventana. La frecuencia con la que comienzan las ventanas variables se denomina período. Por lo tanto, nuestro ejemplo tendría un tamaño de ventana de cinco minutos y un período de diez segundos.

Debido a que varias ventanas se superponen, la mayoría de los elementos en un conjunto de datos se incluirán en más de una ventana. Este tipo de sistema de ventanas es útil para tomar promedios móviles de datos. Usando nuestro ejemplo, cuando usas períodos de tiempo variables, puedes calcular un promedio móvil de los datos de los últimos cinco minutos, actualizados cada diez segundos.

Diagrama que muestra un sistema de ventanas de tiempo variable.
Figura 5: Ventanas de tiempo variable, con un tamaño de ventana de 1 minuto y períodos de ventana de 30 segundos.

Ventanas de sesión

Una función de ventana de sesión define ventanas alrededor de áreas de concentración en los datos. El sistema de ventanas de sesión es útil para los datos que no se distribuyen de forma regular con respecto al tiempo; por ejemplo, una transmisión de datos que representa la actividad del mouse del usuario puede tener largos períodos de tiempo de inactividad intercalados con altas concentraciones de clics. Los sistemas de ventanas de sesión agrupan las concentraciones altas de datos en ventanas separadas y dejan fuera las secciones inactivas de la transmisión de datos.

Ten en cuenta que los sistemas de ventanas de sesión se aplican por clave; es decir, cuando se agrupa en sesiones solo se toman en cuenta los datos que tienen la misma clave. Por lo tanto, cada clave en tu recopilación de datos se agrupará en ventanas separadas de diferentes tamaños.

El tipo de sistema de ventanas de sesión más simple especifica una duración mínima para el intervalo. Todos los datos que llegan por debajo de un umbral mínimo de retraso de tiempo se agrupan en la misma ventana. Si los datos llegan después de la duración mínima especificada para el intervalo, se inicia una nueva ventana.

Diagrama que muestra un sistema de ventanas de sesión.
Figura 5: Ventanas de sesión con una duración mínima para el intervalo. Observa cómo cada clave de datos tiene diferentes ventanas, según su distribución de datos.

Ventana global única

De forma predeterminada, todos los datos en una PCollection se asignan a una sola ventana global. Si tu conjunto de datos es de un tamaño fijo, puedes dejar la ventana global predeterminada para tu PCollection. Si todos los elementos de tu PCollection pertenecen a una sola ventana global, la canalización se ejecutará como un trabajo de procesamiento por lotes (como en el procesamiento basado en MapReduce).

Otras funciones analíticas

Los SDK de Dataflow proporcionan más usos para las funciones analíticas, aparte de ventanas fijas, variables, de sesión y globales, como ventanas basadas en el calendario.

Java

Consulta el paquete com.google.cloud.dataflow.sdk.transforms.windowing para ver una lista completa de los usos de las funciones analíticas en el SDK de Dataflow para Java.

Configura la función analítica de tu PCollection

Puedes configurar la función analítica para una PCollection si aplicas la transformación Window. Cuando aplicas la transformación Window, debes proporcionar una WindowFn. La WindowFn determina la función analítica que usará tu PCollection para las transformaciones de agrupación posteriores, como una ventana de tiempo fijo o variable.

Los SDK de Dataflow proporcionan WindownFn predefinidas para las funciones analíticas básicas. También puedes definir tus propias WindowFn en casos avanzados.

En términos estrictos, como todas las transformaciones, Window toma una PCollection de entrada y genera una PCollection nueva con cada elemento asignado a una o más ventanas lógicas y finitas.

Configura ventanas de tiempo fijo

En el siguiente código de ejemplo, se muestra cómo aplicar Window para dividir una PCollection en ventanas fijas, cada una de un minuto de duración:

Java

  PCollection<String> items = ...;
  PCollection<String> fixed_windowed_items = items.apply(
    Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));

Configura ventanas de tiempo variable

En el siguiente código de ejemplo, se muestra cómo aplicar Window para dividir una PCollection en ventanas de tiempo variable. Cada ventana tiene una duración de 30 minutos y cada cinco segundos comienza una nueva ventana:

Java

  PCollection<String> items = ...;
  PCollection<String> sliding_windowed_items = items.apply(
    Window.<String>into(SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5))));

Configura ventanas de sesión

En el siguiente código de ejemplo, se muestra cómo aplicar Window para dividir una PCollection en ventanas de sesión, en las que cada sesión debe estar separada por un intervalo de tiempo de al menos 10 minutos:

Java

  PCollection<String> items = ...;
  PCollection<String> session_windowed_items = items.apply(
    Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(10))));

Ten en cuenta que las sesiones son por clave: cada clave en la colección tendrá sus propios grupos de sesiones según la distribución de los datos.

Configura una ventana global única

Si tu PCollection está delimitada (el tamaño es fijo), puedes asignar todos los elementos a una sola ventana global. En el siguiente código de ejemplo, se muestra cómo configurar una ventana global única para una PCollection:

A fin de establecer una ventana global única para tu PCollection, pasa new GlobalWindows() como la WindowFn cuando apliques la transformación Window. En el siguiente código de ejemplo, se muestra cómo aplicar Window para asignar una PCollection en una ventana global única:

Java

  PCollection<String> items = ...;
  PCollection<String> batch_items = items.apply(
    Window.<String>into(new GlobalWindows()));

Sesgo de tiempo, retraso de datos y datos tardíos

En cualquier sistema de procesamiento de datos, hay un cierto retraso entre el momento en que ocurre un evento de datos (la “hora del evento”, determinada por la marca de tiempo en el elemento de datos en sí) y el tiempo en que el elemento de datos real se procesa en cualquier etapa en tu canalización (la “hora de procesamiento”, determinada por el reloj en el sistema que procesa el elemento).

En un sistema perfecto, la hora del evento para cada elemento de datos y la hora de procesamiento serían iguales, o al menos tendrían un delta coherente. Sin embargo, en cualquier sistema de computación del mundo real, la generación y la entrega de datos tienen una gran cantidad de limitaciones temporales. En sistemas grandes o distribuidos, como una colección distribuida de frontend web que genera pedidos de clientes o archivos de registro, no hay garantías de que los eventos de datos aparezcan en tu canalización en el mismo orden en el que se generaron en varios lugares de la Web.

Por ejemplo, supongamos que tenemos una PCollection que usa ventanas de tiempo fijo, con ventanas que duran cinco minutos. Para cada ventana, Dataflow debe recopilar todos los datos con una marca de tiempo de la hora del evento en el rango determinado de la ventana (0:00 y 4:59 en la primera ventana, por ejemplo). Los datos con marcas de tiempo fuera de ese rango (datos desde los 5:00 o después) pertenecen a una ventana diferente.

Sin embargo, no se garantiza que los datos siempre lleguen a una canalización en el orden correcto de tiempo o que siempre lleguen a intervalos predecibles. Dataflow rastrea una marca de agua, que es la noción del sistema de cuándo se puede esperar que todos los datos en una ventana determinada hayan llegado a la canalización. Los datos que llegan con una marca de tiempo posterior a la marca de agua se consideran datos tardíos.

De nuestro ejemplo, supongamos que tenemos una marca de agua simple que supone unos 30 segundos de tiempo de retraso entre las marcas de tiempo de los datos (la hora del evento) y el tiempo en que los datos aparecen en la canalización (la hora de procesamiento), entonces el flujo de datos cerraría la primera ventana a los 5:30. Si un registro de datos llega a las 5:34, pero con una marca de tiempo que lo pondría en la ventana de 0:00 a 4:59 (por ejemplo, 3:38), ese registro se considera como datos tardíos.

Nota: Para simplificar, suponemos que usamos una marca de agua muy directa que calcula el tiempo de demora/sesgo de tiempo. En la práctica, la fuente de datos de tu PCollection determina la marca de agua, y las marcas de agua pueden ser más precisas o complejas.

Administra el sesgo de tiempo y los datos tardíos

Puedes permitir datos tardíos mediante la invocación de la operación .withAllowedLateness cuando estableces la estrategia de sistema de ventanas de tu PCollection. En el siguiente ejemplo de código muestra una estrategia de sistema de ventanas que permitirá datos tardíos hasta dos días después del final de una ventana.

Java

  PCollection<String> items = ...;
  PCollection<String> fixed_windowed_items = items.apply(
    Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
          .withAllowedLateness(Duration.standardDays(2)));

Cuando configuras .withAllowedLateness en una PCollection, esa demora se propaga a cualquier PCollection posterior derivada de la primera PCollection a la que aplicaste la demora. Si deseas cambiar la demora permitida más adelante en tu canalización, debes hacerlo de manera explícita mediante la aplicación de Window.withAllowedLateness() otra vez.

También puedes usar la API de activadores de Dataflow con el fin de ayudarte a definir mejor la estrategia del sistema de ventanas para una PCollection. Puedes usar los activadores para determinar con exactitud cuándo se agrega cada ventana individual y cuándo informa sus resultados, lo que incluye la forma en la que la ventana emite los elementos tardíos.

Nota: Las estrategias predeterminadas de sistemas de ventanas y activadores de Dataflow descartan los datos tardíos. Si deseas asegurarte de que la canalización maneje instancias de datos tardíos, deberás configurar .withAllowedLateness de forma explícita cuando establezcas la estrategia del sistema de ventanas de tu PCollection y los activadores para tus PCollection según corresponda.

Agrega marcas de tiempo a los elementos de una PCollection

Puedes asignar marcas de tiempo nuevas a los elementos de una PCollection si aplicas una transformación ParDo que genere elementos nuevos con las marcas de tiempo que configuraste. Asignar marcas de tiempo puede ser útil si deseas usar las funciones del sistema de ventanas de Dataflow y tu conjunto de datos proviene de una fuente sin marcas de tiempo implícitas (como un archivo de TextIO).

Este es un buen patrón a seguir cuando tu conjunto de datos incluye marcas de tiempo, pero la fuente de datos de Dataflow no las genera. Un ejemplo podría ser un caso en el que tu canalización lea registros de un archivo de entrada y cada registro incluya un campo de marca de tiempo; dado que tu canalización lee los registros desde un archivo, la fuente del archivo no asigna marcas de tiempo de forma automática. Puedes analizar el campo de marca de tiempo de cada registro y usar una transformación ParDo para adjuntar las marcas de tiempo a cada elemento en tu PCollection.

Java

A fin de asignar marcas de tiempo, tu transformación ParDo debe usar un DoFn que genere elementos mediante ProcessContext.outputWithTimestamp (en lugar del ProcessContext.output habitual que se usa para emitir elementos a la colección de salida principal). En el siguiente código de ejemplo, se muestra un ParDo con un DoFn que genera elementos con marcas de tiempo nuevas:

  PCollection<LogEntry> unstampedLogs = ...;
  PCollection<LogEntry> stampedLogs =
      unstampedLogs.apply(ParDo.of(new DoFn<LogEntry, LogEntry>() {
        public void processElement(ProcessContext c) {
          // Extract the timestamp from log entry we're currently processing.
          Instant logTimeStamp = extractTimeStampFromLogEntry(c.element());
          // Use outputWithTimestamp to emit the log entry with timestamp attached.
          c.outputWithTimestamp(c.element(), logTimeStamp);
        }
      }));