Como resolver problemas de DAGs

Cloud Composer 1 | Cloud Composer 2

Nesta página, você verá as etapas de solução de problemas e informações sobre problemas comuns do fluxo de trabalho.

Muitos problemas de execução do DAG são causados pelo desempenho não ideal do ambiente. Para otimizar o ambiente do Cloud Composer 2, siga o guia Otimizar o desempenho e os custos do ambiente.

Alguns problemas de execução do DAG podem ser causados pelo programador do Airflow não funcionar corretamente ou de maneira ideal. Siga as instruções de solução de problemas do programador para resolver esses problemas.

Como resolver problemas do fluxo de trabalho

Para começar a solução de problemas, siga estes passos:

  1. Verifique os registros do Airflow.
  2. Verifique o Painel do Monitoring.
  3. Analise o Cloud Monitoring.
  4. No Console do Cloud, verifique se há erros nas páginas dos componentes do ambiente.
  5. Na interface da Web do Airflow, verifique na Visualização do gráfico (em inglês) do DAG se há instâncias de tarefa com falha.

    Dica: para navegar por um DAG grande para procurar instâncias de tarefas com falha, mude a orientação da visualização do gráfico de LR para RL modificando a seguinte opção de configuração do Airflow:

    Seção Chave Valor
    webserver dag_orientation LR, TB, RL ou BT

Como depurar falhas do operador

Para depurar uma falha do operador, siga estes passos:

  1. Verifique se há erros específicos da tarefa.
  2. Verifique os registros do Airflow.
  3. Analise o Cloud Monitoring.
  4. Verificar registros específicos do operador.
  5. Corrija os erros.
  6. Faça upload do DAG para a pasta dags/.
  7. Na interface da Web do Airflow, limpe os estados anteriores do DAG.
  8. Retome ou execute o DAG.

Problemas comuns

Nas seções a seguir, você encontra descrições dos sintomas e possíveis correções de alguns problemas comuns do DAG.

A tarefa falha sem emitir registros devido a erros de análise do DAG.

Às vezes, podem ocorrer erros sutis de DAG que levam a uma situação em que um programador do Airflow e um processador de DAG conseguem programar tarefas para execução e analisar um arquivo DAG, respectivamente, mas o worker do Airflow não executa tarefas de um DAG, já que há erros de programação em um arquivo DAG do Python. Isso pode levar a uma situação em que uma tarefa do Airflow é marcada como Failed e não há registro da execução.

Solução: verifique nos registros do worker do Airflow se não há erros gerados pelo worker do Airflow relacionado a erros de análise de DAG ou DAG ausentes.

A tarefa falha sem emitir registros devido à pressão do recurso

Sintoma: durante a execução de uma tarefa, o subprocesso do worker do Airflow responsável pela execução da tarefa do Airflow é interrompido abruptamente. O erro visível no registro do worker do Airflow pode ser semelhante ao seguinte:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Solução:

A tarefa falha sem emitir registros devido à remoção do pod

Os pods do Google Kubernetes Engine estão sujeitos ao ciclo de vida do pod do Kubernetes e à remoção de pods. Picos de tarefas e coprogramação de workers são duas causas mais comuns de remoção de pods no Cloud Composer.

A remoção de pods pode ocorrer quando um determinado pod usa em excesso os recursos de um nó, em relação às expectativas de consumo de recursos configuradas para o nó. Por exemplo, a remoção pode acontecer quando várias tarefas com uso intenso de memória são executadas em um pod e a carga combinada faz com que o nó em que esse pod é executado exceda o limite de consumo de memória.

Se um pod de worker do Airflow for removido, todas as instâncias de tarefas em execução nele serão interrompidas e, posteriormente, marcadas como com falha pelo Airflow.

Os registros são armazenados em buffer. Se um pod de worker for removido antes da limpeza do buffer, os registros não serão emitidos. Quando uma tarefa falha sem emitir registros, isso indica que os workers do Airflow serão reiniciados devido à falta de memória (OOM, na sigla em inglês). Alguns registros podem estar presentes no Cloud Logging mesmo que os registros do Airflow não tenham sido emitidos.

Para ver os registros:

  1. No Console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente será aberta.

  3. Acesse a guia Registros.

  4. Veja registros de workers individuais em Todos os registros -> Registros do Airflow -> Workers -> (worker individuais).

