Depurar problemas de agendamento de tarefas

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Este tutorial orienta você a diagnosticar e resolver problemas de programação de tarefas e de análise que levam a um mau funcionamento do programador, erros de análise e latência e falha na tarefa.

Introdução

O programador do Airflow é afetado principalmente por dois fatores: programação de tarefas e análise de DAG. Problemas em um desses fatores podem ter um impacto negativo na integridade e desempenho do ambiente.

Às vezes, muitas tarefas são programadas ao mesmo tempo. Nessa situação, a fila fica cheia, e as tarefas permanecem no estado "programada" ou são reprogramadas depois de serem enfileiradas, o que pode causar falha na tarefa e latência de desempenho.

Outro problema comum é a latência de análise e os erros causados pela complexidade de um código DAG. Por exemplo, um código DAG que contenha variáveis do Airflow na parte superior pode gerar atrasos na análise, sobrecarga do banco de dados, e tempo limite do DAG.

Neste tutorial, você vai diagnosticar os DAGs de exemplo e aprender a resolver problemas de programação e análise, melhorar a programação de DAGs e otimizar o código e as configurações do ambiente de DAGs para melhorar o desempenho.

Objetivos

Esta seção lista os objetivos dos exemplos neste tutorial.

Exemplo: mau funcionamento do programador e latência causada por alta simultaneidade de tarefas

  • Faça upload do DAG de exemplo que é executado várias vezes simultaneamente e diagnostique o problema de funcionamento do programador e os problemas de latência com o Cloud Monitoring.

  • Otimize o código do DAG consolidando as tarefas e avaliando o impacto na performance.

  • Distribuir as tarefas de maneira mais uniforme ao longo do tempo e avaliar o desempenho. impacto.

  • otimizar as configurações do Airflow e do ambiente; avaliar o impacto.

Exemplo: latência e erros de análise do DAG causados por código complexo

  • Fazer upload do DAG de amostra com variáveis do Airflow e diagnosticar a análise problemas com o Cloud Monitoring.

  • Otimize o código DAG evitando variáveis do Airflow no nível superior código e avaliar o impacto no tempo de análise.

  • Otimize as configurações do Airflow e do ambiente e avalie o impacto no tempo de análise.

Custos

Neste tutorial, usamos o seguinte componente faturável do Google Cloud:

Ao concluir este tutorial, exclua os recursos criados para evitar o faturamento contínuo. Veja mais detalhes em Limpeza.

Antes de começar

Esta seção descreve as ações necessárias antes de iniciar o tutorial.

Criar e configurar um projeto

Para este tutorial, você precisa ter uma conta do Google Cloud projeto. Configure o projeto da seguinte maneira:

  1. No console do Google Cloud, selecione ou crie um projeto:

    Acessar o seletor de projetos

  2. Verifique se o faturamento foi ativado para o projeto. Saiba como verificar se o faturamento está ativado em um projeto.

  3. Verifique se o usuário do projeto do Google Cloud tem os seguintes papéis para criar os recursos necessários:

    • Administrador de ambiente e de objetos do Storage (roles/composer.environmentAndStorageObjectAdmin)
    • Administrador do Compute (roles/compute.admin)

Ativar as APIs do projeto

Enable the Cloud Composer API.

Enable the API

Criar seu ambiente do Cloud Composer

Crie um ambiente do Cloud Composer 2.

Como parte da criação do ambiente, você concede a extensão do agente de serviço da API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) para o agente de serviço do Composer do Compute Engine. O Cloud Composer usa essa conta para realizar operações no projeto do Google Cloud.

Exemplo: falha no agendador e na tarefa devido a problemas de agendamento

Este exemplo demonstra a depuração do mau funcionamento do programador e a latência causada devido à alta simultaneidade de tarefas.

faça upload do DAG de amostra para o ambiente

Faça upload do DAG de exemplo abaixo para o ambiente criado nas etapas anteriores. Neste tutorial, o DAG é chamado de dag_10_tasks_200_seconds_1.

Este DAG tem 200 tarefas. Cada tarefa aguarda um segundo e exibe "Concluída!". O DAG é acionado automaticamente após o upload. O Cloud Composer executa esse DAG 10 vezes, e todas as execuções do DAG acontecem em paralelo.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Diagnosticar o mau funcionamento do programador e problemas de falha nas tarefas

