Como solucionar problemas de DAGs

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Nesta página, você encontra informações sobre a solução de problemas comuns do fluxo de trabalho e suas etapas.

Muitos problemas de execução de DAG são causados por desempenho de ambiente não otimizado. Para otimizar seu ambiente, siga o guia Otimizar o desempenho e os custos do ambiente.

Alguns problemas de execução de DAG podem ser causados pelo agendador do Airflow que não funciona corretamente ou de maneira ideal. Siga as instruções de solução de problemas do Programador para resolver esses problemas.

Como resolver problemas do fluxo de trabalho

Para começar a solução de problemas, siga estes passos:

  1. Verifique os registros do Airflow.

    É possível aumentar o nível de geração de registros do Airflow substituindo a seguinte opção de configuração do Airflow.

    Seção Chave Valor
    logging logging_level O valor padrão é INFO. Defina como DEBUG para ter mais detalhes nas mensagens de registro.
  2. Verifique o Painel de monitoramento.

  3. Revise o Cloud Monitoring.

  4. No console do Google Cloud, verifique se há erros nas páginas dos componentes do seu ambiente.

  5. Na interface da Web do Airflow, verifique na Visualização do gráfico do DAG se há instâncias de tarefa com falha.

    Seção Chave Valor
    webserver dag_orientation LR, TB, RL ou BT

Como depurar falhas do operador

Para depurar uma falha do operador, siga estes passos:

  1. Verifique se há erros específicos da tarefa.
  2. Verifique os registros do Airflow.
  3. Revise o Cloud Monitoring.
  4. Verifique os registros específicos do operador.
  5. Corrija os erros.
  6. Faça upload do DAG na pasta /dags.
  7. Na interface da Web do Airflow, limpe os estados anteriores do DAG.
  8. Execute o DAG ou retome esse processo.

Solução de problemas na execução de tarefas

O Airflow é um sistema distribuído com muitas entidades, como programador, executor e workers, que se comunicam entre si por meio de uma fila de tarefas e do banco de dados do Airflow e enviam sinais (como SIGTERM). O diagrama a seguir mostra uma visão geral das interconexões entre os componentes do Airflow.

Interação entre componentes do Airflow
Figura 1. Interação entre componentes do Airflow (clique para ampliar)

Em um sistema distribuído como o Airflow, pode haver alguns problemas de conectividade de rede, ou a infraestrutura subjacente pode apresentar problemas intermitentes. Isso pode levar a situações em que as tarefas podem falhar e ser reprogramadas para execução ou não serem concluídas (por exemplo, tarefas zumbis ou tarefas que ficaram presas na execução). O Airflow tem mecanismos para lidar com essas situações e retomar automaticamente o funcionamento normal. As seções a seguir explicam problemas comuns que ocorrem durante a execução de tarefas pelo Airflow: tarefas zumbis, pílulas venenosas e sinais SIGTERM.

Solução de problemas de tarefas zumbi

O Airflow detecta dois tipos de incompatibilidade entre uma tarefa e um processo que a executa:

  • Tarefas zumbi são tarefas que deveriam estar em execução, mas não estão. Isso pode acontecer se o processo da tarefa foi encerrado ou não está respondendo, se o worker do Airflow não informou o status da tarefa a tempo porque está sobrecarregado ou se a VM em que a tarefa é executada foi encerrada. O Airflow encontra essas tarefas periodicamente e falha ou tenta novamente, dependendo das configurações da tarefa.

    Descobrir tarefas zumbi

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • Tarefas inativas são tarefas que não deveriam estar em execução. O Airflow encontra essas tarefas periodicamente e as encerra.

As seções a seguir descrevem as soluções e os motivos mais comuns para tarefas zumbis.

O worker do Airflow ficou sem memória

Cada worker do Airflow pode executar até [celery]worker_concurrency instâncias de tarefa simultaneamente. Se o consumo de memória cumulativo dessas instâncias de tarefa ultrapassar o limite de memória de um worker do Airflow, um processo aleatório será encerrado para liberar recursos.

Descobrir eventos de falta de memória do worker do Airflow

resource.type="k8s_node"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
log_id("events")
jsonPayload.message:"Killed process"
jsonPayload.message:("airflow task" OR "celeryd")

