Resolver problemas do programador do Airflow

Cloud Composer 1 | Cloud Composer 2

Nesta página, você verá etapas de solução de problemas e informações sobre problemas comuns com os programadores do Airflow.

Identificar a origem do problema

Para começar a solução de problemas, identifique se o problema acontece no tempo de análise do DAG ou durante o processamento de tarefas no momento da execução. Para mais informações sobre o tempo de análise e o tempo de execução do DAG, leia Diferença entre o tempo de análise do DAG e o tempo de execução do DAG.

Como inspecionar tempos de análise do DAG

Para verificar se o problema acontece no momento da análise do DAG, siga estas etapas.

Console

  1. No Console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente será aberta.

  3. Acesse a guia Monitoramento.

  4. Na guia Monitoramento, consulte o gráfico Tempo total de análise de todos os arquivos DAG na seção Execuções do DAG e identifique possíveis problemas.

    A seção "Execuções DAG" na guia "Monitoramento" do Composer mostra métricas de integridade dos DAGs no ambiente

gcloud

Use o comando list_dags com a sinalização -r para ver o tempo de análise de todos os seus DAGs.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    list_dags -- -r

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente
  • LOCATION pela região em que o ambiente está localizado;

A saída do comando será semelhante a esta:

-------------------------------------------------------------------
DagBag loading stats for /home/airflow/gcs/dags
-------------------------------------------------------------------
Number of DAGs: 5
Total task number: 13
DagBag parsing time: 0.6765180000000001
-----------+----------+---------+----------+-----------------------
file       | duration | dag_num | task_num | dags
-----------+----------+---------+----------+-----------------------
/dag_1.py  | 0.6477   |       1 |        2 | ['dag_1']
/dag_2.py  | 0.018652 |       1 |        2 | ['dag_2']
/dag_3.py  | 0.004024 |       1 |        6 | ['dag_3']
/dag_4.py  | 0.003476 |       1 |        2 | ['dag_4']
/dag_5.py  | 0.002666 |       1 |        1 | ['dag_5']
-----------+----------+---------+----------+-----------------------

Procure o valor de Tempo de análise do DagBag. Um valor grande pode indicar que um dos seus DAGs não está implementado da maneira ideal. Na tabela de respostas, é possível identificar quais DAGs têm um longo tempo de análise.

Como monitorar tarefas em execução ou na fila

Para verificar se há tarefas travadas em uma fila, siga estas etapas.

  1. No Console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente será aberta.

  3. Acesse a guia Monitoramento.

  4. Na guia Monitoramento, analise o gráfico Tarefas em execução e na fila na seção Execuções DAG e identifique possíveis problemas.

Solução de problemas no momento da análise do DAG

As seções a seguir descrevem sintomas e possíveis correções para alguns problemas comuns no tempo de análise do DAG.

Número limitado de linhas de execução

Permitir que o gerenciador do processador de DAG (a parte do programador que processa arquivos DAG) use apenas um número limitado de linhas de execução pode afetar o tempo de análise do DAG.

Para resolver o problema, aplique as seguintes alterações ao arquivo de configuração air.cfg:

  • Para o Airflow 1.10.12 e versões anteriores, use o parâmetro max_threads:

    [scheduler]
    max_threads = <NUMBER_OF_CORES_IN_MACHINE - 1>
    
  • No Airflow 1.10.14 e versões mais recentes, use o parâmetro parsing_processes:

    [scheduler]
    parsing_processes = <NUMBER_OF_CORES_IN_MACHINE - 1>
    

Substitua NUMBER_OF_CORES_IN_MACHINE pelo número de núcleos nas máquinas de nós de trabalho.

Número e distribuição de tempo das tarefas

O Airflow é conhecido por ter problemas com a programação de um grande número de tarefas pequenas. Nesses casos, opte por um número menor de tarefas mais consolidadas.

Programar um grande número de DAGs ou tarefas ao mesmo tempo também pode ser uma fonte de problemas. Para evitar esse problema, distribua suas tarefas de maneira mais uniforme com o tempo.

Solução de problemas com tarefas em execução e na fila

As seções a seguir descrevem sintomas e possíveis correções para alguns problemas comuns com tarefas em execução e na fila.

Filas de tarefas são muito longas

Em alguns casos, uma fila de tarefas pode ser muito longa para o programador. Para informações sobre como otimizar parâmetros de worker e acelerado, leia sobre como escalonar o ambiente do Cloud Composer junto com a empresa.