Depois que as execuções do DAG forem concluídas, abra a interface do Airflow e clique no DAG dag_10_tasks_200_seconds_1. Você vai notar que 10 execuções de DAG foram concluídas e cada uma delas tem 200 tarefas concluídas.

Revise os registros de tarefas do Airflow:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Registros e depois Todos os registros > Registros do Airflow > Workers > Ver na Análise de registros.

No histograma de registros, é possível ver os erros e avisos indicados com cores vermelha e laranja:

O histograma de registros de workers do Airflow com erros e avisos
    indicados com cores vermelha e laranja
Figura 1. Histograma de registros de workers do Airflow (clique para ampliar)

O DAG de exemplo resultou em cerca de 130 avisos e 60 erros. Clique em qualquer que contém barras amarelas e vermelhas. Você verá algumas e erros nos registros:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

Esses registros podem indicar que o uso de recursos excedeu os limites o worker reiniciou por conta própria.

Se uma tarefa do Airflow é mantida na fila por muito tempo, o programador marca como falhou e up_for_retry e vai reprogramá-lo novamente para execução. Uma maneira de observar os sintomas dessa situação é olhar para o gráfico com o número de tarefas na fila e se os picos do gráfico não cair em cerca de 10 minutos, provavelmente haverá falhas nas tarefas (sem registros).

Revise as informações de monitoramento:

  1. Acesse a guia Monitoramento e selecione Visão geral.

  2. Analise o gráfico de tarefas do Airflow.

    Gráfico das tarefas do Airflow ao longo do tempo, mostrando um pico no número de tarefas na fila
    Figura 2. Gráfico de tarefas do Airflow (clique para ampliar)

    No gráfico de tarefas do Airflow, há um pico nas tarefas na fila que dura cerca de mais de 10 minutos, o que pode significar que não há recursos suficientes em seu ambiente para processar todas as tarefas agendadas.

  3. Analise o gráfico Trabalhadores ativos:

    O gráfico de workers ativos do Airflow ao longo do tempo mostra que o
    número de workers ativos foi aumentado até o limite máximo.
    Figura 3. Gráfico de workers ativos (clique para ampliar)

    O gráfico Workers ativos indica que o DAG acionou o escalonamento automático para o limite máximo permitido de três workers durante a execução do DAG.

  4. Os gráficos de uso de recursos podem indicar a falta de capacidade em workers do Airflow para executar tarefas na fila. Na guia Monitoring, selecione Workers e analise os gráficos Total worker CPU usage e Total worker memory usage.

    O gráfico de uso da CPU por workers do Airflow mostra o uso da CPU
    aumentando até o limite máximo
    Figura 4. Gráfico de uso da CPU dos workers (clique para ampliar)
    O gráfico do uso da memória por workers do Airflow mostra o uso da memória
    aumentando, mas não atingindo o limite máximo
    Figura 5. Gráfico do uso total de memória de workers (clique para ampliar)

    Os gráficos indicam que a execução de muitas tarefas ao mesmo tempo resultou no limite da CPU. Os recursos foram usados por mais de 30 minutos, o que é ainda mais do que a duração total de 200 tarefas em 10 O DAG é executado um por um.

Esses são os indicadores de que a fila está cheia e de falta de recursos para processar todas as tarefas agendadas.

Consolidar suas tarefas

O código atual cria muitos DAGs e tarefas sem recursos suficientes para processar todas as tarefas em paralelo, o que resulta na preenchimento da fila. Manter as tarefas na fila por muito tempo pode fazer com que elas sejam reagendadas ou falhem. Nessas situações, você deve optar por um número menor de modelos tarefas.

O DAG de exemplo a seguir muda o número de tarefas no exemplo inicial de 200 para 20 e aumenta o tempo de espera de 1 para 10 segundos para imitar tarefas mais consolidadas que fazem a mesma quantidade de trabalho.

