Solucionar problemas de DAG

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

En esta página, se proporcionan información y pasos para solucionar problemas de flujos de trabajo comunes problemas.

Muchos problemas de ejecución del DAG se deben a un rendimiento del entorno no óptimo. Para optimizar tu entorno de Cloud Composer 2, sigue los pasos que se indican en la página de optimización y los costos del entorno.

El programador de Airflow puede causar algunos problemas de ejecución del DAG no funcione de forma óptima ni correcta. Sigue Instrucciones para solucionar problemas del programador para resolver estos problemas.

Flujo de trabajo para la solución de problemas

Para comenzar con la solución de problemas, siga estos pasos:

  1. Consulta los registros de Airflow.

    Puedes aumentar el nivel de registro de Airflow si anulas siguiente opción de configuración de Airflow.

    Airflow 2

    Sección Clave Valor
    logging logging_level El valor predeterminado es INFO. Configúralo en DEBUG para obtener más verbosidad en los mensajes de registro.

    Airflow 1

    Sección Clave Valor
    core logging_level El valor predeterminado es INFO. Configúralo en DEBUG para obtener más verbosidad en los mensajes de registro.
  2. Verifica el Panel de Monitoring.

  3. Revisa Cloud Monitoring.

  4. En la consola de Google Cloud, comprueba si hay errores en las páginas de los componentes de tu entorno.

  5. En la interfaz web de Airflow, consulta la Vista de gráfico del DAG para de tareas con errores.

    Sección Clave Valor
    webserver dag_orientation LR, TB, RL, oBT

Depura fallas del operador

Para depurar una falla del operador, sigue estos pasos:

  1. Verifica si hay errores específicos de la tarea.
  2. Consulta los registros de Airflow.
  3. Revisa Cloud Monitoring.
  4. Verifica los registros específicos del operador.
  5. Corrige los errores.
  6. Sube el DAG a la carpeta dags/.
  7. En la interfaz web de Airflow, borrar los estados pasados para el DAG.
  8. Reanuda o ejecuta el DAG.

Soluciona problemas de ejecución de tareas

Airflow es un sistema distribuido con muchas entidades, como programador, ejecutor, de trabajadores que se comunican entre sí a través de una lista de tareas en cola y Airflow y enviar señales (como SIGTERM). En el siguiente diagrama, se muestra Descripción general de las interconexiones entre los componentes de Airflow.

Interacción entre los componentes de Airflow
Figura 1. Interacción entre los componentes de Airflow (haz clic para ampliar)

En un sistema distribuido como Airflow, puede haber cierta conectividad de red problemas o la infraestructura subyacente puede experimentar problemas intermitentes. Esto puede dar lugar a situaciones en las que las tareas pueden fallar y reprogramarse ejecución o que las tareas no se completen con éxito (por ejemplo, tareas o tareas que quedaron atascadas en la ejecución). Airflow dispone de mecanismos para manejar en estas situaciones y reanudar automáticamente el funcionamiento normal. Siguiendo En estas secciones, se explican los problemas comunes que ocurren durante la ejecución de tareas de Airflow: Tareas zombi, píldoras venenosas y señales SIGTERM.

Solución de problemas de tareas zombi

Airflow detecta dos tipos de desigualdad entre una tarea y un proceso que se ejecuta la tarea:

  • Las tareas zombi son tareas que deberían ejecutarse, pero que en ejecución. Esto puede ocurrir si el proceso de la tarea se cerró o no está responder, si el trabajador de Airflow no informó el estado de una tarea a tiempo porque está sobrecargada o si se cerró la VM en la que se ejecuta la tarea. Airflow encuentra esas tareas periódicamente y falla o reintenta la tarea. según la configuración de la tarea.

    Descubre tareas zombi

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • Las tareas de zombis son tareas que no deberían ejecutarse. Hallazgos de Airflow estas tareas periódicamente y las finaliza.

A continuación, se indican los motivos y las soluciones más comunes para las tareas zombie.

El trabajador de Airflow se quedó sin memoria

Cada trabajador de Airflow puede ejecutar hasta [celery]worker_concurrency instancias de tareas. al mismo tiempo. Si un consumo de memoria acumulativo de esas instancias de tareas de memoria para un trabajador de Airflow, se generará finalizar para liberar recursos.

Descubre eventos de memoria insuficiente del trabajador de Airflow

resource.type="k8s_node"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
log_id("events")
jsonPayload.message:"Killed process"
jsonPayload.message:("airflow task" OR "celeryd")

Soluciones:

Se expulsó el trabajador de Airflow

Las expulsiones de Pods son una parte normal de la ejecución de cargas de trabajo en Kubernetes. GKE expulsa los Pods si se quedaban sin almacenamiento o los libera los recursos para las cargas de trabajo con una prioridad más alta.

Descubre expulsiones de trabajadores de Airflow

resource.type="k8s_pod"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
resource.labels.pod_name:"airflow-worker"
log_id("events")
jsonPayload.reason="Evicted"

