Activadores

Cuando los datos se recopilan en una PCollection y se agrupan en ventanas limitadas, Dataflow necesita determinar de alguna manera cuándo emitir los resultados agregados de cada ventana. Dado el retraso de los datos, Dataflow usa un mecanismo llamado activadores para determinar en qué momento se recopilaron "suficientes" datos en una ventana, después de lo cual emite los valores agregados de esa ventana, cada uno conocido como panel.

El sistema de activadores de Dataflow proporciona diferentes maneras de determinar en qué momento emitir los resultados agregados de una ventana determinada, según las necesidades de procesamiento de datos de tu sistema. Por ejemplo, un sistema que requiere actualizaciones puntuales o urgentes podría usar un activador estricto que emita una ventana cada N segundos, lo cual valora la puntualidad por sobre la integridad de los datos. Un sistema que valora la integridad de los datos más que el tiempo exacto de los resultados podría usar un activador basado en los datos que espera que se acumule una determinada cantidad de registros de datos antes de cerrar una ventana.

Los activadores resultan particularmente útiles para controlar dos tipos de condiciones en tu canalización:

  • Los activadores pueden ayudarte a controlar las instancias de los datos tardíos.
  • Los activadores pueden ayudarte a emitir los resultados iniciales antes de que todos los datos lleguen a una ventana determinada.

Nota: Puedes configurar un activador de una PCollection no delimitada que usa una ventana global única para la función de sistema de ventanas. Esto puede resultar útil si deseas que tu canalización proporcione actualizaciones periódicas de un conjunto de datos no delimitado, por ejemplo, un promedio de ejecución de todos los datos proporcionados hasta el momento, actualizados cada N segundos o cada N elementos.

Tipos de activadores

Dataflow proporciona una cantidad de activadores prediseñados que puedes configurar para tus PCollection. Existen tres tipos de activadores principales:

  • Activadores basados en el tiempo: Estos activadores operan en una referencia de tiempo, ya sea el tiempo del evento (según lo indica la marca de tiempo en cada elemento de datos) o el tiempo de procesamiento (momento en que el elemento de datos se procesa en una etapa dada de la canalización).
  • Activadores basados en los datos: Estos activadores operan mediante el análisis de los datos a medida que llegan a cada ventana y la activación cuando una condición de datos que especificas se cumple. Por ejemplo, puedes configurar un activador para emitir los resultados de una ventana una vez que esa ventana reciba una determinada cantidad de elementos de datos.
  • Activadores compuestos: Estos activadores combinan varios activadores basados en el tiempo o en los datos de alguna manera lógica. Puedes configurar un activador compuesto para que se active cuando se cumplan todos los activadores (operación AND lógica), cuando se cumpla cualquier activador (operación OR lógica) y así sucesivamente.

Activadores basados en el tiempo

Los activadores basados en el tiempo de Dataflow incluyen AfterWatermark y AfterProcessingTime. Estos activadores toman una referencia de tiempo, ya sea el tiempo del evento o el tiempo de procesamiento, y configuran un temporizador en función de esa referencia de tiempo.

AfterWatermark

El activador de AfterWatermark opera en el tiempo del evento. El activador de AfterWatermark emitirá el contenido de una ventana después de que la marca de agua pase el final de la ventana, según las marcas de tiempo asociadas a los elementos de datos. La marca de agua es una métrica de progreso global, concepto que tiene Dataflow de una entrada completa dentro de la canalización en un momento dado.

El activador de AfterWatermark solo se activa cuando la marca de agua pasa el final de la ventana; es el activador principal que usa Dataflow para emitir los resultados cuando el sistema estima que tiene todos los datos en una determinada ventana basada en el tiempo.

AfterProcessingTime

El activador de AfterProcessingTime opera en el tiempo de procesamiento. El activador de AfterProcessingTime emite una ventana una vez transcurrido un determinado tiempo de procesamiento desde la referencia de tiempo, como el inicio de una ventana. El tiempo de procesamiento está determinado por el reloj del sistema más que por la marca de tiempo del elemento de datos.

El activador de AfterProcessingTime sirve para activar los resultados iniciales de una ventana, en especial, una ventana con un período largo, como una ventana global única.

Activadores basados en datos

Actualmente, Dataflow proporciona solo un activador basado en los datos, AfterPane.elementCountAtLeast. Este activador opera en un recuento directo de elementos; se activa una vez que el panel actual recopila al menos N elementos.