A execução do DAG é limitada pela memória. Todas as tarefas são iniciadas com dois processos do Airflow: execução de tarefa e monitoramento. Cada nó aceita até seis tarefas simultâneas, com aproximadamente 12 processos carregados com módulos do Airflow. É possível que mais memória seja consumida dependendo da natureza do DAG.

Sintoma:

  1. No Console do Google Cloud, acesse a página Cargas de trabalho.

    Acesse "Cargas de trabalho"

  2. Se houver pods airflow-worker que mostrem Evicted, clique em cada pod removido e procure a mensagem The node was low on resource: memory na parte superior da janela.

Corrigir:

A importação da carga do DAG atingiu o tempo limite

Sintoma:

  • Na interface da Web do Airflow, na parte superior da página da lista de DAGs, uma caixa de alerta vermelha mostra Broken DAG: [/path/to/dagfile] Timeout.
  • No Cloud Monitoring: os registros airflow-scheduler contêm entradas semelhantes a estas:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Corrigir:

Modifique a opção de configuração dag_file_processor_timeout do Airflow e permita mais tempo para a análise do DAG:

Seção Chave Valor
core dag_file_processor_timeout Novo valor de tempo limite

Aumento do tráfego de rede de entrada e saída do banco de dados do Airflow

A quantidade de rede de tráfego entre o cluster do GKE do ambiente e o banco de dados do Airflow depende do número de DAGs, do número de tarefas nos DAGs e da maneira como os DAGs acessam os dados no banco de dados. Os fatores a seguir podem influenciar o uso da rede:

  • Consultas no banco de dados do Airflow. Se os DAGs fazem muitas consultas, eles geram grandes quantidades de tráfego. Exemplos: verificação do status de tarefas antes de prosseguir com outras tarefas, consultar a tabela XCom, despejar conteúdo do banco de dados do Airflow.

  • Um grande número de tarefas. Quanto mais tarefas houver para programar, mais tráfego de rede será gerado. Essa consideração se aplica ao número total de tarefas nos DAGs e à frequência de programação. Quando o programador do Airflow programa as execuções de DAG, ele faz consultas no banco de dados do Airflow e gera tráfego.

  • A interface da Web do Airflow gera tráfego de rede porque faz consultas ao banco de dados. O uso intenso de páginas com gráficos, tarefas e diagramas pode gerar grandes volumes de tráfego de rede.

O DAG falha no servidor da Web do Airflow ou faz com que ele retorne um erro 502 gateway timeout

As falhas do servidor da Web podem ocorrer por diversos motivos. Verifique os registros airflow-webserver no Cloud Logging para determinar a causa do erro 502 gateway timeout.

Computação pesada

Esta seção se aplica apenas ao Cloud Composer 1.

Evite executar computação pesada durante a análise do DAG.

Ao contrário dos nós de worker e de programador, que têm tipos de máquina que podem ser personalizados para aumentar a capacidade de CPU e memória, o servidor da Web usa um tipo de máquina fixo. Isso poderá gerar falhas na análise do DAG se a computação em tempo de análise for muito pesada.

O servidor da Web tem duas vCPUs e 2 GB de memória. O valor padrão para core-dagbag_import_timeout é de 30 segundos. Esse valor de tempo limite define o limite máximo de quanto tempo o Airflow gasta carregando um módulo do Python na pasta dags/.

Permissões incorretas

Esta seção se aplica apenas ao Cloud Composer 1.

O servidor da Web não é executado na mesma conta de serviço que os workers e o programador. Assim, eles podem acessar recursos gerenciados pelo usuário a que o servidor não tem acesso.

Recomendamos que você evite acessar recursos que não sejam públicos durante a análise do DAG. Às vezes, isso é inevitável, e você precisará conceder permissões à conta de serviço do servidor da Web. O nome da conta de serviço é derivado do domínio do servidor da Web. Por exemplo, se o domínio for example-tp.appspot.com, a conta de serviço será example-tp@appspot.gserviceaccount.com.

Erros do DAG

Esta seção se aplica apenas ao Cloud Composer 1.