Soluciones:

Se cerró el trabajador de Airflow

Es posible que los trabajadores de Airflow se quiten de forma externa. Si actualmente las tareas en ejecución no que aún no terminan durante un período de rescisión ordenada, estas se interrumpirán y podrían terminan siendo detectados como zombis.

Descubre las terminaciones de los Pods de los trabajadores de Airflow

resource.type="k8s_cluster"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
protoPayload.methodName:"pods.delete"
protoPayload.response.metadata.name:"airflow-worker"

Situaciones y soluciones posibles:

  • Los trabajadores de Airflow se reinician durante las modificaciones del entorno, como actualizaciones o instalación de paquetes:

    Descubre las modificaciones del entorno de Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    Puede realizar esas operaciones cuando no hay tareas críticas en ejecución o habilitar reintentos de tarea.

  • Es posible que varios componentes no estén disponibles temporalmente durante el mantenimiento. operaciones:

    Descubre las operaciones de mantenimiento de GKE

    resource.type="gke_nodepool"
    resource.labels.cluster_name="GKE_CLUSTER_NAME"
    protoPayload.metadata.operationType="UPGRADE_NODES"

    Puedes especificar períodos de mantenimiento para minimizar se superpone con la ejecución de tareas críticas.

  • En las versiones de Cloud Composer 2 anteriores a 2.4.5, se aplica una solicitud es posible que el trabajador ignore la señal SIGTERM y continúe ejecutando tareas:

    Descubre cómo reducir la escala verticalmente con el ajuste de escala automático de Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-worker-set")
    textPayload:"Workers deleted"

    Puedes actualizar a una versión posterior de Cloud Composer en la que ya se solucionó el problema.

El trabajador de Airflow estaba bajo una gran carga

La cantidad de recursos de CPU y memoria disponibles para un trabajador de Airflow es limitada por la configuración del entorno. Si un uso se acerca a los límites, causaría una contención de recursos y retrasos innecesarios durante la tarea ejecución. En situaciones extremas, cuando los recursos carecen durante períodos más largos tiempo, esto podría provocar tareas zombis.

Soluciones:

La base de datos de Airflow estaba muy cargada

Varios componentes de Airflow usan una base de datos para comunicarse entre sí. en particular, para almacenar las pulsaciones de las instancias de tareas. Escasez de recursos en la de la base de datos generará tiempos de consulta más largos y podría afectar la ejecución de una tarea.

Soluciones:

La base de datos de Airflow no estaba disponible en este momento

Es posible que un trabajador de Airflow tarde en detectar y manejar como problemas temporales de conectividad. Es posible que supere el valor predeterminado de detección de zombis.

Descubre los tiempos de espera de la señal de monitoreo de funcionamiento de Airflow

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

Soluciones:

  • Aumentar el tiempo de espera para tareas zombi y anular el valor de [scheduler]scheduler_zombie_task_threshold Opción de configuración de Airflow:

    Sección Clave Valor Notas
    scheduler scheduler_zombie_task_threshold Nuevo tiempo de espera agotado (en segundos) Predeterminado el valor es 300

Solución de problemas de la píldora venenosa

Poison Pill es un mecanismo que usa Airflow para cerrar sus tareas.

Airflow usa la píldora venenosa en las siguientes situaciones:

  • Cuando un programador finaliza una tarea que no se completó a tiempo.
  • Cuando se agota el tiempo de espera de una tarea o se ejecuta por mucho tiempo.

Cuando Airflow usa Poison Pill, puedes ver las siguientes entradas de registro en los registros de un trabajador de Airflow que ejecutó la tarea:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Taking the poison pill.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Soluciones posibles:

  • Revisa el código de la tarea para ver si hay errores que puedan provocar que se ejecute durante demasiado tiempo.
  • (Cloud Composer 2) Aumenta la CPU y la memoria para Airflow trabajadores, para que las tareas se ejecuten más rápido.
  • Aumentar el valor de la [celery_broker_transport_options]visibility-timeout Configuración de Airflow de 12 a 1 con la nueva opción de compresión.

    Como resultado, el programador espera más tiempo hasta que se complete una tarea antes de considerar que la tarea es zombi. Esta opción es especialmente útil para tareas que consumen mucho tiempo y duran muchas horas. Si Si el valor es demasiado bajo (por ejemplo, 3 horas), el programador considera tareas que se ejecutan durante 5 o 6 horas como “suspendidas” (Tareas zombi).

  • Aumenta el valor del Airflow de [core]killed_task_cleanup_time opción de configuración.

    Un valor más largo proporciona más tiempo a los trabajadores de Airflow para que finalicen sus tareas con elegancia. Si el valor es demasiado bajo, es posible que se interrumpan las tareas de Airflow de forma abrupta, sin tiempo suficiente para terminar el trabajo con elegancia.

Solución de problemas de las señales SIGTERM

