Resolução de problemas do agendador do Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página fornece passos de resolução de problemas e informações para problemas comuns com os programadores e os processadores de DAGs do Airflow.

Identifique a origem do problema

Para iniciar a resolução de problemas, identifique se o problema ocorre:

  • No momento da análise do DAG, enquanto o DAG é analisado por um processador de DAGs do Airflow
  • No momento da execução, enquanto o DAG é processado por um programador do Airflow

Para mais informações acerca do tempo de análise e do tempo de execução, leia o artigo Diferença entre o tempo de análise do DAG e o tempo de execução do DAG.

Inspecione problemas de processamento de DAG

  1. Inspecione os registos do processador DAG.
  2. Verifique os tempos de análise de DAGs.

Monitorizar tarefas em execução e em fila

Para verificar se tem tarefas bloqueadas numa fila, siga estes passos.

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  3. Aceda ao separador Monitorização.

  4. No separador Monitorização, reveja o gráfico Tarefas do Airflow na secção Execuções de DAGs e identifique possíveis problemas. As tarefas do Airflow são tarefas que se encontram num estado de fila no Airflow e podem ser encaminhadas para a fila do agente do Celery ou do Kubernetes Executor. As tarefas em fila do Celery são instâncias de tarefas que são colocadas na fila do agente do Celery.

Resolução de problemas no momento da análise do DAG

As secções seguintes descrevem os sintomas e as potenciais correções para alguns problemas comuns no momento da análise do DAG.

Distribuição do número e do tempo das tarefas

O Airflow pode ter problemas ao agendar um grande número de DAGs ou tarefas ao mesmo tempo. Para evitar problemas com o agendamento, pode:

  • Ajuste os seus DAGs para usar um número menor de tarefas mais consolidadas.
  • Ajuste os intervalos de programação dos seus DAGs para distribuir as execuções de DAGs de forma mais uniforme ao longo do tempo.

Dimensionar a configuração do Airflow

O Airflow oferece opções de configuração do Airflow que controlam o número de tarefas e DAGs que o Airflow pode executar em simultâneo. Para definir estas opções de configuração, substitua os respetivos valores para o seu ambiente. Também pode definir alguns destes valores ao nível do DAG ou da tarefa.

  • Simultaneidade de trabalhadores

    O parâmetro [celery]worker_concurrency controla o número máximo de tarefas que um trabalhador do Airflow pode executar em simultâneo. Se multiplicar o valor deste parâmetro pelo número de trabalhadores do Airflow no seu ambiente do Cloud Composer, obtém o número máximo de tarefas que podem ser executadas num determinado momento no seu ambiente. Este número é limitado pela opção de configuração do [core]parallelism Airflow, que é descrita mais detalhadamente.

    Em ambientes do Cloud Composer 3, o valor predefinido de [celery]worker_concurrency é calculado automaticamente com base no número de instâncias de tarefas simultâneas simples que um worker pode acomodar. Isto significa que o respetivo valor depende dos limites de recursos dos trabalhadores. O valor de concorrência de trabalhadores não depende do número de trabalhadores no seu ambiente.

  • Execuções de DAG ativas máximas

    A opção de configuração do [core]max_active_runs_per_dagAirflow controla o número máximo de execuções de DAG ativas por DAG. O agendador não cria mais execuções de DAG se atingir este limite.

    Se este parâmetro estiver definido incorretamente, pode deparar-se com um problema em que o agendador limita a execução do DAG porque não consegue criar mais instâncias de execução do DAG num determinado momento.

    Também pode definir este valor ao nível do DAG com o parâmetro max_active_runs.

  • Máximo de tarefas ativas por DAG

    A opção de configuração do [core]max_active_tasks_per_dagAirflow controla o número máximo de instâncias de tarefas que podem ser executadas em simultâneo em cada DAG.

    Se este parâmetro estiver definido incorretamente, pode encontrar um problema em que a execução de uma única instância de DAG é lenta porque existe apenas um número limitado de tarefas de DAG que podem ser executadas num determinado momento. Neste caso, pode aumentar o valor desta opção de configuração.

    Também pode definir este valor ao nível do DAG com o parâmetro max_active_tasks.

    Pode usar os parâmetros max_active_tis_per_dag e max_active_tis_per_dagrun ao nível da tarefa para controlar quantas instâncias com um ID de tarefa específico podem ser executadas por DAG e por execução de DAG.

  • Paralelismo e tamanho do conjunto

    A opção de configuração [core]parallelismAirflow controla quantas tarefas o agendador do Airflow pode colocar em fila na fila do Executor depois de todos os requisitos destas tarefas serem cumpridos.

    Este é um parâmetro global para toda a configuração do Airflow.

    As tarefas são colocadas em fila e executadas num conjunto. Os ambientes do Cloud Composer usam apenas um pool. O tamanho deste conjunto controla o número de tarefas que o programador pode colocar em fila para execução num determinado momento. Se o tamanho do conjunto for demasiado pequeno, o programador não pode colocar tarefas em fila para execução, mesmo que os limites, que são definidos pela opção de configuração [core]parallelism e pela opção de configuração [celery]worker_concurrency multiplicada pelo número de trabalhadores do Airflow, ainda não tenham sido atingidos.

    Pode configurar o tamanho do conjunto na IU do Airflow (Menu > Admin > Pools). Ajuste o tamanho do conjunto ao nível de paralelismo que espera no seu ambiente.

    Normalmente, [core]parallelism é definido como um produto do número máximo de trabalhadores e [celery]worker_concurrency.

