Cloud Composer 1 | Cloud Composer 2
Nesta página, você verá etapas de solução de problemas e informações para problemas comuns com os programadores do Airflow.
Identificar a origem do problema
Para começar a solução de problemas, identifique se o problema acontece no tempo de análise do DAG ou durante o processamento de tarefas no momento da execução. Para mais informações sobre o tempo de análise e o tempo de execução do DAG, leia Diferença entre o tempo de análise do DAG e o tempo de execução do DAG.
Como inspecionar registros do processador de DAG
Se você tiver DAGs complexos, o processador de DAG, que é executado pelo programador, pode não analisar todos os DAGs. Isso pode levar a muitos problemas com os sintomas a seguir.
Sintomas:
Se o processador de DAG encontrar problemas ao analisar seus DAGs, isso pode levar a uma combinação dos problemas listados abaixo. Se os DAGs forem gerados dinamicamente, esses problemas podem ser mais impactantes em comparação com os DAGs estáticos.
Os DAGs não são visíveis na IU do Airflow e na IU do DAG.
Os DAGs não estão programados para execução.
Há erros nos registros do processador de DAG, por exemplo:
dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR - Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
ou
dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR - Processor for /home/airflow/gcs/dags/dag-example.py exited with return code 1.
Os programadores do Airflow enfrentam problemas que levam a reinicializações do programador.
As tarefas do Airflow programadas para execução são canceladas, e as execuções de DAGs que não foram analisadas podem ser marcadas como
failed
. Exemplo:airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1 manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag 'dag-example'. Marking it as removed.
Solução:
Aumente os parâmetros relacionados à análise do DAG:
Aumente dagbag-import-timeout para pelo menos 120 segundos (ou mais, se necessário).
Aumente dag-file-processor-timeout para pelo menos 180 segundos (ou mais, se necessário). Esse valor precisa ser maior que
dagbag-import-timeout
.
Corrija ou remova os DAGs que causam problemas ao processador de DAG.
Como inspecionar tempos de análise do DAG
Para verificar se o problema acontece no momento da análise do DAG, siga estas etapas.
Console
No console do Google Cloud, use a página Monitoramento e a guia Registros para inspecionar os tempos de análise do DAG.
Inspecione os tempos de análise do DAG com a página de monitoramento do Cloud Composer:
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Monitoramento é aberta.
Na guia Monitoramento, consulte o gráfico Tempo total de análise de todos os arquivos DAG na seção Execuções do DAG e identifique possíveis problemas.
Inspecione os tempos de análise do DAG com a guia Registros do Cloud Composer:
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Monitoramento é aberta.
Acesse a guia Registros e, na árvore de navegação Todos os registros, selecione a seção gerenciador do processador DAG.
Analise os registros
dag-processor-manager
e identifique possíveis problemas.
gcloud: Airflow 1
Use o comando list_dags
com a sinalização -r
para ver o tempo de análise
de todos os seus DAGs.
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
list_dags -- -r
Substitua:
ENVIRONMENT_NAME
pelo nome do ambienteLOCATION
pela região em que o ambiente está localizado;
A saída do comando será semelhante a esta:
-------------------------------------------------------------------
DagBag loading stats for /home/airflow/gcs/dags
-------------------------------------------------------------------
Number of DAGs: 5
Total task number: 13
DagBag parsing time: 0.6765180000000001
-----------+----------+---------+----------+-----------------------
file | duration | dag_num | task_num | dags
-----------+----------+---------+----------+-----------------------
/dag_1.py | 0.6477 | 1 | 2 | ['dag_1']
/dag_2.py | 0.018652 | 1 | 2 | ['dag_2']
/dag_3.py | 0.004024 | 1 | 6 | ['dag_3']
/dag_4.py | 0.003476 | 1 | 2 | ['dag_4']
/dag_5.py | 0.002666 | 1 | 1 | ['dag_5']
-----------+----------+---------+----------+-----------------------
Procure o valor de Tempo de análise do DagBag. Um valor grande pode indicar que um dos seus DAGs não está implementado da maneira ideal. Na tabela de respostas, é possível identificar quais DAGs têm um longo tempo de análise.
gcloud: Airflow 2
Use o comando dags report
para conferir o tempo de análise de todos os DAGs.
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags report
Substitua:
ENVIRONMENT_NAME
pelo nome do ambienteLOCATION
pela região em que o ambiente está localizado;
A saída do comando será semelhante a esta:
Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file | duration | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py | 0:00:00.038334 | 2 | 10 | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1 | 1 | airflow_monitoring
Procure o valor de duration para cada um dos DAGs listados na tabela. Um valor grande pode indicar que um dos DAGs não foi implementado da maneira ideal. Na tabela de saída, é possível identificar quais DAGs têm um longo tempo de análise.
Como monitorar tarefas em execução ou na fila
Para verificar se há tarefas travadas em uma fila, siga estas etapas.
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Monitoramento.
Na guia Monitoramento, revise o gráfico de tarefas do Airflow na seção Execuções do DAG e identifique possíveis problemas. As tarefas do Airflow são tarefas que estão em um estado na fila no Airflow. Elas podem acessar a fila do agente do Celery ou do Kubernetes Executor. As tarefas em fila do Celery são instâncias de tarefas colocadas na fila de agentes do Celery.
Solução de problemas no momento da análise do DAG
As seções a seguir descrevem sintomas e possíveis correções para alguns problemas comuns no tempo de análise do DAG.
Análise e programação de DAG no Cloud Composer 1 e no Airflow 1
A eficiência da análise do DAG foi melhorada significativamente no Airflow 2. Se você tiver problemas de desempenho relacionados à análise e programação do DAG, considere migrar para o Airflow 2.
No Cloud Composer 1, o programador é executado nos nós do cluster com outros componentes do Cloud Composer. Por isso, a carga de nós de cluster individuais pode ser maior ou menor em comparação com outros nós. O desempenho do programador (análise e programação do DAG) pode variar dependendo do nó em que o programador é executado. Além disso, um nó individual em que o programador é executado pode mudar como resultado de operações de upgrade ou manutenção. Essa limitação foi resolvida no Cloud Composer 2, em que é possível alocar recursos de CPU e memória para o programador, e o desempenho do programador não depende da carga dos nós do cluster.
Número limitado de linhas de execução
Permitir que o gerenciador do processador DAG, a parte do programador que processa arquivos DAG, use apenas um número limitado de linhas de execução pode afetar o tempo de análise do DAG.
Para resolver o problema, substitua as seguintes opções de configuração do Airflow:
Para o Airflow 1.10.12 e versões anteriores, modifique o parâmetro
max_threads
:Seção Chave Valor Observações scheduler
max_threads
NUMBER_OF_CORES_IN_MACHINE - 1
Substitua NUMBER_OF_CORES_IN_MACHINE
pelo número de núcleos
nas máquinas de nós de trabalho.No Airflow 1.10.14 e versões posteriores, modifique o parâmetro
parsing_processes
:Seção Chave Valor Observações scheduler
parsing_processes
NUMBER_OF_CORES_IN_MACHINE - 1
Substitua NUMBER_OF_CORES_IN_MACHINE
pelo número de núcleos
nas máquinas de nós de trabalho.
Número e distribuição de tempo das tarefas
O Airflow é conhecido por ter problemas com a programação de um grande número de tarefas pequenas. Nesses casos, opte por um número menor de tarefas mais consolidadas.
Programar um grande número de DAGs ou tarefas ao mesmo tempo também pode ser uma fonte de problemas. Para evitar esse problema, distribua suas tarefas de maneira mais uniforme com o tempo.
Solução de problemas com tarefas em execução e na fila
As seções a seguir descrevem sintomas e possíveis correções para alguns problemas comuns com tarefas em execução e na fila.
Filas de tarefas são muito longas
Em alguns casos, uma fila de tarefas pode ser muito longa para o programador. Para informações sobre como otimizar parâmetros de worker e acelerado, leia sobre como escalonar o ambiente do Cloud Composer junto com a empresa.
Como usar o recurso TimeTable do programador do Airflow
A partir do Airflow 2.2, é possível definir uma tabela de horários para um DAG usando um novo atributo chamado TimeTable.
É possível definir uma tabela de horários usando um dos seguintes métodos:
Recursos de cluster limitados
Esta seção se aplica apenas ao Cloud Composer 1.
É possível que ocorram problemas de desempenho se o cluster do GKE do ambiente for muito pequeno para processar todos os DAGs e tarefas. Nesse caso, tente uma destas soluções:
- Crie um novo ambiente com um tipo de máquina que ofereça mais desempenho e migre seus DAGs para ele.
- Criar mais ambientes do Cloud Composer e dividir os DAGs entre eles
- Altere o tipo de máquina dos nós do GKE, conforme descrito em Como fazer upgrade do tipo de máquina para os nós do GKE. Como esse procedimento é propenso a erros, é a opção menos recomendada.
- Faça upgrade do tipo de máquina da instância do Cloud SQL que executa o banco de dados do Airflow no ambiente, por exemplo, usando os comandos
gcloud composer environments update
. O baixo desempenho do banco de dados do Airflow pode ser a razão pela qual o programador está lento.
Evite programar tarefas durante janelas de manutenção
É possível definir janelas de manutenção específicas para seu ambiente. Durante esses períodos, ocorrem eventos de manutenção para o Cloud SQL e o GKE.
Fazer o programador do Airflow ignorar arquivos desnecessários
Melhore o desempenho do programador do Airflow ignorando arquivos desnecessários na pasta de DAGs. O programador do Airflow ignora arquivos e pastas especificados no arquivo .airflowignore
.
Para fazer com que o programador do Airflow ignore arquivos desnecessários:
- Crie um arquivo
.airflowignore
. - Nesse arquivo, liste as pastas e os arquivos que serão ignorados.
- Faça upload desse arquivo para a pasta
/dags
no bucket do ambiente.
Para mais informações sobre o formato de arquivo .airflowignore
, consulte a
documentação do Airflow.
O programador do Airflow processa DAGs pausados
Os usuários do Airflow pausam os DAGs para evitar a execução. Isso economiza ciclos de processamento dos workers do Airflow.
O programador do Airflow continuará analisando DAGs pausados. Se você realmente quiser melhorar o desempenho do programador do Airflow, use .airflowignore
ou exclua os DAGs pausados da pasta de DAGs.
Uso de "wait_for_downstream" nos DAGs
Se você definir o parâmetro wait_for_downstream
como True
nos DAGs, para que uma tarefa seja bem-sucedida, todas as tarefas que estiverem imediatamente downstream também
serão bem-sucedidas. de dados. Isso significa que a execução de tarefas pertencentes a uma determinada execução do DAG pode ser reduzida pela execução de tarefas da execução anterior do DAG. Leia mais sobre isso na documentação do Airflow.
As tarefas na fila por muito tempo serão canceladas e reprogramadas
Se uma tarefa do Airflow for mantida na fila por muito tempo, o programador vai reprogramá-la novamente para execução. Nas versões do Airflow anteriores à 2.3.1, a tarefa também será marcada como com falha e será repetida se for qualificada para uma nova tentativa.
Uma maneira de observar os sintomas dessa situação é analisar o gráfico com o número de tarefas na fila (guia "Monitoramento" na interface do Cloud Composer) e, se os picos nesse gráfico não caírem em cerca de duas horas, as tarefas provavelmente serão reprogramadas (sem registros) seguidas por entradas de registro "As tarefas adotadas ainda estavam pendentes ..." nos registros do programador. Nesses casos, a mensagem "O arquivo de registros não foi encontrado..." pode aparecer nos registros de tarefas do Airflow porque a tarefa não foi executada.
Em geral, esse comportamento é esperado, e a próxima instância da tarefa programada deve ser executada de acordo com a programação. Se você observar muitos desses casos nos seus ambientes do Cloud Composer, talvez isso signifique que não há workers suficientes do Airflow no seu ambiente para processar todas as tarefas programadas.
Resolução: para resolver esse problema, é preciso garantir que sempre haja capacidade nos workers do Airflow para executar tarefas na fila. Por exemplo, é possível aumentar o número de workers ou worker_concurrency. Também é possível ajustar o paralelismo ou os pools para evitar que as tarefas de enfileiramento sejam maiores do que a capacidade que você tem.
Esporadicamente, tarefas desatualizadas podem bloquear a execução de um DAG específico
Em casos normais, o programador do Airflow precisa ser capaz de lidar com situações em que há tarefas desatualizadas na fila e, por algum motivo, não ser possível executá-las corretamente (por exemplo, um DAG ao qual as tarefas desatualizadas pertencem foi excluído).
Se essas tarefas desatualizadas não forem limpas pelo programador, talvez seja necessário excluí-las manualmente. Por exemplo, é possível fazer isso na IU do Airflow. Acesse Menu > Navegador > Instâncias de tarefas, encontre e exclua tarefas na fila que pertencem a um DAG desatualizado.
Para resolver esse problema, faça upgrade do ambiente para o Cloud Composer versão 2.1.12 ou posterior.
Abordagem do Cloud Composer para o parâmetro [scheduler]min_file_process_interval
O Cloud Composer muda a forma como o [scheduler]min_file_process_interval
é usado pelo programador do Airflow.
Airflow 1
No caso do Cloud Composer usar o Airflow 1, os usuários podem definir o valor de [scheduler]min_file_process_interval
entre 0 e 600 segundos. Valores superiores a 600 segundos trazem os mesmos resultados, como se [scheduler]min_file_process_interval
estivesse definido como 600 segundos.
Airflow 2
No Airflow 2, [scheduler]min_file_process_interval
só pode ser usado com as versões 1.19.9 e 2.0.26 ou mais recentes
Versões do Cloud Composer anteriores às 1.19.9 e 2.0.26
Nessas versões,
[scheduler]min_file_process_interval
é ignorado.Cloud Composer nas versões 1.19.9 ou 2.0.26 ou mais recentes
O programador do Airflow é reiniciado depois de um determinado número de vezes que todos os DAGs são programados e o parâmetro
[scheduler]num_runs
controla quantas vezes é feito pelo programador. Quando o programador atinge[scheduler]num_runs
repetições de programação, ele é reiniciado. O Programador é um componente sem estado e essa reinicialização é um mecanismo de recuperação automática para qualquer problema com o programador. Quando não é especificado, o valor padrão de[scheduler]num_runs
é aplicado, que é 5.000.[scheduler]min_file_process_interval
pode ser usado para configurar a frequência de análise do DAG, mas esse parâmetro não pode ser maior do que o tempo necessário para que um programador execute loops[scheduler]num_runs
ao programar seus DAGs.
Como escalonar a configuração do Airflow
O Airflow oferece opções de configuração que controlam quantas tarefas e DAGs o Airflow pode executar ao mesmo tempo. Para definir essas opções de configuração, substitua os valores delas para seu ambiente.
-
O parâmetro
[celery]worker_concurrency
controla o número máximo de tarefas que um worker do Airflow pode executar ao mesmo tempo. Se você multiplicar o valor desse parâmetro pelo número de workers do Airflow no ambiente do Cloud Composer, você receberá o número máximo de tarefas que podem ser executadas em um determinado momento no ambiente. Esse número é limitado pela opção de configuração[core]parallelism
do Airflow, descrita em mais detalhes.Nos ambientes do Cloud Composer 2, o valor padrão de
[celery]worker_concurrency
é calculado automaticamentePara as versões 2.3.3 e posteriores do Airflow,
[celery]worker_concurrency
está definido como um valor mínimo de 32, 12 * worker_CPU e 8 * worker_memory.Para as versões do Airflow: 2.2.5 ou anteriores,
[celery]worker_concurrency
está definido como 12 * número de CPUs dos workers.
Máximo de execuções de DAGs ativas
A opção de configuração
[core]max_active_runs_per_dag
do Airflow controla o número máximo de execuções ativas de DAGs por DAG. O programador não criará mais execuções de DAGs se atingir esse limite.Se esse parâmetro for definido incorretamente, você poderá encontrar um problema em que o programador restringe a execução do DAG, porque não é possível criar mais instâncias de execução do DAG em um determinado momento.
Máximo de tarefas ativas por DAG
A opção de configuração
[core]max_active_tasks_per_dag
do Airflow controla o número máximo de instâncias de tarefa que podem ser executadas simultaneamente em cada DAG. É um parâmetro no nível do DAG.Se esse parâmetro for definido incorretamente, você poderá encontrar um problema em que a execução de uma única instância do DAG é lenta porque há apenas um número limitado de tarefas do DAG que podem ser executadas em um determinado momento
Solução: aumentar
[core]max_active_tasks_per_dag
.Paralelismo e tamanho do pool
A opção de configuração
[core]parallelism
do Airflow controla quantas tarefas o programador do Airflow pode enfileirar na fila do executor após todas as dependências dessas tarefas serem atendidas.Este é um parâmetro global para toda a configuração do Airflow.
As tarefas são enfileiradas e executadas em um pool. Os ambientes do Cloud Composer usam apenas um pool. O tamanho desse pool controla quantas tarefas podem ser enfileiradas pelo programador para execução em um determinado momento. Se o tamanho do pool for muito pequeno, o programador não poderá enfileirar tarefas para execução, mesmo que os limites sejam definidos pela opção de configuração
[core]parallelism
e pelo[celery]worker_concurrency
. opção de configuração multiplicada pelo número de workers do Airflow ainda não foi atendida.É possível configurar o tamanho do pool na IU do Airflow (Menu > Administrador > Pools). Ajuste o tamanho do pool com o nível de paralelismo esperado no ambiente.
Normalmente,
[core]parallelism
é definido como um produto do número máximo de workers e de [celery]worker_concurrency.
Os DAGs não são programados pelo programador devido ao tempo limite do processador de DAG
Para mais informações sobre esse problema, consulte Como solucionar problemas de DAGs.
Marcando tarefas como falhas após chegar a dagrun_timeout
O programador marca as tarefas que não estão concluídas (em execução, programadas e enfileiradas) como falha se uma execução de DAG não terminar dentro de dagrun_timeout
(um parâmetro de DAG).
Solução:
Estenda
dagrun_timeout
para que o tempo limite seja atingido.(Cloud Composer 2) Aumente o número de workers ou aumente os parâmetros de desempenho do worker para que o DAG seja executado mais rapidamente.
Sintomas de pressão de carga no banco de dados do Airflow
Às vezes, nos registros do programador do Airflow, você pode ver a seguinte entrada de registro de aviso:
Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"
Sintomas semelhantes também podem ser observados nos registros de worker do Airflow:
Para MySQL:
(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"
Para PostgreSQL:
psycopg2.OperationalError: connection to server at ... failed
Esses erros ou avisos podem ser um sintoma do banco de dados do Airflow que está sobrecarregado com o número de conexões abertas ou o número de consultas executadas ao mesmo tempo, seja por programadores ou outros componentes do Airflow, como workers, acionadores e servidores da Web.
Soluções possíveis:
Escalone verticalmente o banco de dados do Airflow:
- (Cloud Composer 1) Altere o tipo de máquina da instância do Cloud SQL que armazena o banco de dados do Airflow do seu ambiente.
- (Cloud Composer 2) Ajuste o tamanho do ambiente.
Reduza o número de programadores. Na maioria dos casos, um ou dois programadores são suficientes para analisar e programar tarefas do Airflow. Não é recomendado configurar mais de dois programadores, a menos que haja um caso justificado.
Evite usar variáveis globais nos DAGs do Airflow: variáveis de ambiente do Cloud Composer e variáveis do Airflow.
Defina [scheduler]scheduler-heartbeat-sec como um valor maior, por exemplo, para 15 segundos ou mais.
Defina [scheduler]job-heartbeat-sec como um valor mais alto, por exemplo, 30 segundos ou mais.
Defina [scheduler]scheduler_health_check_threshold como um valor igual a
[scheduler]job-heartbeat-sec
multiplicado por4
.
O servidor da Web mostra o aviso "O programador não parece estar em execução"
O programador informa o sinal de funcionamento regularmente ao banco de dados do Airflow. Com base nessas informações, o servidor da Web do Airflow determina se o programador está ativo.
Às vezes, se o programador estiver sobrecarregado, talvez não seja possível informar o sinal de funcionamento a cada [scheduler]scheduler-heartbeat-sec.
Nessa situação, o servidor da Web do Airflow pode exibir o seguinte aviso:
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
Soluções possíveis:
Aumente a CPU e a memória dos recursos do programador.
Otimize seus DAGs para que a análise e a programação deles sejam mais rápidas e não consumam muito dos recursos do programador.
Evite usar variáveis globais nos DAGs do Airflow: variáveis de ambiente do Cloud Composer e variáveis do Airflow.
Aumente o valor de [scheduler]scheduler-health-check-threshold para que o servidor da Web aguarde mais tempo antes de relatar a indisponibilidade do programador.
Soluções alternativas para problemas encontrados durante o preenchimento de DAGs
Às vezes, pode ser necessário executar novamente DAGs que já foram executados. É possível fazer isso com a ferramenta de linha de comando do Airflow da seguinte maneira:
Airflow 1
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Para executar novamente apenas tarefas com falha para um DAG específico, use também o argumento --rerun_failed_tasks
.
Airflow 2
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Para executar novamente apenas tarefas com falha para um DAG específico, use também o argumento --rerun-failed-tasks
.
Substitua:
ENVIRONMENT_NAME
pelo nome do ambienteLOCATION
pela região em que o ambiente está localizado;START_DATE
por um valor para o parâmetrostart_date
do DAG no formatoYYYY-MM-DD
.END_DATE
por um valor para o parâmetroend_date
do DAG no formatoYYYY-MM-DD
.DAG_NAME
pelo nome do DAG.
Às vezes, a operação de preenchimento pode gerar uma situação de impasse em que um preenchimento não é possível porque há um bloqueio em uma tarefa. Exemplo:
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
Em alguns casos, você pode usar as seguintes soluções alternativas para superar os impasses:
Desative o miniprogramador substituindo o
[core]schedule-after-task-execution
porFalse
.Execute preenchimentos para períodos mais restritos. Por exemplo, defina
START_DATE
eEND_DATE
para especificar um período de apenas um dia.