Ciclo de vida de una canalización

En esta página se ofrece una descripción general del ciclo de vida de una canalización, desde el código de la canalización hasta un trabajo de Dataflow.

En esta página se explican los siguientes conceptos:

  • Qué es un gráfico de ejecución y cómo se convierte un flujo de procesamiento de Apache Beam en una tarea de Dataflow
  • Cómo gestiona Dataflow los errores
  • Cómo Dataflow paraleliza y distribuye automáticamente la lógica de procesamiento de tu canalización entre los trabajadores que realizan tu tarea
  • Optimizaciones de tareas que puede hacer Dataflow

Gráfico de ejecución

Cuando ejecutas tu flujo de procesamiento de Dataflow, este crea un gráfico de ejecución a partir del código que crea tu objeto Pipeline, incluidas todas las transformaciones y sus funciones de procesamiento asociadas, como los objetos DoFn. Este es el gráfico de ejecución de la canalización y la fase se denomina tiempo de construcción del gráfico.

Durante la construcción del gráfico, Apache Beam ejecuta localmente el código del punto de entrada principal del código de la canalización, se detiene en las llamadas a un paso de origen, receptor o transformación y convierte estas llamadas en nodos del gráfico. Por lo tanto, un fragmento de código en el punto de entrada de una canalización (método de Java y Go main o nivel superior de una secuencia de comandos de Python) se ejecuta localmente en la máquina que ejecuta la canalización. El mismo código declarado en un método de un objeto DoFn se ejecuta en los trabajadores de Dataflow.

Por ejemplo, el ejemplo WordCount incluido en los SDKs de Apache Beam contiene una serie de transformaciones para leer, extraer, contar, dar formato y escribir las palabras individuales de una colección de texto, junto con un recuento de ocurrencias de cada palabra. En el siguiente diagrama se muestra cómo se expanden las transformaciones de la canalización WordCount en un gráfico de ejecución:

Las transformaciones del programa de ejemplo WordCount se ampliaron en un gráfico de ejecución
de pasos que debe ejecutar el servicio Dataflow.

Figura 1: Gráfico de ejecución de ejemplo de WordCount

El gráfico de ejecución suele ser diferente del orden en el que especificaste las transformaciones al crear la canalización. Esta diferencia se debe a que el servicio Dataflow realiza varias optimizaciones y fusiones en el gráfico de ejecución antes de que se ejecute en recursos de nube gestionados. El servicio Dataflow respeta las dependencias de datos al ejecutar tu flujo de procesamiento. Sin embargo, los pasos que no tengan dependencias de datos entre ellos se pueden ejecutar en cualquier orden.

Para ver el gráfico de ejecución no optimizado que Dataflow ha generado para tu flujo de trabajo, selecciona tu tarea en la interfaz de monitorización de Dataflow. Para obtener más información sobre cómo ver trabajos, consulta Usar la interfaz de monitorización de Dataflow.

Durante la construcción del grafo, Apache Beam valida que los recursos a los que hace referencia la canalización, como los cubos de Cloud Storage, las tablas de BigQuery y los temas o las suscripciones de Pub/Sub, existan y sean accesibles. La validación se realiza mediante llamadas a las APIs estándar de los servicios correspondientes, por lo que es fundamental que la cuenta de usuario utilizada para ejecutar una canalización tenga la conectividad adecuada con los servicios necesarios y esté autorizada para llamar a las APIs de los servicios. Antes de enviar la canalización al servicio Dataflow, Apache Beam también comprueba si hay otros errores y se asegura de que el gráfico de la canalización no contenga ninguna operación no permitida.

A continuación, el gráfico de ejecución se traduce al formato JSON y se transmite al endpoint del servicio Dataflow.

A continuación, el servicio Dataflow valida el gráfico de ejecución JSON. Cuando se valida el gráfico, se convierte en una tarea en el servicio Dataflow. Puedes ver tu trabajo, su gráfico de ejecución, su estado y la información de registro mediante la interfaz de monitorización de Dataflow.

Java

