Como solucionar problemas do programador do Airflow

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Esta página fornece etapas para solução de problemas e informações para problemas problemas com os programadores do Airflow.

Identificar a origem do problema

Para começar a solução de problemas, identifique se o problema acontece no tempo de análise do DAG ou durante o processamento de tarefas no momento da execução. Para mais informações sobre o tempo de análise e o tempo de execução do DAG, leia Diferença entre o tempo de análise do DAG e o tempo de execução do DAG.

Como inspecionar registros do processador de DAG

Se você tem DAGs complexos, o processador de DAG, que é executado pelo programador, pode não analisar todos os DAGs. Isso pode causar muitos problemas apresentar os sintomas a seguir.

Sintomas:

  • Se o processador de DAG encontrar problemas ao analisar os DAGs, isso poderá levar a uma combinação dos problemas listados abaixo. Se os DAGs forem gerados dinamicamente, esses problemas poderão ter mais impacto em comparação aos DAGs estáticos.

  • Os DAGs não são visíveis na interface do Airflow e do DAG.

  • Os DAGs não são programados para execução.

  • Há erros nos registros do processador de DAG, por exemplo:

    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    ou

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Os programadores do Airflow enfrentam problemas que levam a reinicializações do programador.

  • As tarefas do Airflow programadas para execução são canceladas e o DAG é executado para DAGs que não foram analisados podem ser marcados como failed. Exemplo:

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

Solução:

  • Aumente os parâmetros relacionados à análise do DAG:

  • Corrija ou remova os DAGs que causam problemas ao processador de DAG.

Como inspecionar tempos de análise do DAG

Para verificar se o problema acontece no momento da análise do DAG, siga estas etapas.

Console

No console do Google Cloud, é possível usar a página Monitoramento e a guia Registros para inspecionar os tempos de análise do DAG.

Inspecione os tempos de análise do DAG com a página de monitoramento do Cloud Composer:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do ambiente. A página Monitoramento é aberta.

  3. Na guia Monitoramento, consulte o gráfico Tempo total de análise de todos os arquivos DAG na seção Execuções do DAG e identifique possíveis problemas.

    A seção &quot;Execuções do DAG&quot; na guia &quot;Monitoramento&quot; do Composer mostra métricas de integridade para os DAGs no ambiente

Inspecione os tempos de análise do DAG com a guia Registros do Cloud Composer:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do ambiente. A página Monitoramento é aberta.

  3. Acesse a guia Registros e a árvore de navegação Todos os registros. Selecione a seção Gerenciador de processadores DAG.

  4. Analise os registros dag-processor-manager e identifique possíveis problemas.

    Os registros do processador de DAG vão mostrar os tempos de análise do DAG

gcloud

Use o comando dags report para conferir o tempo de análise de todos os DAGs.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente
  • LOCATION pela região em que o ambiente está localizado;

A saída do comando será semelhante a esta:

Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

Procure o valor de duration para cada um dos DAGs listados na tabela. Um valor alto pode indicar que um dos DAGs não está implementado da melhor forma possível. Na tabela de saída, é possível identificar quais DAGs têm um longo tempo de análise.

Como monitorar tarefas em execução ou na fila

Para verificar se há tarefas travadas em uma fila, siga estas etapas.

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Monitoramento.

  4. Na guia Monitoramento, consulte o gráfico Tarefas do Airflow. na seção Execuções de DAG para identificar possíveis problemas. Tarefas do Airflow são tarefas que estão em fila no Airflow, elas podem ir Fila de agentes do Celery ou do Kubernetes Executor. As tarefas na fila do Celery são tarefas que são colocadas na fila de agentes do Celery.

Solução de problemas no momento da análise do DAG

As seções a seguir descrevem sintomas e possíveis correções para alguns problemas comuns no tempo de análise do DAG.

Número limitado de linhas de execução

Permitir o gerenciador do processador de DAG (a parte do programador que processa arquivos DAG) usar apenas um número limitado de linhas de execução pode afetar o tempo de análise do DAG.

Para resolver o problema, substitua as seguintes opções de configuração do Airflow:

  • Para o Airflow 1.10.12 e versões anteriores, modifique o Parâmetro max_threads:

    Seção Chave Valor Observações
    scheduler max_threads NUMBER_OF_CORES_IN_MACHINE - 1 Substitua NUMBER_OF_CORES_IN_MACHINE pelo número de núcleos
    nas máquinas de nós de trabalho.
  • Para o Airflow 1.10.14 e versões mais recentes, modifique o Parâmetro parsing_processes:

    Seção Chave Valor Observações
    scheduler parsing_processes NUMBER_OF_CORES_IN_MACHINE - 1 Substitua NUMBER_OF_CORES_IN_MACHINE pelo número de núcleos
    nas máquinas de nós de trabalho.