El activador de AfterPane.elementCountAtLeast() es una excelente manera de hacer que la ventana emita los resultados iniciales antes de que se acumulen todos los datos, especialmente en el caso de una ventana global única.

Activador predeterminado

El activador predeterminado para una PCollection se basa en el tiempo del evento y emite los resultados de la ventana cuando la marca de agua del sistema (concepto que tiene Dataflow de cuándo "debería" tener todos los datos) pasa el final de la ventana. La configuración de activación predeterminada se emite solo una vez y los datos tardíos se descartan. Esto se debe a que la configuración predeterminada del sistema de ventanas y del activador tiene un valor de retraso permitido de 0. Consulta Cómo controlar los datos tardíos para obtener información acerca de cómo modificar este comportamiento.

La marca de agua depende de la fuente de datos; en algunos casos, es una estimación. En otros casos, como Pub/Sub con marcas de tiempo asignadas por el sistema, la marca de agua puede proporcionar un límite exacto de los datos que procesó la canalización.

Cómo controlar los datos tardíos

Si tu canalización debe controlar los datos tardíos (los datos que llegan después de que la marca de agua pasa el final de la ventana), puedes aplicar un retraso permitido cuando configuras el sistema de ventanas y el activador. Esto permitirá que el activador reaccione a los datos tardíos. En la configuración de activación predeterminada, emitirá los resultados nuevos de inmediato cuando lleguen los datos tardíos.

Configura el retraso permitido con .withAllowedLateness() cuando configures la ventana y el activador de la siguiente manera:

  PCollection<String> pc = ...;
  pc.apply(Window<String>.into(FixedWindows.of(1, TimeUnit.MINUTES))
                               .triggering(AfterProcessingTime.pastFirstElementInPane()
                                                              .plusDelayOf(Duration.standardMinutes(1)))
                               .withAllowedLateness(Duration.standardMinutes(30));

Este retraso permitido se propaga a todas las PCollection derivadas como resultado de la aplicación de transformaciones a la PCollection original. Si deseas cambiar el retraso permitido más adelante en tu canalización, puedes volver a aplicar Window.withAllowedLateness() de manera explícita.

Cómo configurar un activador

Cuando configuras una función de sistema de ventanas para una PCollection con la transformación Window, también puedes especificar un activador.

Invoca el método .triggering() en el resultado de la transformación Window.into() para configurar el activador de PCollection de la siguiente manera:

Java

  PCollection<String> pc = ...;
  pc.apply(Window<String>.into(FixedWindows.of(1, TimeUnit.MINUTES))
                               .triggering(AfterProcessingTime.pastFirstElementInPane()
                                                              .plusDelayOf(Duration.standardMinutes(1)))
                               .discardingFiredPanes());

En el ejemplo del código anterior, se configura un activador para una PCollection. El activador se basa en el tiempo y emite cada ventana un minuto después de procesarse el primer elemento de esa ventana. La última línea en el ejemplo del código, .discardingFiredPanes(), es el modo de acumulación de la ventana.

Modos de acumulación de la ventana

Cuando especificas un activador, también debes configurar el modo de acumulación de la ventana. Cuando un activador se activa, emite el contenido actual de la ventana como un panel. Dado que un activador se puede activar varias veces, el modo de acumulación determina si el sistema acumula los paneles de la ventana cuando el activador se activa o los descarta.

Si deseas configurar una ventana para acumular los paneles que se producen cuando el activador se activa, invoca .accumulatingFiredPanes() cuando configures el activador. Si deseas configurar una ventana para descartar los paneles activados, invoca .discardingFiredPanes().

Observemos un ejemplo que usa una PCollection con un sistema de ventanas de tiempo fijo y un activador basado en los datos. (Esto es algo que deberías hacer si, por ejemplo, cada ventana representa un promedio de ejecución de diez minutos, pero deseas ver el valor actual del promedio en una IU con una frecuencia mayor que cada diez minutos). Vamos a suponer las siguientes condiciones:

  • La PCollection usa ventanas de tiempo fijo de 10 minutos.
  • La PCollection tiene un activador recurrente que se activa cada vez que llegan 3 elementos.

En el siguiente diagrama, se muestran los eventos de datos a medida que llegan a la PCollection y se asignan a las ventanas*:

Un diagrama de datos, por clave, que llega a una PCollection con un sistema de ventanas de tiempo fijo.
Figura 1: Eventos de datos de una PCollection con un sistema de ventanas de tiempo fijo.

Nota: Para que el diagrama sea un poco más sencillo, supondremos que todos los eventos llegan a la canalización en orden.

Para simplificar, consideremos solo los valores asociados con la clave X.

Modo de acumulación

Si nuestro activador se configura como .accumulatingFiredPanes, emite los siguientes valores cada vez que se activa (recuerda que el activador se activa cada vez que llegan tres elementos):

  Key X:
    First trigger firing:  [5, 8, 3]
    Second trigger firing: [5, 8, 3, 15, 19, 23]
    Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]