El servicio Dataflow envía una respuesta a la máquina en la que ejecutas tu programa de Dataflow. Esta respuesta se encapsula en el objeto DataflowPipelineJob, que contiene el jobId de tu trabajo de Dataflow. Usa jobId para monitorizar, hacer un seguimiento y solucionar problemas de tu trabajo mediante la interfaz de monitorización de Dataflow y la interfaz de línea de comandos de Dataflow. Para obtener más información, consulta la referencia de la API de DataflowPipelineJob.

Python

El servicio Dataflow envía una respuesta a la máquina en la que ejecutas tu programa de Dataflow. Esta respuesta se encapsula en el objeto DataflowPipelineResult, que contiene el job_id de tu trabajo de Dataflow. Usa la job_id para monitorizar, hacer un seguimiento y solucionar problemas de tu trabajo con la interfaz de monitorización de Dataflow y la interfaz de línea de comandos de Dataflow.

Go

El servicio Dataflow envía una respuesta a la máquina en la que ejecutas tu programa de Dataflow. Esta respuesta se encapsula en el objeto dataflowPipelineResult, que contiene el jobID de tu trabajo de Dataflow. Usa la jobID para monitorizar, hacer un seguimiento y solucionar problemas de tu trabajo con la interfaz de monitorización de Dataflow y la interfaz de línea de comandos de Dataflow.

La creación del gráfico también se produce cuando ejecutas tu canalización localmente, pero el gráfico no se traduce a JSON ni se transmite al servicio. En su lugar, el gráfico se ejecuta de forma local en la misma máquina en la que has iniciado el programa de Dataflow. Para obtener más información, consulta Configurar PipelineOptions para la ejecución local.

Gestión de errores y excepciones

Es posible que tu flujo de trabajo genere excepciones al procesar datos. Algunos de estos errores son transitorios, como la dificultad temporal para acceder a un servicio externo. Otros errores son permanentes, como los que se producen por datos de entrada dañados o no analizables, o por punteros nulos durante el cálculo.

Dataflow procesa los elementos en paquetes arbitrarios y vuelve a intentar procesar el paquete completo cuando se produce un error en algún elemento de ese paquete. Cuando se ejecuta en modo por lotes, los paquetes que incluyen un elemento con errores se vuelven a intentar cuatro veces. El flujo de procesamiento falla por completo cuando un paquete falla cuatro veces. Cuando se ejecuta en modo de streaming, un paquete que incluye un elemento fallido se vuelve a intentar indefinidamente, lo que puede provocar que tu canalización se detenga permanentemente.

Cuando se procesa en modo por lotes, es posible que se produzcan muchos errores individuales antes de que falle por completo una tarea de canalización, lo que ocurre cuando falla un paquete determinado después de cuatro intentos. Por ejemplo, si tu flujo de procesamiento intenta procesar 100 paquetes, Dataflow podría generar varios cientos de errores individuales hasta que un solo paquete alcance la condición de cuatro errores para salir.

Los errores de los trabajadores de inicio, como los fallos al instalar paquetes en los trabajadores, son transitorios. En este caso, se producirán reintentos indefinidos y es posible que tu pipeline se detenga de forma permanente.

Paralelización y distribución

El servicio Dataflow paraleliza y distribuye automáticamente la lógica de procesamiento de tu flujo de trabajo entre los trabajadores que asignes para realizar tu tarea. Dataflow usa las abstracciones del modelo de programación para representar funciones de procesamiento en paralelo. Por ejemplo, las transformaciones ParDo de una canalización hacen que Dataflow distribuya automáticamente el código de procesamiento, representado por objetos DoFn, a varios trabajadores para que se ejecuten en paralelo.

