Cloud Composer 1 | Cloud Composer 2
Nesta página, você verá etapas de solução de problemas e informações para problemas comuns do fluxo de trabalho.
Muitos problemas na execução do DAG são causados por um desempenho de ambiente não ideal. É possível otimizar o ambiente do Cloud Composer 2 seguindo o guia Otimizar o desempenho e os custos do ambiente.
Alguns problemas de execuções do DAG podem ser causados porque o programador do Airflow não está funcionando de maneira correta ou ideal. Siga as instruções de solução de problemas do programador para resolver esses problemas.
Como resolver problemas do fluxo de trabalho
Para começar a solução de problemas, siga estes passos:
Verifique os registros do Airflow.
É possível aumentar o nível de geração de registros do Airflow substituindo a opção de configuração do Airflow a seguir.
Airflow 2
Seção Chave Valor logging
logging_level
O valor padrão é INFO
. Defina comoDEBUG
para receber mais detalhes nas mensagens de registro.Airflow 1
Seção Chave Valor core
logging_level
O valor padrão é INFO
. Defina comoDEBUG
para receber mais detalhes nas mensagens de registro.Verifique o Painel de monitoramento.
Revise o Cloud Monitoring.
No console do Google Cloud, verifique se há erros nas páginas dos componentes do seu ambiente.
Na interface da Web do Airflow, verifique na Visualização do gráfico (em inglês) do DAG se há instâncias de tarefas com falha.
Seção Chave Valor webserver
dag_orientation
LR
,TB
,RL
ouBT
Como depurar falhas do operador
Para depurar uma falha do operador, siga estes passos:
- Verifique se há erros específicos da tarefa.
- Verifique os registros do Airflow.
- Revise o Cloud Monitoring.
- Verifique os registros específicos do operador.
- Corrija os erros.
- Faça upload do DAG para a pasta
dags/
. - Na interface da Web do Airflow, limpe os estados anteriores do DAG.
- Execute o DAG ou retome esse processo.
Solução de problemas na execução de tarefas
O Airflow é um sistema distribuído com muitas entidades, como programador, executor e workers, que se comunicam por uma fila de tarefas e pelo banco de dados do Airflow e enviam sinais (como SIGTERM). No diagrama a seguir, mostramos uma visão geral das interconexões entre os componentes do Airflow.
Em um sistema distribuído como o Airflow, pode haver alguns problemas de conectividade de rede ou a infraestrutura pode apresentar problemas intermitentes. Isso pode levar a situações em que as tarefas podem falhar e ser reprogramadas para execução, ou as tarefas podem não ser concluídas com êxito (por exemplo, tarefas zumbis ou tarefas que ficaram presas na execução). O Airflow tem mecanismos para lidar com essas situações e retomar automaticamente o funcionamento normal. As seções a seguir explicam problemas comuns que ocorrem durante a execução de tarefas pelo Airflow: tarefas zumbis, Poison Pills e sinais SIGTERM.
Solução de problemas de tarefas zumbis
O Airflow detecta dois tipos de incompatibilidade entre uma tarefa e um processo que a executa:
Tarefas zumbis são tarefas que precisam estar em execução, mas não estão em execução. Isso poderá acontecer se o processo da tarefa for encerrado ou não estiver respondendo, se o worker do Airflow não informou o status da tarefa a tempo porque está sobrecarregado ou se a VM em que a tarefa foi executada foi encerrada. O Airflow encontra essas tarefas periodicamente e, dependendo das configurações, falha ou repete a tarefa.
Tarefas não ativas são tarefas que não podem estar em execução. O Airflow encontra essas tarefas periodicamente e as encerra.
O motivo mais comum para as tarefas zumbi é a escassez de recursos de CPU e memória no cluster do ambiente. Como resultado, um worker do Airflow pode não ser capaz de relatar o status de uma tarefa ou um sensor pode ser interrompido abruptamente. Nesse caso, o programador marca uma determinada tarefa como uma tarefa zumbi. Para evitar tarefas zumbis, atribua mais recursos ao ambiente.
Para mais informações sobre como escalonar o ambiente do Cloud Composer, consulte o Guia de escalonamento de ambientes. Uma solução possível é aumentar o tempo limite para tarefas zumbis. Como resultado, o programador espera mais tempo antes de considerar uma tarefa como um zumbi. Dessa forma, um worker do Airflow tem mais tempo para informar o status da tarefa.
Para aumentar o tempo limite das tarefas zumbi, substitua o valor da opção de configuração [scheduler]scheduler_zombie_task_threshold
do Airflow:
Seção | Chave | Valor | Observações |
---|---|---|---|
scheduler |
scheduler_zombie_task_threshold |
Novo tempo limite (em segundos) | O valor padrão é 300 |
Solução de problemas com pílula contra venenos
O Poison Pill é um mecanismo usado pelo Airflow para encerrar tarefas do Airflow.
O Airflow usa o Poison Pill nestas situações:
- Quando um programador encerra uma tarefa que não foi concluída no prazo.
- Quando uma tarefa expira ou é executada por muito tempo.
Quando o Airflow usa o Poison Pill, é possível ver as seguintes entradas nos registros de um worker do Airflow que executou a tarefa:
INFO - Subtask ... WARNING - State of this instance has been externally set
to success. Taking the poison pill.
INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.
Soluções possíveis:
- Verifique o código da tarefa em busca de erros que possam fazer com que ela seja executada por muito tempo.
- (Cloud Composer 2) Aumente a CPU e a memória dos workers do Airflow para que as tarefas sejam executadas mais rapidamente.
Aumente o valor da opção de configuração
[celery_broker_transport_options]visibility-timeout
do Airflow.Como resultado, o programador espera mais tempo para que uma tarefa seja concluída, antes de considerá-la como uma tarefa zumbi. Essa opção é especialmente útil para tarefas demoradas que duram muitas horas. Se o valor for muito baixo (por exemplo, 3 horas), o programador vai considerar as tarefas executadas por 5 ou 6 horas como "travadas" (tarefas zumbis).
Aumente o valor da opção de configuração
[core]killed_task_cleanup_time
do Airflow.Um valor mais longo oferece mais tempo para que os workers do Airflow concluam as tarefas corretamente. Se o valor for muito baixo, as tarefas do Airflow poderão ser interrompidas abruptamente, sem tempo suficiente para terminar o trabalho normalmente.
Solução de problemas de sinais SIGTERM
Os sinais SIGTERM são usados pelo Linux, Kubernetes, programador do Airflow e Celery para encerrar processos responsáveis pela execução de workers ou tarefas do Airflow.
Há vários motivos para os sinais SIGTERM serem enviados em um ambiente:
Uma tarefa se tornou uma tarefa zumbi e precisa ser interrompida.
O programador descobriu uma cópia de uma tarefa e envia sinais Poison Pill e SIGTERM à tarefa para interrompê-la.
No Escalonamento automático horizontal de pods, o plano de controle do GKE envia sinais SIGTERM para remover pods que não são mais necessários.
O programador pode enviar sinais SIGTERM para o processo DagFileProcessorManager. Esses sinais SIGTERM são usados pelo programador para gerenciar o ciclo de vida do processo DagFileProcessorManager e podem ser ignorados com segurança.
Exemplo:
Launched DagFileProcessorManager with pid: 353002 Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: [] Sending the signal Signals.SIGTERM to group 353002 Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
Disputa entre o callback do sinal de funcionamento e os callbacks de saída no local_task_job, que monitora a execução da tarefa. Se o sinal de funcionamento detectar que uma tarefa foi marcada como bem-sucedida, ele não poderá distinguir se a tarefa foi bem-sucedida ou se o Airflow foi instruído a considerar a tarefa bem-sucedida. No entanto, ele encerra o executor de tarefas sem esperar a saída dele.
Esses sinais SIGTERM podem ser ignorados com segurança. A tarefa já está no estado bem-sucedido, e a execução da execução do DAG como um todo não será afetada.
A entrada de registro
Received SIGTERM.
é a única diferença entre a saída normal e o encerramento da tarefa no estado bem-sucedido.Um componente do Airflow usa mais recursos (CPU, memória) do que o permitido pelo nó do cluster.
O serviço do GKE realiza operações de manutenção e envia sinais SIGTERM aos pods executados em um nó que está prestes a ser atualizado. Quando uma instância de tarefa é encerrada com SIGTERM, é possível ver as seguintes entradas de registro nos registros de um worker do Airflow que executou a tarefa:
{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception
Soluções possíveis:
Esse problema acontece quando a VM que executa a tarefa está sem memória. Isso não está relacionado às configurações do Airflow, mas à quantidade de memória disponível para a VM.
O aumento da memória depende da versão do Cloud Composer usada. Exemplo:
No Cloud Composer 2, é possível atribuir mais recursos de CPU e memória aos workers do Airflow.
No caso do Cloud Composer 1, é possível recriar o ambiente usando um tipo de máquina com melhor desempenho.
Nas duas versões do Cloud Composer, é possível reduzir o valor da opção de configuração de simultaneidade do Airflow
[celery]worker_concurrency
. Essa opção determina quantas tarefas são executadas simultaneamente por um determinado worker do Airflow.
Para mais informações sobre como otimizar o ambiente do Cloud Composer 2, consulte Otimizar desempenho e custos do ambiente.
Consultas do Cloud Logging para descobrir motivos para reinicializações ou remoções de pods
Os ambientes do Cloud Composer usam clusters do GKE como camada de infraestrutura de computação. Nesta seção, você encontra consultas úteis que ajudam a encontrar motivos para reinicializações ou remoções do worker ou do programador do Airflow.
As consultas apresentadas abaixo podem ser ajustadas da seguinte maneira:
especifique o cronograma do seu interesse no Cloud Logging. Por exemplo, as últimas 6 horas ou 3 dias ou um período
especifique o CLUSTER_NAME do Cloud Composer
também é possível limitar a pesquisa a um pod específico adicionando o POD_NAME
Descobrir contêineres reiniciados
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar os resultados a um pod específico:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Desativação dos contêineres de descoberta como resultado de um evento de falta de memória
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar os resultados a um pod específico:
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Descobrir contêineres que pararam de executar
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar os resultados a um pod específico:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Impacto das operações de atualização ou upgrade nas execuções de tarefas do Airflow
As operações de atualização ou upgrade interrompem a execução de tarefas do Airflow, a menos que uma tarefa seja executada no modo adiável.
Recomendamos realizar essas operações quando você espera um impacto mínimo nas execuções de tarefas do Airflow e configurar mecanismos de repetição adequados nos DAGs e nas tarefas.
Problemas comuns
Nas seções a seguir, você encontra descrições dos sintomas e possíveis correções de alguns problemas comuns do DAG.
A tarefa do Airflow foi interrompida por Negsignal.SIGKILL
Às vezes, a tarefa pode estar usando mais memória do que o worker do Airflow está alocado.
Nessa situação, ele pode ser interrompido por Negsignal.SIGKILL
. O sistema envia esse sinal para evitar mais consumo de memória, o que pode afetar a execução de outras tarefas do Airflow. No registro do worker do Airflow, você verá a seguinte entrada de registro:
{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL
Negsignal.SIGKILL
também pode aparecer como o código -9
.
Soluções possíveis:
Diminuir
worker_concurrency
de workers do Airflow.No caso do Cloud Composer 2, aumente a memória dos workers do Airflow.
No caso do Cloud Composer 1, faça upgrade para um tipo de máquina maior usado no cluster do Cloud Composer.
Otimize suas tarefas para usar menos memória.
Gerencie tarefas com uso intensivo de recursos no Cloud Composer usando o KubernetesPodOperator ou o GKEStartPodOperator para isolamento de tarefas e alocação de recursos personalizada.
A tarefa falha sem emitir registros devido a erros de análise do DAG
Às vezes, podem ocorrer erros sutis no DAG que levam a uma situação em que
um programador do Airflow e um processador de DAG são capazes de agendar tarefas para execução
e analisar um arquivo DAG (respectivamente), mas o worker do Airflow não executa tarefas
de um DAG porque há erros de programação em arquivos DAG em Python. Isso pode
levar a uma situação em que uma tarefa do Airflow é marcada como Failed
e não há registro da execução.
Soluções:
Verifique nos registros de worker do Airflow se não há erros gerados pelo worker relacionado a erros de análise do DAG ou DAG ausente.
Aumente os parâmetros relacionados à análise do DAG:
Aumente dagbag-import-timeout para pelo menos 120 segundos (ou mais, se necessário).
Aumente dag-file-processor-timeout para pelo menos 180 segundos (ou mais, se necessário). Esse valor precisa ser maior que
dagbag-import-timeout
.
Consulte também Como inspecionar registros do processador de DAG.
A tarefa falha sem emitir registros devido à pressão dos recursos
Sintoma: durante a execução de uma tarefa, o subprocesso do worker do Airflow responsável pela execução delas é interrompido abruptamente. O erro visível no registro do worker do Airflow pode ser semelhante ao seguinte:
...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task R = retval = fun(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ return self.run(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command _execute_in_fork(command_to_exec) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...
Solução:
- No Cloud Composer 1, crie um novo ambiente com um tipo de máquina maior do que o atual. Considere adicionar mais nós ao ambiente e reduzir o
[celery]worker_concurrency
para os workers. - No Cloud Composer 2, aumente os limites de memória para workers do Airflow.
- Se o ambiente também gera tarefas zumbi, consulte Como solucionar problemas de tarefas zumbi.
- Para ver um tutorial sobre como depurar problemas de falta de memória, consulte Como depurar problemas de falta de memória e de armazenamento no DAG.
A tarefa falha sem emitir registros devido à remoção do pod
Os pods do Google Kubernetes Engine estão sujeitos ao ciclo de vida do pod do Kubernetes e à remoção de pods. Picos de tarefas e coprogramação de workers são as duas causas mais comuns da remoção de pods no Cloud Composer.
A remoção de pods pode ocorrer quando um determinado pod usa em excesso os recursos de um nó, em relação às expectativas de consumo de recursos configuradas para o nó. Por exemplo, a remoção pode acontecer quando várias tarefas com uso intenso de memória são executadas em um pod e a carga combinada dela faz com que o nó em que o pod é executado exceda o limite de consumo de memória.
Se um pod de worker do Airflow for removido, todas as instâncias de tarefas em execução nele serão interrompidas e, posteriormente, marcadas como com falha pelo Airflow.
Os registros são armazenados em buffer. Se um pod de worker for removido antes da limpeza do buffer, os registros não serão emitidos. Quando uma tarefa falha sem emitir registros, isso indica que os workers do Airflow serão reiniciados devido à falta de memória (OOM, na sigla em inglês). Alguns registros podem estar presentes no Cloud Logging mesmo que os registros do Airflow não tenham sido emitidos.
Para conferir os registros:
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Registros.
Confira os registros de workers individuais em Todos os registros -> Registros do Airflow -> Workers -> (worker individual).
A execução do DAG é limitada por memória. Todas as tarefas são iniciadas com dois processos do Airflow: execução de tarefa e monitoramento. Cada nó aceita até seis tarefas simultâneas, com aproximadamente 12 processos carregados com módulos do Airflow. É possível que mais memória seja consumida dependendo da natureza do DAG.
Sintoma:
No console do Google Cloud, acesse a página Cargas de trabalho.
Se houver pods
airflow-worker
que mostremEvicted
, clique em cada pod removido e procure a mensagemThe node was low on resource: memory
na parte superior da janela.
Corrigir:
- No Cloud Composer 1, crie um novo ambiente do Cloud Composer com um tipo de máquina maior do que o atual.
- No Cloud Composer 2, aumente os limites de memória para workers do Airflow.
- Verifique os registros de
airflow-worker
pods para possíveis causas de remoção. Para mais informações sobre como buscar registros de pods individuais, consulte Solução de problemas com cargas de trabalho implantadas. - Verifique se as tarefas no DAG são idempotentes e podem ser repetidas.
Evite fazer o download de arquivos desnecessários no sistema de arquivos local dos workers do Airflow.
Os workers do Airflow têm capacidade limitada do sistema de arquivos local. Por exemplo, no Cloud Composer 2, um worker pode ter de 1 GB a 10 GB de armazenamento. Quando o espaço de armazenamento fica sem espaço, o pod de worker do Airflow é removido pelo plano de controle do GKE. Isso falhará em todas as tarefas que o worker removido estava executando.
Exemplos de operações problemáticas:
- fazer o download de arquivos ou objetos e armazená-los localmente em um worker do Airflow. Em vez disso, armazene esses objetos diretamente em um serviço adequado, como um bucket do Cloud Storage.
- Acessar objetos grandes na pasta
/data
de um worker do Airflow. O worker do Airflow faz o download do objeto no sistema de arquivos local. Em vez disso, implemente os DAGs para que os arquivos grandes sejam processados fora do pod de workers do Airflow.
A importação da carga do DAG atingiu o tempo limite
Sintoma:
- Na interface da Web do Airflow, na parte superior da página da lista de DAGs, uma caixa alerta vermelho mostra
Broken DAG: [/path/to/dagfile] Timeout
. No Cloud Monitoring: os registros
airflow-scheduler
contêm entradas semelhantes a:ERROR - Process timed out
ERROR - Failed to import: /path/to/dagfile
AirflowTaskTimeout: Timeout
Corrigir:
Substitua a opção de configuração dag_file_processor_timeout
do Airflow
e aguarde mais tempo para a análise do DAG:
Seção | Chave | Valor |
---|---|---|
core |
dag_file_processor_timeout |
Novo valor de tempo limite |
A execução do DAG não termina no tempo esperado
Sintoma:
Às vezes, uma execução de DAG não termina porque as tarefas do Airflow ficam paralisadas e a execução do DAG dura mais do que o esperado. Em condições normais, as tarefas do Airflow não permanecem indefinidamente no estado na fila ou em execução, porque o Airflow tem procedimentos de tempo limite e limpeza que ajudam a evitar essa situação.
Corrigir:
Use o parâmetro
dagrun_timeout
para os DAGs. Por exemplo:dagrun_timeout=timedelta(minutes=120)
. Como resultado, cada execução do DAG precisa ser concluída dentro do tempo limite, e as tarefas não concluídas são marcadas comoFailed
ouUpstream Failed
. Para mais informações sobre os estados de tarefas do Airflow, consulte a documentação do Apache Airflow.Use o parâmetro tempo limite de execução da tarefa para definir um tempo limite padrão para tarefas executadas com base nos operadores do Apache Airflow.
Execuções de DAG não executadas
Sintoma:
Quando uma data de programação para um DAG é definida dinamicamente, isso pode levar a vários efeitos colaterais inesperados. Exemplo:
Uma execução de DAG está sempre no futuro, e o DAG nunca é executado.
As execuções anteriores do DAG são marcadas como executadas e bem-sucedidas, apesar de não terem sido executadas.
Veja mais informações na documentação do Apache Airflow (em inglês).
Corrigir:
Siga as recomendações na documentação do Apache Airflow.
Defina o
start_date
estático para os DAGs. Uma opção é usarcatchup=False
para desativar a execução do DAG para datas anteriores.Evite usar
datetime.now()
oudays_ago(<number of days>)
, a menos que você esteja ciente dos efeitos colaterais dessa abordagem.
Aumento do tráfego de rede de entrada e saída do banco de dados do Airflow
A quantidade de rede de tráfego entre o cluster do GKE do ambiente e o banco de dados do Airflow depende do número de DAGs, do número de tarefas nos DAGs e da maneira como os DAGs acessam os dados no banco de dados. Os fatores a seguir podem influenciar o uso da rede:
Consultas no banco de dados do Airflow. Se os DAGs fazem muitas consultas, eles geram grandes quantidades de tráfego. Exemplos: verificação do status de tarefas antes de prosseguir com outras tarefas, consultar a tabela XCom, despejar conteúdo do banco de dados do Airflow.
Um grande número de tarefas. Quanto mais tarefas houver para programar, mais tráfego de rede será gerado. Essa consideração se aplica ao número total de tarefas nos DAGs e à frequência de programação. Quando o programador do Airflow programa as execuções de DAG, ele faz consultas no banco de dados do Airflow e gera tráfego.
A interface da Web do Airflow gera tráfego de rede porque faz consultas ao banco de dados. O uso intenso de páginas com gráficos, tarefas e diagramas pode gerar grandes volumes de tráfego de rede.
O DAG falha no servidor da Web do Airflow ou faz com que ele retorne um erro 502 gateway timeout
As falhas do servidor da Web podem ocorrer por diversos motivos. Verifique
os registros do airflow-webserver no
Cloud Logging para determinar a causa do
erro 502 gateway timeout
.
Computação pesada
Esta seção se aplica apenas ao Cloud Composer 1.
Evite executar computação pesada durante a análise do DAG.
Ao contrário dos nós de trabalho e programador, com tipos de máquina que podem ser personalizados para ter maior capacidade de CPU e memória, o servidor da Web usa um tipo de máquina fixo, o que pode causar falhas na análise do DAG se a computação do tempo de análise for muito pesada.
O servidor da Web tem duas vCPUs e 2 GB de memória.
O valor padrão para core-dagbag_import_timeout
é de 30 segundos. Esse valor de tempo limite define o limite máximo de quanto tempo o Airflow gasta carregando um módulo do Python na pasta dags/
.
Permissões incorretas
Esta seção se aplica apenas ao Cloud Composer 1.
O servidor da Web não é executado na mesma conta de serviço que os workers e o programador. Assim, eles podem acessar recursos gerenciados pelo usuário a que o servidor não tem acesso.
Recomendamos que você evite acessar recursos que não sejam públicos durante a análise do DAG. Às vezes, isso é inevitável, e você precisará conceder permissões à conta de serviço do servidor da Web. O nome da conta de serviço é derivado do domínio do servidor da Web. Por exemplo, se o domínio for example-tp.appspot.com
, a conta de serviço será example-tp@appspot.gserviceaccount.com
.
Erros do DAG
Esta seção se aplica apenas ao Cloud Composer 1.
O servidor da Web é executado no App Engine e fica separado do cluster do GKE do ambiente. O servidor da Web analisa os arquivos de definição do DAG, e um 502 gateway timeout
pode ocorrer se houver erros no DAG. O Airflow funciona normalmente sem um servidor da Web funcional se o DAG problemático não interromper nenhum processo em execução no GKE.
Nesse caso, é possível usar gcloud composer environments run
para recuperar detalhes do ambiente e como solução alternativa se o servidor da Web ficar indisponível.
Em outros casos, é possível executar a análise do DAG no GKE, além de pesquisar os DAGs que causam exceções fatais do Python ou esse tempo limite (padrão de 30 segundos). Para resolver os problemas, conecte-se a um shell remoto em um contêiner de worker do Airflow e teste os erros de sintaxe. Para mais informações, consulte Como testar DAGs.
Como lidar com um grande número de DAGs e plug-ins em pastas de DAGs e plug-ins
O conteúdo das pastas /dags
e /plugins
é sincronizado do bucket do ambiente com os sistemas de arquivos locais dos programadores e workers do Airflow.
Quanto mais dados forem armazenados nessas pastas, mais tempo levará para executar a sincronização. Para lidar com essas situações:
Limite o número de arquivos nas pastas
/dags
e/plugins
. Armazene apenas o mínimo de arquivos necessários.Se possível, aumente o espaço em disco disponível para programadores e workers do Airflow.
Se possível, aumente a CPU e a memória dos programadores e workers do Airflow para que a operação de sincronização seja realizada mais rapidamente.
No caso de um número muito grande de DAGs, divida-os em lotes, compacte-os em arquivos ZIP e implante-os na pasta
/dags
. Essa abordagem acelera o processo de sincronização dos DAGs. Os componentes do Airflow descompactam arquivos ZIP antes de processar DAGs.A geração de DAGs em uma programática também pode ser um método para limitar o número de arquivos DAG armazenados na pasta
/dags
. Consulte a seção sobre DAGs programáticos para evitar problemas com a programação e a execução de DAGs gerados programaticamente.
Não programar DAGs gerados programaticamente ao mesmo tempo
Gerar objetos DAG de maneira programática usando um arquivo DAG é um método eficiente para criar muitos DAGs semelhantes que têm apenas pequenas diferenças.
É importante não programar todos esses DAGs para execução imediatamente. Há uma grande chance de que os workers do Airflow não tenham recursos suficientes de CPU e memória para executar todas as tarefas programadas ao mesmo tempo.
Para evitar problemas com a programação de DAGs programáticos:
- Aumente a simultaneidade de workers e escalonar verticalmente seu ambiente para que ele possa executar mais tarefas simultaneamente.
- Gerar DAGs de uma maneira de distribuir as programações uniformemente ao longo do tempo para evitar a programação de centenas de tarefas ao mesmo tempo, para que os workers do Airflow tenham tempo para executar todas as tarefas programadas.
Erro 504 ao acessar o servidor da Web do Airflow
Consulte o Erro 504 ao acessar a IU do Airflow.
A exceção Lost connection to Postgres / MySQL server during query
é gerada durante a execução da tarefa ou logo depois dela
As exceções Lost connection to Postgres / MySQL server during query
costumam acontecer quando as seguintes condições são atendidas:
- O DAG usa
PythonOperator
ou um operador personalizado. - O DAG faz consultas no banco de dados do Airflow.
Se várias consultas forem feitas a partir de uma função chamável, os tracebacks poderão apontar incorretamente para a linha self.refresh_from_db(lock_for_update=True)
no código do Airflow. é a primeira consulta do banco de dados após a execução da tarefa. A causa real da exceção acontece antes disso, quando uma sessão do SQLAlchemy não é fechada corretamente.
O escopo das sessões do SQLAlchemy é uma linha de execução e é criado em uma sessão de função chamável que pode ser continuada dentro do código do Airflow. Se houver atrasos significativos entre as consultas em uma sessão, é possível que a conexão já tenha sido fechada pelo servidor Postgres ou MySQL. O tempo limite de conexão nos ambientes do Cloud Composer é definido como aproximadamente 10 minutos.
Corrigir:
- Use o decorador
airflow.utils.db.provide_session
. Esse decorador fornece uma sessão válida para o banco de dados do Airflow no parâmetrosession
e fecha corretamente a sessão no final da função. - Não use uma única função de longa duração. Em vez disso, mova todas as consultas
do banco de dados para funções separadas, de modo que haja várias funções com
o decorador
airflow.utils.db.provide_session
. Nesse caso, as sessões são fechadas automaticamente depois de recuperar os resultados da consulta.
Como controlar o tempo de execução de DAGs, tarefas e execuções paralelas do mesmo DAG
Para controlar a duração de uma única execução de DAG para um determinado DAG, use o parâmetro dagrun_timeout
. Por exemplo, se você espera que a execução de um único DAG (independentemente de a execução terminar com sucesso ou falha) não dure mais do que 1 hora, defina esse parâmetro como 3.600 segundos.
Também é possível controlar a duração de uma única tarefa do Airflow. Para isso, use execution_timeout
.
Se quiser controlar quantas execuções de DAG ativas você quer ter para um DAG específico, use a [core]max-active-runs-per-dag
opção de configuração do Airflow para isso.
Se você quiser que uma única instância de um DAG seja executada em um determinado momento, defina o parâmetro max-active-runs-per-dag
como 1
.
Problemas que afetam a sincronização de DAGs e plug-ins com programadores, workers e servidores da Web
O Cloud Composer sincroniza o conteúdo das pastas /dags
e /plugins
com os programadores e os workers. Certos objetos nas pastas /dags
e /plugins
podem impedir que essa sincronização funcione corretamente ou, pelo menos, desacelerá-la.
A pasta
/dags
está sincronizada com programadores e workers. Essa pasta não é sincronizada com os servidores da Web no Cloud Composer 2 ou se você ativarDAG Serialization
no Cloud Composer 1.A pasta
/plugins
está sincronizada com programadores, workers e servidores da Web.
Você pode encontrar os seguintes problemas:
Você fez upload de arquivos compactados gzip que usam transcodificação de compactação para as pastas
/dags
e/plugins
. Isso geralmente acontece quando você usa o comandogsutil cp -Z
para fazer o upload de dados para o bucket.Solução: exclua o objeto que usou a transcodificação por compactação e faça o upload dele novamente para o bucket.
Um dos objetos é chamado de ".". Esse objeto não é sincronizado com programadores e workers e pode parar de ser sincronizado.
Solução: renomeie o objeto problemático.
Uma pasta e um arquivo DAG do Python têm os mesmos nomes, por exemplo,
a.py
. Nesse caso, o arquivo DAG não está sincronizado corretamente com os componentes do Airflow.Solução: remova a pasta que tem o mesmo nome de um arquivo DAG Python.
Um dos objetos nas pastas
/dags
ou/plugins
contém um símbolo/
no final do nome do objeto. Esses objetos podem enganar o processo de sincronização, porque o símbolo/
significa que um objeto é uma pasta, não um arquivo.Solução: remova o símbolo
/
do nome do objeto problemático.Não armazene arquivos desnecessários nas pastas
/dags
e/plugins
.Às vezes, os DAGs e plug-ins implementados são acompanhados de outros arquivos, como arquivos que armazenam testes para esses componentes. Esses arquivos são sincronizados com workers e programadores e afetam o tempo necessário para copiar esses arquivos para programadores, workers e servidores da Web.
Solução: não armazene arquivos extras e desnecessários nas pastas
/dags
e/plugins
.
Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...'
erro é gerado por programadores e workers
Esse problema acontece porque os objetos podem ter namespace sobreposto no Cloud Storage, enquanto os programadores e workers usam sistemas de arquivos tradicionais. Por exemplo, é possível adicionar uma pasta e um objeto com o mesmo nome ao bucket de um ambiente. Quando o bucket é sincronizado com os programadores e workers do ambiente, esse erro é gerado, o que pode levar a falhas de tarefas.
Para corrigir esse problema, verifique se não há namespaces sobrepostos no
bucket do ambiente. Por exemplo, se /dags/misc
(um arquivo) e /dags/misc/example_file.txt
(outro arquivo) estiverem em um bucket, um erro será gerado pelo programador.
Interrupções temporárias ao se conectar ao banco de dados de metadados do Airflow
O Cloud Composer é executado na infraestrutura em nuvem distribuída. Isso significa que, de tempos em tempos, alguns problemas transitórios podem aparecer e interromper a execução das tarefas do Airflow.
Nessas situações, você verá as seguintes mensagens de erro nos registros dos workers do Airflow:
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"
ou
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"
Esses problemas intermitentes também podem ser causados por operações de manutenção realizadas nos ambientes do Cloud Composer.
Normalmente, esses erros são intermitentes e, se as tarefas do Airflow forem idempotentes e você tiver configurado novas tentativas, estará imune a elas. Também é possível considerar a definição de janelas de manutenção.
Outro motivo para esses erros pode ser a falta de recursos no cluster do ambiente. Nesses casos, é possível escalonar verticalmente ou otimizar o ambiente conforme descrito nas instruções sobre Como escalonar ambientes ou Como otimizar o ambiente.
Uma execução do DAG é marcada como bem-sucedida, mas não tem tarefas executadas
Se uma execução de DAG execution_date
for anterior a start_date
do DAG, talvez você veja execuções de DAG que não têm nenhuma execução de tarefa, mas ainda estão marcadas como bem-sucedidas.
Causa
Isso pode acontecer em um dos seguintes casos:
Uma incompatibilidade é causada pela diferença de fuso horário entre o
execution_date
e ostart_date
do DAG. Isso pode acontecer, por exemplo, ao usarpendulum.parse(...)
para definirstart_date
.O
start_date
do DAG está definido como um valor dinâmico, por exemplo,airflow.utils.dates.days_ago(1)
.
Solução
Verifique se
execution_date
estart_date
estão usando o mesmo fuso horário.Especifique um
start_date
estático e combine-o comcatchup=False
para evitar a execução de DAGs com datas de início passadas.
Um DAG não está visível na interface do Airflow ou na interface do DAG, e o programador não o programa
O processador de DAG analisa cada DAG antes de ele ser programado pelo programador e antes de o DAG ficar visível na interface do Airflow ou do DAG.
As seguintes opções de configuração do Airflow definem tempos limite para analisar DAGs:
[core]dagrun_import_timeout
define quanto tempo o processador de DAG tem para analisar um único DAG.[core]dag_file_processor_timeout
define o tempo total que o processador de DAG pode gastar na análise de todos os DAGs.
Se um DAG não estiver visível na interface do Airflow ou do DAG:
- Verifique os registros do processador de DAG se ele consegue processar o DAG corretamente. Em caso de problemas, você verá as seguintes entradas de registro nos registros do processador ou do programador de DAG:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
- Verifique os registros do agendador para ver se ele funciona corretamente. Em caso de problemas, você pode ver as seguintes entradas nos registros do programador:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496
Soluções:
Corrija todos os erros de análise do DAG. O processador DAG analisa vários DAGs e, em casos raros, a análise de erros em um DAG pode afetar negativamente a análise de outros.
Se a análise do DAG demorar mais do que o tempo definido em segundos em
[core]dagrun_import_timeout
, aumente esse tempo limite.Se a análise de todos os DAGs demorar mais do que a quantidade de segundos definida em
[core]dag_file_processor_timeout
, aumente esse tempo limite.Se a análise do DAG demorar muito, isso pode significar que ele não foi implementado da maneira ideal. Por exemplo, se ele ler muitas variáveis de ambiente ou realizar chamadas para serviços externos ou o banco de dados do Airflow. Na medida do possível, evite realizar essas operações em seções globais de DAGs.
Aumentar os recursos de CPU e memória do Scheduler para que ele funcione mais rapidamente.
Aumente o número de processos do processador de DAG para que a análise possa ser feita mais rapidamente. Para fazer isso, aumente o valor de
[scheduler]parsing_process
.
Sintomas de sobrecarga no banco de dados do Airflow
Para mais informações, consulte Sintomas do banco de dados do Airflow que está sob pressão de carga.
A seguir
- Solução de problemas de instalação do pacote PyPI
- Como resolver problemas de atualizações e upgrades do ambiente