Faça upload do DAG de amostra a seguir no ambiente que você criou. Neste tutorial, o DAG é chamado de dag_10_tasks_20_seconds_10.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Avaliar o impacto de tarefas mais consolidadas nos processos de agendamento:

  1. Aguarde até que as execuções do DAG sejam concluídas.

  2. Na interface do Airflow, na página DAGs, clique em DAG dag_10_tasks_20_seconds_10. Você vai encontrar 10 execuções de DAG, cada uma com 20 tarefas bem-sucedidas.

  3. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  4. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  5. Acesse a guia Registros e depois Todos os registros > Registros do Airflow > Workers > Visualizar no Explorer de registros.

    O segundo exemplo com tarefas mais consolidadas resultou em aproximadamente 10 avisos e 7 erros. No histograma, é possível comparar o número de erros e avisos no exemplo inicial (valores anteriores) e no segundo exemplo (valores posteriores).

    O histograma de registros de worker do Airflow com erros e avisos
    mostra a diminuição da quantidade de erros e avisos após as tarefas serem
    consolidados
    Figura 6. Histograma de registros de workers do Airflow depois que as tarefas foram consolidadas (clique para ampliar)

    Ao comparar o primeiro exemplo com o mais consolidado, é possível observar que há muito menos erros e avisos no segundo exemplo. No entanto, os mesmos erros relacionados ao encerramento com estado salvo ainda aparecem devido à sobrecarga de recursos.

  6. Na guia Monitoramento, selecione Workers e analise os gráficos.

    Ao comparar o gráfico Tarefas do Airflow do primeiro exemplo (valores anteriores) com o gráfico do segundo exemplo com tarefas mais consolidadas, é possível notar que o pico nas tarefas em fila durou um período mais curto quando as tarefas estavam mais consolidadas. No entanto, ela durou cerca de 10 minutos, o que ainda é abaixo do ideal.

    O gráfico de tarefas do Airflow ao longo do tempo mostra que o pico em
    As tarefas do Airflow duraram um período menor do que antes.
    Figura 7. Gráfico de tarefas do Airflow depois que as tarefas foram consolidadas (clique para ampliar)

    No gráfico "Workers ativos", confira o primeiro exemplo (à esquerda) do lado do gráfico) usou os recursos por um período muito mais longo que o segundo, embora os dois exemplos imitem a mesma quantidade de funcionam.

    O gráfico de workers ativos do Airflow ao longo do tempo mostra que o
    número de workers ativos aumentou por um período mais curto
    do que antes.
    Figura 8. Gráfico de workers ativos após tarefas foram consolidadas (clique para ampliar)

    Analisar os gráficos de consumo de recursos dos workers. Mesmo que a diferença entre os recursos usados no exemplo com tarefas mais consolidadas e exemplo inicial é bastante significativo, o uso da CPU ainda está aumentando a 70% do limite.

    O gráfico de uso da CPU pelos workers do Airflow mostra o uso da CPU
    aumentando até 70% do limite máximo.
    Figura 9. Gráfico de uso total da CPU pelos workers após tarefas foram consolidadas (clique para ampliar)
    O gráfico do uso da memória por workers do Airflow mostra o uso da memória aumentando, mas não atingindo o limite máximo
    Figura 10. Gráfico do uso total de memória dos workers depois que as tarefas foram consolidadas (clique para ampliar)

Distribuir as tarefas de maneira mais uniforme ao longo do tempo

Muitas tarefas simultâneas resultam no preenchimento da fila, o que faz com que as tarefas fiquem presas na fila ou sejam reprogramadas. Nas etapas anteriores, você reduziu o número de tarefas ao consolidar essas tarefas. No entanto, os logs de saída e o monitoramento indicaram que o número de tarefas simultâneas ainda é subótimo.

É possível controlar o número de execuções de tarefas simultâneas implementando uma programação ou definir limites para quantas tarefas podem ser executadas simultaneamente.

Neste tutorial, você vai distribuir as tarefas de maneira mais uniforme ao longo do tempo adicionando parâmetros no nível do DAG ao DAG dag_10_tasks_20_seconds_10:

  1. Adicione o argumento max_active_runs=1 ao gerenciador de contexto do DAG. Este argumento define o limite de uma única instância da execução de um DAG em um determinado momento.

  2. Adicione o argumento max_active_tasks=5 ao gerenciador de contexto do DAG. Esse argumento controla o número máximo de instâncias de tarefa que podem ser executadas simultaneamente em cada DAG.

Faça o upload do DAG de exemplo a seguir para o ambiente criado. Neste tutorial, o DAG é chamado de dag_10_tasks_20_seconds_10_scheduled.py.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5