Hay dos tipos de paralelismo de trabajos:

  • El paralelismo horizontal se produce cuando los datos de la canalización se dividen y se procesan en varios trabajadores al mismo tiempo. El entorno de ejecución de Dataflow se basa en un grupo de trabajadores distribuidos. Una canalización tiene un paralelismo potencial mayor cuando el grupo contiene más trabajadores, pero esa configuración también tiene un coste mayor. En teoría, el paralelismo horizontal no tiene límite superior. Sin embargo, Dataflow limita el grupo de trabajadores a 4000 trabajadores para optimizar el uso de recursos en toda la flota.

  • El paralelismo vertical se produce cuando los datos de la canalización se dividen y se procesan en varios núcleos de CPU del mismo trabajador. Cada trabajador funciona con una máquina virtual de Compute Engine. Una VM puede ejecutar varios procesos para saturar todos sus núcleos de CPU. Una VM con más núcleos tiene un mayor paralelismo vertical potencial, pero esta configuración conlleva un coste más elevado. Un mayor número de núcleos suele provocar un aumento del uso de memoria, por lo que el número de núcleos suele escalarse junto con el tamaño de la memoria. Dado el límite físico de las arquitecturas de los ordenadores, el límite superior del paralelismo vertical es mucho menor que el límite superior del paralelismo horizontal.

Paralelismo gestionado

De forma predeterminada, Dataflow gestiona automáticamente el paralelismo de los trabajos. Dataflow monitoriza las estadísticas de tiempo de ejecución de la tarea, como el uso de CPU y memoria, para determinar cómo escalarla. En función de la configuración de tu trabajo, Dataflow puede escalar los trabajos horizontalmente (lo que se conoce como autoescalado horizontal) o verticalmente (lo que se conoce como escalado vertical). El escalado automático para el paralelismo optimiza el coste y el rendimiento de los trabajos.

Para mejorar el rendimiento de las tareas, Dataflow también optimiza las canalizaciones internamente. Las optimizaciones típicas son la optimización de fusión y la optimización de combinación. Al fusionar los pasos de la canalización, Dataflow elimina los costes innecesarios asociados a la coordinación de los pasos en un sistema distribuido y a la ejecución de cada paso por separado.

Factores que afectan al paralelismo

Los siguientes factores influyen en el rendimiento del paralelismo en los trabajos de Dataflow.

Fuente de entrada

Cuando una fuente de entrada no permite el paralelismo, el paso de ingesta de la fuente de entrada puede convertirse en un cuello de botella en una tarea de Dataflow. Por ejemplo, cuando ingieres datos de un solo archivo de texto comprimido, Dataflow no puede paralelizar los datos de entrada. Como la mayoría de los formatos de compresión no se pueden dividir arbitrariamente en fragmentos durante la ingestión, Dataflow necesita leer los datos de forma secuencial desde el principio del archivo. El rendimiento general de la canalización se ralentiza debido a la parte no paralela de la canalización. La solución a este problema es usar una fuente de entrada más escalable.

En algunos casos, la fusión de pasos también reduce el paralelismo. Cuando la fuente de entrada no permite el paralelismo, si Dataflow combina el paso de ingestión de datos con los pasos posteriores y asigna este paso combinado a un solo subproceso, toda la canalización puede ejecutarse más lentamente.

Para evitar esta situación, inserta un paso Redistribute después del paso de ingesta de la fuente de entrada. Para obtener más información, consulta la sección Evitar la fusión de este documento.

Fan-out y forma de los datos predeterminados

El fanout predeterminado de un solo paso de transformación puede convertirse en un cuello de botella y limitar el paralelismo. Por ejemplo, una transformación ParDo con un alto factor de ramificación puede provocar que la fusión limite la capacidad de Dataflow para optimizar el uso de los trabajadores. En una operación de este tipo, puede que tengas una colección de entrada con relativamente pocos elementos, pero ParDo produce una salida con cientos o miles de veces más elementos, seguida de otro ParDo. Si el servicio Dataflow combina estas operaciones ParDo, el paralelismo en este paso se limita a un máximo del número de elementos de la colección de entrada, aunque el PCollection intermedio contenga muchos más elementos.

Para ver posibles soluciones, consulta la sección Evitar la fusión de este documento.

Forma de los datos