O servidor da Web é executado no App Engine e fica separado do cluster do GKE do ambiente. O servidor da Web analisa os arquivos de definição do DAG, e um 502 gateway timeout pode ocorrer se houver erros no DAG. O Airflow funciona normalmente sem um servidor da Web funcional se o DAG problemático não estiver interrompendo os processos em execução no GKE. Nesse caso, use gcloud composer environments run para recuperar detalhes do ambiente e como solução alternativa se o servidor da Web ficar indisponível.

Em outros casos, é possível executar a análise do DAG no GKE, além de pesquisar os DAGs que causam exceções fatais do Python ou esse tempo limite (padrão de 30 segundos). Para resolver os problemas, conecte-se a um shell remoto em um contêiner de worker do Airflow e teste os erros de sintaxe. Para mais informações, consulte Como testar DAGs.

Erro 504 ao acessar o servidor da Web do Airflow

Consulte Erro 504 ao acessar a IU do Airflow.

A exceção Lost connection to MySQL server during query é gerada durante a execução da tarefa ou logo depois dela

As exceções Lost connection to MySQL / PostgreSQL server during query geralmente ocorrem quando as seguintes condições são atendidas:

  • O DAG usa PythonOperator ou um operador personalizado.
  • O DAG faz consultas no banco de dados do Airflow.

Se várias consultas forem feitas a partir de uma função chamável, os tracebacks poderão apontar incorretamente para a linha self.refresh_from_db(lock_for_update=True) no código do Airflow. é a primeira consulta do banco de dados após a execução da tarefa. A causa real da exceção acontece antes disso, quando uma sessão do SQLAlchemy não é fechada corretamente.

O escopo das sessões do SQLAlchemy é uma linha de execução e é criado em uma sessão de função chamável que pode ser continuada dentro do código do Airflow. Se houver atrasos significativos entre as consultas em uma sessão, a conexão já poderá ser fechada pelo servidor MySQL ou PostgreSQL. O tempo limite de conexão nos ambientes do Cloud Composer é definido como aproximadamente 10 minutos.

Corrigir:

  • Use o decorador airflow.utils.db.provide_session. Esse decorador fornece uma sessão válida para o banco de dados do Airflow no parâmetro session e fecha corretamente a sessão no final da função.
  • Não use uma única função de longa duração. Em vez disso, mova todas as consultas do banco de dados para funções separadas, de modo que haja várias funções com o decorador airflow.utils.db.provide_session. Nesse caso, as sessões são fechadas automaticamente depois de recuperar os resultados da consulta.

Como controlar o tempo de execução de DAGs, tarefas e execuções paralelas do mesmo DAG

Se você quiser controlar a duração de uma única execução de DAG para um determinado DAG, use o parâmetro dagrun_timeout do DAG. Por exemplo, se você esperar que uma única execução de DAG não termine (por padrão, com sucesso ou falha), não poderá durar mais de uma hora, defina esse parâmetro como 3.600 segundos.

Também é possível controlar por quanto tempo uma tarefa do Airflow durará. Para fazer isso, é possível usar execution_timeout.

Se quiser controlar quantas execuções de DAG ativas você quer ter para um DAG específico, use a opção de configuração do Airflow [core]max-active-runs-per-dag para isso.

Se você quiser que apenas uma instância de um DAG seja executada em um momento específico, defina o parâmetro max-active-runs-per-dag como 1.

Problemas que afetam DAGs e plug-ins e sincronizam com programadores, workers e servidores da Web

O Cloud Composer sincroniza o conteúdo das pastas /dags e /plugins com os programadores e os workers. Alguns objetos em pastas /dags e /plugins podem impedir que essa sincronização funcione corretamente ou pelo menos desacelerar.

  • A pasta /dags está sincronizada com os programadores e os workers. Essa pasta não é sincronizada com servidores da Web no Cloud Composer 2 ou se você ativar DAG Serialization no Cloud Composer 1.

  • A pasta /plugins está sincronizada com os programadores, os workers e os servidores da Web.

