Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página fornece passos de resolução de problemas e informações para problemas comuns com os programadores e os processadores de DAGs do Airflow.
Identifique a origem do problema
Para iniciar a resolução de problemas, identifique se o problema ocorre:
- No momento da análise do DAG, enquanto o DAG é analisado por um processador de DAGs do Airflow
- No momento da execução, enquanto o DAG é processado por um programador do Airflow
Para mais informações acerca do tempo de análise e do tempo de execução, leia o artigo Diferença entre o tempo de análise do DAG e o tempo de execução do DAG.
Inspecione problemas de processamento de DAG
Monitorizar tarefas em execução e em fila
Para verificar se tem tarefas bloqueadas numa fila, siga estes passos.
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.
Aceda ao separador Monitorização.
No separador Monitorização, reveja o gráfico Tarefas do Airflow na secção Execuções de DAGs e identifique possíveis problemas. As tarefas do Airflow são tarefas que se encontram num estado de fila no Airflow e podem ser encaminhadas para a fila do agente do Celery ou do Kubernetes Executor. As tarefas em fila do Celery são instâncias de tarefas que são colocadas na fila do agente do Celery.
Resolução de problemas no momento da análise do DAG
As secções seguintes descrevem os sintomas e as potenciais correções para alguns problemas comuns no momento da análise do DAG.
Distribuição do número e do tempo das tarefas
O Airflow pode ter problemas ao agendar um grande número de DAGs ou tarefas ao mesmo tempo. Para evitar problemas com o agendamento, pode:
- Ajuste os seus DAGs para usar um número menor de tarefas mais consolidadas.
- Ajuste os intervalos de programação dos seus DAGs para distribuir as execuções de DAGs de forma mais uniforme ao longo do tempo.
Dimensionar a configuração do Airflow
O Airflow oferece opções de configuração do Airflow que controlam o número de tarefas e DAGs que o Airflow pode executar em simultâneo. Para definir estas opções de configuração, substitua os respetivos valores para o seu ambiente. Também pode definir alguns destes valores ao nível do DAG ou da tarefa.
Simultaneidade de trabalhadores
O parâmetro
[celery]worker_concurrency
controla o número máximo de tarefas que um trabalhador do Airflow pode executar em simultâneo. Se multiplicar o valor deste parâmetro pelo número de trabalhadores do Airflow no seu ambiente do Cloud Composer, obtém o número máximo de tarefas que podem ser executadas num determinado momento no seu ambiente. Este número é limitado pela opção de configuração do[core]parallelism
Airflow, que é descrita mais detalhadamente.Em ambientes do Cloud Composer 3, o valor predefinido de
[celery]worker_concurrency
é calculado automaticamente com base no número de instâncias de tarefas simultâneas simples que um worker pode acomodar. Isto significa que o respetivo valor depende dos limites de recursos dos trabalhadores. O valor de concorrência de trabalhadores não depende do número de trabalhadores no seu ambiente.Execuções de DAG ativas máximas
A opção de configuração do
[core]max_active_runs_per_dag
Airflow controla o número máximo de execuções de DAG ativas por DAG. O agendador não cria mais execuções de DAG se atingir este limite.Se este parâmetro estiver definido incorretamente, pode deparar-se com um problema em que o agendador limita a execução do DAG porque não consegue criar mais instâncias de execução do DAG num determinado momento.
Também pode definir este valor ao nível do DAG com o parâmetro
max_active_runs
.Máximo de tarefas ativas por DAG
A opção de configuração do
[core]max_active_tasks_per_dag
Airflow controla o número máximo de instâncias de tarefas que podem ser executadas em simultâneo em cada DAG.Se este parâmetro estiver definido incorretamente, pode encontrar um problema em que a execução de uma única instância de DAG é lenta porque existe apenas um número limitado de tarefas de DAG que podem ser executadas num determinado momento. Neste caso, pode aumentar o valor desta opção de configuração.
Também pode definir este valor ao nível do DAG com o parâmetro
max_active_tasks
.Pode usar os parâmetros
max_active_tis_per_dag
emax_active_tis_per_dagrun
ao nível da tarefa para controlar quantas instâncias com um ID de tarefa específico podem ser executadas por DAG e por execução de DAG.Paralelismo e tamanho do conjunto
A opção de configuração
[core]parallelism
Airflow controla quantas tarefas o agendador do Airflow pode colocar em fila na fila do Executor depois de todos os requisitos destas tarefas serem cumpridos.Este é um parâmetro global para toda a configuração do Airflow.
As tarefas são colocadas em fila e executadas num conjunto. Os ambientes do Cloud Composer usam apenas um pool. O tamanho deste conjunto controla o número de tarefas que o programador pode colocar em fila para execução num determinado momento. Se o tamanho do conjunto for demasiado pequeno, o programador não pode colocar tarefas em fila para execução, mesmo que os limites, que são definidos pela opção de configuração
[core]parallelism
e pela opção de configuração[celery]worker_concurrency
multiplicada pelo número de trabalhadores do Airflow, ainda não tenham sido atingidos.Pode configurar o tamanho do conjunto na IU do Airflow (Menu > Admin > Pools). Ajuste o tamanho do conjunto ao nível de paralelismo que espera no seu ambiente.
Normalmente,
[core]parallelism
é definido como um produto do número máximo de trabalhadores e[celery]worker_concurrency
.
Resolução de problemas com tarefas em execução e em fila
As secções seguintes descrevem os sintomas e as potenciais correções para alguns problemas comuns com a execução e as tarefas em fila.
As execuções de DAG não são executadas
Sintoma:
Quando uma data de agendamento para um DAG é definida dinamicamente, isto pode originar vários efeitos secundários inesperados. Por exemplo:
Uma execução de DAG está sempre no futuro e o DAG nunca é executado.
As execuções de DAG anteriores estão marcadas como executadas e bem-sucedidas, apesar de não terem sido executadas.
Estão disponíveis mais informações na documentação do Apache Airflow.
Possíveis soluções:
Siga as recomendações na documentação do Apache Airflow.
Defina o
start_date
estático para DAGs. Em alternativa, pode usarcatchup=False
para desativar a execução do DAG para datas anteriores.Evite usar
datetime.now()
oudays_ago(<number of days>)
, a menos que tenha conhecimento dos efeitos secundários desta abordagem.
Usar a funcionalidade TimeTable do programador do Airflow
As tabelas de tempo estão disponíveis a partir do Airflow 2.2.
Pode definir uma tabela de horários para um DAG com um dos seguintes métodos:
Também pode usar os horários incorporados.
Evite o agendamento de tarefas durante as janelas de manutenção
Pode definir períodos de manutenção para o seu ambiente, para que a manutenção desse ambiente ocorra fora dos horários em que executa os seus DAGs. Pode continuar a executar os seus DAGs durante os períodos de manutenção, desde que seja aceitável que algumas tarefas possam ser interrompidas e repetidas. Para mais informações sobre como os períodos de manutenção afetam o seu ambiente, consulte o artigo Especifique períodos de manutenção.
Utilização de "wait_for_downstream" nos seus DAGs
Se definir o parâmetro wait_for_downstream
como True
nos DAGs, para que uma tarefa seja bem-sucedida, todas as tarefas imediatamente a jusante desta tarefa também têm de ser bem-sucedidas. Significa que a execução de tarefas pertencentes a uma determinada execução de DAG pode ser abrandada pela execução de tarefas da execução de DAG anterior. Leia mais sobre esta funcionalidade na documentação do Airflow.
As tarefas em fila durante demasiado tempo são canceladas e reagendadas
Se uma tarefa do Airflow for mantida na fila durante demasiado tempo, o agendador
vai reagendá-la para execução após o período definido na
[scheduler]task_queued_timeout
opção de configuração do Airflow. O valor predefinido é 2400
.
Nas versões do Airflow anteriores à 2.3.1, a tarefa também é marcada como falhada e
repetida se for elegível para uma repetição.
Uma forma de observar os sintomas desta situação é analisar o gráfico com o número de tarefas em fila ("Monitorização" no IU do Cloud Composer) e, se os picos neste gráfico não diminuírem no prazo de cerca de duas horas, as tarefas vão ser reagendadas (sem registos), seguidas de entradas de registo "As tarefas adotadas ainda estavam pendentes…" nos registos do agendador. Nesses casos, pode ver a mensagem "Log file is not found..." (Ficheiro de registo não encontrado...) nos registos de tarefas do Airflow porque a tarefa não foi executada.
Em geral, este comportamento é esperado e a próxima instância da tarefa agendada destina-se a ser executada de acordo com o agendamento. Se observar muitos casos deste tipo nos seus ambientes do Cloud Composer, pode significar que não existem trabalhadores do Airflow suficientes no seu ambiente para processar todas as tarefas agendadas.
Resolução: para resolver este problema, tem de se certificar de que existe sempre capacidade nos trabalhadores do Airflow para executar tarefas em fila. Por exemplo, pode aumentar o número de trabalhadores ou a worker_concurrency. Também pode ajustar o paralelismo ou os conjuntos para evitar colocar tarefas em fila mais do que a capacidade que tem.
As tarefas que estão bloqueadas na fila podem impedir a execução de um DAG específico
Para resolver este problema, atualize o seu ambiente para a versão 2.1.12 ou posterior do Cloud Composer.
Em casos normais, o agendador do Airflow deve conseguir lidar com situações em que existem tarefas na fila e, por algum motivo, não é possível executá-las corretamente (como quando um DAG ao qual estas tarefas pertencem foi eliminado).
Se estas tarefas não forem eliminadas pelo agendador, pode ter de as eliminar manualmente. Pode fazê-lo, por exemplo, na IU do Airflow (Menu > Navegador > Instâncias de tarefas), encontrar tarefas em fila e eliminá-las.
Abordagem do Cloud Composer ao parâmetro min_file_process_interval
O Cloud Composer altera a forma como o
[scheduler]min_file_process_interval
é usado pelo programador do Airflow.
Nas versões do Cloud Composer anteriores à 2.0.26, o parâmetro [scheduler]min_file_process_interval
é ignorado.
Nas versões do Cloud Composer posteriores à 2.0.26:
O agendador do Airflow é reiniciado após um determinado número de vezes em que todos os DAGs são agendados e o parâmetro [scheduler]num_runs
controla quantas vezes o agendador o faz. Quando o agendador atinge [scheduler]num_runs
ciclos de agendamento, é reiniciado. O agendador é um componente sem estado e, por isso, o reinício é um mecanismo de autocorreção para quaisquer problemas que o agendador possa ter. O valor predefinido de
[scheduler]num_runs
é 5000.
[scheduler]min_file_process_interval
pode ser usado para configurar a frequência com que a análise DAG ocorre, mas este parâmetro não pode ser superior ao tempo necessário para um agendador executar ciclos [scheduler]num_runs
ao agendar os seus DAGs.
Marcar tarefas como falhadas após atingir o tempo limite dagrun_timeout
O agendador marca as tarefas que não estão concluídas (em execução, agendadas e em fila)
como falhadas se uma execução de DAG não terminar no prazo de
dagrun_timeout
(um parâmetro de DAG).
Solução:
Prolongue
dagrun_timeout
para cumprir o limite de tempo.Aumente o número de trabalhadores ou aumente os parâmetros de desempenho dos trabalhadores, para que o DAG seja executado mais rapidamente.
Sintomas de a base de dados do Airflow estar sob carga elevada
Por vezes, nas entradas de registo do agendador do Airflow, pode ver a seguinte entrada de registo de aviso:
Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"
Também podem ser observados sintomas semelhantes nos registos do trabalhador do Airflow:
Para o MySQL:
(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"
Para o PostgreSQL:
psycopg2.OperationalError: connection to server at ... failed
Estes erros ou avisos podem ser um sintoma de a base de dados do Airflow estar sobrecarregada pelo número de ligações abertas ou pelo número de consultas executadas ao mesmo tempo, seja por programadores ou por outros componentes do Airflow, como trabalhadores, acionadores e servidores Web.
Possíveis soluções:
Aumente a escala da base de dados do Airflow ajustando o tamanho do seu ambiente.
Reduza o número de programadores. Na maioria dos casos, um ou dois programadores são suficientes para analisar e programar tarefas do Airflow. Não é recomendado configurar mais de dois programadores, a menos que haja um motivo justificado.
Evite usar variáveis globais em DAGs do Airflow. Em alternativa, use variáveis de ambiente e variáveis do Airflow.
Defina
[scheduler]scheduler_heartbeat_sec
para um valor mais elevado, por exemplo, 15 segundos ou mais.Defina
[scheduler]job_heartbeat_sec
para um valor superior, por exemplo, 30 segundos ou mais.Defina
[scheduler]scheduler_health_check_threshold
para um valor igual a[scheduler]job_heartbeat_sec
multiplicado por4
.
O servidor Web mostra o aviso "O agendador não parece estar em execução"
O programador comunica o seu batimento cardíaco regularmente à base de dados do Airflow. Com base nestas informações, o servidor Web do Airflow determina se o agendador está ativo.
Por vezes, se o programador estiver sobrecarregado, pode não conseguir comunicar o seu heartbeat a cada [scheduler]scheduler_heartbeat_sec
.
Nessa situação, o servidor Web do Airflow pode apresentar o seguinte aviso:
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
Possíveis soluções:
Aumente os recursos de CPU e memória para o programador.
Otimize os seus DAGs para que a respetiva análise e agendamento sejam mais rápidos e não consumam demasiados recursos do programador.
Evite usar variáveis globais em DAGs do Airflow. Em alternativa, use variáveis de ambiente e variáveis do Airflow.
Aumente o valor da opção de configuração do
[scheduler]scheduler_health_check_threshold
Airflow para que o servidor Web aguarde mais tempo antes de comunicar a indisponibilidade do programador.
Soluções alternativas para problemas encontrados durante o preenchimento de DAGs
Por vezes, pode querer executar novamente DAGs que já foram executados. Pode fazê-lo com um comando da CLI do Airflow da seguinte forma:
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Para voltar a executar apenas as tarefas com falhas de um DAG específico, também pode usar o argumento --rerun-failed-tasks
.
Substituir:
ENVIRONMENT_NAME
com o nome do ambiente.LOCATION
com a região onde o ambiente está localizado.START_DATE
com um valor para o parâmetrostart_date
DAG, no formatoYYYY-MM-DD
.END_DATE
com um valor para o parâmetroend_date
DAG, no formatoYYYY-MM-DD
.DAG_NAME
com o nome do DAG.
Por vezes, a operação de preenchimento pode gerar uma situação de impasse em que não é possível fazer um preenchimento porque existe um bloqueio numa tarefa. Por exemplo:
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
Em alguns casos, pode usar as seguintes soluções alternativas para superar os bloqueios:
Desative o mini-agendador substituindo o
[core]schedule_after_task_execution
paraFalse
.Execute preenchimentos para intervalos de datas mais restritos. Por exemplo, defina
START_DATE
eEND_DATE
para especificar um período de apenas 1 dia.