.

Número e distribuição de tempo das tarefas

O Airflow é conhecido por ter problemas com a programação de um grande número de tarefas pequenas. Nesses casos, opte por um número menor de tarefas mais consolidadas.

Programar um grande número de DAGs ou tarefas ao mesmo tempo também pode ser uma fonte de problemas. Para evitar esse problema, distribua suas tarefas de maneira mais uniforme com o tempo.

Solução de problemas com tarefas em execução e na fila

As seções a seguir descrevem sintomas e possíveis correções para alguns problemas comuns com tarefas em execução e na fila.

Filas de tarefas são muito longas

Em alguns casos, uma fila de tarefas pode ser muito longa para o programador. Para informações sobre como otimizar parâmetros de worker e acelerado, leia sobre como escalonar o ambiente do Cloud Composer junto com a empresa.

Como usar o recurso TimeTable do programador do Airflow

A partir do Airflow 2.2, é possível definir uma tabela de horário para um DAG usando um um novo recurso chamado TimeTable.

Você pode definir uma tabela de horários usando um dos seguintes métodos:

.

Recursos de cluster limitados

Esta seção se aplica apenas ao Cloud Composer 1.

É possível que ocorram problemas de desempenho se o cluster do GKE do ambiente for muito pequeno para processar todos os DAGs e tarefas. Nesse caso, tente uma destas soluções:

  • Crie um novo ambiente com um tipo de máquina que ofereça mais desempenho e migre seus DAGs para ele.
  • Criar mais ambientes do Cloud Composer e dividir os DAGs entre eles
  • Altere o tipo de máquina dos nós do GKE, conforme descrito em Como fazer upgrade do tipo de máquina para os nós do GKE. Como esse procedimento é propenso a erros, é a opção menos recomendada.
  • Faça upgrade do tipo de máquina da instância do Cloud SQL que executa o banco de dados do Airflow no ambiente, por exemplo, usando os comandos gcloud composer environments update. O baixo desempenho do banco de dados do Airflow pode ser a razão o programador está lento.

Evite programar tarefas durante janelas de manutenção

É possível definir janelas de manutenção específicas para seus de nuvem. Durante esses períodos, ocorrem eventos de manutenção para o Cloud SQL e o GKE.

Fazer o programador do Airflow ignorar arquivos desnecessários

Melhore o desempenho do programador do Airflow ignorando arquivos desnecessários na pasta de DAGs. O programador do Airflow ignora arquivos e pastas especificados no arquivo .airflowignore.

Para fazer com que o programador do Airflow ignore arquivos desnecessários:

  1. Crie um arquivo .airflowignore.
  2. Nesse arquivo, liste as pastas e os arquivos que serão ignorados.
  3. Faça upload deste arquivo para a pasta /dags no seu bucket do seu ambiente de execução.

Para mais informações sobre o formato de arquivo .airflowignore, consulte Documentação do Airflow.

O programador do Airflow processa DAGs pausados

Os usuários do Airflow pausam os DAGs para evitar a execução. Dessa forma, os workers do Airflow ciclos de processamento de dados.

O programador do Airflow continuará analisando DAGs pausados. Se você realmente quiser melhorar o desempenho do programador do Airflow, usar .airflowignore ou excluir pausados DAGs da pasta de DAGs.

Uso de "wait_for_downstream" nos DAGs

Se você definir o parâmetro wait_for_downstream como True nos DAGs, para que uma tarefa seja bem-sucedida, todas as tarefas que estiverem imediatamente downstream também serão bem-sucedidas. de dados. Isso significa que a execução de tarefas pertencentes a uma determinada execução do DAG pode ser reduzida pela execução de tarefas da execução anterior do DAG. Leia mais sobre isso em a documentação do Airflow.

As tarefas que estiverem na fila por muito tempo serão canceladas e reprogramadas

Se uma tarefa do Airflow for mantida na fila por muito tempo, o programador o reprogramará para execução (nas versões do Airflow anteriores à 2.3.1, a tarefa também será marcada como com falha e será repetida se for qualificada para uma nova tentativa).

Uma maneira de observar os sintomas da situação é olhar para o gráfico com o número de tarefas na fila (Guia "Monitoramento" na interface do Cloud Composer) Se os picos do gráfico não diminuírem em cerca de duas horas, as tarefas provavelmente serão reprogramadas (sem registros) seguidas por "As tarefas adotadas ainda estavam pendentes ..." nos registros do programador. Nesses casos, você verá a mensagem "O arquivo de registros não foi encontrado...". mensagem nos registros de tarefas do Airflow porque a tarefa não foi executada.