Resolução de problemas com tarefas em execução e em fila

As secções seguintes descrevem os sintomas e as potenciais correções para alguns problemas comuns com a execução e as tarefas em fila.

As execuções de DAG não são executadas

Sintoma:

Quando uma data de agendamento para um DAG é definida dinamicamente, isto pode originar vários efeitos secundários inesperados. Por exemplo:

  • Uma execução de DAG está sempre no futuro e o DAG nunca é executado.

  • As execuções de DAG anteriores estão marcadas como executadas e bem-sucedidas, apesar de não terem sido executadas.

Estão disponíveis mais informações na documentação do Apache Airflow.

Possíveis soluções:

  • Siga as recomendações na documentação do Apache Airflow.

  • Defina o start_date estático para DAGs. Em alternativa, pode usar catchup=False para desativar a execução do DAG para datas anteriores.

  • Evite usar datetime.now() ou days_ago(<number of days>), a menos que tenha conhecimento dos efeitos secundários desta abordagem.

Usar a funcionalidade TimeTable do programador do Airflow

As tabelas de tempo estão disponíveis a partir do Airflow 2.2.

Pode definir uma tabela de horários para um DAG com um dos seguintes métodos:

Também pode usar os horários incorporados.

Evite o agendamento de tarefas durante as janelas de manutenção

Pode definir períodos de manutenção para o seu ambiente, para que a manutenção desse ambiente ocorra fora dos horários em que executa os seus DAGs. Pode continuar a executar os seus DAGs durante os períodos de manutenção, desde que seja aceitável que algumas tarefas possam ser interrompidas e repetidas. Para mais informações sobre como os períodos de manutenção afetam o seu ambiente, consulte o artigo Especifique períodos de manutenção.

Utilização de "wait_for_downstream" nos seus DAGs

Se definir o parâmetro wait_for_downstream como True nos DAGs, para que uma tarefa seja bem-sucedida, todas as tarefas imediatamente a jusante desta tarefa também têm de ser bem-sucedidas. Significa que a execução de tarefas pertencentes a uma determinada execução de DAG pode ser abrandada pela execução de tarefas da execução de DAG anterior. Leia mais sobre esta funcionalidade na documentação do Airflow.

As tarefas em fila durante demasiado tempo são canceladas e reagendadas

Se uma tarefa do Airflow for mantida na fila durante demasiado tempo, o agendador vai reagendá-la para execução após o período definido na [scheduler]task_queued_timeoutopção de configuração do Airflow. O valor predefinido é 2400. Nas versões do Airflow anteriores à 2.3.1, a tarefa também é marcada como falhada e repetida se for elegível para uma repetição.

Uma forma de observar os sintomas desta situação é analisar o gráfico com o número de tarefas em fila ("Monitorização" no IU do Cloud Composer) e, se os picos neste gráfico não diminuírem no prazo de cerca de duas horas, as tarefas vão ser reagendadas (sem registos), seguidas de entradas de registo "As tarefas adotadas ainda estavam pendentes…" nos registos do agendador. Nesses casos, pode ver a mensagem "Log file is not found..." (Ficheiro de registo não encontrado...) nos registos de tarefas do Airflow porque a tarefa não foi executada.

Em geral, este comportamento é esperado e a próxima instância da tarefa agendada destina-se a ser executada de acordo com o agendamento. Se observar muitos casos deste tipo nos seus ambientes do Cloud Composer, pode significar que não existem trabalhadores do Airflow suficientes no seu ambiente para processar todas as tarefas agendadas.

Resolução: para resolver este problema, tem de se certificar de que existe sempre capacidade nos trabalhadores do Airflow para executar tarefas em fila. Por exemplo, pode aumentar o número de trabalhadores ou a worker_concurrency. Também pode ajustar o paralelismo ou os conjuntos para evitar colocar tarefas em fila mais do que a capacidade que tem.

As tarefas que estão bloqueadas na fila podem impedir a execução de um DAG específico

Para resolver este problema, atualize o seu ambiente para a versão 2.1.12 ou posterior do Cloud Composer.