Modo de descarte

Si nuestro activador se configura como .discardingFiredPanes, emite los siguientes valores en cada activación:

  Key X:
    First trigger firing:  [5, 8, 3]
    Second trigger firing: [15, 19, 23]
    Third trigger firing:  [9, 13, 10]

Efectos de acumulación y descarte

Ahora, agreguemos un cálculo por clave a nuestra canalización. Cada vez que el activador se activa, la canalización aplica una Combine.perKey que calcula un promedio de todos los valores asociados con cada clave de la ventana.

Una vez más, consideremos solo la clave X:

Con el activador configurado como .accumulatingFiredPanes:

  Key X:
    First trigger firing:  [5, 8, 3]
      Average after first trigger firing: 5.3
    Second trigger firing: [5, 8, 3, 15, 19, 23]
      Average after second trigger firing: 12.167
    Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]
      Average after third trigger firing: 11.667

Con el activador configurado como .discardingFiredPanes:

  Key X:
    First trigger firing:  [5, 8, 3]
      Average after first trigger firing: 5.3
    Second trigger firing: [15, 19, 23]
      Average after second trigger firing: 19
    Third trigger firing:  [9, 13, 10]
      Average after third trigger firing: 10.667

Ten en cuenta que la Combine.perKey que calcula el promedio genera resultados distintos en cada caso.

Por lo general, un activador configurado como .accumulatingFiredPanes siempre muestra todos los datos en una ventana determinada, incluidos los elementos previamente activados. Un activador configurado como .discardingFiredPanes muestra los cambios incrementales desde la última vez que el activador se activó. El modo de acumulación es más adecuado que una operación de agrupación que combina o actualiza los elementos; de lo contrario, usa el método de descarte.

Continuación del activador

Cuando aplicas una transformación de agregación como GroupByKey o Combine.perKey a una PCollection para la cual especificaste un activador, recuerda que GroupByKey o Combine.perKey genera una PCollection de salida nueva. El activador que configuras para la recopilación de entrada no se propaga a la recopilación de salida nueva.

En cambio, el SDK de Dataflow crea un activador similar para la PCollection de salida en función del activador que especificaste con respecto a la recopilación de entrada. El activador nuevo intenta emitir los elementos lo más rápido posible, aproximadamente a la velocidad especificada por el activador original en la PCollection de entrada. Dataflow determina las propiedades del activador nuevo en función de los parámetros que proporcionaste para el activador de entrada:

  • De manera predeterminada, el activador de continuación de AfterWatermark es idéntico al activador original. Si el activador de AfterWatermark tiene activaciones tempranas o tardías específicas, las activaciones de la continuación serán la continuación de las activaciones tempranas o tardías del activador original.
  • El activador de continuación predeterminado de AfterProcessingTime se activa después del tiempo de procesamiento sincronizado de los elementos asociados y no propaga ningún retraso adicional. Por ejemplo, considera un activador como AfterProcessingTime.pastFirstElementInPane().alignedTo(15 min).plusDelayOf(1 hour). Después de una GroupByKey, las tareas de Dataflow del activador para la recopilación de salida se sincronizarán al mismo tiempo para cada clave, pero no mantendrán el retraso de 1 hora.
  • El activador de continuación predeterminado de AfterCount se activa en cada elemento. Por ejemplo, AfterCount(n) en la recopilación de entrada es AfterCount(1) en la recopilación de salida.

Si crees que el activador que Dataflow genera para una PCollection de salida a partir de GroupByKey o Combine.perKey no es suficiente, debes configurar un activador nuevo de manera explícita.

Cómo combinar los activadores

En Dataflow, puedes combinar varios activadores para crear activadores compuestos. Puedes usar el sistema de activadores compuestos de Dataflow para combinar varios activadores de manera lógica. También puedes especificar un activador para emitir los resultados de manera reiterada, como máximo una vez, o en otras condiciones personalizadas.