La forma de los datos, ya sean datos de entrada o datos intermedios, puede limitar el paralelismo. Por ejemplo, cuando un paso GroupByKey en una clave natural, como una ciudad, va seguido de un paso map o Combine, Dataflow fusiona los dos pasos. Cuando el espacio de claves es pequeño (por ejemplo, cinco ciudades) y una clave es muy activa (por ejemplo, una ciudad grande), la mayoría de los elementos de la salida del GroupByKeypaso se distribuyen a un proceso. Este proceso se convierte en un cuello de botella y ralentiza el trabajo.

En este ejemplo, puedes redistribuir los resultados del paso GroupByKey en un espacio de claves artificial más grande en lugar de usar las claves naturales. Inserta un paso Redistribute entre el paso GroupByKey y el paso map o Combine. En el Redistribute paso, crea el espacio de claves artificiales, por ejemplo, usando una función hash, para superar el paralelismo limitado causado por la forma de los datos.

Para obtener más información, consulta la sección Evitar la fusión de este documento.

Receptor de salida

Un receptor es una transformación que escribe en un sistema de almacenamiento de datos externo, como un archivo o una base de datos. En la práctica, los receptores se modelan e implementan como objetos DoFn estándar y se usan para materializar un PCollection en sistemas externos. En este caso, PCollection contiene los resultados finales de la canalización. Los subprocesos que llaman a las APIs de sink pueden ejecutarse en paralelo para escribir datos en los sistemas externos. De forma predeterminada, no se produce ninguna coordinación entre los subprocesos. Si no hay una capa intermedia que almacene en búfer las solicitudes de escritura y controle el flujo, el sistema externo puede sobrecargarse y reducir el rendimiento de escritura. Si aumentas los recursos añadiendo más paralelismo, es posible que la canalización se ralentice aún más.

La solución a este problema es reducir el paralelismo en el paso de escritura. Puedes añadir un paso GroupByKey justo antes del paso de escritura. El paso GroupByKey agrupa los datos de salida en un conjunto más pequeño de lotes para reducir el número total de llamadas RPC y las conexiones a sistemas externos. Por ejemplo, usa un GroupByKey para crear un espacio de hash de 50 de entre 1 millón de puntos de datos.

El inconveniente de este enfoque es que introduce un límite codificado en el paralelismo. Otra opción es implementar un tiempo de espera exponencial en el receptor al escribir datos. Esta opción puede proporcionar una limitación mínima del cliente.

Monitorizar el paralelismo

Para monitorizar el paralelismo, puedes usar la Google Cloud consola para ver los elementos que se hayan quedado atrás. Para obtener más información, consulta los artículos Solucionar problemas de tareas por lotes que se retrasan y Solucionar problemas de tareas de streaming que se retrasan.

Optimización de la fusión

Una vez que se haya validado el formulario JSON de tu gráfico de ejecución de la canalización, el servicio Dataflow podrá modificar el gráfico para realizar optimizaciones. Las optimizaciones pueden incluir la fusión de varios pasos o transformaciones en el gráfico de ejecución de tu pipeline en pasos únicos. La fusión de pasos evita que el servicio Dataflow tenga que materializar cada PCollection intermedio de tu flujo de procesamiento, lo que puede resultar caro en términos de memoria y sobrecarga de procesamiento.

Aunque todas las transformaciones que especifiques en la construcción de tu canalización se ejecutan en el servicio, para asegurar la ejecución más eficiente de tu canalización, las transformaciones se pueden ejecutar en un orden diferente o como parte de una transformación fusionada más grande. El servicio Dataflow respeta las dependencias de datos entre los pasos del gráfico de ejecución, pero, por lo demás, los pasos se pueden ejecutar en cualquier orden.

Ejemplo de fusión

En el siguiente diagrama se muestra cómo el servicio Dataflow puede optimizar y combinar el gráfico de ejecución del ejemplo WordCount incluido en el SDK de Apache Beam para Java para que la ejecución sea eficiente:

Gráfico de ejecución del programa de ejemplo WordCount optimizado y con pasos combinados por el servicio Dataflow.