with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Avaliar o impacto da distribuição de tarefas ao longo do tempo nos processos de agendamento:

  1. Aguarde até que as execuções do DAG sejam concluídas.

  2. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  3. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  4. Acesse a guia Registros e depois Todos os registros > Registros do Airflow > Workers > Ver na Análise de registros.

  5. No histograma, é possível notar que o terceiro DAG com um número limitado de tarefas e execuções ativas não geraram avisos ou erros, e dos registros parece mais uniforme em comparação com os valores anteriores.

    O histograma de registros de workers do Airflow com erros e avisos
    não mostra erros ou avisos depois que as tarefas foram consolidadas e
    distribuídas ao longo do tempo.
    Figura 11. O worker do Airflow registra o histograma após o as tarefas foram consolidadas e distribuídas ao longo do tempo (clique para ampliar)

As tarefas no exemplo dag_10_tasks_20_seconds_10_scheduled que têm um número limitado de tarefas ativas e execuções não causaram pressão de recursos porque as tarefas foram enfileiradas de maneira uniforme.

Depois de executar as etapas descritas, você otimizou o uso de recursos consolidar pequenas tarefas e distribuí-las de modo mais uniforme ao longo do tempo.

Otimizar as configurações do ambiente

Você pode ajustar as configurações do ambiente para garantir que os workers do Airflow sempre tenham capacidade para executar tarefas na fila.

Número de workers e simultaneidade de workers

É possível ajustar o número máximo de workers para que o Cloud Composer escalone automaticamente o ambiente dentro dos limites definidos.

O parâmetro [celery]worker_concurrency define o número máximo de tarefas que um único worker pode pegar da fila de tarefas. Alterar esse parâmetro ajusta o número de tarefas que um único worker pode executar ao mesmo tempo. É possível mudar essa opção de configuração do Airflow substituindo-a. Por padrão, a simultaneidade do worker é definida como um mínimo de 32, 12 * worker_CPU, 8 * worker_memory, o que significa que ela depende dos limites de recursos do worker. Consulte Otimizar ambientes para mais informações sobre os valores padrão de simultaneidade do worker.

O número de workers e a simultaneidade dos workers trabalham em conjunto, e o desempenho do ambiente é altamente dependente de ambos os parâmetros. Use as seguintes considerações para escolher a combinação correta:

  • Várias tarefas rápidas em execução em paralelo. É possível aumentar o número de workers simultaneidade quando há tarefas aguardando na fila, e seus workers usam uma pequena porcentagem das CPUs e da memória ao mesmo tempo. No entanto, em determinadas circunstâncias, a fila pode nunca ficar cheia, fazendo com que o escalonamento automático nunca seja acionado. Se tarefas pequenas terminam a execução no momento em que os novos workers estão prontos, um worker existente pode assumir as tarefas restantes não haverá tarefas para workers recém-criados.

    Nessas situações, é recomendável aumentar o número mínimo de workers e a simultaneidade de workers. para evitar um escalonamento excessivo.

  • Várias tarefas longas em execução em paralelo. A alta simultaneidade de workers impede que o sistema escalone o número de workers. Se várias tarefas usam muitos recursos e demoram muito para serem concluídas, um alto nível de a simultaneidade pode fazer com que a fila nunca seja preenchida e todas as tarefas sejam por apenas um worker, o que causa problemas de desempenho. Nessas situações, é recomendável aumentar o número máximo de workers e diminuir a simultaneidade.

A importância do paralelismo

Os programadores do Airflow controlam a programação de execuções de DAGs e tarefas individuais os DAGs. A opção de configuração [core]parallelism do Airflow controla quantas tarefas o programador do Airflow pode enfileirar na fila do executor após todas as dependências dessas tarefas serem atendidas.

O paralelismo é um mecanismo de proteção do Airflow que determina quantas tarefas podem ser executadas ao mesmo tempo por cada programador, independentemente da contagem de workers. O valor de paralelismo, multiplicado pelo número de programadores no cluster, é o número máximo de instâncias de tarefas que o ambiente pode enfileirar.

Normalmente, [core]parallelism é definido como um produto de um número máximo de workers e [celery]worker_concurrency. Ele também é afetado pelo pool. É possível mudar essa opção de configuração do Airflow substituindo-a. Para mais informações sobre como ajustar as configurações do Airflow relacionadas à escala, consulte Como ajustar a configuração do Airflow.

Encontrar as configurações de ambiente ideais

