Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Nesta página, você encontra etapas de solução de problemas e informações sobre problemas comuns com programadores do Airflow e processadores de DAGs.
Identificar a origem do problema
Para começar a solução de problemas, identifique se o problema acontece:
- No momento da análise do DAG, enquanto ele é analisado por um processador de DAG do Airflow
- No momento da execução, enquanto o DAG é processado por um programador do Airflow
Para mais informações sobre o tempo de análise e o tempo de execução do DAG, leia Diferença entre o tempo de análise do DAG e o tempo de execução do DAG.
Inspecionar problemas de processamento de DAGs
Como monitorar tarefas em execução ou na fila
Para verificar se há tarefas travadas em uma fila, siga estas etapas.
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Monitoramento.
Na guia Monitoramento, consulte o gráfico Tarefas do Airflow na seção Execuções do DAG e identifique possíveis problemas. As tarefas do Airflow são tarefas que estão em estado de fila no Airflow. Elas podem ir para a fila de agentes do executor do Celery ou do Kubernetes. As tarefas em fila do Celery são instâncias de tarefas colocadas na fila de agentes do Celery.
Solução de problemas no momento da análise do DAG
As seções a seguir descrevem sintomas e possíveis correções para alguns problemas comuns no tempo de análise do DAG.
Análise e programação de DAGs no Cloud Composer 1 e no Airflow 1
A eficiência da análise de DAGs foi significativamente melhorada no Airflow 2. Se você tiver problemas de desempenho relacionados à análise e programação de DAGs, considere migrar para o Airflow 2.
No Cloud Composer 1, o programador é executado em nós de cluster com outros componentes do Cloud Composer. Por isso, a carga dos nós de cluster individuais pode ser maior ou menor em comparação com outros nós. O desempenho do programador (análise e programação de DAGs) pode variar dependendo do nó em que ele é executado. Além disso, um nó individual em que o programador é executado pode mudar como resultado de operações de upgrade ou manutenção. Essa limitação foi resolvida no Cloud Composer 2, em que é possível alocar recursos de CPU e memória ao programador. Além disso, o desempenho dele não depende da carga dos nós do cluster.
Número e distribuição de tempo das tarefas
O Airflow pode ter problemas ao programar um grande número de DAGs ou tarefas ao mesmo tempo. Para evitar problemas com o agendamento, você pode:
- Ajuste seus DAGs para usar um número menor de tarefas mais consolidadas.
- Ajuste os intervalos de programação dos seus DAGs para distribuir as execuções de DAGs de maneira mais uniforme ao longo do tempo.
Como escalonar a configuração do Airflow
O Airflow oferece opções de configuração que controlam quantas tarefas e DAGs o Airflow pode executar ao mesmo tempo. Para definir essas opções de configuração, modifique os valores para o ambiente. Também é possível definir alguns desses valores no nível da DAG ou da tarefa.
-
O parâmetro
[celery]worker_concurrency
controla o número máximo de tarefas que um worker do Airflow pode executar ao mesmo tempo. Se você multiplicar o valor desse parâmetro pelo número de workers do Airflow no ambiente do Cloud Composer, você receberá o número máximo de tarefas que podem ser executadas em um determinado momento no ambiente. Esse número é limitado pela opção de configuração[core]parallelism
do Airflow, que é descrita em mais detalhes. Máximo de execuções de DAGs ativas
A opção de configuração
[core]max_active_runs_per_dag
do Airflow controla o número máximo de execuções ativas de DAGs por DAG. O programador não criará mais execuções de DAGs se atingir esse limite.Se esse parâmetro for definido incorretamente, você poderá encontrar um problema em que o programador restringe a execução do DAG, porque não é possível criar mais instâncias de execução do DAG em um determinado momento.
Também é possível definir esse valor no nível do DAG com o parâmetro
max_active_runs
.Máximo de tarefas ativas por DAG
A opção de configuração
[core]max_active_tasks_per_dag
do Airflow controla o número máximo de instâncias de tarefa que podem ser executadas simultaneamente em cada DAG.Se esse parâmetro for definido incorretamente, você poderá encontrar um problema em que a execução de uma única instância do DAG é lenta porque há apenas um número limitado de tarefas do DAG que podem ser executadas em um determinado momento Nesse caso, é possível aumentar o valor dessa opção de configuração.
Também é possível definir esse valor no nível do DAG com o parâmetro
max_active_tasks
.É possível usar os parâmetros
max_active_tis_per_dag
emax_active_tis_per_dagrun
no nível da tarefa para controlar quantas instâncias com um ID de tarefa específico podem ser executadas por DAG e por execução de DAG.Paralelismo e tamanho do pool
A opção de configuração
[core]parallelism
do Airflow controla quantas tarefas o programador do Airflow pode enfileirar na fila do executor após todas as dependências dessas tarefas serem atendidas.Este é um parâmetro global para toda a configuração do Airflow.
As tarefas são enfileiradas e executadas em um pool. Os ambientes do Cloud Composer usam apenas um pool. O tamanho desse pool controla quantas tarefas podem ser enfileiradas pelo programador para execução em um determinado momento. Se o tamanho do pool for muito pequeno, o programador não poderá enfileirar tarefas para execução, mesmo que os limites, definidos pela opção de configuração
[core]parallelism
e pela opção de configuração[celery]worker_concurrency
multiplicada pelo número de workers do Airflow, ainda não tenham sido atingidos.É possível configurar o tamanho do pool na interface do Airflow (Menu > Administrador > Pools). Ajuste o tamanho do pool com o nível de paralelismo esperado no ambiente.
Normalmente,
[core]parallelism
é definido como um produto do número máximo de workers e[celery]worker_concurrency
.
Solução de problemas com tarefas em execução e na fila
As seções a seguir descrevem sintomas e possíveis correções para alguns problemas comuns com tarefas em execução e na fila.
As execuções de DAG não são realizadas
Sintoma:
Quando uma data de programação para um DAG é definida dinamicamente, isso pode causar vários efeitos colaterais inesperados. Exemplo:
Uma execução de DAG está sempre no futuro, e o DAG nunca é executado.
As execuções anteriores de DAG são marcadas como executadas e bem-sucedidas, mesmo que não tenham sido executadas.
Mais informações estão disponíveis na documentação do Apache Airflow.
Possíveis soluções:
Siga as recomendações na documentação do Apache Airflow.
Defina
start_date
estático para DAGs. Como opção, usecatchup=False
para desativar a execução do DAG em datas anteriores.Evite usar
datetime.now()
oudays_ago(<number of days>)
, a menos que você conheça os efeitos colaterais dessa abordagem.
Usar o recurso TimeTable do programador do Airflow
As tabelas de tempo estão disponíveis a partir do Airflow 2.2.
É possível definir uma tabela de horários para um DAG com um dos seguintes métodos:
- Com uma função Python
- (Não disponível no Cloud Composer 1) Com um plug-in personalizado
Você também pode usar os horários integrados.
Recursos de cluster limitados
É possível que ocorram problemas de desempenho se o cluster do GKE do ambiente for muito pequeno para processar todos os DAGs e tarefas. Nesse caso, tente uma destas soluções:
- Crie um novo ambiente com um tipo de máquina que ofereça mais desempenho e migre seus DAGs para ele.
- Criar mais ambientes do Cloud Composer e dividir os DAGs entre eles
- Altere o tipo de máquina dos nós do GKE, conforme descrito em Como fazer upgrade do tipo de máquina para os nós do GKE. Como esse procedimento é propenso a erros, é a opção menos recomendada.
- Faça upgrade do tipo de máquina da instância do Cloud SQL que executa o banco de dados do Airflow no ambiente, por exemplo, usando os comandos
gcloud composer environments update
. O baixo desempenho do banco de dados do Airflow pode ser o motivo da lentidão do programador.
Evite programar tarefas durante janelas de manutenção
É possível definir janelas de manutenção para seu ambiente. Assim, a manutenção dele acontece fora dos horários em que você executa os DAGs. Você ainda pode executar seus DAGs durante as janelas de manutenção, desde que seja aceitável que algumas tarefas possam ser interrompidas e repetidas. Para mais informações sobre como as janelas de manutenção afetam seu ambiente, consulte Especificar janelas de manutenção.
Uso de "wait_for_downstream" nos DAGs
Se você definir o parâmetro wait_for_downstream
como True
nos DAGs, para que uma tarefa seja bem-sucedida, todas as tarefas que estiverem imediatamente downstream também
serão bem-sucedidas. de dados. Isso significa que a execução de tarefas pertencentes a uma determinada execução do DAG pode ser reduzida pela execução de tarefas da execução anterior do DAG. Leia mais sobre isso na
documentação do Airflow.
As tarefas em fila por muito tempo serão canceladas e reprogramadas
Se uma tarefa do Airflow ficar na fila por muito tempo, o programador
vai reprogramá-la para execução depois que o período definido na
opção de configuração [scheduler]task_queued_timeout
do Airflow passar. O valor padrão é 2400
.
Em versões do Airflow anteriores à 2.3.1, a tarefa também é marcada como falha e
repetida se estiver qualificada para uma nova tentativa.
Uma maneira de observar os sintomas dessa situação é analisar o gráfico com o número de tarefas enfileiradas (guia "Monitoramento" na interface do Cloud Composer). Se os picos nesse gráfico não diminuírem em cerca de duas horas, as tarefas provavelmente serão reagendadas (sem registros), seguidas por entradas de registro "As tarefas adotadas ainda estavam pendentes..." nos registros do programador. Nesses casos, talvez você veja a mensagem "O arquivo de registro não foi encontrado..." nos registros de tarefas do Airflow porque a tarefa não foi executada.
Em geral, esse comportamento é esperado, e a próxima instância da tarefa programada será executada de acordo com a programação. Se você observar muitos casos desse tipo nos seus ambientes do Cloud Composer, isso pode significar que não há workers do Airflow suficientes para processar todas as tarefas programadas.
Resolução: para resolver esse problema, verifique se sempre há capacidade nos workers do Airflow para executar tarefas enfileiradas. Por exemplo, você pode aumentar o número de workers ou worker_concurrency. Você também pode ajustar o paralelismo ou os pools para evitar enfileirar mais tarefas do que a capacidade disponível.
Abordagem do Cloud Composer para o parâmetro min_file_process_interval
O Cloud Composer muda a maneira como o
[scheduler]min_file_process_interval
é usado pelo programador do Airflow.
Airflow 1
No caso do Cloud Composer usando o Airflow 1, os usuários podem definir o valor de [scheduler]min_file_process_interval
entre 0 e 600 segundos. Valores
maiores que 600 segundos trazem os mesmos resultados que se
[scheduler]min_file_process_interval
estiver definido como 600 segundos.
Airflow 2
Em versões do Cloud Composer anteriores à 1.19.9, [scheduler]min_file_process_interval
é ignorado.
Versões do Cloud Composer posteriores à 1.19.9:
O programador do Airflow é reiniciado depois de um certo número de vezes que todos os DAGs são programados, e o parâmetro [scheduler]num_runs
controla quantas vezes isso é feito pelo programador. Quando o programador
atinge [scheduler]num_runs
loops de programação, ele é reiniciado. O programador é um componente sem estado, e essa reinicialização é um mecanismo de recuperação automática para problemas que ele possa ter. O valor padrão de
[scheduler]num_runs
é 5000.
[scheduler]min_file_process_interval
pode ser usado para configurar a frequência com que a análise do DAG acontece, mas esse parâmetro não pode ser maior do que o tempo necessário para um programador realizar loops [scheduler]num_runs
ao programar seus DAGs.
Marcando tarefas como falhas após atingir dagrun_timeout
O programador marca como falhas as tarefas que não são concluídas (em execução, programadas e enfileiradas) se uma execução de DAG não for concluída em dagrun_timeout
(um parâmetro de DAG).
Solução:
Estenda
dagrun_timeout
para atender ao tempo limite.
Sintomas de carga pesada no banco de dados do Airflow
Às vezes, nos registros do programador do Airflow, você pode encontrar a seguinte entrada de registro de aviso:
Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"
Sintomas semelhantes também podem ser observados nos registros do worker do Airflow:
Para MySQL:
(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"
Para PostgreSQL:
psycopg2.OperationalError: connection to server at ... failed
Esses erros ou avisos podem ser um sintoma de que o banco de dados do Airflow está sobrecarregado pelo número de conexões abertas ou de consultas executadas ao mesmo tempo, seja por programadores ou por outros componentes do Airflow como workers, acionadores e servidores da Web.
Possíveis soluções:
Aumente a escala do banco de dados do Airflow mudando o tipo de máquina da instância do Cloud SQL que armazena o banco de dados do Airflow do seu ambiente.
Evite usar variáveis globais em DAGs do Airflow. Em vez disso, use variáveis de ambiente e variáveis do Airflow.
Defina
[scheduler]scheduler_heartbeat_sec
como um valor mais alto, por exemplo, 15 segundos ou mais.Defina
[scheduler]job_heartbeat_sec
como um valor maior, por exemplo, 30 segundos ou mais.Defina
[scheduler]scheduler_health_check_threshold
como um valor igual a[scheduler]job_heartbeat_sec
multiplicado por4
.
O servidor da Web mostra o aviso "O programador não parece estar em execução"
O programador informa regularmente o sinal de funcionamento ao banco de dados do Airflow. Com base nessas informações, o servidor da Web do Airflow determina se o agendador está ativo.
Às vezes, se o programador estiver sobrecarregado, talvez não seja possível
informar o heartbeat a cada
[scheduler]scheduler_heartbeat_sec
.
Nessa situação, o servidor da Web do Airflow pode mostrar o seguinte aviso:
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
Possíveis soluções:
Aumente os recursos de CPU e memória para o programador.
Otimize seus DAGs para que a análise e o agendamento sejam mais rápidos e não consumam muitos recursos do programador.
Evite usar variáveis globais em DAGs do Airflow. Em vez disso, use variáveis de ambiente e variáveis do Airflow.
Aumente o valor da opção de configuração do Airflow
[scheduler]scheduler_health_check_threshold
para que o servidor da Web espere mais tempo antes de informar a indisponibilidade do programador.
Soluções alternativas para problemas encontrados durante o backfill de DAGs
Às vezes, você pode querer executar novamente DAGs que já foram executados. É possível fazer isso com um comando da CLI do Airflow da seguinte maneira:
Airflow 2
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Para executar novamente apenas as tarefas com falha de um DAG específico, use também o argumento
--rerun-failed-tasks
.
Airflow 1
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Para executar novamente apenas as tarefas com falha de um DAG específico, use também o argumento
--rerun_failed_tasks
.
Substitua:
ENVIRONMENT_NAME
pelo nome do ambienteLOCATION
pela região em que o ambiente está localizado;START_DATE
com um valor para o parâmetrostart_date
do DAG, no formatoYYYY-MM-DD
.END_DATE
com um valor para o parâmetroend_date
do DAG, no formatoYYYY-MM-DD
.DAG_NAME
pelo nome do DAG.
Às vezes, a operação de backfill pode gerar um deadlock em que um backfill não é possível porque há um bloqueio em uma tarefa. Exemplo:
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
Em alguns casos, é possível usar as seguintes soluções alternativas para evitar deadlocks:
Para desativar o miniagendador, substitua o
[core]schedule_after_task_execution
porFalse
.Execute preenchimentos para períodos mais curtos. Por exemplo, defina
START_DATE
eEND_DATE
para especificar um período de apenas um dia.