“Exactamente una vez” en Dataflow

Dataflow admite el procesamiento de registros del tipo “exactamente una vez”. En esta página, se explica cómo Dataflow implementa el procesamiento “exactamente una vez” y, al mismo tiempo, garantiza una latencia baja.

Descripción general

Las canalizaciones por lotes siempre usan el procesamiento del tipo “exactamente una vez”. Las canalizaciones de transmisión usan el procesamiento de tipo “exactamente una vez” de forma predeterminada, pero también pueden usar el procesamiento al menos una vez.

El procesamiento “exactamente una vez” proporciona garantías sobre los resultados del procesamiento de registros, incluidos los resultados de cada etapa de canalización. En particular, para cada registro que llega a la canalización desde una fuente o llega a una etapa de una etapa anterior, Dataflow garantiza lo siguiente:

  • El registro se procesa y no se pierde.
  • Cualquier resultado del procesamiento que permanezca dentro de la canalización se refleja como máximo una vez.

En otras palabras, los registros se procesan al menos una vez y los resultados se confirman exactamente una vez.

El procesamiento “exactamente una vez” garantiza que los resultados sean precisos, sin registros duplicados en el resultado. Dataflow está optimizado para minimizar la latencia y, a la vez, mantener una semántica de “exactamente una vez”. Sin embargo, el procesamiento “exactamente una vez” genera costos por realizar la anulación de duplicación. Para los casos de uso que pueden tolerar registros duplicados, a menudo puedes reducir los costos y mejorar la latencia si habilitas el modo al menos una vez. Para obtener más información sobre cómo elegir entre la transmisión “exactamente una vez” o al menos una vez, consulta Configura el modo de transmisión de la canalización.

Datos retrasados

El procesamiento “exactamente una vez” garantiza la exactitud de la canalización: si la canalización procesa un registro, Dataflow garantiza que el registro se refleje en el resultado y que el registro no se duplique.

Sin embargo, en una canalización de transmisión, el procesamiento de tipo “exactamente una vez” no puede garantizar que los resultados estén completos, ya que los registros pueden llegar tarde. Por ejemplo, supongamos que tu canalización realiza una agregación durante un período, como Count. Con el procesamiento “exactamente una vez”, el resultado es preciso para los registros que llegan dentro de la ventana de manera oportuna, pero los registros tardíos pueden descartarse.

En general, no hay forma de garantizar la integridad en una canalización de transmisión, ya que, en teoría, los registros pueden llegar de manera arbitraria tarde. En el caso límite, deberás esperar por siempre para producir un resultado. De manera más práctica, Apache Beam te permite configurar el umbral para descartar datos tardíos y cuándo emitir resultados agregados. Para obtener más información, consulta Marcas de agua y datos retrasados en la documentación de Apache Beam.

Efectos secundarios

No se garantiza que los efectos secundarios tengan una semántica de exactamente una vez. Es importante destacar que esto incluye escribir el resultado en un almacén externo, a menos que el receptor también implemente una semántica de “exactamente una vez”.

Específicamente, Dataflow no garantiza que cada registro pase por cada transformación exactamente una vez. Debido a los reintentos o las fallas de los trabajadores, Dataflow puede enviar un registro a través de una transformación varias veces, o incluso de forma simultánea en varios trabajadores.

Como parte del procesamiento del tipo “exactamente una vez”, Dataflow anula la duplicación de los resultados. Sin embargo, si el código en una transformación tiene efectos secundarios, estos pueden ocurrir varias veces. Por ejemplo, si una transformación realiza una llamada de servicio remoto, esa llamada puede realizarse varias veces para el mismo registro. Los efectos secundarios incluso pueden provocar la pérdida de datos en algunas situaciones. Por ejemplo, supongamos que una transformación lee un archivo para generar un resultado y, luego, lo borra de inmediato sin esperar a que se confirme el resultado. Si se produce un error cuando se confirma el resultado, Dataflow vuelve a intentar la transformación, pero ahora la transformación no puede leer el archivo borrado.

Logging

El resultado del registro del procesamiento indica que se produjo el procesamiento, pero no indica si los datos se confirmaron. Por lo tanto, los archivos de registro pueden indicar que los datos se procesaron varias veces, aunque los resultados de los datos procesados se confirmen en el almacenamiento persistente solo una vez. Además, los registros no siempre reflejan los datos procesados y confirmados. Es posible que los registros se descarten debido a una limitación o una pérdida debido a otros problemas del servicio de registro.