A maneira recomendada de corrigir problemas de programação é consolidar tarefas pequenas em tarefas maiores e distribuir as tarefas de maneira mais uniforme ao longo do tempo. Além de otimizar o código do DAG, você também pode otimizar as configurações do ambiente para ter uma capacidade suficiente para executar várias tarefas simultaneamente.

Por exemplo, suponha que você consolide tarefas no seu DAG o máximo possível, mas limitando as tarefas ativas para distribuí-las de modo mais uniforme tempo não é uma solução preferida para seu caso de uso específico.

É possível ajustar o paralelismo, o número e a simultaneidade de workers. parâmetros para executar o DAG dag_10_tasks_20_seconds_10 sem limitar o acesso tarefas. Neste exemplo, o DAG é executado 10 vezes, e cada execução contém 20 pequenas tarefas. Se você quiser executar todos simultaneamente:

  • Você vai precisar de um tamanho de ambiente maior, porque ele controla os parâmetros de desempenho da infraestrutura gerenciada do Cloud Composer do seu ambiente.

  • Os workers do Airflow precisam ser capazes de executar 20 tarefas simultaneamente, o que significa que você precisa definir a simultaneidade do worker como 20.

  • Os workers precisam de CPU e memória suficientes para processar todas as tarefas. A simultaneidade do worker é afetada pela CPU e pela memória do worker. Portanto, você vai precisar de pelo menos worker_concurrency / 12 na CPU e least worker_concurrency / 8 na memória.

  • Você vai precisar aumentar o paralelismo para corresponder à maior simultaneidade de workers. Para que os workers peguem 20 tarefas da fila, o programador precisa agendar essas 20 tarefas primeiro.

Ajuste as configurações do ambiente da seguinte maneira:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Configuração do ambiente.

  4. Encontre a configuração Recursos > Cargas de trabalho e clique em Editar.

  5. Na seção Worker, no campo Memory, especifique a nova memória. para workers do Airflow. Neste tutorial, use 4 GB.

  6. No campo CPU, especifique o novo limite de CPU para os workers do Airflow. Neste use duas vCPUs.

  7. Salve as mudanças e aguarde alguns minutos para que os workers do Airflow reiniciar.

Em seguida, substitua as opções de configuração do Airflow para paralelismo e simultaneidade de workers:

  1. Acesse a guia Modificações de configuração do Airflow.

  2. Clique em Edit e em Add Airflow Configuration Override.

  3. Modifique a configuração de parralismo:

    Seção Chave Valor
    core parallelism 20
  4. Clique em Add Airflow Configuration Override e substitua a configuração de concorrência do worker:

    Seção Chave Valor
    celery worker_concurrency 20
  5. Clique em Salvar e aguarde até que o ambiente atualize a configuração.

Acione o mesmo DAG de exemplo novamente com as configurações ajustadas:

  1. Na interface do Airflow, acesse a página DAGs.

  2. Encontre e exclua o DAG dag_10_tasks_20_seconds_10.

    Depois que o DAG é excluído, o Airflow verifica a pasta de DAGs no bucket do ambiente e executa o DAG novamente de forma automática.

Depois que as execuções de DAG forem concluídas, analise o histograma de registros novamente. No diagrama, observe que o exemplo dag_10_tasks_20_seconds_10 com mais tarefas consolidadas não geravam nenhum erro e aviso ao serem executadas com a configuração do ambiente ajustada. Compare os resultados com os dados anteriores no diagrama, em que o mesmo exemplo gerou erros e avisos ao em execução com a configuração de ambiente padrão.

O histograma de registros de worker do Airflow com erros e avisos
        não mostra nenhum erro e aviso após a configuração do ambiente
        ajustado
Figura 12. Histograma de registros de workers do Airflow depois que a configuração do ambiente foi ajustada (clique para ampliar)

As configurações de ambiente e do Airflow desempenham um papel crucial na programação de tarefas. No entanto, não é possível aumentar as configurações além de determinados limites.

Recomendamos otimizar o código DAG, consolidar tarefas e usar a programação para otimizar a performance e a eficiência.

Exemplo: erros de análise e latência de DAG devido a um código complexo

Neste exemplo, você investiga a latência de análise de uma amostra de DAG que imita um excesso de variáveis do Airflow.

Criar uma variável do Airflow