Em geral, esse comportamento é esperado, e a próxima instância do deve ser executada de acordo com a programação. Se você observar muitas casos assim nos seus ambientes do Cloud Composer, isso pode significar que não há workers do Airflow suficientes no ambiente para processar todos as tarefas agendadas.

Resolução: para resolver esse problema, é necessário garantir que sempre haja capacidade em workers do Airflow para executar tarefas na fila. Por exemplo, você pode aumentar o número de workers ou worker_concurrency. Você também pode ajustar o paralelismo ou pools para evitar enfileirar tarefas além da sua capacidade.

Esporadicamente, tarefas desatualizadas podem bloquear a execução de um DAG específico

Em casos regulares, o programador do Airflow deve ser capaz de lidar com situações em que há tarefas desatualizadas na fila e, por algum motivo, não estão possível executá-las corretamente (por exemplo, um DAG ao qual pertencem as tarefas desatualizadas foi excluído).

Se essas tarefas desatualizadas não forem limpas pelo programador, talvez seja necessário excluí-los manualmente. É possível fazer isso, por exemplo, na interface do Airflow. É possível acesse (Menu &gt; Navegador &gt; Instâncias de tarefas), encontre tarefas na fila de um DAG desatualizado e as exclua.

Para resolver esse problema, faça upgrade do ambiente para o Cloud Composer versão 2.1.12 ou posterior.

Abordagem do Cloud Composer para o parâmetro [scheduler]min_file_process_interval

O Cloud Composer muda o modo como o [scheduler]min_file_process_interval é usado pelo programador do Airflow.

Airflow 1

Caso o Cloud Composer use o Airflow 1, os usuários podem definir o valor de [scheduler]min_file_process_interval entre 0 e 600 segundos. Valores maiores que 600 segundos trazem os mesmos resultados que se [scheduler]min_file_process_interval estivesse definido como 600 segundos.

Airflow 2

No Airflow 2, o [scheduler]min_file_process_interval só pode ser usado com 1.19.9 e 2.0.26 ou mais recentes

  • Versões do Cloud Composer anteriores à 1.19.9 e 2.0.26

    Nessas versões, [scheduler]min_file_process_interval é ignorado.

  • Cloud Composer versões 1.19.9 ou 2.0.26 ou versões mais recentes

    O programador do Airflow é reiniciado após um determinado número de vezes que todos os DAGs estão programados, e o parâmetro [scheduler]num_runs controla quantas vezes isso é feito pelo programador. Quando o programador atinge [scheduler]num_runs loops de programação, é reiniciado - O programador é um componente sem estado e essa reinicialização é um mecanismo de recuperação automática para quaisquer problemas que o Programador possa ter. Quando não é especificado, o padrão valor de [scheduler]num_runs é aplicado, que é 5000.

    [scheduler]min_file_process_interval pode ser usada para configurar a frequência A análise do DAG ocorre, mas o parâmetro não pode ser maior que o tempo necessário para um programador executar [scheduler]num_runs loops ao programar os DAGs.

Como escalonar a configuração do Airflow