Às vezes, a falta de memória em um worker do Airflow pode fazer com que pacotes com formato incorreto sejam enviados durante uma sessão do SQL Alchemy para o banco de dados, para um servidor DNS ou para qualquer outro serviço chamado por um DAG. Nesse caso, a outra extremidade da conexão pode rejeitar ou descartar conexões do worker do Airflow. Exemplo:

"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"

Soluções:

O worker do Airflow foi removido

As remoções de pods são parte normal da execução de cargas de trabalho no Kubernetes. O GKE remove pods se eles ficarem sem armazenamento ou para liberar recursos para cargas de trabalho com prioridade mais alta.

Descobrir despejo de workers do Airflow

resource.type="k8s_pod"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
resource.labels.pod_name:"airflow-worker"
log_id("events")
jsonPayload.reason="Evicted"

Soluções:

  • Se uma expulsão for causada pela falta de armazenamento, reduza o uso de armazenamento ou remova arquivos temporários assim que eles não forem mais necessários. Como alternativa, é possível aumentar o armazenamento disponível ou executar cargas de trabalho em um pod dedicado com KubernetesPodOperator.

O worker do Airflow foi encerrado

Os workers do Airflow podem ser removidos externamente. Se as tarefas em execução não forem concluídas durante um período de encerramento suave, elas serão interrompidas e poderão ser detectadas como zumbis.

Descobrir as terminações de pods de workers do Airflow

resource.type="k8s_cluster"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
protoPayload.methodName:"pods.delete"
protoPayload.response.metadata.name:"airflow-worker"

Possíveis cenários e soluções:

  • Os workers do Airflow são reiniciados durante modificações no ambiente, como upgrades ou instalação de pacotes:

    Descobrir modificações no ambiente do Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    É possível realizar essas operações quando nenhuma tarefa crítica está em execução ou ativar as tentativas de repetição de tarefas.

  • Vários componentes podem ficar temporariamente indisponíveis durante as operações de manutenção.

    Descobrir as operações de manutenção do GKE

    resource.type="gke_nodepool"
    resource.labels.cluster_name="GKE_CLUSTER_NAME"
    protoPayload.metadata.operationType="UPGRADE_NODES"

    É possível especificar janelas de manutenção para minimizar

    se sobrepõe à execução de tarefas críticas.

  • Nas versões do Cloud Composer 2 anteriores à 2.4.5, um worker do Airflow em encerramento pode ignorar o sinal SIGTERM e continuar a executar tarefas:

    Descubra o escalonamento para baixo pelo escalonamento automático do Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-worker-set")
    textPayload:"Workers deleted"

    Você pode fazer upgrade para uma versão mais recente do Cloud Composer em que esse problema foi corrigido.

O worker do Airflow estava sob carga pesada

A quantidade de recursos de CPU e memória disponíveis para um worker do Airflow é limitada pela configuração do ambiente. Se a utilização de recursos se aproximar dos limites, isso poderá causar uma disputa de recursos e atrasos desnecessários durante a execução da tarefa. Em situações extremas, quando os recursos estão em falta por períodos mais longos, isso pode causar tarefas zumbis.

Soluções:

O banco de dados do Airflow estava sob carga pesada

Um banco de dados é usado por vários componentes do Airflow para se comunicar entre si e, em particular, para armazenar batimentos cardíacos de instâncias de tarefas. A falta de recursos no banco de dados leva a tempos de consulta mais longos e pode afetar a execução de tarefas.

Às vezes, os seguintes erros estão presentes nos registros de um worker do Airflow:

(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly

This probably means the server terminated abnormally before or while
processing the request.

Soluções:

O banco de dados do Airflow ficou temporariamente indisponível

Um worker do Airflow pode levar algum tempo para detectar e processar erros intermitentes, como problemas temporários de conectividade. Ele pode exceder o limite padrão de detecção de zumbis.

Descobrir os tempos limite de batimentos do Airflow

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

Soluções:

  • Aumente o tempo limite para tarefas zumbis e substitua o valor da opção de configuração [scheduler]scheduler_zombie_task_threshold do Airflow:

    Seção Chave Valor Observações
    scheduler scheduler_zombie_task_threshold Novo tempo limite (em segundos) O valor padrão é 300.

Solução de problemas de pílulas venenosas

A pílula do veneno é um mecanismo usado pelo Airflow para encerrar tarefas.

O Airflow usa a pílula de veneno nestas situações:

  • Quando um programador encerra uma tarefa que não foi concluída no prazo.
  • Quando uma tarefa expira ou é executada por muito tempo.

Quando o Airflow usa a pílula envenenada, é possível ver as seguintes entradas de registro nos registros de um worker do Airflow que executou a tarefa:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Taking the poison pill.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Possíveis soluções:

  • Verifique se há erros no código da tarefa que podem fazer com que ela seja executada por muito tempo.

  • Aumente a CPU e a memória dos workers do Airflow para que as tarefas sejam executadas mais rapidamente.

  • Aumente o valor da opção de configuração [celery_broker_transport_options]visibility-timeout do Airflow.

    Como resultado, o agendador espera mais para que uma tarefa seja concluída antes de considerá-la como uma tarefa zumbi. Essa opção é especialmente útil para tarefas que levam muito tempo. Se o valor for muito baixo (por exemplo, 3 horas), o programador considerará tarefas executadas por 5 ou 6 horas como "travadas" (tarefas zumbi).

  • Aumente o valor da opção de configuração [core]killed_task_cleanup_time do Airflow.

    Um valor maior dá mais tempo aos workers do Airflow para concluir as tarefas corretamente. Se o valor for muito baixo, as tarefas do Airflow poderão ser interrompidas de forma abrupta, sem tempo suficiente para concluir o trabalho.

Solução de problemas de sinais SIGTERM

Os sinais SIGTERM são usados pelo Linux, Kubernetes, programador do Airflow e Celery para encerrar processos responsáveis por executar workers ou tarefas do Airflow.

Há vários motivos para que os sinais SIGTERM sejam enviados em um ambiente:

  • Uma tarefa se tornou uma tarefa zumbi e precisa ser interrompida.

  • O programador detectou uma tarefa duplicada e enviou sinais Poison Pill e SIGTERM para a tarefa para interromper.

  • No Escalonamento automático horizontal de pods, o plano de controle do GKE envia sinais SIGTERM para remover pods que não são mais necessários.

  • O programador pode enviar sinais SIGTERM para o processo DagFileProcessorManager. Esses sinais SIGTERM são usados pelo Programador para gerenciar o ciclo de vida do processo DagFileProcessorManager e podem ser ignorados com segurança.

    Exemplo:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • Condição de corrida entre o callback de verificação de funcionamento e os callbacks de saída no local_task_job, que monitora a execução da tarefa. Se o heartbeat detectar que uma tarefa foi marcada como concluída, ele não poderá distinguir se a tarefa foi concluída ou se o Airflow foi instruído a considerar a tarefa como concluída. No entanto, ele encerrará um executor de tarefas sem esperar que ele saia.

    Esses sinais SIGTERM podem ser ignorados com segurança. A tarefa já está no estado de sucesso, e a execução da execução do DAG como um todo não será afetada.

    A entrada de registro Received SIGTERM. é a única diferença entre a saída regular e o encerramento da tarefa no estado de sucesso.

    Condição de corrida entre os callbacks de saída e de sinal de funcionamento
    Figura 2. Condição de corrida entre os callbacks de saída e de batimento cardíaco (clique para ampliar)
  • Um componente do Airflow usa mais recursos (CPU, memória) do que o permitido pelo nó do cluster.

  • O serviço do GKE realiza operações de manutenção e envia sinais SIGTERM para pods executados em um nó que está prestes a ser atualizado.

    Quando uma instância de tarefa é encerrada com SIGTERM, é possível ver as seguintes entradas de registro nos registros de um worker do Airflow que executou a tarefa:

    {local_task_job.py:211} WARNING - State of this instance has been externally
    set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
    SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
    with exception
    

Possíveis soluções:

Esse problema ocorre quando uma VM que executa a tarefa fica sem memória. Isso não está relacionado às configurações do Airflow, mas à quantidade de memória disponível para a VM.

  • No Cloud Composer 2, é possível atribuir mais recursos de CPU e memória a workers do Airflow.

  • É possível diminuir o valor da opção de configuração do Airflow para concorrência [celery]worker_concurrency. Essa opção determina quantas tarefas são executadas simultaneamente por um determinado worker do Airflow.

Para mais informações sobre como otimizar seu ambiente, consulte Otimizar o desempenho e os custos do ambiente.

Consultas do Cloud Logging para descobrir os motivos das reinicializações ou remoções de pods

Os ambientes do Cloud Composer usam clusters do GKE como camada de infraestrutura de computação. Nesta seção, você encontra consultas úteis que podem ajudar a encontrar os motivos das reinicializações ou remoções de workers ou programadores do Airflow.

As consultas apresentadas podem ser ajustadas da seguinte maneira:

  • É possível especificar a linha do tempo necessária no Cloud Logging. Por exemplo, as últimas 6 horas, 3 dias ou você pode definir seu intervalo de tempo personalizado.

  • Especifique o nome do cluster do ambiente em CLUSTER_NAME.

  • É possível limitar a pesquisa a um pod específico adicionando o POD_NAME.

Descobrir contêineres reiniciados

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

Consulta alternativa para limitar os resultados a um pod específico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

Descobrir o encerramento de contêineres como resultado de um evento fora da memória

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

Consulta alternativa para limitar os resultados a um pod específico:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Descobrir contêineres que pararam de ser executados

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

Consulta alternativa para limitar os resultados a um pod específico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Impacto das operações de atualização ou upgrade nas execuções de tarefas do Airflow

As operações de atualização ou upgrade interrompem as tarefas do Airflow que estão sendo executadas, a menos que uma tarefa seja executada no modo diferível.

Recomendamos a realização dessas operações quando você espera um impacto mínimo nas execuções de tarefas do Airflow e configurar mecanismos de repetição adequados nas DAGs e tarefas.

Problemas comuns

Nas seções a seguir, você encontra descrições dos sintomas e possíveis correções de alguns problemas comuns do DAG.

A tarefa do Airflow foi interrompida por Negsignal.SIGKILL

Às vezes, a tarefa pode estar usando mais memória do que o worker do Airflow alocou. Nessa situação, ele pode ser interrompido por Negsignal.SIGKILL. O sistema envia esse sinal para evitar mais consumo de memória, o que pode afetar a execução de outras tarefas do Airflow. No registro do worker do Airflow, você pode encontrar a seguinte entrada de registro:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL também pode aparecer como código -9.

Possíveis soluções:

  • worker_concurrency mais baixo dos workers do Airflow.

  • Aumente a quantidade de memória disponível para os workers do Airflow.

  • Gerencie tarefas com uso intensivo de recursos no Cloud Composer usando KubernetesPodOperator ou GKEStartPodOperator para isolamento de tarefas e alocação de recursos personalizada.

  • Otimize suas tarefas para usar menos memória.

A tarefa falha sem emitir registros devido a erros de análise de DAG

Às vezes, pode haver erros sutis de DAG que levam a uma situação em que o programador do Airflow pode programar tarefas para execução, o processador de DAG pode analisar o arquivo DAG, mas o worker do Airflow não consegue executar tarefas do DAG porque há erros de programação no arquivo DAG. Isso pode levar a uma situação em que uma tarefa do Airflow é marcada como Failed e não há registro da execução dela.

Soluções:

  • Verifique nos registros do worker do Airflow se não há erros gerados pelo worker do Airflow relacionados a um DAG ausente ou erros de análise de DAG.

  • Aumento de parâmetros relacionados à análise de DAGs:

  • Consulte também Como inspecionar os registros do processador DAG.

A tarefa falha sem emitir registros devido à pressão de recursos

Sintoma: durante a execução de uma tarefa, o subprocesso do worker do Airflow responsável pela execução da tarefa é interrompido abruptamente. O erro visível no registro do worker do Airflow pode ser semelhante ao abaixo:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Solução:

A tarefa falha sem emitir registros devido à remoção de pods

Os pods do Google Kubernetes Engine estão sujeitos ao ciclo de vida de pods do Kubernetes e à remoção deles. Os picos de tarefas são a causa mais comum de remoção de pods no Cloud Composer.

A remoção de pods pode ocorrer quando um determinado pod usa em excesso os recursos de um nó, em relação às expectativas de consumo de recursos configuradas para o nó. Por exemplo, a remoção pode acontecer quando várias tarefas com muita memória são executadas em um pod, e a carga combinada faz com que o nó em que o pod é executado exceda o limite de consumo de memória.

Se um pod de worker do Airflow for removido, todas as instâncias de tarefas em execução nele serão interrompidas e, posteriormente, marcadas como com falha pelo Airflow.

Os registros são armazenados em buffer. Se um pod de worker for removido antes da limpeza do buffer, os registros não serão emitidos. Quando uma tarefa falha sem emitir registros, isso indica que os workers do Airflow serão reiniciados devido à falta de memória (OOM, na sigla em inglês). Alguns registros podem estar presentes no Cloud Logging mesmo que os registros do Airflow não tenham sido emitidos.

Para ver os registros:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Registros.

  4. Acesse os registros de workers individuais do Airflow em Todos os registros > Registros do Airflow > Workers.

Sintoma:

  1. No console do Google Cloud, acesse a página Cargas de trabalho.

    Acesse "Cargas de trabalho"

  2. Se houver pods airflow-worker que mostrem Evicted, clique em cada pod removido e procure a mensagem The node was low on resource: memory na parte de cima da janela.

Solução:

  • Aumente os limites de memória para workers do Airflow.

  • Verifique os registros dos pods airflow-worker para possíveis causas de remoção. Para mais informações sobre como buscar registros de pods individuais, consulte Solução de problemas com cargas de trabalho implantadas.

  • Garanta que as tarefas no DAG sejam idempotentes e que possam ser repetidas.

  • Evite fazer o download de arquivos desnecessários para o sistema de arquivos local dos workers do Airflow.

    Os workers do Airflow têm capacidade limitada do sistema de arquivos local. Um worker do Airflow pode ter de 1 GB a 10 GB de armazenamento. Quando o espaço de armazenamento acaba, o pod de worker do Airflow é removido pelo plano de controle do GKE. Isso faz com que todas as tarefas que o worker excluído estava executando falhem.

    Exemplos de operações problemáticas:

    • Fazer o download de arquivos ou objetos e armazená-los localmente em um worker do Airflow. Em vez disso, armazene esses objetos diretamente em um serviço adequado, como um bucket do Cloud Storage.
    • Acesso a objetos grandes na pasta /data de um worker do Airflow. O worker do Airflow faz o download do objeto no sistema de arquivos local. Em vez disso, implemente seus DAGs para que arquivos grandes sejam processados fora do pod de worker do Airflow.

A importação da carga do DAG atingiu o tempo limite

Sintoma:

  • Na interface da Web do Airflow, na parte de cima da página da lista de DAGs, uma caixa de alerta vermelha mostra Broken DAG: [/path/to/dagfile] Timeout.
  • No Cloud Monitoring: os registros airflow-scheduler contêm entradas semelhantes a:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Corrigir:

Modifique a opção de configuração do Airflow dag_file_processor_timeout e conceda mais tempo para a análise do DAG:

Seção Chave Valor
core dag_file_processor_timeout Novo valor de tempo limite

A execução da DAG não termina no tempo esperado

Sintoma:

Às vezes, uma execução de DAG não termina porque as tarefas do Airflow ficam bloqueadas e a execução de DAG dura mais do que o esperado. Em condições normais, as tarefas do Airflow não ficam indefinidamente no estado em fila ou em execução, porque o Airflow tem procedimentos de limpeza e tempo limite que ajudam a evitar essa situação.

Corrigir:

  • Use o parâmetro dagrun_timeout para os DAGs. Por exemplo: dagrun_timeout=timedelta(minutes=120). Como resultado, cada execução de DAG precisa ser concluída dentro do tempo limite de execução de DAG. As tarefas não concluídas são marcadas como Failed ou Upstream Failed. Para mais informações sobre os estados de tarefa do Airflow, consulte a documentação do Apache Airflow.

  • Use o parâmetro tempo limite de execução da tarefa para definir um tempo limite padrão para tarefas executadas com base nos operadores do Apache Airflow.

As execuções de DAG não são executadas

Sintoma:

Quando uma data de programação de um DAG é definida dinamicamente, isso pode levar a vários efeitos colaterais inesperados. Exemplo:

  • Uma execução de DAG é sempre no futuro, e o DAG nunca é executado.

  • As execuções de DAG anteriores são marcadas como executadas e bem-sucedidas, mesmo que não sejam executadas.

Mais informações estão disponíveis na documentação do Apache Airflow (em inglês).

Possíveis soluções:

  • Siga as recomendações na documentação do Apache Airflow.

  • Defina start_date estático para DAGs. Como opção, use catchup=False para desativar a execução da DAG para datas anteriores.

  • Evite usar datetime.now() ou days_ago(<number of days>), a menos que você esteja ciente dos efeitos colaterais dessa abordagem.

Aumento do tráfego de rede de entrada e saída do banco de dados do Airflow

A quantidade de rede de tráfego entre o cluster do GKE do ambiente e o banco de dados do Airflow depende do número de DAGs, do número de tarefas nos DAGs e da maneira como os DAGs acessam os dados no banco de dados. Os fatores a seguir podem influenciar o uso da rede:

  • Consultas no banco de dados do Airflow. Se os DAGs fazem muitas consultas, eles geram grandes quantidades de tráfego. Exemplos: verificação do status de tarefas antes de prosseguir com outras tarefas, consultar a tabela XCom, despejar conteúdo do banco de dados do Airflow.

  • Um grande número de tarefas. Quanto mais tarefas houver para programar, mais tráfego de rede será gerado. Essa consideração se aplica ao número total de tarefas nos DAGs e à frequência de programação. Quando o programador do Airflow programa as execuções de DAG, ele faz consultas no banco de dados do Airflow e gera tráfego.

  • A interface da Web do Airflow gera tráfego de rede porque faz consultas ao banco de dados. O uso intenso de páginas com gráficos, tarefas e diagramas pode gerar grandes volumes de tráfego de rede.

O DAG trava o servidor da Web do Airflow ou faz com que ele retorne um erro de "tempo limite de gateway 502"

As falhas do servidor da Web podem ocorrer por diversos motivos. Verifique os registros airflow-webserver no Cloud Logging para determinar a causa do erro 502 gateway timeout.

Processar um grande número de DAGs e plug-ins em pastas de DAGs e plug-ins

O conteúdo das pastas /dags e /plugins é sincronizado do bucket do ambiente com os sistemas de arquivos locais dos workers e programadores do Airflow.

Quanto mais dados forem armazenados nessas pastas, mais tempo levará para realizar a sincronização. Para resolver essas situações:

  • Limite o número de arquivos nas pastas /dags e /plugins. Armazene apenas o mínimo de arquivos necessários.

  • Aumente o espaço em disco disponível para os programadores e workers do Airflow.

  • Aumente a CPU e a memória dos programadores e workers do Airflow para que a operação de sincronização seja realizada mais rapidamente.

  • No caso de um número muito grande de DAGs, divida os DAGs em lotes, compacte-os em arquivos ZIP e implante esses arquivos na pasta /dags. Essa abordagem acelera o processo de sincronização das DAGs. Os componentes do Airflow descompactam os arquivos ZIP antes de processar as DAGs.

  • A geração de DAGs de forma programática também pode ser um método para limitar o número de arquivos DAG armazenados na pasta /dags. Consulte a seção sobre DAGs programáticos para evitar problemas com a programação e execução de DAGs gerados programaticamente.

Não programe DAGs gerados de maneira programática ao mesmo tempo

Gerar objetos DAG de forma programática em um arquivo DAG é um método eficiente para criar muitos DAGs semelhantes que têm apenas pequenas diferenças.

É importante não programar todos esses DAGs para execução imediatamente. Há uma grande chance de os workers do Airflow não terem recursos de CPU e memória suficientes para executar todas as tarefas programadas ao mesmo tempo.

Para evitar problemas com a programação de DAGs programáticos:

  • Aumente a simultaneidade do worker e escalone o ambiente para que ele possa executar mais tarefas simultaneamente.
  • Gere DAGs de forma a distribuir as programações de maneira uniforme ao longo do tempo, para evitar a programação de centenas de tarefas ao mesmo tempo, para que os workers do Airflow tenham tempo para executar todas as tarefas programadas.

Erro 504 ao acessar o servidor da Web do Airflow

Consulte Erro 504 ao acessar a interface do Airflow.

A conexão perdida com o servidor Postgres durante a exceção de consulta é gerada durante a execução da tarefa ou logo depois dela

As exceções Lost connection to Postgres server during query geralmente acontecem quando as seguintes condições são atendidas:

  • O DAG usa PythonOperator ou um operador personalizado.
  • O DAG faz consultas no banco de dados do Airflow.

Se várias consultas forem feitas a partir de uma função chamável, os tracebacks poderão apontar incorretamente para a linha self.refresh_from_db(lock_for_update=True) no código do Airflow. é a primeira consulta do banco de dados após a execução da tarefa. A causa real da exceção acontece antes disso, quando uma sessão do SQLAlchemy não é fechada corretamente.

O escopo das sessões do SQLAlchemy é uma linha de execução e é criado em uma sessão de função chamável que pode ser continuada dentro do código do Airflow. Se houver atrasos significativos entre as consultas em uma sessão, a conexão já poderá ser fechada pelo servidor Postgres. O tempo limite de conexão nos ambientes do Cloud Composer é definido como aproximadamente 10 minutos.

Solução:

  • Use o decorador airflow.utils.db.provide_session. Esse decorador fornece uma sessão válida para o banco de dados do Airflow no parâmetro session e fecha corretamente a sessão no final da função.
  • Não use uma única função de longa duração. Em vez disso, mova todas as consultas do banco de dados para funções separadas, de modo que haja várias funções com o decorador airflow.utils.db.provide_session. Nesse caso, as sessões são fechadas automaticamente depois de recuperar os resultados da consulta.

Como controlar o tempo de execução de DAGs, tarefas e execuções paralelas do mesmo DAG

Se você quiser controlar a duração de uma única execução de DAG para um DAG específico, use o parâmetro DAG dagrun_timeout. Por exemplo, se você espera que uma única execução de DAG (independente de a execução terminar com sucesso ou falha) não dure mais de uma hora, defina esse parâmetro como 3600 segundos.

Também é possível controlar por quanto tempo uma única tarefa do Airflow pode durar. Para fazer isso, use execution_timeout.

Se você quiser controlar quantas execuções ativas de DAGs você quer ter para um DAG específico, use a opção de configuração do Airflow [core]max-active-runs-per-dag.

Se você quiser ter apenas uma instância de execução de DAG em um determinado momento, defina o parâmetro max-active-runs-per-dag como 1.

Problemas que afetam a sincronização de DAGs e plug-ins com programadores, workers e servidores da Web

O Cloud Composer sincroniza o conteúdo das pastas /dags e /plugins com programadores e workers. Alguns objetos nas pastas /dags e /plugins podem impedir que essa sincronização funcione corretamente ou diminuir a velocidade dela.

  • A pasta /dags é sincronizada com programadores e workers.

    Essa pasta não é sincronizada com o servidor da Web.

  • A pasta /plugins é sincronizada com programadores, workers e servidores da Web.

Você pode encontrar os seguintes problemas:

  • Você fez upload de arquivos compactados por gzip que usam transcodificação de compactação para as pastas /dags e /plugins. Isso geralmente acontece se você usar a flag --gzip-local-all em um comando gcloud storage cp para fazer upload de dados para o bucket.

    Solução: exclua o objeto que usou a transcodificação de compactação e faça o upload novamente no bucket.

  • Um dos objetos é chamado ".". Esse objeto não é sincronizado com programadores e workers e pode parar de sincronizar.

    Solução: renomeie o objeto.

  • Uma pasta e um arquivo Python de DAG têm o mesmo nome, por exemplo, a.py. Nesse caso, o arquivo DAG não é sincronizado corretamente com os componentes do Airflow.

    Solução: remova a pasta com o mesmo nome do arquivo Python do DAG.

  • Um dos objetos nas pastas /dags ou /plugins contém um símbolo / no final do nome do objeto. Esses objetos podem interferir no processo de sincronização porque o símbolo / significa que um objeto é uma pasta, não um arquivo.

    Solução: remova o símbolo / do nome do objeto problemático.

  • Não armazene arquivos desnecessários nas pastas /dags e /plugins.

    Às vezes, os DAGs e plug-ins implementados vêm com arquivos adicionais, como arquivos que armazenam testes para esses componentes. Esses arquivos são sincronizados com workers e programadores e afetam o tempo necessário para copiar esses arquivos para programadores, workers e servidores da Web.

    Solução: não armazene arquivos adicionais e desnecessários nas pastas /dags e /plugins.

Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' error is generated by schedulers and workers

Isso acontece porque os objetos podem ter um namespace sobreposto no Cloud Storage, enquanto os programadores e os workers usam sistemas de arquivos tradicionais. Por exemplo, é possível adicionar uma pasta e um objeto com o mesmo nome ao bucket de um ambiente. Quando o bucket é sincronizado com os programadores e workers do ambiente, esse erro é gerado, o que pode levar a falhas na tarefa.

Para corrigir esse problema, verifique se não há namespaces sobrepostos no bucket do ambiente. Por exemplo, se /dags/misc (um arquivo) e /dags/misc/example_file.txt (outro arquivo) estiverem em um bucket, um erro será gerado pelo programador.

Interrupções temporárias ao se conectar ao banco de dados de metadados do Airflow

O Cloud Composer é executado em uma infraestrutura distribuída. Isso significa que, de tempos em tempos, alguns problemas temporários podem aparecer e interromper a execução das suas tarefas do Airflow.

Nessas situações, as seguintes mensagens de erro podem aparecer nos registros dos workers do Airflow:

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

ou

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Esses problemas intermitentes também podem ser causados por operações de manutenção realizadas nos seus ambientes do Cloud Composer.

Normalmente, esses erros são intermitentes, e se as tarefas do Airflow forem idempotentes e você tiver novas tentativas configuradas, elas não vão afetar você. Também é possível definir janelas de manutenção.

Outro motivo para esses erros pode ser a falta de recursos no cluster do ambiente. Nesses casos, é possível aumentar ou otimizar o ambiente, conforme descrito nas instruções de Como aumentar o ambiente ou Como otimizar o ambiente.

Uma execução de DAG é marcada como concluída, mas não tem tarefas executadas

Se uma execução de DAG execution_date for anterior à start_date do DAG, é possível que haja execuções de DAG que não tenham execuções de tarefas, mas ainda estejam marcadas como concluídas.

Uma execução de DAG bem-sucedida sem tarefas executadas
Figura 3. Uma execução de DAG bem-sucedida sem tarefas executadas (clique para ampliar)

Causa

Essa situação pode acontecer em um dos seguintes casos:

  • Uma incompatibilidade é causada pela diferença de fuso horário entre o execution_date e o start_date do DAG. Isso pode acontecer, por exemplo, ao usar pendulum.parse(...) para definir start_date.

  • O start_date do DAG é definido como um valor dinâmico, por exemplo, airflow.utils.dates.days_ago(1)

Solução

  • Verifique se execution_date e start_date estão usando o mesmo fuso horário.

  • Especifique um start_date estático e combine-o com catchup=False para evitar a execução de DAGs com datas de início anteriores.

Um DAG não fica visível na interface do Airflow ou do DAG, e o programador não o agenda.

O processador DAG analisa cada DAG antes de ser programado pelo programador e antes que um DAG se torne visível na interface do Airflow ou na interface DAG.

As seguintes opções de configuração do Airflow definem os tempos limite para a análise de DAGs:

Se um DAG não estiver visível na interface do Airflow ou do DAG:

  • Verifique se o processador de DAG consegue processar corretamente seu DAG nos registros do processador de DAG. Em caso de problemas, talvez você veja as seguintes entradas de registro nos registros do programador ou do processador DAG:

    [2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
    /usr/local/airflow/dags/example_dag.py with PID 21903 started at
    2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
    
  • Verifique os registros do programador para saber se ele funciona corretamente. Em caso de problemas, você poderá ver as seguintes entradas de registro nos registros do agendador:

    DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
    Process timed out, PID: 68496
    

Soluções:

  • Corrija todos os erros de análise de DAG. O processador de DAG analisa vários DAGs e, em casos raros, os erros de análise de um DAG podem afetar negativamente a análise de outros DAGs.

  • Se a análise do DAG levar mais do que a quantidade de segundos definida em [core]dagrun_import_timeout, aumente esse tempo limite.

  • Se a análise de todos os DAGs levar mais do que a quantidade de segundos definida em [core]dag_file_processor_timeout, aumente esse tempo limite.

  • Se o DAG levar muito tempo para ser analisado, isso também pode significar que ele não está implementado da maneira ideal. Por exemplo, se ele ler muitas variáveis de ambiente ou realizar chamadas para serviços externos ou para o banco de dados do Airflow. Na medida do possível, evite realizar essas operações em seções globais de DAGs.

  • Aumente os recursos de CPU e memória do Programador para que ele funcione mais rápido.

  • Ajuste o número de programadores.

  • Aumente o número de processos do processador DAG para que a análise possa ser feita mais rapidamente. Para fazer isso, aumente o valor de [scheduler]parsing_process.

  • Diminuir a frequência da análise de DAG.

  • Reduza a carga no banco de dados do Airflow.

Sintomas de carga pesada no banco de dados do Airflow

Para mais informações, consulte Sintomas de sobrecarga do banco de dados do Airflow.

A seguir