Figura 2: Gráfico de ejecución optimizado del ejemplo WordCount

Evitar la fusión

En algunos casos, es posible que Dataflow adivine incorrectamente la forma óptima de fusionar operaciones en la canalización, lo que puede limitar la capacidad de Dataflow para usar todos los trabajadores disponibles. En estos casos, puedes dar una pista a Dataflow para que redistribuya los datos mediante una transformación Redistribute.

Para añadir una transformación Redistribute, llama a uno de los siguientes métodos:

  • Redistribute.arbitrarily: indica que es probable que los datos estén desequilibrados. Dataflow elige el mejor algoritmo para redistribuir los datos.

  • Redistribute.byKey: indica que es probable que un PCollection de pares clave-valor esté desequilibrado y que deba redistribuirse en función de las claves. Normalmente, Dataflow coloca todos los elementos de una sola clave en el mismo subproceso de trabajador. Sin embargo, no se garantiza la colocación conjunta de las claves y los elementos se procesan de forma independiente.

Si tu canalización contiene una transformación Redistribute, Dataflow suele evitar la fusión de los pasos anteriores y posteriores a la transformación Redistribute y baraja los datos para que los pasos posteriores a la transformación Redistribute tengan un paralelismo más óptimo.

Monitor fusion

Puedes acceder a tu gráfico optimizado y a las fases fusionadas en la Google Cloud consola, mediante la CLI de gcloud o mediante la API.

Consola

Para ver las fases y los pasos combinados de tu gráfico en la consola, ve a la pestaña Detalles de la ejecución de tu tarea de Dataflow y abre la vista de gráfico Flujo de trabajo de la fase.

Para ver los pasos de los componentes que se han fusionado en una fase, haz clic en la fase fusionada del gráfico. En el panel Información de la fase, la fila Pasos del componente muestra las fases fusionadas. A veces, partes de una sola transformación compuesta se fusionan en varias fases.

gcloud

Para acceder a tu gráfico optimizado y a las fases fusionadas mediante la CLI de gcloud, ejecuta el siguiente comando gcloud:

  gcloud dataflow jobs describe --full JOB_ID --format json

Sustituye JOB_ID por el ID de tu trabajo de Dataflow.

Para extraer los bits relevantes, canaliza la salida del comando gcloud a jq:

gcloud dataflow jobs describe --full JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'

Para ver la descripción de las fases fusionadas en el archivo de respuesta de salida, en la matriz ComponentTransform, consulta el objeto ExecutionStageSummary.

API

Para acceder a tu gráfico optimizado y a las fases fusionadas mediante la API, llama a project.locations.jobs.get.

Para ver la descripción de las fases fusionadas en el archivo de respuesta de salida, en la matriz ComponentTransform, consulta el objeto ExecutionStageSummary.

Combinar optimización

Las operaciones de agregación son un concepto importante en el procesamiento de datos a gran escala. La agregación reúne datos que están conceptualmente muy separados, lo que resulta extremadamente útil para establecer correlaciones. El modelo de programación de Dataflow representa las operaciones de agregación como las transformaciones GroupByKey, CoGroupByKey y Combine.

Las operaciones de agregación de Dataflow combinan datos de todo el conjunto de datos, incluidos los que pueden estar repartidos entre varios trabajadores. Durante estas operaciones de agregación, suele ser más eficiente combinar la mayor cantidad de datos posible de forma local antes de combinar los datos de las distintas instancias. Cuando aplicas una GroupByKey u otra transformación de agregación, el servicio Dataflow realiza automáticamente una combinación parcial de forma local antes de la operación de agrupación principal.

Cuando se realiza una combinación parcial o multinivel, el servicio Dataflow toma decisiones diferentes en función de si tu flujo de procesamiento trabaja con datos por lotes o de streaming. En el caso de los datos delimitados, el servicio prioriza la eficiencia y realiza tantas combinaciones locales como sea posible. En el caso de los datos ilimitados, el servicio prioriza una latencia más baja y es posible que no realice combinaciones parciales, ya que podría aumentar la latencia.