Procesamiento exacto en Dataflow

Dataflow admite el procesamiento de registros una sola vez. En esta página se explica cómo implementa Dataflow el procesamiento "una sola vez" y, al mismo tiempo, garantiza una latencia baja.

Información general

Los flujos de procesamiento por lotes siempre usan el procesamiento exacto. Los flujos de procesamiento usan el procesamiento exacto de forma predeterminada, pero también pueden usar el procesamiento al menos una vez.

El procesamiento exacto proporciona garantías sobre los resultados del procesamiento de registros, incluidos los resultados de cada fase de la canalización. En concreto, por cada registro que llega a la canalización desde una fuente o a una fase desde una fase anterior, Dataflow asegura lo siguiente:

  • El registro se procesa y no se pierde.
  • Los resultados del procesamiento que se mantengan en la canalización se reflejarán como máximo una vez.

Es decir, los registros se procesan al menos una vez y los resultados se confirman exactamente una vez.

El procesamiento exacto garantiza que los resultados sean precisos y que no haya registros duplicados en la salida. Dataflow está optimizado para minimizar la latencia y, al mismo tiempo, mantener la semántica de tipo "exactamente una vez". Sin embargo, el procesamiento exacto sigue teniendo un coste para realizar la deduplicación. En los casos prácticos que pueden tolerar registros duplicados, a menudo puedes reducir los costes y mejorar la latencia habilitando el modo "al menos una vez". Para obtener más información sobre cómo elegir entre el envío exactamente una vez y el envío al menos una vez, consulta el artículo Definir el modo de streaming de la canalización.

Datos retrasados

El procesamiento "una sola vez" garantiza la precisión de la canalización: si la canalización procesa un registro, Dataflow se asegura de que se refleje en la salida y de que no se duplique.

Sin embargo, en una canalización de streaming, el procesamiento exactamente una vez no puede garantizar que los resultados sean completos, ya que los registros pueden llegar tarde. Por ejemplo, supongamos que tu canalización realiza una agregación en un periodo, como Count. Con el procesamiento exactamente una vez, el resultado es preciso para los registros que llegan a tiempo dentro de la ventana, pero es posible que se descarten los registros tardíos.

Por lo general, no hay forma de garantizar la integridad en una canalización de streaming, ya que, en teoría, los registros pueden llegar con un retraso arbitrario. En el caso límite, tendrías que esperar indefinidamente para obtener un resultado. De forma más práctica, Apache Beam te permite configurar el umbral para descartar datos tardíos y cuándo emitir resultados agregados. Para obtener más información, consulta Marcas de agua y datos tardíos en la documentación de Apache Beam.

Efectos secundarios

No se garantiza que los efectos secundarios tengan una semántica de exactamente una vez. Es importante tener en cuenta que esto incluye escribir la salida en un almacén externo, a menos que el receptor también implemente la semántica de exactamente una vez.

En concreto, Dataflow no garantiza que cada registro pase por cada transformación exactamente una vez. Debido a los reintentos o a los fallos de los trabajadores, Dataflow puede enviar un registro a través de una transformación varias veces o incluso simultáneamente en varios trabajadores.

Como parte del procesamiento "una sola vez", Dataflow elimina los duplicados de las salidas. Sin embargo, si el código de una transformación tiene efectos secundarios, estos podrían producirse varias veces. Por ejemplo, si una transformación hace una llamada a un servicio remoto, esa llamada se puede hacer varias veces para el mismo registro. Los efectos secundarios pueden incluso provocar la pérdida de datos en algunas situaciones. Por ejemplo, supongamos que una transformación lee un archivo para generar una salida y, a continuación, elimina el archivo inmediatamente sin esperar a que se confirme la salida. Si se produce un error al confirmar el resultado, Dataflow vuelve a intentar la transformación, pero ahora la transformación no puede leer el archivo eliminado.

Almacenamiento de registros

El registro de salida del procesamiento indica que se ha producido el procesamiento, pero no si los datos se han confirmado. Por lo tanto, los archivos de registro pueden indicar que los datos se han procesado varias veces, aunque los resultados de los datos procesados se confirmen en el almacenamiento persistente solo una vez. Además, los registros no siempre reflejan los datos procesados y confirmados. Es posible que se descarten registros debido a la limitación o que se pierdan debido a otros problemas del servicio de registro.

