Como resolver problemas de DAGs (fluxos de trabalho)

Nesta página, você encontra informações sobre a solução de problemas comuns do fluxo de trabalho e suas etapas.

Como resolver problemas do fluxo de trabalho

Para começar a solução de problemas, siga estes passos:

  1. Verifique os registros do Airflow.
  2. Revise o conjunto de operações do Google Cloud.
  3. No Console do Cloud, verifique se há erros nas páginas dos componentes do Google Cloud em execução no ambiente.
  4. Na interface da Web do Airflow, verifique na Visualização do gráfico (em inglês) do DAG se há instâncias de tarefa com falha.

    Dica: para navegar por um DAG grande para procurar instâncias de tarefa com falha, mude a orientação da visualização do gráfico (em inglês) de LR para RL substituindo o dag_orientation padrão do servidor da Web.:

    Seção Chave Valor
    webserver dag_orientation LR, TB, RL ou BT

Como depurar falhas do operador

Para depurar uma falha do operador, siga estes passos:

  1. Verifique se há erros específicos da tarefa.
  2. Verifique os registros do Airflow.
  3. Revise o conjunto de operações do Google Cloud.
  4. Verifique os registros específicos do operador.
  5. Corrija os erros.
  6. Faça upload do DAG na pasta dags/.
  7. Na interface da Web do Airflow, limpe os estados anteriores (em inglês) do DAG.
  8. Execute o DAG ou retome esse processo.

Problemas comuns

As seções a seguir descrevem os sintomas e as possíveis correções de alguns problemas comuns do DAG.

A tarefa falha sem emitir registros

Os pods do Google Kubernetes Engine estão sujeitos ao ciclo de vida do pod do Kubernetes (em inglês) e à remoção de pods. Picos de tarefas e coprogramação de workers são as duas causas mais comuns de remoção de pods no Cloud Composer.

A remoção de pod pode ocorrer quando um determinado pod usa recursos em excesso de um nó em relação às expectativas de consumo de recursos configuradas para o nó. Por exemplo, a remoção pode acontecer quando várias tarefas com uso intenso de memória são executadas em um pod, e a carga combinada faz com que o nó em que esse pod seja executado exceda o limite de consumo de memória.

Se um pod de worker do Airflow for removido, todas as instâncias de tarefas em execução nele serão interrompidas e marcadas como com falha pelo Airflow.

Os registros são armazenados em buffer. Se um pod de worker for removido antes da limpeza do buffer, os registros não serão emitidos. Uma falha de tarefa sem registros é uma indicação de que os workers do Airflow são reiniciados devido à falta de memória (OOM, na sigla em inglês). É possível que alguns registros estejam presentes no Cloud Logging mesmo que os registros do Airflow não tenham sido emitidos. É possível ver os registros, por exemplo, porComo selecionar seu ambiente no Console do Google Cloud, navegando atéRegistros: e visualizar os registros de workers individuais emTodos os registros ->Registros do Airflow ->Trabalhadores ->(worker individual) ,

A execução do DAG é limitada pela RAM. Cada execução de tarefa começa com dois processos do Airflow: execução e monitoramento. Cada nó aceita até seis tarefas simultâneas, com aproximadamente 12 processos carregados com módulos do Airflow. É possível que mais memória seja consumida dependendo da natureza do DAG.

Sintoma

  1. No Console do Cloud, acesse o painel Kubernetes Engine -> Cargas de trabalho.

    Abrir o painel Cargas de trabalho

  2. Se houver pods airflow-worker que mostram Evicted, clique em cada pod removido e procure a mensagem The node was low on resource: memory na parte superior da janela.

Corrigir

A importação da carga do DAG atingiu o tempo limite

Sintoma:

  • IU da Web do Airflow: na parte superior da página de lista de DAGs, uma caixa de alerta vermelha mostra Broken DAG: [/path/to/dagfile] Timeout.
  • Pacote de operações do Google Cloud: os registros airflow-scheduler contêm entradas semelhantes a:
    • "ERROR - Process timed out"
    • "ERROR - Failed to import: /path/to/dagfile"
    • "AirflowTaskTimeout: Timeout"

Correção:

Modifique a opção de configuração dagbag_import_timeout do Airflow e permita mais tempo para a análise do DAG:

Seção Chave Valor
core dagbag_import_timeout Novo valor de tempo limite

Aumento do tráfego de rede para e do banco de dados do Airflow.