Recursos de cluster limitados

Esta seção se aplica apenas ao Cloud Composer 1.

É possível que ocorram problemas de desempenho se o cluster do GKE do ambiente for muito pequeno para processar todos os DAGs e tarefas. Nesse caso, tente uma destas soluções:

  • Crie um novo ambiente com um tipo de máquina que ofereça mais desempenho e migre seus DAGs para ele.
  • Criar mais ambientes do Cloud Composer e dividir os DAGs entre eles
  • Altere o tipo de máquina dos nós do GKE, conforme descrito em Como fazer upgrade do tipo de máquina para os nós do GKE. Como esse procedimento é propenso a erros, é a opção menos recomendada.
  • Faça upgrade do tipo de máquina da instância do Cloud SQL que executa o banco de dados do Airflow no ambiente, por exemplo, usando os comandos gcloud composer environments update. O baixo desempenho do banco de dados do Airflow pode ser o motivo da lentidão do programador.

Evite programar tarefas durante janelas de manutenção

É possível definir janelas de manutenção específicas para seu ambiente. Durante esses períodos, ocorrem eventos de manutenção para o Cloud SQL e o GKE.

Fazer o programador do Airflow ignorar arquivos desnecessários

Melhore o desempenho do programador do Airflow ignorando arquivos desnecessários na pasta de DAGs. O programador do Airflow ignora arquivos e pastas especificados no arquivo .airflowignore.

Para fazer com que o programador do Airflow ignore arquivos desnecessários:

  1. Crie um arquivo .airflowignore.
  2. Nesse arquivo, liste as pastas e os arquivos que serão ignorados.
  3. Faça o upload desse arquivo para a pasta /dags no seu ambiente.

Para mais informações sobre o formato de arquivo .airflowignore, consulte a documentação do Airflow.

O programador do Airflow processa DAGs pausados

Os usuários do Airflow pausam os DAGs para evitar a execução. Isso salva os ciclos de processamento dos workers do Airflow.

O programador do Airflow continuará analisando DAGs pausados. Se você realmente quiser melhorar o desempenho do programador do Airflow, use .airflowignore ou exclua os DAGs pausados da pasta DAGs.

Uso de "wait_for_downstream" nos DAGs

Se você definir o parâmetro wait_for_downstream como True nos DAGs, para que uma tarefa seja bem-sucedida, todas as tarefas que estiverem imediatamente downstream também serão bem-sucedidas. de dados. Isso significa que a execução de tarefas pertencentes a uma determinada execução do DAG pode ser reduzida pela execução de tarefas da execução anterior do DAG. Leia mais sobre isso na documentação do Airflow.

As tarefas na fila por muito tempo serão canceladas e reprogramadas

Se uma tarefa do Airflow for mantida na fila por muito tempo, o programador irá marcá-la como failed/up_for_retry e reprogramá-la para execução. Uma maneira de observar os sintomas dessa situação é analisar o gráfico com o número de tarefas na fila (guia "Monitoramento" na IU do Cloud Composer) e, se os picos do gráfico não cairem em cerca de 10 minutos, é provável que haja falhas nas tarefas (sem registros) seguidas de "Tarefas pendentes ainda estão pendentes... Nesses casos, você verá a mensagem "O arquivo de registros não foi encontrado". Os registros de tarefas do Airflow não foram executados nos registros de tarefas do Airflow.

Em geral, essa falha de tarefa é esperada e a próxima instância da tarefa programada deve ser executada de acordo com a programação. Se você observar muitos desses casos nos ambientes do Cloud Composer, isso poderá significar que não há workers suficientes do Airflow no ambiente para processar todas as tarefas programadas.

Resolução: para resolver esse problema, é necessário garantir que sempre haja capacidade nos workers do Airflow para executar tarefas na fila. Por exemplo, é possível aumentar o número de workers ou worker_concurrency. Também é possível ajustar o paralelismo ou os pools para evitar tarefas de enfileiramento mais do que a capacidade que você tem.

Esporadicamente, tarefas desatualizadas podem bloquear a execução de um DAG específico

Em casos regulares, o programador do Airflow consegue lidar com situações em que há tarefas desatualizadas na fila e, por algum motivo, não é possível executá-las corretamente (por exemplo, um DAG a que as tarefas desatualizadas pertencem foi excluído).