Em casos normais, o agendador do Airflow deve conseguir lidar com situações em que existem tarefas na fila e, por algum motivo, não é possível executá-las corretamente (como quando um DAG ao qual estas tarefas pertencem foi eliminado).

Se estas tarefas não forem eliminadas pelo agendador, pode ter de as eliminar manualmente. Pode fazê-lo, por exemplo, na IU do Airflow (Menu > Navegador > Instâncias de tarefas), encontrar tarefas em fila e eliminá-las.

Abordagem do Cloud Composer ao parâmetro min_file_process_interval

O Cloud Composer altera a forma como o [scheduler]min_file_process_interval é usado pelo programador do Airflow.

Nas versões do Cloud Composer anteriores à 2.0.26, o parâmetro [scheduler]min_file_process_interval é ignorado.

Nas versões do Cloud Composer posteriores à 2.0.26:

O agendador do Airflow é reiniciado após um determinado número de vezes em que todos os DAGs são agendados e o parâmetro [scheduler]num_runs controla quantas vezes o agendador o faz. Quando o agendador atinge [scheduler]num_runs ciclos de agendamento, é reiniciado. O agendador é um componente sem estado e, por isso, o reinício é um mecanismo de autocorreção para quaisquer problemas que o agendador possa ter. O valor predefinido de [scheduler]num_runs é 5000.

[scheduler]min_file_process_interval pode ser usado para configurar a frequência com que a análise DAG ocorre, mas este parâmetro não pode ser superior ao tempo necessário para um agendador executar ciclos [scheduler]num_runs ao agendar os seus DAGs.

Marcar tarefas como falhadas após atingir o tempo limite dagrun_timeout

O agendador marca as tarefas que não estão concluídas (em execução, agendadas e em fila) como falhadas se uma execução de DAG não terminar no prazo de dagrun_timeout (um parâmetro de DAG).

Solução:

Sintomas de a base de dados do Airflow estar sob carga elevada

Por vezes, nas entradas de registo do agendador do Airflow, pode ver a seguinte entrada de registo de aviso:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Também podem ser observados sintomas semelhantes nos registos do trabalhador do Airflow:

Para o MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

Para o PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

Estes erros ou avisos podem ser um sintoma de a base de dados do Airflow estar sobrecarregada pelo número de ligações abertas ou pelo número de consultas executadas ao mesmo tempo, seja por programadores ou por outros componentes do Airflow, como trabalhadores, acionadores e servidores Web.

Possíveis soluções:

O servidor Web mostra o aviso "O agendador não parece estar em execução"

O programador comunica o seu batimento cardíaco regularmente à base de dados do Airflow. Com base nestas informações, o servidor Web do Airflow determina se o agendador está ativo.

Por vezes, se o programador estiver sobrecarregado, pode não conseguir comunicar o seu heartbeat a cada [scheduler]scheduler_heartbeat_sec.

Nessa situação, o servidor Web do Airflow pode apresentar o seguinte aviso:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

Possíveis soluções:

  • Aumente os recursos de CPU e memória para o programador.

  • Otimize os seus DAGs para que a respetiva análise e agendamento sejam mais rápidos e não consumam demasiados recursos do programador.

  • Evite usar variáveis globais em DAGs do Airflow. Em alternativa, use variáveis de ambiente e variáveis do Airflow.

  • Aumente o valor da opção de configuração do [scheduler]scheduler_health_check_threshold Airflow para que o servidor Web aguarde mais tempo antes de comunicar a indisponibilidade do programador.

Soluções alternativas para problemas encontrados durante o preenchimento de DAGs

Por vezes, pode querer executar novamente DAGs que já foram executados. Pode fazê-lo com um comando da CLI do Airflow da seguinte forma:

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

Para voltar a executar apenas as tarefas com falhas de um DAG específico, também pode usar o argumento --rerun-failed-tasks.

Substituir:

  • ENVIRONMENT_NAME com o nome do ambiente.
  • LOCATION com a região onde o ambiente está localizado.
  • START_DATE com um valor para o parâmetro start_date DAG, no formato YYYY-MM-DD.
  • END_DATE com um valor para o parâmetro end_date DAG, no formato YYYY-MM-DD.
  • DAG_NAME com o nome do DAG.

Por vezes, a operação de preenchimento pode gerar uma situação de impasse em que não é possível fazer um preenchimento porque existe um bloqueio numa tarefa. Por exemplo:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

Em alguns casos, pode usar as seguintes soluções alternativas para superar os bloqueios:

  • Desative o mini-agendador substituindo o [core]schedule_after_task_execution para False.

  • Execute preenchimentos para intervalos de datas mais restritos. Por exemplo, defina START_DATE e END_DATE para especificar um período de apenas 1 dia.

O que se segue?