A quantidade de rede de tráfego entre o cluster do GKE do ambiente e o banco de dados do Airflow depende do número de DAGs, do número de tarefas nos DAGs e da maneira como os DAGs acessam os dados no banco de dados do Airflow. Os fatores a seguir podem influenciar o uso da rede:

  • Consultas ao banco de dados do Airflow. Se os DAGs fazem muitas consultas, eles geram grande quantidade de tráfego. Exemplos: verificação do status das tarefas antes de prosseguir com outras tarefas, consultar a tabela XCom e despejar conteúdo do banco de dados do Airflow.

  • Número grande de tarefas. Quanto mais tarefas houver para programação, mais tráfego de rede será gerado. Essa consideração se aplica ao número total de tarefas nos DAGs e à frequência de agendamento. Quando o programador do Airflow programa a execução do DAG, ele faz consultas ao banco de dados do Airflow e gera tráfego.

  • A interface da Web do Airflow gera tráfego de rede porque faz consultas ao banco de dados do Airflow. O uso intensivo de páginas com gráficos, tarefas e diagramas pode gerar grandes volumes de tráfego de rede.

O DAG falha no servidor da Web do Airflow ou faz com que ele retorne um erro 502 gateway timeout

As falhas no servidor da Web podem ocorrer por diversos motivos. Verifique os registros de airflow-webserver no Cloud Logging para determinar a causa do erro 502 gateway timeout.

Computação pesada

Evite executar computação pesada durante a análise do DAG. Ao contrário dos nós de worker e de programador, que têm tipos de máquina que podem ser personalizados para garantir maior capacidade de CPU e de memória, o servidor da Web usa um tipo de máquina fixo. Isso poderá gerar falhas na análise do DAG se a computação executada durante esse processo for muito pesada.

O servidor da Web tem duas vCPUs e 2 GB de memória. O valor padrão para core-dagbag_import_timeout é de 30 segundos. Esse valor de tempo limite define o limite máximo de quanto tempo o Airflow gasta carregando um módulo do Python na pasta dags/.

Permissões incorretas

O servidor da Web não é executado na mesma conta de serviço que os workers e o programador. Assim, eles podem acessar recursos gerenciados pelo usuário a que o servidor não tem acesso.

Recomendamos que você evite acessar recursos que não sejam públicos durante a análise do DAG. Às vezes, isso é inevitável, e você precisará conceder permissões à conta de serviço do servidor da Web. O nome da conta de serviço é derivado do domínio do servidor da Web. Por exemplo, se o domínio for foo-tp.appspot.com, a conta de serviço será foo-tp@appspot.gserviceaccount.com.

Erros do DAG

O servidor da Web é executado no App Engine e fica separado do cluster do GKE do ambiente. O servidor da Web analisa os arquivos de definição do DAG, e um 502 gateway timeout pode ocorrer se houver erros no DAG. O Airflow funciona normalmente sem um servidor da Web ativo, contanto que o DAG problemático não interrompa nenhum processo executado no GKE. Nesse caso, é possível usar gcloud composer environments run para recuperar detalhes do ambiente e como uma solução alternativa se o servidor da Web ficar indisponível.

Em outros casos, é possível executar a análise do DAG no GKE, além de pesquisar os DAGs que causam exceções fatais do Python ou esse tempo limite (padrão de 30 segundos). Para resolver os problemas, conecte-se a um shell remoto em um contêiner de worker do Airflow e teste os erros de sintaxe. Para mais informações, consulte Como testar DAGs.

A exceção Lost connection to MySQL server during query é gerada durante a execução da tarefa ou logo depois dela

As exceções Lost connection to MySQL server during query geralmente ocorrem quando as seguintes condições são atendidas:

  • Seu DAG usa PythonOperator ou um operador personalizado.
  • Seu DAG faz consultas no banco de dados do Airflow.

Se várias consultas forem feitas a partir de uma função chamável, os rastreamentos poderão apontar incorretamente para a linha self.refresh_from_db(lock_for_update=True) no código do Airflow. é a primeira consulta de banco de dados após a execução da tarefa. A causa real da exceção acontece antes disso, quando uma sessão do SQLAlchemy não é fechada corretamente.

As sessões SQLAlchemy têm como escopo uma linha de execução e são criadas em uma sessão de função chamável. Assim, é possível continuar posteriormente dentro do código do Airflow. Se houver atrasos significativos entre as consultas em uma sessão, a conexão poderá já estar fechada pelo servidor MySQL ou PostgreSQL. O tempo limite de conexão nos ambientes do Cloud Composer é definido como aproximadamente 10 minutos.

Correção:

  • Use o decorador airflow.utils.db.provide_session. Esse decorador fornece uma sessão válida para o banco de dados do Airflow no parâmetro session e fecha corretamente a sessão no final da função.
  • Não use uma única função de longa duração. Em vez disso, mova todas as consultas de banco de dados para funções separadas, de modo que haja várias funções com o decorador airflow.utils.db.provide_session. Nesse caso, as sessões são fechadas automaticamente depois de recuperar os resultados da consulta.