Transmisión “exactamente una vez”

En esta sección, se explica cómo Dataflow implementa el procesamiento “exactamente una vez” para los trabajos de transmisión, incluido cómo Dataflow administra complejidades como el procesamiento no determinista, los datos retrasados y el código personalizado.

Shuffle de transmisión de Dataflow

Los trabajos de transmisión de Dataflow se ejecutan en muchos trabajadores diferentes en paralelo mediante la asignación de rangos de trabajo a cada trabajador. Aunque las asignaciones pueden cambiar con el tiempo en respuesta a las fallas de los trabajadores, el ajuste de escala automático o cualquier otro evento, después de cada transformación GroupByKey, todos los registros con la misma clave se procesan en el mismo trabajador. Las transformaciones compuestas, como Count, FileIO, etc., suelen usar la transformación GroupByKey. Para garantizar que los registros de una clave determinada terminen en el mismo trabajador, los trabajadores de Dataflow redistribuyen los datos entre sí mediante llamadas de procedimiento remoto (RPC).

Para garantizar que no se pierdan los registros durante la redistribución, Dataflow usa la copia de seguridad ascendente. Con la copia de seguridad ascendente, el trabajador que envía los registros reintenta las RPC hasta que recibe una confirmación positiva de que se recibió el registro. Los efectos secundarios de procesar el registro se confirman en el almacenamiento continuo en sentido descendente. Si el trabajador que envía los registros no está disponible, Dataflow continúa reintentando las RPC, lo que garantiza que cada registro se entregue al menos una vez.

Dado que estos reintentos pueden crear duplicados, cada mensaje se etiqueta con un ID único. Cada receptor almacena un catálogo de todos los ID que ya se vieron y procesaron. Cuando se recibe un registro, Dataflow busca su ID en el catálogo. Si se encuentra el ID, el registro ya se recibió y confirmó, y se descarta como un duplicado. Para garantizar que los ID de registro sean estables, cada resultado de cada paso se controla en el almacenamiento. Como resultado, si el mismo mensaje se envía varias veces debido a llamadas RPC repetidas, el mensaje solo se confirma en el almacenamiento una vez.

Garantiza una latencia baja

Para que el procesamiento “exactamente una vez” sea viable, se debe reducir la E/S, en particular, mediante la prevención de E/S en cada registro. Para lograr este objetivo, Dataflow usa filtros de Bloom y recolección de elementos no utilizados.

Filtros de Bloom

Los filtros de Bloom son estructuras de datos compactas que permiten verificaciones rápidas de membresías de conjuntos. En Dataflow, cada trabajador mantiene un filtro de Bloom de cada ID que ve. Cuando llega un ID de registro nuevo, el trabajador busca el ID en el filtro. Si el filtro muestra falso, este registro no es un duplicado y el trabajador no busca el ID en un almacenamiento estable.

Dataflow mantiene un conjunto de filtros progresivos de Bloom agrupados por tiempo. Cuando llega un registro, Dataflow elige el filtro adecuado para verificar en función de la marca de tiempo del sistema. En este paso, se evita que los filtros de Bloom se saturen a medida que los filtros se recolectan, y también se limita la cantidad de datos que se deben analizar en el inicio.

Recolección de elementos no utilizados

Para evitar que se llene el almacenamiento con ID de registros, Dataflow usa la recolección de elementos no utilizados a fin de quitar los registros antiguos. Dataflow usa la marca de tiempo del sistema para calcular una marca de agua de recolección de elementos no utilizados.

Esta marca de agua se basa en la cantidad de tiempo físico dedicado a esperar una etapa determinada. Por lo tanto, también proporciona información sobre qué partes de la canalización son lentas. Estos metadatos son la base de la métrica de retraso del sistema que se muestra en la interfaz de supervisión de Dataflow.

Si un registro llega a una marca de tiempo anterior a la marca de agua y si los ID de este momento ya se recolectaron, el registro se ignora. Debido a que la marca de agua baja que activa la recolección de elementos no utilizados no avanza hasta que se confirma la recepción de las entregas de registros, estos registros retrasados se duplican.

Fuentes no deterministas