Os seguintes problemas podem acontecer:

  • Você fez upload de arquivos compactados gzip que usam a transcodificação de compactação para pastas /dags e /plugins. Isso geralmente acontecerá se você usar o comando gsutil cp -Z para fazer upload de dados para o bucket.

    Solução: exclua o objeto que usou a transcodificação de compactação e faça novamente o upload para o bucket.

  • Um dos objetos tem o nome '.'. Esse objeto não é sincronizado com os programadores e os workers, e pode parar de ser sincronizado.

    Solução: renomear um objeto problemático

  • Um dos objetos nas pastas /dags ou /plugins contém um símbolo / no final do nome do objeto. Esses objetos podem enganar o processo de sincronização porque o símbolo / significa que um objeto é uma pasta, não um arquivo.

    Solução: remova o símbolo / do nome do objeto problemático.

  • Não armazene arquivos desnecessários em pastas /dags e /plugins.

    Às vezes, os DAGs e plug-ins implementados são acompanhados por arquivos adicionais, como arquivos que armazenam testes para esses componentes. Esses arquivos são sincronizados com workers e programadores e afetam o tempo necessário de cópia deles para programadores, workers e servidores da Web.

    Solução: não armazene nenhum arquivo adicional e desnecessário em pastas /dags e /plugins.

O erro Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' é gerado por programadores e workers

Esse problema acontece porque os objetos podem ter um namespace sobreposto no Cloud Storage. Ao mesmo tempo, os programadores e os workers usam sistemas de arquivos tradicionais. Por exemplo, é possível adicionar uma pasta e um objeto com o mesmo nome a um bucket do ambiente. Quando o bucket é sincronizado com os programadores e workers do ambiente, esse erro é gerado, o que pode levar a falhas de tarefas.

Para corrigir esse problema, verifique se não há namespaces sobrepostos no ambiente. Por exemplo, se /dags/misc (um arquivo) e /dags/misc/example_file.txt (outro arquivo) estiverem em um bucket, um erro será gerado pelo programador.

Interrupções temporárias ao se conectar ao banco de dados de metadados do Airflow

O Cloud Composer é executado na infraestrutura em nuvem distribuída. Isso significa que, de tempos em tempos, podem surgir alguns problemas transitórios e interromper a execução das tarefas do Airflow.

Nessas situações, as seguintes mensagens de erro podem ser exibidas nos registros dos workers do Airflow:

"Can't connect to MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

ou

"Can't connect to MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Esses problemas intermitentes também podem ser causados por operações de manutenção realizadas nos ambientes do Cloud Composer.

Normalmente, esses erros são intermitentes e, se as tarefas do Airflow forem idempotentes e as tentativas estiverem configuradas, você estará imune a elas. Considere também definir janelas de manutenção.

Outro motivo para esses erros pode ser a falta de recursos no cluster do ambiente. Nesses casos, é possível escalonar ou otimizar o ambiente conforme descrito nas instruções em Como escalonar ambientes ou Como otimizar o ambiente.

O DAG não fica visível na IU do Airflow ou na IU do DAG, e o programador não faz a programação dele

O processador DAG analisa cada DAG antes que ele possa ser programado pelo programador e antes que ele fique visível na IU do Airflow ou na IU do DAG. A opção de configuração do [core]dag_file_processor_timeout do Airflow define quanto tempo o processador do DAG precisa analisar um único DAG.

Se um DAG não estiver visível na IU do Airflow ou na IU, siga estas etapas:

  • Verifique os registros do processador de DAG se ele puder processar corretamente o DAG. Em caso de problemas, é possível ver as seguintes entradas de registro nos registros do processador DAG ou do programador:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.

Esse erro pode ser causado por um dos seguintes motivos:

  • O DAG não foi implementado corretamente e o processador não pode analisá-lo. Nesse caso, corrija o DAG.

  • A análise do DAG leva mais do que a quantidade de segundos definida em [core]dag_file_processor_timeout.

    Nesse caso, você pode aumentar esse tempo limite.

  • Se o DAG levar muito tempo para analisar, isso também poderá significar que ele não foi implementado da maneira ideal. Por exemplo, se ele lê muitas variáveis de ambiente ou executa chamadas para serviços externos. Se esse for o caso, otimize seu DAG para que o processador possa processá-lo rapidamente.

Sintomas de que o banco de dados do Airflow está sob carga pesada

Às vezes, nos registros de worker do Airflow, são exibidas as seguintes entradas de registro de aviso.

Para MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

Para o PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

Esse erro ou alerta pode indicar que o banco de dados do Airflow está sobrecarregado pelo número de consultas que ele precisa processar.

Soluções possíveis:

A seguir