Se essas tarefas desatualizadas não forem limpas pelo programador, talvez seja necessário excluí-las manualmente. Você pode fazer isso, por exemplo, na IU do Airflow, acessar (Menu > Navegador > Instâncias de tarefas) para encontrar tarefas na fila que pertencem a um DAG desatualizado e excluí-las.

Abordagem do Cloud Composer para o parâmetro min_file_process_interval

O Cloud Composer altera a maneira como min_file_process_interval é usado pelo programador do Airflow.

No caso do Cloud Composer usando o Airflow 1, os usuários podem definir o valor de min_file_process_interval entre 0 e 600 segundos. Os valores maiores que 600 segundos trarão os mesmos resultados de min_file_process_interval definido como 600 segundos.

No caso do Cloud Composer usando o Airflow 2, os usuários podem definir o valor de min_file_process_interval como um valor entre 0 e 1.200 segundos. Os valores acima de 1.200 segundos gerarão os mesmos resultados como se min_file_process_interval estivesse definido como 1.200 segundos.

Como escalonar a configuração do Airflow

O Airflow oferece opções de configuração que controlam quantas tarefas e DAGs o Airflow pode executar ao mesmo tempo. Para definir essas opções de configuração, modifique os valores do ambiente.

  • Simultaneidade do worker

    O parâmetro [celery]worker_concurrency controla o número máximo de tarefas que um worker do Airflow pode executar ao mesmo tempo. Se você multiplicar o valor desse parâmetro pelo número de workers do Airflow no ambiente do Cloud Composer, você receberá o número máximo de tarefas que podem ser executadas em um determinado momento no ambiente. Esse número é limitado pela opção de configuração [core]parallelism do Airflow, que é descrita em mais detalhes.

  • Máximo de execuções de DAGs ativas

    A opção de configuração [core]max_active_runs_per_dag do Airflow controla o número máximo de execuções ativas de DAGs por DAG. O programador não criará mais execuções de DAGs se atingir esse limite.

    Se esse parâmetro for definido incorretamente, você poderá encontrar um problema em que o programador restringe a execução do DAG, porque não é possível criar mais instâncias de execução do DAG em um determinado momento.

  • Máximo de tarefas ativas por DAG

    A opção de configuração [core]max_active_tasks_per_dag do Airflow controla o número máximo de instâncias de tarefa que podem ser executadas simultaneamente em cada DAG. É um parâmetro no nível do DAG.

    Se esse parâmetro for definido incorretamente, você poderá encontrar um problema em que a execução de uma única instância do DAG é lenta porque há apenas um número limitado de tarefas do DAG que podem ser executadas em um determinado momento

    Solução: aumente o [core]max_active_tasks_per_dag para atender às suas necessidades

  • Paralelismo e tamanho do pool

    A opção de configuração [core]parallelism do Airflow controla quantas tarefas o programador do Airflow pode enfileirar na fila do executor após todas as dependências dessas tarefas serem atendidas.

    Este é um parâmetro global para toda a configuração do Airflow.

    As tarefas são enfileiradas e executadas em um pool. Os ambientes do Cloud Composer usam apenas um pool. O tamanho desse pool controla quantas tarefas podem ser enfileiradas pelo programador para execução em um determinado momento. Se o tamanho do pool for muito pequeno, o programador não poderá enfileirar tarefas para execução, mesmo que os limites sejam definidos pela opção de configuração [core]parallelism e pelo [celery]worker_concurrency. opção de configuração multiplicada pelo número de workers do Airflow ainda não foi atendida.

    É possível configurar o tamanho do pool na IU do Airflow (Menu > Administrador > Pools). Ajuste o tamanho do pool com o nível de paralelismo esperado no ambiente.

Os DAGs não foram programados pelo Scheduler devido aos tempos limite do processador de DAGs

Leia mais sobre esse problema na seção de solução de problemas de DAGs

Marcar tarefas como reprovadas após chegar a dagrun_timeout

O Scheduler marcará as tarefas não concluídas (em execução, programadas e na fila) como falhas se uma execução do DAG não for concluída no dagrun_timeout (um parâmetro do DAG) Solução:

  • Ampliar dagrun_timeout (um parâmetro DAG) para atender às suas necessidades

  • Escalone verticalmente ou amplie os workers do Airflow para que o DAG seja executado mais rapidamente

Sintomas de que o Database Database está sob pressão

Às vezes, pode aparecer a seguinte entrada nos registros do programador do Airflow

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Esse erro ou alerta pode ser um sintoma do banco de dados de metadados do Airflow sobrecarregado com operações.

Soluções possíveis:

A seguir