Antes de fazer upload do exemplo de código, crie uma nova variável do Airflow.

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na coluna Servidor da Web do Airflow, siga o link Airflow do ambiente.

  3. Acesse Administrador > Variáveis > Adicionar um novo registro.

  4. Defina os seguintes valores:

    • chave: example_var
    • val: test_airflow_variable

faça upload do DAG de amostra para o ambiente

Faça upload do DAG de exemplo abaixo para o ambiente criado nas etapas anteriores. Neste tutorial, o DAG é chamado dag_for_loop_airflow_variable:

Esse DAG contém um loop for que é executado 1.000 vezes e imita um excesso de variáveis do Airflow. Cada iteração lê a variável example_var e gera uma tarefa. Cada tarefa contém um comando que imprime o valor da variável.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

Diagnosticar os problemas de análise

O tempo de análise do DAG é o tempo que leva para o programador do Airflow ler um arquivo DAG e analisá-lo. Antes que o programador do Airflow possa programar qualquer tarefa de um DAG, o programador precisa analisar o arquivo DAG para descobrir a estrutura sobre o DAG e as tarefas definidas.

Se a análise de um DAG demorar muito, isso consumirá a capacidade do programador e pode reduzir o desempenho das execuções de DAGs.

Para monitorar o tempo de análise do DAG:

  1. Execute o comando da CLI do Airflow dags report em CLI gcloud para conferir o tempo de análise de todos os DAGs:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    Substitua:

    • ENVIRONMENT_NAME: o nome do ambiente;
    • LOCATION: a região em que o ambiente está localizado.
  2. Na saída do comando, procure o valor de duração do valor-chave DAG dag_for_loop_airflow_variables. Um valor grande pode indicar que para que o DAG não seja implementado da melhor maneira. Se você tiver vários DAGs, na tabela de saída, é possível identificar quais DAGs têm um longo tempo de análise.

    Exemplo:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Inspecione os tempos de análise do DAG no console do Google Cloud:

    1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  4. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  5. Acesse a guia Registros e depois Todos os registros > Gerenciador do processador de DAG.

  6. Analise os registros do dag-processor-manager e identifique possíveis problemas.

    Uma entrada de registro do DAG de exemplo mostra que o tempo de análise do DAG é de 46,3 segundos.
    Figura 13. Os registros do gerenciador do processador DAG mostram os tempos de análise do DAG (clique para ampliar)

Se o tempo de análise total do DAG exceder cerca de 10 segundos, seus programadores poderão estar sobrecarregados com a análise do DAG e não poderão executar DAGs de maneira eficaz.

Otimizar o código DAG

É recomendado evitar o código Python "de nível superior" desnecessário nos DAGs. DAGs com muitos importações, variáveis e funções fora do DAG introduzem maior análise para o programador do Airflow. Isso reduz a performance e a escalabilidade do Cloud Composer e do Airflow. O excesso de leitura de variáveis do Airflow leva a um tempo de análise longo e a uma carga alta do banco de dados. Se esse código estiver em um arquivo DAG, essas funções serão executadas em cada batimento cardíaco do programador, o que pode ser lento.

Os campos de modelo do Airflow permitem incorporar valores do Airflow variáveis e modelos Jinja nos seus DAGs. Isso evita o uso desnecessário execução da função durante os sinais de funcionamento do programador.

Para implementar o exemplo de DAG de uma maneira melhor, evite usar variáveis do Airflow no código Python de nível superior dos DAGs. Em vez disso, transmita as variáveis do Airflow para operadores usam um modelo Jinja, que atrasa a leitura do valor até a execução da tarefa.

Faça o upload da nova versão do DAG de exemplo para o ambiente. Neste tutorial, o DAG é chamado de dag_for_loop_airflow_variable_optimized.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

Inspecione o novo tempo de análise do DAG:

  1. Aguarde até que a execução do DAG seja concluída.

  2. Execute o comando dags report novamente para ver a o tempo de análise de todos os DAGs:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. Revise os registros dag-processor-manager outra vez e a duração da análise.

    Uma entrada de registro para o DAG de amostra mostra que o tempo de análise do DAG é 4,21
    segundos
    Figura 14. Os registros do gerenciador do processador de DAG mostram o DAG tempos de análise após a otimização do código DAG (clique para ampliar)