Tipos de activadores compuestos

Dataflow incluye los siguientes activadores compuestos:

  • Puedes agregar activaciones tempranas o tardías adicionales a AfterWatermark.pastEndOfWindow.
  • Repeatedly.forever especifica un activador que se ejecuta indefinidamente. Cuando se cumplen las condiciones del activador, una ventana emite los resultados; luego, el activador se reinicia y se vuelve a iniciar. Puede resultar útil combinar Repeatedly.forever con .orFinally para especificar una condición que provoque la detención del activador recurrente.
  • AfterEach.inOrder combina varios activadores para que se activen en una secuencia específica. Cada vez que un activador en la secuencia emite una ventana, la secuencia avanza al siguiente activador.
  • AfterFirst toma varios activadores y realiza una emisión la primera vez que cualquiera de sus activadores de argumento se cumple. Esto equivale a una operación OR lógica para varios activadores.
  • AfterAll toma varios activadores y realiza una emisión en el momento en el que todos sus activadores de argumento se cumplen. Esto equivale a una operación AND lógica para varios activadores.
  • orFinally puede funcionar como una condición final para que cualquier activador se active por última vez y no se vuelva a activar nunca más.

Composición con AfterWatermark.pastEndOfWindow

Algunos de los activadores compuestos más útiles se activan una sola vez cuando el sistema estima que llegaron todos los datos (es decir, cuando la marca de agua pasa el final de la ventana) y se combinaron con una o ambas de las siguientes condiciones:

  • Activaciones especulativas que preceden el paso de la marca de agua al final de la ventana para permitir un procesamiento más rápido de los resultados parciales.
  • Activaciones tardías que ocurren después de que la marca de agua pasa el final de la ventana para permitir el control de los datos tardíos.

Puedes expresar este patrón con AfterWatermark.pastEndOfWindow. Por ejemplo, el código del activador del siguiente ejemplo se activa en las siguientes condiciones:

  • En función de la estimación del sistema de que todos los datos llegaron (la marca de agua pasa el final de la ventana)
  • Cada vez que llegan los datos tardíos, después de un retraso de diez minutos
  • Después de dos días, suponemos que no llegarán más datos de interés y el activador detiene la ejecución

Java

  .apply(Window
      .triggering(AfterWatermark
           .pastEndOfWindow()
           .withLateFirings(AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardMinutes(10))))
      .withAllowedLateness(Duration.standardDays(2)));

Otros activadores compuestos

También puedes compilar otros tipos de activadores compuestos. En el siguiente código de ejemplo, se muestra un activador compuesto simple que se activa cuando el panel tiene al menos 100 elementos o después de un minuto.

Java

Repeatedly.forever(AfterFirst.of(
    AfterPane.elementCountAtLeast(100),
    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))

Gramática del activador

En la siguiente gramática, se describen varias maneras de combinar los activadores en activadores compuestos:

TRIGGER ::=
   ONCE_TRIGGER
   Repeatedly.forever(TRIGGER)
   TRIGGER.orFinally(ONCE_TRIGGER)
   AfterEach.inOrder(TRIGGER, TRIGGER, ...)

ONCE_TRIGGER ::=
  TIME_TRIGGER
  WATERMARK_TRIGGER
  AfterPane.elementCountAtLeast(Integer)
  AfterFirst.of(ONCE_TRIGGER, ONCE_TRIGGER, ...)
  AfterAll.of(ONCE_TRIGGER, ONCE_TRIGGER, ...)

TIME_TRIGGER ::=
  AfterProcessingTime.pastFirstElementInPane()
  TIME_TRIGGER.alignedTo(Duration)
  TIME_TRIGGER.alignedTo(Duration, Instant)
  TIME_TRIGGER.plusDelayOf(Duration)
  TIME_TRIGGER.mappedBy(Instant -> Instant)

WATERMARK_TRIGGER ::=
  AfterWatermark.pastEndOfWindow()
  WATERMARK_TRIGGER.withEarlyFirings(ONCE_TRIGGER)
  WATERMARK_TRIGGER.withLateFirings(ONCE_TRIGGER)

Default = Repeatedly.forever(AfterWatermark.pastEndOfWindow())
¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Si necesitas ayuda, visita nuestra página de asistencia.