Dataflow usa el SDK de Apache Beam para leer datos en canalizaciones. Si el procesamiento falla, Dataflow puede reintentar las lecturas de una fuente. En ese caso, Dataflow necesita asegurarse de que cada registro único producido por una fuente se registre exactamente una vez. Para las fuentes deterministas, como Pub/Sub Lite o Kafka, los registros se leen en función de un desplazamiento registrado, lo que mitiga la necesidad de este paso.

Debido a que Dataflow no puede asignar automáticamente ID de registro, las fuentes no deterministas deben indicarle a Dataflow cuáles son los ID de registro para evitar la duplicación. Cuando una fuente proporciona ID únicos para cada registro, el conector usa una redistribución en la canalización a fin de quitar los duplicados. Se filtran los registros con el mismo ID. Para ver un ejemplo de cómo Dataflow implementa el procesamiento “exactamente una vez” cuando se usa Pub/Sub como fuente, consulta la sección Anulación de duplicación eficiente en la página Transmisión con Pub/Sub.

Cuando ejecutas DoFn personalizadas como parte de tu canalización, Dataflow no garantiza que este código se ejecute solo una vez por registro. Para garantizar al menos una vez el procesamiento en caso de fallas de los trabajadores, Dataflow puede ejecutar un registro determinado a través de una transformación varias veces, o puede ejecutar el mismo registro en simultáneo en varios trabajadores. Si incluyes código en tu canalización que realiza acciones como comunicarse con un servicio externo, las acciones pueden ejecutarse más de una vez para un registro determinado.

Para que el procesamiento no determinista sea eficazmente determinista, usa el punto de control. Cuando usas puntos de control, cada resultado de una transformación se controla en un almacenamiento estable con su ID único antes de entregarlo a la siguiente etapa. Los reintentos en la entrega aleatoria de Dataflow retransmiten el resultado que se indicó. Aunque el código puede ejecutarse varias veces, Dataflow garantiza que se almacene un resultado de solo una de esas ejecuciones. Dataflow usa un almacén coherente que evita que se escriban duplicados en un almacenamiento estable.

Entrega de resultado “exactamente una vez”

El SDK de Apache Beam incluye receptores integrados que están diseñados para garantizar que no produzcan duplicados. Siempre que sea posible, usa uno de estos receptores integrados.

Si necesitas escribir tu propio receptor, el mejor enfoque es hacer que tu objeto de la función sea idempotente para que pueda reintentarse con la frecuencia que sea necesaria sin causar efectos secundarios no deseados. Sin embargo, a menudo, algún componente de la transformación que implementa la funcionalidad del receptor no es determinista y puede cambiar si se vuelve a intentar.

Por ejemplo, en una agregación con ventanas, el conjunto de registros en la ventana puede no ser determinista. Específicamente, la ventana podría intentar activarse con el elemento e0, e1, e2. El trabajador puede fallar antes de confirmar el procesamiento de la ventana, pero no antes de que esos elementos se envíen como un efecto secundario. Cuando el trabajador se reinicia, la ventana se vuelve a activar y llega un elemento e3 retrasado. Debido a que este elemento llega antes de que se confirme la ventana, no se cuenta como datos retrasados, por lo que se vuelve a llamar a DoFn con los elementos e0, e1, e2 y e3. Luego, se envían a la operación de efecto secundario. La idempotencia no ayuda en esta situación, ya que se envían diferentes conjuntos de registros lógicos cada vez.

Para abordar la no determinación en Dataflow, usa la transformación integrada Reshuffle. Cuando Dataflow redistribuye datos, los escribe de manera duradera para que los elementos generados de forma no determinista sean estables si se reintentan las operaciones después de que se produce la redistribución. El uso de la transformación Reshuffle garantiza que solo una versión del resultado de un DoFn pueda hacer que pase un límite de redistribución. Con el siguiente patrón, se garantiza que la operación de efecto secundario siempre reciba un registro determinista para generar lo siguiente:

c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 .apply(GroupByKey.<..>.create())
 .apply(new PrepareOutputData())
 .apply(Reshuffle.<..>of())
 .apply(WriteToSideEffect());

Para asegurarte de que el ejecutor de Dataflow sepa que los elementos deben ser estables antes de ejecutar un DoFn, agrega la anotación RequiresStableInput al DoFn.

Más información