Streaming exacto

En esta sección se explica cómo implementa Dataflow el procesamiento exactamente una vez en los trabajos de streaming, incluido cómo gestiona Dataflow complejidades como el procesamiento no determinista, los datos tardíos y el código personalizado.

Shuffle de streaming de Dataflow

Los trabajos de Dataflow de streaming se ejecutan en muchos trabajadores diferentes en paralelo asignando intervalos de trabajo a cada trabajador. Aunque las asignaciones pueden cambiar con el tiempo en respuesta a errores de los trabajadores, al escalado automático u otros eventos, después de cada GroupByKeytransformación, todos los registros con la misma clave se procesan en el mismo trabajador. La transformación GroupByKey se suele usar en transformaciones compuestas, como Count, FileIO, etc. Para asegurarse de que los registros de una clave determinada acaben en el mismo trabajador, los trabajadores de Dataflow barajan los datos entre sí mediante llamadas a procedimiento remoto (RPCs).

Para garantizar que no se pierdan registros durante la aleatorización, Dataflow usa copias de seguridad de la fase anterior. Con la copia de seguridad de la fuente, el trabajador que envía los registros vuelve a intentar las llamadas a procedimiento remoto hasta que recibe una confirmación positiva de que se ha recibido el registro. Los efectos secundarios del procesamiento del registro se confirman en el almacenamiento persistente más adelante. Si el trabajador que envía los registros deja de estar disponible, Dataflow sigue reintentando las llamadas RPC, lo que asegura que cada registro se entregue al menos una vez.

Como estos reintentos pueden crear duplicados, cada mensaje se etiqueta con un ID único. Cada receptor almacena un catálogo de todos los IDs que ya se han visto y procesado. Cuando se recibe un registro, Dataflow busca su ID en el catálogo. Si se encuentra el ID, significa que el registro ya se ha recibido y confirmado, por lo que se descarta como duplicado. Para asegurarse de que los IDs de registro sean estables, cada salida de un paso a otro se guarda en el almacenamiento. Por lo tanto, si el mismo mensaje se envía varias veces debido a llamadas RPC repetidas, solo se confirma en el almacenamiento una vez.

Asegurar una latencia baja

Para que el procesamiento exactamente una vez sea viable, se debe reducir la E/S, en particular, evitando la E/S en cada registro. Para conseguir este objetivo, Dataflow usa filtros de Bloom y la recogida de elementos no utilizados.

Filtros Bloom

Los filtros Bloom son estructuras de datos compactas que permiten comprobar rápidamente si un elemento pertenece a un conjunto. En Dataflow, cada trabajador mantiene un filtro de Bloom de cada ID que ve. Cuando llega un nuevo ID de registro, el trabajador busca el ID en el filtro. Si el filtro devuelve el valor false, el registro no es un duplicado y el trabajador no busca el ID en el almacenamiento estable.

Dataflow mantiene un conjunto de filtros Bloom continuos agrupados por tiempo. Cuando llega un registro, Dataflow elige el filtro adecuado para comprobarlo en función de la marca de tiempo del sistema. Este paso evita que los filtros Bloom se saturen a medida que se recogen los elementos no utilizados y también limita la cantidad de datos que se deben analizar al inicio.

Recolección de memoria residual

Para evitar que el almacenamiento se llene con IDs de registros, Dataflow usa la recogida de elementos no utilizados para eliminar los registros antiguos. Dataflow usa la marca de tiempo del sistema para calcular una marca de agua de recolección de elementos no utilizados.

Esta marca de agua se basa en el tiempo físico que se ha dedicado a esperar en una fase determinada. Por lo tanto, también proporciona información sobre qué partes de la canalización son lentas. Estos metadatos son la base de la métrica de latencia del sistema que se muestra en la interfaz de monitorización de Dataflow.

Si llega un registro con una marca de tiempo anterior a la marca de agua y ya se han recogido los IDs de ese momento, el registro se ignora. Como la marca de agua inferior que activa la recogida de elementos no utilizados no avanza hasta que se confirman las entregas de registros, estos registros que llegan tarde son duplicados.

Fuentes no deterministas