Ao substituir as variáveis de ambiente por modelos do Airflow, você simplificou o código DAG e reduziu a latência de análise em cerca de dez vezes.

Otimizar as configurações do ambiente do Airflow

O programador do Airflow tenta constantemente acionar novas tarefas e analisar todos os DAGs no bucket do ambiente. Se os DAGs tiverem um tempo de análise longo e o programador consumir muitos recursos, será possível otimizar as configurações do programador do Airflow para que ele use os recursos de maneira mais eficiente.

Neste tutorial, os arquivos DAG levam muito tempo para analisar e analisar ciclos. começam a se sobrepor, o que esgota a capacidade do programador. Em nosso exemplo, o primeiro DAG de exemplo leva mais de 5 segundos para analisar, então você vai configurar que o programador seja executado com menos frequência para usar os recursos com mais eficiência. Você vai substituir a opção de configuração do Airflow scheduler_heartbeat_sec. Essa configuração define com que frequência o programador precisa ser executado (em segundos). Por padrão, o valor é definido como 5 segundos. Para alterar essa opção de configuração do Airflow, substituí-lo.

Substitua a opção de configuração do Airflow scheduler_heartbeat_sec:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Substituições de configuração do Airflow.

  4. Clique em Editar e em Adicionar substituição da configuração do Airflow.

  5. Modifique a opção de configuração do Airflow:

    Seção Chave Valor
    scheduler scheduler_heartbeat_sec 10
  6. Clique em Salvar e aguarde a atualização da configuração do ambiente.

Verifique as métricas do programador:

  1. Acesse a guia Monitoramento e selecione Agendadores.

  2. No gráfico Frequência de funcionamento do programador, clique no botão Mais opções (três pontos) e, em seguida, clique em Visualizar no Metrics Explorer.

O gráfico de batimento cardíaco do programador mostra que o batimento cardíaco ocorre com menos frequência
Figura 15. Gráfico de batimentos do programador (clique para ampliar)

No gráfico, você vai notar que o programador é executado com metade da frequência depois que você mudou a configuração padrão de 5 segundos para 10 segundos. Ao reduzir a frequência de batimentos, você garante que o programador não comece a ser executado enquanto o ciclo de análise anterior estiver em andamento e a capacidade de recursos do programador não estiver esgotada.

Atribuir mais recursos ao programador

No Cloud Composer 2, é possível alocar mais recursos de CPU e memória para o programador. Dessa forma, você pode aumentar a performance do programador e para acelerar o tempo de análise do DAG.

Aloque mais CPU e memória para o programador:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Configuração do ambiente.

  4. Encontre a configuração Resources > Workloads e Clique em Editar.

  5. Na seção Programador, no campo Memória, especifique o novo limite de memória. Neste tutorial, use 4 GB.

  6. No campo CPU, especifique o novo limite de CPU. Neste tutorial, use duas vCPUs.

  7. Salve as mudanças e aguarde alguns minutos para que os programadores do Airflow reiniciar.

  8. Acesse a guia Registros e depois Todos os registros > Gerenciador de processador DAG.

  9. Revise os registros dag-processor-manager e compare a duração da análise dos exemplos de DAGs:

    Uma entrada de registro do DAG de amostra mostra que o tempo de análise do DAG otimizado é de 1,5 segundo. Para o DAG não otimizado, o tempo de análise é de 28,71 segundos.
    Figura 16. Os registros do gerenciador do processador de DAG mostram o DAG tempos de análise após mais recursos terem sido atribuídos ao programador (clique para ampliar)

Ao atribuir mais recursos ao programador, você aumentou a capacidade dele e reduziu a latência de análise significativamente em comparação com as configurações padrão do ambiente. Com mais recursos, o programador pode analisar os DAGs mais rapidamente. No entanto, os custos associados aos recursos do Cloud Composer também vão aumentar. Além disso, não é possível aumentar recursos além de um determinado limite.

Recomendamos alocar recursos somente após o possível código DAG e As otimizações de configuração do Airflow foram implementadas.

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados neste tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.

Exclua o projeto

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Excluir recursos individuais

Se você planeja ver vários tutoriais e guias de início rápido, a reutilização de projetos pode evitar que você exceda os limites da cota do projeto.

Exclua o ambiente do Cloud Composer. Você também excluir o bucket do ambiente durante este procedimento.

A seguir