O Airflow oferece opções de configuração que controlam quantas tarefas e DAGs o Airflow pode executar ao mesmo tempo. Para definir essas opções de configuração, substituir os valores deles no seu ambiente.

  • Simultaneidade do worker

    O parâmetro [celery]worker_concurrency controla o número máximo de tarefas que um worker do Airflow pode executar ao mesmo tempo. Se você multiplicar o valor desse parâmetro pelo número de workers do Airflow no ambiente do Cloud Composer, você receberá o número máximo de tarefas que podem ser executadas em um determinado momento no ambiente. Isso número é limitado pela opção de configuração [core]parallelism do Airflow, que é descrito mais detalhadamente.

    Nos ambientes do Cloud Composer 2, o valor padrão [celery]worker_concurrency é calculado automaticamente

    • Para as versões 2.3.3 e mais recentes do Airflow, [celery]worker_concurrency está definido com um valor mínimo de 32, 12 * worker_CPU e 8 * worker_memory.

    • Para as versões 2.2.5 ou anteriores do Airflow, [celery]worker_concurrency é Definido como 12 * número de workers e CPUs.

  • Máximo de execuções de DAGs ativas

    A opção de configuração [core]max_active_runs_per_dag do Airflow controla o número máximo de execuções ativas de DAGs por DAG. O programador não criará mais execuções de DAGs se atingir esse limite.

    Se esse parâmetro for definido incorretamente, você poderá encontrar um problema em que o programador restringe a execução do DAG, porque não é possível criar mais instâncias de execução do DAG em um determinado momento.

  • Máximo de tarefas ativas por DAG

    A opção de configuração [core]max_active_tasks_per_dag do Airflow controla o número máximo de instâncias de tarefa que podem ser executadas simultaneamente em cada DAG. É um parâmetro no nível do DAG.

    Se esse parâmetro for definido incorretamente, você poderá encontrar um problema em que a execução de uma única instância do DAG é lenta porque há apenas um número limitado de tarefas do DAG que podem ser executadas em um determinado momento

    Solução: aumentar [core]max_active_tasks_per_dag.

  • Paralelismo e tamanho do pool

    A opção de configuração [core]parallelism do Airflow controla quantas tarefas o programador do Airflow pode enfileirar na fila do executor após todas as dependências dessas tarefas serem atendidas.

    Este é um parâmetro global para toda a configuração do Airflow.

    As tarefas são enfileiradas e executadas em um pool. Os ambientes do Cloud Composer usam apenas um pool. O tamanho desse pool controla quantas tarefas podem ser enfileiradas pelo programador para execução em um determinado momento. Se o tamanho do pool for muito pequeno, o programador não poderá enfileirar tarefas para execução, mesmo que os limites sejam definidos pela opção de configuração [core]parallelism e pelo [celery]worker_concurrency. opção de configuração multiplicada pelo número de workers do Airflow ainda não foi atendida.

    É possível configurar o tamanho do pool na IU do Airflow (Menu > Administrador > Pools). Ajuste o tamanho do pool com o nível de paralelismo esperado no ambiente.

    Normalmente, [core]parallelism é definido como um produto do número máximo de workers. e [celery]worker_concurrency.

Os DAGs não são programados pelo programador devido aos tempos limite do processador de DAG

Para mais informações sobre esse problema, consulte Como solucionar problemas de DAGs.

Marcando tarefas como falhas após atingir dagrun_timeout

O programador marca tarefas que não estão concluídas (em execução, agendadas e na fila) falhou se uma execução do DAG não for concluída dentro dagrun_timeout (um parâmetro do DAG).

Solução:

Sintomas de pressão de carga no banco de dados do Airflow

Às vezes, nos registros do programador do Airflow, a seguinte entrada de registro de avisos:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Sintomas semelhantes também podem ser observados nos registros de workers do Airflow:

Para MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

Para PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

Tais erros ou avisos podem ser um sintoma do banco de dados do Airflow sendo sobrecarregado com o número de conexões abertas ou o número de consultas executados ao mesmo tempo, por programadores ou outros componentes do Airflow como workers, gatilhos e servidores da Web.

Soluções possíveis:

.

O servidor da Web mostra "O agendador parece não estar em execução" alerta

O programador relata o sinal de funcionamento regularmente para o Airflow no seu banco de dados. Com base nessas informações, o servidor da Web do Airflow determina se programador está ativo.

Às vezes, se o programador estiver sobrecarregado, talvez não consiga informar os batimentos a cada [scheduler]scheduler-heartbeat-sec.

Nesse caso, o servidor da Web do Airflow pode mostrar o seguinte aviso:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

Soluções possíveis:

Alternativas para problemas encontrados durante o preenchimento de DAGs

Às vezes, talvez você queira executar novamente os DAGs que já foram executados. É possível fazer isso com a ferramenta de linha de comando do Airflow da seguinte maneira:

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

Para executar novamente apenas tarefas com falha em um DAG específico, use também o argumento --rerun_failed_tasks.

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

Para executar novamente apenas tarefas com falha em um DAG específico, use também o argumento --rerun-failed-tasks.

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente
  • LOCATION pela região em que o ambiente está localizado;
  • START_DATE com um valor para o parâmetro do DAG start_date, em o formato YYYY-MM-DD.
  • END_DATE com um valor para o parâmetro do DAG end_date, em o formato YYYY-MM-DD.
  • DAG_NAME pelo nome do DAG.

A operação de preenchimento pode, às vezes, gerar uma situação de impasse em que uma o preenchimento não é possível porque há um bloqueio em uma tarefa. Exemplo:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

Em alguns casos, você pode usar as seguintes soluções para superar os impasses:

  • Desative o mini-Programador substituindo o [core]schedule-after-task-execution para False.

  • Execute preenchimentos para períodos mais restritos. Por exemplo, defina START_DATE. e END_DATE para especificar um período de apenas um dia.

A seguir