Dataflow usa el SDK de Apache Beam para leer datos en las canalizaciones. Si el procesamiento falla, Dataflow puede volver a intentar leer datos de una fuente. En esa situación, Dataflow debe asegurarse de que cada registro único generado por una fuente se registre exactamente una vez. En el caso de las fuentes deterministas, como Pub/Sub Lite o Kafka, los registros se leen en función de un desplazamiento registrado, lo que reduce la necesidad de este paso.

Como Dataflow no puede asignar automáticamente IDs de registro, las fuentes no deterministas deben indicar a Dataflow cuáles son los IDs de registro para evitar duplicados. Cuando una fuente proporciona IDs únicos para cada registro, el conector usa una aleatorización en la canalización para eliminar los duplicados. Los registros con el mismo ID se filtran. Para ver un ejemplo de cómo implementa Dataflow el procesamiento una vez solo cuando se usa Pub/Sub como fuente, consulta la sección Procesamiento una vez solo de la página Streaming con Pub/Sub.

Cuando ejecutas DoFns personalizados como parte de tu flujo de procesamiento, Dataflow no garantiza que este código se ejecute solo una vez por registro. Para garantizar que el procesamiento se realiza al menos una vez en caso de que se produzcan errores en los trabajadores, Dataflow puede ejecutar un registro determinado a través de una transformación varias veces o puede ejecutar el mismo registro simultáneamente en varios trabajadores. Si incluyes código en tu flujo de procesamiento que haga cosas como ponerse en contacto con un servicio externo, es posible que las acciones se ejecuten más de una vez para un registro determinado.

Para que el procesamiento no determinista sea determinista, usa puntos de control. Cuando usas el punto de control, cada salida de una transformación se registra en un almacenamiento estable con su ID único antes de enviarse a la siguiente fase. Los reintentos en la distribución aleatoria de Dataflow reenvían la salida que se ha guardado en un punto de control. Aunque tu código se ejecute varias veces, Dataflow se asegura de que solo se almacene el resultado de una de esas ejecuciones. Dataflow usa un almacén coherente que evita que se escriban duplicados en el almacenamiento estable.

Entrega de resultados exactamente una vez

El SDK de Apache Beam incluye sinks integrados diseñados para asegurarse de que no producen duplicados. Siempre que sea posible, usa uno de estos sinks integrados.

Si necesitas escribir tu propio receptor, lo mejor es que el objeto de tu función sea idempotente para que se pueda volver a intentar tantas veces como sea necesario sin provocar efectos secundarios no deseados. Sin embargo, a menudo, algún componente de la transformación que implementa la funcionalidad del receptor no es determinista y puede cambiar si se vuelve a intentar.

Por ejemplo, en una agregación de ventana, el conjunto de registros de la ventana puede no ser determinista. En concreto, la ventana puede intentar activarse con los elementos e0, e1 y e2. Es posible que el trabajador falle antes de confirmar el procesamiento de la ventana, pero no antes de que se envíen esos elementos como efecto secundario. Cuando se reinicia el trabajador, la ventana se activa de nuevo y llega un elemento tardío e3. Como este elemento llega antes de que se confirme la ventana, no se considera datos tardíos, por lo que se vuelve a llamar a DoFn con los elementos e0, e1, e2 y e3. Estos elementos se envían a la operación de efecto secundario. La idempotencia no ayuda en este caso, ya que cada vez se envían conjuntos de registros lógicos diferentes.

Para abordar la indeterminación en Dataflow, usa la transformación Reshuffle integrada. Cuando Dataflow baraja los datos, los escribe de forma duradera para que los elementos generados de forma no determinista sean estables si se reintentan las operaciones después de que se produzca el barajado. Al usar la transformación Reshuffle, se garantiza que solo una versión de la salida de DoFn puede superar un límite de aleatorización. El siguiente patrón asegura que la operación de efecto secundario siempre reciba un registro determinista para la salida:

c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 .apply(GroupByKey.<..>.create())
 .apply(new PrepareOutputData())
 .apply(Reshuffle.<..>of())
 .apply(WriteToSideEffect());

Para asegurarte de que el runner de Dataflow sabe que los elementos deben ser estables antes de ejecutar un DoFn, añade la anotación RequiresStableInput a DoFn.

Más información