Linux usa las señales SIGTERM, Kubernetes, el programador de Airflow y Celery para finalizar los procesos responsables de en la ejecución de trabajadores o tareas de Airflow.

Puede haber varios motivos por los cuales las señales SIGTERM se envían en un entorno:

  • Una tarea se convirtió en una tarea zombi y debe detenerse.

  • El programador descubrió un duplicado de una tarea y envía una píldora venenosa. SIGTERM señala la tarea para detenerla.

  • En el Ajuste de escala automático horizontal de Pods, el clúster de GKE El plano de control envía señales SIGTERM para quitar Pods que ya no estén según tus necesidades.

  • El programador puede enviar señales SIGTERM al proceso DagFileProcessorManager. El programador utiliza esas señales SIGTERM para administrar y puede ignorarse de forma segura.

    Ejemplo:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • Condición de carrera entre la devolución de llamada de monitoreo de funcionamiento y las devoluciones de llamada de salida en el local_task_job, que supervisa la ejecución de la tarea. Si el corazón late detecta que una tarea se marcó como exitosa, no puede distinguir si la tarea en sí se realizó correctamente o que se le pidió a Airflow que considerara la tarea y exitoso. No obstante, finalizará un ejecutor de tareas, sin esperar para que se cierre.

    Estas señales SIGTERM se pueden ignorar de forma segura. La tarea ya se encuentra en estado exitoso y la ejecución de la ejecución de DAG en su totalidad no se afectado.

    La entrada de registro Received SIGTERM. es la única diferencia entre la entrada normal y la finalización de la tarea en el estado correcto.

    Condición de carrera entre la señal de monitoreo de funcionamiento y las devoluciones de llamada de salida .
    Figura 2: Condición de carrera entre la señal de monitoreo de funcionamiento y las devoluciones de llamada de salida (haz clic para ampliar)
  • Un componente de Airflow usa más recursos (CPU, memoria) de los que permite la clúster.

  • El servicio de GKE realiza operaciones de mantenimiento y envía señales SIGTERM a los Pods que se ejecutan en un nodo que está a punto de actualizarse. Cuando una instancia de tarea termina con SIGTERM, puedes ver el siguiente registro entradas en los registros de un trabajador de Airflow que ejecutó la tarea:

{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception

Soluciones posibles:

Este problema ocurre cuando una VM que ejecuta la tarea se queda sin memoria. No es relacionados con los parámetros de configuración de Airflow, sino con la cantidad de memoria disponible para el y la VM.

El aumento de la memoria depende de la versión de Cloud Composer que usas. Por ejemplo:

  • En Cloud Composer 2, puedes asignar más recursos de CPU y memoria a Airflow trabajadores.

  • En el caso de Cloud Composer 1, puedes volver a crear tu entorno usando un de tipo de máquina con más rendimiento.

  • En ambas versiones de Cloud Composer, puedes reducir el valor de La opción de configuración de simultaneidad [celery]worker_concurrency de Airflow Esta opción determina cuántas tareas ejecuta un determinado Trabajador de Airflow.

Para obtener más información sobre cómo optimizar tu entorno de Cloud Composer 2, consulta Optimiza el rendimiento y los costos del entorno

Consultas de Cloud Logging para descubrir los motivos de los reinicios o las expulsiones de un Pod

Los entornos de Cloud Composer usan clústeres de GKE como infraestructura de procesamiento. por la capa de prealimentación. En esta sección, encontrarás consultas útiles que te ayudarán a encontrar los motivos de los reinicios o las expulsiones del trabajador de Airflow o del programador de Airflow

Las consultas que se presentan a continuación podrían ajustarse de la siguiente manera:

  • Puedes especificar un cronograma que te interese en Cloud Logging. por ejemplo, las últimas 6 horas o 3 días, o bien puedes definir un intervalo de tiempo personalizado

  • debes especificar la API de Cloud Composer CLUSTER_NAME

  • también puedes limitar la búsqueda a un Pod específico agregando el POD_NAME

Descubre contenedores reiniciados

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

Consulta alternativa para limitar los resultados a un Pod específico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

Descubre el cierre de contenedores debido a un evento de memoria insuficiente

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

Consulta alternativa para limitar los resultados a un Pod específico:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Descubre contenedores que dejaron de ejecutarse

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

Consulta alternativa para limitar los resultados a un Pod específico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Impacto de las operaciones de actualización en las ejecuciones de tareas de Airflow

Las operaciones de actualización interrumpen las tareas de Airflow que se ejecutan actualmente. a menos que una tarea se ejecute en el modo diferible.

Recomendamos realizar estas operaciones cuando se espere que el impacto sea mínimo en las ejecuciones de tareas de Airflow y configura mecanismos de reintento adecuados en tu DAG y tareas.

Soluciona problemas de tareas de KubernetesExecutor

CeleryKubernetesExecutor es un tipo de ejecutor de Cloud Composer 3 que puede usar CeleryExecutor y KubernetesExecutor al mismo tiempo.

Consulta la página Cómo usar CeleryKubernetesExecutor para obtener más información. información sobre la solución de problemas de tareas ejecutadas con KubernetesExecutor.

Problemas comunes

En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes del DAG.

Negsignal.SIGKILL interrumpió la tarea de Airflow

A veces, tu tarea puede usar más memoria de la que se asigna al trabajador de Airflow. En ese caso, es posible que Negsignal.SIGKILL la interrumpa. El sistema envía esta señal para evitar el consumo de memoria adicional, lo que podría afectar la ejecución de otras tareas de Airflow. En el registro del trabajador de Airflow, es posible que veas la siguiente entrada de registro:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL también puede aparecer como código -9.

Soluciones posibles:

  • Un worker_concurrency inferior de trabajadores de Airflow.

  • En el caso de Cloud Composer 2, aumenta la memoria de los trabajadores de Airflow.

  • En el caso de Cloud Composer 1, realiza la actualización al tipo de máquina más grande que se usa en Clúster de Cloud Composer.

  • Optimiza tus tareas para usar menos memoria.

  • Administra tareas con uso intensivo de recursos en Cloud Composer con KubernetesPodOperator o GKEStartPodOperator para aislamiento de tareas y asignación personalizada de recursos.

La tarea falla sin emitir registros debido a errores de análisis del DAG

A veces, puede haber errores sutiles del DAG que generen una situación en la que un programador y un procesador de DAG de Airflow pueden programar tareas para su ejecución y analizar un archivo DAG (respectivamente), pero el trabajador de Airflow no puede ejecutar tareas desde este DAG, ya que hay errores de programación en el archivo DAG de Python. Esto podría puede generar una situación en la que una tarea de Airflow se marca como Failed y no hay ningún registro de su ejecución.

Soluciones:

  • Verifica en los registros del trabajador de Airflow que no haya errores generados por Trabajador de Airflow relacionado con errores de análisis faltantes del DAG o el DAG.

  • Aumenta los parámetros relacionados con el análisis del DAG:

  • Consulta también Inspecciona los registros del procesador de DAG.

La tarea falla sin emitir registros debido a la presión de recursos

Síntoma: Durante la ejecución de una tarea, el subproceso del trabajador de Airflow es responsable para la ejecución de tareas de Airflow se interrumpe de forma abrupta. El error visible en el registro del trabajador de Airflow puede ser similar al siguiente:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Solución:

La tarea falla sin emitir registros debido a la expulsión del Pod

Los Pods de Google Kubernetes Engine están sujetos a la Ciclo de vida del Pod de Kubernetes y expulsión del Pod. Tarea Los aumentos repentinos y la programación conjunta de trabajadores son las dos causas más comunes de expulsión de Pods. en Cloud Composer.

La expulsión de pods puede ocurrir cuando un pod en particular usa recursos de un nodo, en relación con las expectativas de consumo de recursos configuradas para el nodo. Para ejemplo, una expulsión puede ocurrir cuando varias tareas que consumen mucha memoria se ejecutan en un Pod, y su carga combinada hace que el nodo en el que se ejecuta este Pod supere el límite de consumo de memoria.

Si se expulsa un pod de trabajador de Airflow, todas las instancias de tareas que se ejecutan en ese pod se interrumpen y, luego, se marcan como con errores en Airflow.

Los registros están almacenados en búfer. Si se expulsa un pod trabajador antes de que se vacíe el búfer, no se emiten registros. La falla de la tarea sin registros indica que los trabajadores de Airflow se reiniciaron debido a la falta de memoria (OOM). Algunos registros pueden estar presentes en Cloud Logging, aunque los registros de Airflow no se hayan emitido.

Para ver los registros, haz lo siguiente:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  3. Ve a la pestaña Registros.

  4. Visualiza los registros de trabajadores individuales en Todos los registros -> Registros de Airflow -> Trabajadores -> (trabajador individual).

La ejecución del DAG tiene capacidad de memoria limitada. Cada ejecución de la tarea comienza con dos procesos de Airflow: ejecución de la tarea y supervisión. Actualmente, cada nodo puede realizar hasta 6 tareas simultáneas (aproximadamente 12 procesos cargados con módulos de Airflow). Se puede consumir más memoria, según el tamaño del DAG.

Síntoma:

  1. En la consola de Google Cloud, ve a la página Cargas de trabajo.

    Ir a Cargas de trabajo

  2. Si hay pods airflow-worker que muestran Evicted, haz clic en cada pod expulsado y busca el mensaje The node was low on resource: memory en la parte superior de la ventana.

Solución:

  • En Cloud Composer 1, crea un nuevo entorno de Cloud Composer con un tipo de máquina más grande que la máquina actual el tipo de letra.
  • En Cloud Composer 2, aumenta los límites de memoria para los trabajadores de Airflow.
  • Verifica los registros de airflow-worker Pods para detectar posibles causas de expulsión. Para ver más información sobre la recuperación de registros de Pods individuales, consulta Soluciona problemas con las cargas de trabajo implementadas.
  • Asegúrate de que las tareas del DAG sean idempotentes y se puedan reintentar.
  • Evita descargar archivos innecesarios en el sistema de archivos local de los trabajadores de Airflow.

    Los trabajadores de Airflow tienen una capacidad limitada del sistema de archivos local. Por ejemplo, en Cloud Composer 2, un trabajador puede tener de 1 GB a 10 GB de almacenamiento Cuando se queda sin espacio de almacenamiento, el Pod trabajador de Airflow es expulsado Plano de control de GKE. Esto falla en todas las tareas que se trabajador de ejecución.

    Ejemplos de operaciones problemáticas:

    • Descargar objetos o archivos y almacenarlos de forma local en un Airflow trabajador. En su lugar, almacena estos objetos directamente en un servicio adecuado, como un bucket de Cloud Storage.
    • Acceso a objetos grandes en la carpeta /data desde un trabajador de Airflow. El trabajador de Airflow descarga el objeto en su sistema de archivos local. En su lugar, implementa los DAG para que los archivos grandes se procesen fuera del Pod trabajador de Airflow.

Tiempo de espera de importación de carga de DAG

Síntoma:

  • En la interfaz web de Airflow, en la parte superior de la página de la lista de DAG, se muestra una alerta roja el cuadro muestra Broken DAG: [/path/to/dagfile] Timeout.
  • En Cloud Monitoring: Los registros airflow-scheduler contienen entradas similares a:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Solución:

Anula el Airflow dag_file_processor_timeout de configuración y permiten más tiempo para el análisis del DAG:

Sección Clave Valor
core dag_file_processor_timeout Nuevo valor del tiempo de espera

La ejecución del DAG no finaliza dentro del tiempo esperado

Síntoma:

A veces, una ejecución de DAG no finaliza porque las tareas de Airflow se detienen y se ejecutan el DAG dura más de lo esperado. En condiciones normales, las tareas de Airflow no permanecen de forma indefinida en el estado en cola o de ejecución, porque Airflow tiene los procedimientos de limpieza que ayudan a evitar esta situación.

Solución:

  • Usa el dagrun_timeout para los DAG. Por ejemplo: dagrun_timeout=timedelta(minutes=120). Como resultado, cada ejecución de DAG finalizaron dentro del tiempo de espera de ejecución del DAG y se marcarán las tareas no finalizadas como Failed o Upstream Failed. Para obtener más información sobre los estados de las tareas de Airflow, consulta Documentación de Apache Airflow.

  • Usa el tiempo de espera de ejecución de la tarea para definir un tiempo de espera predeterminado para las tareas que se ejecutan en función de Apache operadores de Airflow.

No se ejecutaron las ejecuciones de DAG

Síntoma:

Cuando se establece dinámicamente una fecha de programación de un DAG, efectos secundarios inesperados. Por ejemplo:

  • Una ejecución de DAG siempre es en el futuro, y el DAG nunca se ejecuta.

  • Las ejecuciones de DAG anteriores se marcaron como ejecutadas y correctas a pesar de no ser ejecutado.

Hay más información disponible en la documentación de Apache Airflow.

Solución:

  • Sigue las recomendaciones en la documentación de Apache Airflow.

  • Establece un start_date estático para los DAG. Como opción, puedes usar catchup=False para inhabilitar la ejecución del DAG en fechas pasadas.

  • Evita usar datetime.now() o days_ago(<number of days>), a menos que seas de los efectos secundarios de este enfoque.

Mayor tráfico de red desde y hacia la base de datos de Airflow

La cantidad de tráfico de red entre el clúster de GKE de tu entorno y la base de datos de Airflow depende de la cantidad de DAG, de tareas en DAG, y de cómo los DAG acceden a los datos en la base de datos de Airflow. Los siguientes factores pueden influir en el uso de la red:

  • Consultas a la base de datos de Airflow Si tus DAG realizan muchas consultas, generan grandes cantidades de tráfico. Por ejemplo, verificar el estado de las tareas antes de continuar con otras tareas, consultar la tabla XCom y volcar el contenido de la base de datos de Airflow.

  • Gran cantidad de tareas. Cuantas más tareas haya para programar, más tráfico de red se generará. Esta consideración se aplica a la cantidad total de tareas en tus DAG y a la frecuencia de programación. Cuando el programador de Airflow programa las ejecuciones de DAG, realiza consultas a la base de datos de Airflow y genera tráfico.

  • La interfaz web de Airflow genera tráfico de red, ya que realiza consultas a la base de datos de Airflow. El uso intensivo de páginas con grafos, tareas y diagramas puede generar grandes volúmenes de tráfico de red.

El DAG provoca una falla en el servidor web de Airflow o hace que muestre un error 502 gateway timeout

Las fallas del servidor web pueden ocurrir por varias razones diferentes. Cheque los registros de airflow-webserver en Cloud Logging para determinar la causa del Error de 502 gateway timeout.

Procesamiento pesado

Esta sección solo se aplica a Cloud Composer 1.

Evita ejecutar procesamientos pesados durante el tiempo de análisis del DAG.

A diferencia de los nodos trabajadores y programadores, cuyos tipos de máquinas pueden personalizarse tienen mayor capacidad de CPU y memoria, el servidor web usa un tipo de máquina fijo lo que puede provocar fallas en el análisis del DAG si el cálculo del tiempo de análisis es demasiado y tienen mucho peso.

Ten en cuenta que el servidor web tiene 2 CPU virtuales y 2 GB de memoria. El valor predeterminado para core-dagbag_import_timeout es 30 segundos. Este valor de tiempo de espera define el límite superior de tiempo que Airflow pasa cargando un módulo de Python en la carpeta dags/.

Permisos incorrectos

Esta sección solo se aplica a Cloud Composer 1.

El servidor web no se ejecuta con la misma cuenta de servicio que los trabajadores y el programador. Por lo tanto, es posible que los trabajadores y el programador puedan acceder a los recursos administrados por el usuario a los que el servidor web no puede acceder.

Te recomendamos que evites acceder a recursos no públicos durante el análisis del DAG. A veces, esto es inevitable y deberás otorgar permisos a la cuenta de servicio del servidor web. El nombre de la cuenta de servicio se deriva del dominio de tu servidor web. Por ejemplo, si el dominio es example-tp.appspot.com, la cuenta de servicio está example-tp@appspot.gserviceaccount.com

Errores del DAG

Esta sección solo se aplica a Cloud Composer 1.

El servidor web se ejecuta en App Engine y es independiente del clúster de GKE de tu entorno. El servidor web analiza los archivos de definición del DAG, y puede mostrarse un mensaje 502 gateway timeout si hay errores en el DAG. Airflow funciona normalmente sin un servidor web funcional si un DAG problemático no interrumpe ningún proceso que se ejecute en GKE. En este caso, puedes usar gcloud composer environments run para recuperar detalles de tu entorno y como una solución alternativa si el servidor web se vuelve no disponible.

En otros casos, puedes ejecutar el análisis del DAG en GKE y buscar DAG que arrojen excepciones críticas de Python o para los que se haya agotado el tiempo de espera (30 segundos predeterminados). Para solucionar el problema, conéctate a un shell remoto en un contenedor de trabajador de Airflow y prueba los errores de sintaxis. Para obtener más información, consulta Probar DAG.

Control de una gran cantidad de DAG y complementos en carpetas de DAG y complementos

El contenido de las carpetas /dags y /plugins se sincroniza desde bucket de tu entorno a los sistemas de archivos locales de los trabajadores de Airflow y programadores.

Cuantos más datos se almacenen en estas carpetas, más tiempo llevará realizar la y la sincronización. Para abordar dichas situaciones:

  • Limita la cantidad de archivos en las carpetas /dags y /plugins. Almacena solo los la cantidad mínima de archivos necesarios.

  • Si es posible, aumenta el espacio en disco disponible para los programadores de Airflow trabajadores.

  • Si es posible, aumenta la CPU y la memoria de los programadores y trabajadores de Airflow que la operación de sincronización se realice más rápido.

  • Si hay una gran cantidad de DAG, divídelos en lotes, comprime en archivos ZIP y, luego, impleméntalos en la carpeta /dags. Este enfoque acelera el proceso de sincronización de los DAG. Componentes de Airflow descomprimir los archivos ZIP antes de procesar los DAG.

  • Generar DAG de forma programática también puede ser un método para limitar la cantidad de archivos DAG almacenados en la carpeta /dags Consulta la sección sobre los DAG programáticos que se deben evitar problemas con la programación y ejecución de DAG generados de manera programática.

No programes DAG generados de manera programática al mismo tiempo

Un método eficaz es generar objetos DAG de manera programática a partir de un archivo DAG para crear muchos DAG similares que solo tienen pequeñas diferencias.

Es importante no programar todos estos DAG para su ejecución de inmediato. Hay hay una alta probabilidad de que los trabajadores de Airflow no tengan suficiente CPU y memoria recursos para ejecutar todas las tareas programadas al mismo tiempo.

Para evitar problemas con la programación de DAG programáticos, sigue estos pasos:

  • Aumenta la simultaneidad de trabajadores y escala verticalmente tu entorno para que pueda ejecutar más tareas simultáneamente.
  • Genera DAG para distribuir sus programas de manera uniforme a lo largo del tiempo, para evitar programar cientos de tareas al mismo tiempo, para que los trabajadores de Airflow tienen tiempo para ejecutar todas las tareas programadas.

Error 504 cuando se accede al servidor web de Airflow

Consulta el error 504 cuando se accede a la IU de Airflow.

Se genera la excepción Lost connection to Postgres / MySQL server during query durante la ejecución de la tarea o justo después de ella

Lost connection to Postgres / MySQL server during query excepciones generalmente ocurren cuando se cumplen las siguientes condiciones:

  • El DAG usa PythonOperator o un operador personalizado.
  • El DAG realiza consultas a la base de datos de Airflow.

Si se realizan varias consultas desde una función que admite llamadas, los objetos tracebacks pueden apuntar de forma incorrecta a la línea self.refresh_from_db(lock_for_update=True) en el código de Airflow. Es la primera consulta de la base de datos después de la ejecución de la tarea. La causa real de la excepción ocurre antes de esto, cuando una sesión de SQLAlchemy no se cierra de forma correcta.

Las sesiones de SQLAlchemy se limitan a un subproceso y se crean en una sesión de función que admite llamadas que pueden continuar más adelante dentro del código de Airflow. Si hay errores significativos o demoras entre consultas dentro de una sesión, es posible que la conexión ya esté cerrado por el servidor de Postgres o MySQL. El tiempo de espera de conexión en los entornos de Cloud Composer se establece en alrededor de 10 minutos.

Solución:

  • Usa el decorador airflow.utils.db.provide_session. Este decorador proporciona una sesión válida a la base de datos de Airflow en el parámetro session y cierra la sesión de forma correcta al final de la función.
  • No uses una sola función de larga duración. En cambio, mueve todas las consultas de base de datos a funciones separadas, de modo que haya varias funciones con el decorador airflow.utils.db.provide_session. En este caso, las sesiones se cierran de forma automática después de recuperar los resultados de la consulta.

Control del tiempo de ejecución de DAG, tareas y ejecuciones paralelas de un mismo DAG

Si deseas controlar durante cuánto tiempo se ejecuta un solo DAG para un DAG en particular dura, puedes usar el parámetro de DAG dagrun_timeout así que Por ejemplo, si esperas que se ejecute un solo DAG (independientemente de finaliza con éxito o falla) no debe durar más de 1 hora y, luego, establece este parámetro en 3,600 segundos.

También puedes controlar cuánto tiempo permites que dure una sola tarea de Airflow. Tareas pendientes por lo que puedes usar execution_timeout.

Si quieres controlar cuántas ejecuciones de DAG activas quieres tener en un un DAG específico, puedes usar el [core]max-active-runs-per-dag la opción de configuración de Airflow para hacerlo.

Si solo quieres ejecutar una instancia de un DAG en un momento dado, establece El parámetro max-active-runs-per-dag como 1.

Problemas que afectan la sincronización de DAG y complementos con programadores, trabajadores y servidores web

Cloud Composer sincroniza el contenido de las carpetas /dags y /plugins a los programadores y trabajadores. Ciertos objetos en las carpetas /dags y /plugins podría impedir que esta sincronización funcione correctamente o, al menos, ralentizarla.

  • La carpeta /dags está sincronizada con programadores y trabajadores. Esta carpeta no está sincronizada a servidores web en Cloud Composer 2 o si activas DAG Serialization en Cloud Composer 1.

  • La carpeta /plugins está sincronizada con programadores, trabajadores y servidores web.

Puedes encontrar los siguientes problemas:

  • Subiste archivos comprimidos en gzip que usan transcodificación de compresión a /dags y /plugins individuales. Suele ocurrir si usas el comando gsutil cp -Z para subir datos al bucket.

    Solución: Borrar el objeto que usó la transcodificación de compresión y volver a subirlo al bucket.

  • Uno de los objetos se llama "."; dicho objeto no se sincroniza con entre los programadores y los trabajadores, y podría dejar de sincronizarse.

    Solución: Cambia el nombre del objeto problemático.

  • Una carpeta y un archivo de DAG de Python tienen los mismos nombres, por ejemplo, a.py. En este caso, el archivo DAG no está sincronizado correctamente con los componentes de Airflow.

    Solución: Quita la carpeta que tiene el mismo nombre que un archivo DAG de Python.

  • Uno de los objetos de las carpetas /dags o /plugins contiene un símbolo / al final del nombre del objeto. Estos objetos pueden confundir el proceso de sincronización. porque el símbolo / indica que un objeto es una carpeta, no un archivo.

    Solución: Quita el símbolo / del nombre del objeto problemático.

  • No almacenes archivos innecesarios en las carpetas /dags y /plugins.

    A veces, los DAG y los complementos que implementas incluyen archivos adicionales, como archivos que almacenan pruebas para estos componentes. Estos los archivos se sincronizan con los trabajadores y los programadores, y esto afecta el tiempo necesario para copiar estos archivos a programadores, trabajadores y servidores web.

    Solución: No almacenes ningún archivo adicional e innecesario en /dags y /plugins carpetas.

Los programadores y trabajadores generan el error Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...'

Este problema ocurre porque los objetos pueden tener espacios de nombres superpuestos en Cloud Storage y, al mismo tiempo, los programadores y los trabajadores usan sistemas de archivos tradicionales. Por ejemplo, es posible para agregar una carpeta y un objeto con el mismo nombre al archivo bucket. Cuando el bucket se sincroniza con los programadores y trabajadores del entorno se genera este error, lo que puede conducir a fallas en las tareas.

Para solucionar este problema, asegúrate de que no haya espacios de nombres superpuestos en el en el bucket de tu entorno. Por ejemplo, si tanto /dags/misc (un archivo) como Las /dags/misc/example_file.txt (otro archivo) están en un bucket, se produce un error generada por el programador.

Interrupciones transitorias durante la conexión a la base de datos de metadatos de Airflow

Cloud Composer se ejecuta en una infraestructura de nube distribuida. Significa que, de vez en cuando, pueden aparecer algunos problemas transitorios interrumpir la ejecución de tus tareas de Airflow.

En estas situaciones, es posible que veas los siguientes mensajes de error en las secciones de los trabajadores de Airflow registros:

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

o

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Las operaciones de mantenimiento también pueden causar este tipo de problemas intermitentes para tus entornos de Cloud Composer.

Por lo general, esos errores son intermitentes y si tus tareas de Airflow son idempotentes y tienes reintentos configurados, debes ser inmune a ellos. También puedes considera definir períodos de mantenimiento.

Una razón adicional para estos errores podría ser la falta de recursos en tu en el clúster del entorno. En esos casos, puede optimizar o escalar verticalmente entorno, como se describe en Entornos de escalamiento o Instrucciones para optimizar tu entorno.

Una ejecución de DAG se marca como exitosa, pero no tiene tareas ejecutadas

Si una ejecución de DAG execution_date es anterior a su start_date, Es posible que veas ejecuciones de DAG que no tienen ninguna ejecución de tareas, pero que aún están marcadas como exitosas.

Una ejecución exitosa del DAG sin tareas ejecutadas .
Figura 3: Una ejecución exitosa del DAG sin tareas ejecutadas (haz clic para ampliar)

Causa

Esto puede ocurrir en uno de los siguientes casos:

  • Una discrepancia se debe a la diferencia de zona horaria entre los execution_date y start_date. Puede ocurrir, por ejemplo, cuando usando pendulum.parse(...) para configurar start_date.

  • El start_date del DAG está configurado en un valor dinámico, por ejemplo airflow.utils.dates.days_ago(1)

Solución

  • Asegúrate de que execution_date y start_date usen la misma zona horaria.

  • Especifica un start_date estático y combínalo con catchup=False para evitar lo siguiente: DAG que ejecutan fechas de inicio anteriores a la actual.

Un DAG no es visible en la IU de Airflow o en la IU del DAG y el programador no lo programa

El procesador de DAG analiza cada DAG antes de que pueda ser programado por el programador. y antes de que un DAG se vuelva visible en la IU de Airflow o la IU de DAG.

Con las siguientes opciones de configuración de Airflow, se definen tiempos de espera para analizar los DAG:

Si un DAG no es visible en la IU de Airflow o en la IU del DAG, haz lo siguiente:

  • Verifica los registros del procesador del DAG si este puede procesar correctamente tu DAG. Si tienes problemas, es posible que veas las siguientes entradas de registro en los registros del procesador o programador de DAG:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
  • Verifica los registros del programador para ver si este funciona correctamente. En caso de Puedes ver las siguientes entradas de registro en los registros del programador:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496

Soluciones:

  • Corrige todos los errores de análisis del DAG. El procesador de DAG analiza varios DAG y, en casos poco comunes que analizan errores de un DAG pueden afectar negativamente el análisis de otros DAG.

  • Si el análisis de tu DAG demora más que la cantidad de segundos definida en [core]dagrun_import_timeout, y luego aumentará este tiempo de espera.

  • Si el análisis de todos tus DAG demora más que la cantidad de segundos definidos en [core]dag_file_processor_timeout, y luego aumentará este tiempo de espera.

  • Si tu DAG tarda mucho tiempo en analizarse, también puede significar que no está implementar de manera óptima. Por ejemplo, si lee muchas variables de entorno o que realice llamadas a servicios externos o Airflow, en la base de datos. En la medida de lo posible, evita realizar dichas operaciones en secciones globales de los DAG.

  • Aumenta los recursos de CPU y memoria de Scheduler para que pueda funcionar más rápido.

  • Ajusta la cantidad de programadores.

  • Aumenta la cantidad de procesos del procesador de DAG para que se pueda realizar el análisis más rápido. Puedes hacerlo aumentando el valor de [scheduler]parsing_process.

  • Disminuye la frecuencia del análisis del DAG.

  • Disminuye la carga en la base de datos de Airflow.

Síntomas de que la base de datos de Airflow está sobrecargada

Para obtener más información, consulta Síntomas de que la base de datos de Airflow está bajo presión.

¿Qué sigue?