Depurar problemas de agendamento de tarefas

Cloud Composer 1 | Cloud Composer 2

Este tutorial orienta você no diagnóstico e na solução de problemas de programação de tarefas e análise que causam mau funcionamento do programador, erros de análise e latência e falha de tarefas.

Introdução

O programador do Airflow é afetado principalmente por dois fatores: programação de tarefas e análise do DAG. Os problemas em um desses fatores podem ter um impacto negativo na integridade e no desempenho do ambiente.

Às vezes, muitas tarefas são agendadas simultaneamente. Nessa situação, a fila é preenchida e as tarefas permanecem no estado "programado" ou são reprogramadas após serem colocadas em fila, o que pode causar falha na tarefa e latência de desempenho.

Outro problema comum é a análise da latência e dos erros causados pela complexidade de um código DAG. Por exemplo, um código DAG que contenha variáveis do Airflow no nível superior do código pode levar à análise de atrasos, sobrecarga do banco de dados, falhas na programação e tempos limite do DAG.

Neste tutorial, você diagnosticará os DAGs de exemplo e aprenderá a resolver problemas de programação e análise, melhorar a programação de DAGs e otimizar o código DAG e as configurações do ambiente para melhorar o desempenho.

Objetivos

Nesta seção, listamos os objetivos dos exemplos deste tutorial.

Exemplo: latência e mau funcionamento do programador causados pela alta simultaneidade de tarefas

  • Faça upload do DAG de amostra que é executado várias vezes simultaneamente e diagnostique o mau funcionamento e os problemas de latência do programador com o Cloud Monitoring.

  • Consolide as tarefas e avalie o impacto no desempenho para otimizar seu código DAG.

  • Distribuir as tarefas de maneira mais uniforme ao longo do tempo e avaliar o impacto na performance.

  • Otimize as configurações do Airflow e do ambiente e avalie o impacto.

Exemplo: latência e erros de análise de DAG causados por código complexo

  • Fazer upload do DAG de amostra com variáveis do Airflow e diagnosticar problemas de análise com o Cloud Monitoring.

  • Otimizar o código DAG evitando variáveis Airflow no nível superior do código e avaliar o impacto no tempo de análise.

  • Otimizar as configurações do Airflow e do ambiente e avaliar o impacto no tempo de análise.

Custos

Neste tutorial, usamos o seguinte componente faturável do Google Cloud:

Ao concluir este tutorial, exclua os recursos criados para evitar o faturamento contínuo. Para mais detalhes, consulte Limpeza.

Antes de começar

Nesta seção, descrevemos as ações necessárias antes de iniciar o tutorial.

Criar e configurar um projeto

Para este tutorial, você precisa de um projeto do Google Cloud. Configure o projeto desta maneira:

  1. No console do Google Cloud, selecione ou crie um projeto:

    Acessar o seletor de projetos

  2. Verifique se o faturamento foi ativado para o projeto. Saiba como verificar se o faturamento está ativado em um projeto.

  3. Verifique se o usuário do projeto do Google Cloud tem os papéis a seguir para criar os recursos necessários:

    • Administrador de ambientes e de objetos do Storage (roles/composer.environmentAndStorageObjectAdmin)
    • Administrador do Compute (roles/compute.admin)

Ativar as APIs do projeto

Ative a API Cloud Composer.

Ative a API

criar o ambiente do Cloud Composer

Crie um ambiente do Cloud Composer 2.

Como parte da criação do ambiente, você concede o papel Extensão de agente de serviço da API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) à conta do agente de serviço do Composer. O Cloud Composer usa essa conta para realizar operações no seu projeto do Google Cloud.

Exemplo: mau funcionamento do programador e falha de tarefa devido a problemas de agendamento

Este exemplo demonstra a depuração do mau funcionamento do programador e da latência causada pela alta simultaneidade de tarefas.

Fazer upload do DAG de amostra para o ambiente

Faça o upload do DAG de amostra a seguir para o ambiente que você criou nas etapas anteriores. Neste tutorial, este DAG é chamado de dag_10_tasks_200_seconds_1.

Este DAG tem 200 tarefas. Cada tarefa aguarda um segundo e exibe "Concluído". O DAG é acionado automaticamente após o upload. O Cloud Composer executa esse DAG 10 vezes, e todas as execuções ocorrem em paralelo.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Diagnosticar o mau funcionamento do programador e problemas de falha de tarefas

Depois que o DAG for executado, abra a IU do Airflow e clique no DAG dag_10_tasks_200_seconds_1. Você verá que 10 execuções de DAG foram bem-sucedidas e que cada uma tem 200 tarefas bem-sucedidas.

Revise os registros de tarefas do Airflow:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Registros e clique em Todos os registros > Registros do Airflow > Workers > Visualizar no Explorador de registros.

No histograma de registros, é possível ver os erros e avisos indicados com as cores vermelha e laranja:

O histograma dos registros de worker do Airflow com erros e avisos
    indicados com as cores vermelha e laranja
Figura 1. Histograma de registros de worker do Airflow (clique para ampliar)

O DAG de exemplo resultou em cerca de 130 avisos e 60 erros. Clique em qualquer coluna que contenha barras amarelas e vermelhas. Você verá alguns dos seguintes avisos e erros nos registros:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

Esses registros podem indicar que o uso de recursos excedeu os limites e que o worker se reiniciou.

Se uma tarefa do Airflow for mantida na fila por muito tempo, o programador a marcará como com falha e up_for_retry e a reprogramará para execução. Uma maneira de observar os sintomas dessa situação é observar o gráfico com o número de tarefas na fila. Se os picos dele não cairem em cerca de 10 minutos, provavelmente haverá falhas nas tarefas (sem registros).

Revise as informações de monitoramento:

  1. Acesse a guia Monitoramento e selecione Visão geral.

  2. Analise o gráfico Tarefas do Airflow.

    Gráfico de tarefas do Airflow ao longo do tempo, mostrando um pico no
 número de tarefas na fila
    Figura 2. Gráfico de tarefas do Airflow (clique para ampliar)

    No gráfico de tarefas do Airflow, há um pico nas tarefas na fila que duram mais de 10 minutos, o que pode significar que não há recursos suficientes no ambiente para processar todas as tarefas programadas.

  3. Analise o gráfico Workers ativos:

    O gráfico de workers ativos do Airflow ao longo do tempo mostra que o
    numebr de workers ativos aumentou até o limite máximo
    Figura 3. Gráfico de workers ativos (clique para ampliar)

    O gráfico Workers ativos indica que o DAG acionou o escalonamento automático para o limite máximo permitido de três workers durante a execução do DAG.

  4. Os gráficos de uso de recursos podem indicar a falta de capacidade nos workers do Airflow para executar tarefas na fila. Na guia Monitoramento, selecione Workers e consulte os gráficos Uso total da CPU do worker e Uso total da memória do worker.

    O gráfico de uso da CPU por workers do Airflow mostra o uso da CPU
 até o limite máximo
    Figura 4. Gráfico de uso da CPU do total de workers (clique para ampliar)
    O gráfico de uso da memória por workers do Airflow mostra o aumento no uso, mas não atinge o limite máximo
    Figura 5. Gráfico de uso da memória total de workers (clique para ampliar)

    Os gráficos indicam que a execução de muitas tarefas simultaneamente resultou no limite da CPU. Os recursos foram usados por mais de 30 minutos, o que é ainda maior do que a duração total de 200 tarefas em 10 execuções de DAG executadas uma a uma.

Esses são os indicadores de que a fila está sendo preenchida e a falta de recursos para processar todas as tarefas programadas.

Consolide suas tarefas

O código atual cria muitos DAGs e tarefas sem recursos suficientes para processar todas as tarefas em paralelo, o que resulta no preenchimento da fila. Manter as tarefas na fila por muito tempo pode fazer com que elas sejam reprogramadas ou falhem. Nessas situações, opte por um número menor de tarefas mais consolidadas.

O DAG de amostra a seguir altera o número de tarefas no exemplo inicial de 200 para 20 e aumenta o tempo de espera de 1 para 10 segundos para imitar tarefas mais consolidadas que fazem a mesma quantidade de trabalho.

Faça o upload do DAG de amostra a seguir para o ambiente que você criou. Neste tutorial, este DAG é chamado de dag_10_tasks_20_seconds_10.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Avalie o impacto de tarefas mais consolidadas nos processos de agendamento:

  1. Aguarde até que as execuções do DAG sejam concluídas.

  2. Na IU do Airflow, na página DAGs, clique no DAG dag_10_tasks_20_seconds_10. Você verá 10 execuções de DAG, cada uma com 20 tarefas concluídas.

  3. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  4. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  5. Acesse a guia Registros e clique em Todos os registros > Registros do Airflow > Workers > Visualizar no Explorador de registros.

    O segundo exemplo, com tarefas mais consolidadas, resultou em aproximadamente 10 avisos e 7 erros. No histograma, é possível comparar o número de erros e avisos no exemplo inicial (valores anteriores) e no segundo exemplo (valores posteriores).

    O histograma de registros de worker do Airflow com erros e avisos
    mostra a quantidade reduzida de erros e avisos após a consolidação das
    tarefas
    Figura 6. Histograma de registros do worker do Airflow após a consolidação das tarefas (clique para ampliar)

    Ao comparar o primeiro exemplo com o mais consolidado, é possível ver que há significativamente menos erros e avisos no segundo exemplo. No entanto, os mesmos erros relacionados ao desligamento com estado salvo ainda aparecem nos registros devido à sobrecarga de recursos.

  6. Na guia Monitoramento, selecione Workers e analise os gráficos.

    Ao comparar o gráfico de tarefas do Airflow do primeiro exemplo (valores anteriores) com o gráfico do segundo exemplo com tarefas mais consolidadas, é possível observar que o pico nas tarefas na fila durou um período menor quando as tarefas estavam mais consolidadas. No entanto, durou perto de 10 minutos, o que ainda é abaixo do ideal.

    O gráfico de tarefas do Airflow ao longo do tempo mostra que o pico nas tarefas do Airflow durou um período mais curto do que antes.
    Figura 7. Gráfico de tarefas do Airflow após a consolidação das tarefas (clique para ampliar)

    No gráfico de workers ativos, é possível ver que o primeiro exemplo (à esquerda do gráfico) usou recursos por um período muito mais longo do que o segundo, embora os dois exemplos imitem a mesma quantidade de trabalho.

    O gráfico de workers ativos do Airflow ao longo do tempo mostra que o número de workers ativos aumentou por um período mais curto do que antes.
    Figura 8. Gráfico de workers ativos após a consolidação das tarefas (clique para ampliar)

    Analise os gráficos de consumo de recursos do worker. Embora a diferença entre os recursos usados no exemplo com tarefas mais consolidadas e o exemplo inicial seja bastante significativa, o uso da CPU ainda está atingindo 70% do limite.

    O gráfico de uso da CPU por workers do Airflow mostra o aumento de até 70% do limite máximo
    Figura 9. Gráfico de uso da CPU do total de workers após a consolidação das tarefas (clique para ampliar)
    O gráfico de uso da memória por workers do Airflow mostra o aumento no uso, mas não atinge o limite máximo
    Figura 10. Gráfico de uso da memória total de workers após a consolidação das tarefas (clique para ampliar)

Distribuir tarefas de maneira mais uniforme ao longo do tempo

Muitas tarefas simultâneas resultam no preenchimento da fila, o que faz com que tarefas fiquem presas na fila ou reprogramadas. Nas etapas anteriores, você diminuiu o número de tarefas consolidando-as. No entanto, os registros de saída e o monitoramento indicaram que o número de tarefas simultâneas ainda não é o ideal.

É possível controlar o número de execuções de tarefas simultâneas implementando uma programação ou definindo limites para quantas tarefas podem ser executadas simultaneamente.

Neste tutorial, você distribui tarefas de maneira mais uniforme ao longo do tempo, adicionando parâmetros no nível do DAG ao DAG dag_10_tasks_20_seconds_10:

  1. Adicione o argumento max_active_runs=1 ao gerenciador de contexto do DAG. Esse argumento define um limite de apenas uma instância de uma execução de DAG em um determinado momento.

  2. Adicione o argumento max_active_tasks=5 ao gerenciador de contexto do DAG. Esse argumento controla o número máximo de instâncias de tarefa que podem ser executadas simultaneamente em cada DAG.

Faça o upload do DAG de amostra a seguir para o ambiente que você criou. Neste tutorial, este DAG é chamado de dag_10_tasks_20_seconds_10_scheduled.py.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Avalie o impacto da distribuição de tarefas ao longo do tempo nos processos de agendamento:

  1. Aguarde até que as execuções do DAG sejam concluídas.

  2. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  3. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  4. Acesse a guia Registros e clique em Todos os registros > Registros do Airflow > Workers > Visualizar no Explorador de registros.

  5. No histograma, observe que o terceiro DAG com um número limitado de tarefas e execuções ativas não gera avisos ou erros, e a distribuição de registros parece mais uniforme em comparação com os valores anteriores.

    O histograma de registros de worker do Airflow com erros e avisos não mostra nenhum erro ou aviso depois que as tarefas foram consolidadas e distribuídas ao longo do tempo.
    Figura 11. Histograma de registros do worker do Airflow após as tarefas serem consolidadas e distribuídas ao longo do tempo (clique para ampliar)

As tarefas no exemplo dag_10_tasks_20_seconds_10_scheduled que têm um número limitado de tarefas ativas e são executadas não causaram pressão de recursos porque as tarefas foram enfileiradas de maneira uniforme.

Depois de executar as etapas descritas, você otimizou o uso de recursos consolidando pequenas tarefas e distribuindo-as de maneira mais uniforme ao longo do tempo.

Otimizar as configurações do ambiente

É possível ajustar as configurações do ambiente para garantir que sempre haja capacidade nos workers do Airflow para executar tarefas na fila.

Número de workers e simultaneidade de workers

É possível ajustar o número máximo de workers para que o Cloud Composer faça o escalonamento automático do ambiente dentro dos limites definidos.

O parâmetro [celery]worker_concurrency define o número máximo de tarefas que um único worker pode selecionar na fila de tarefas. A alteração desse parâmetro ajusta o número de tarefas que um único worker pode executar ao mesmo tempo. Para alterar essa opção de configuração do Airflow, substitua-a. Por padrão, a simultaneidade de worker é definida para, no mínimo, um destes: 32, 12 * worker_CPU, 8 * worker_memory, o que significa que ela depende dos limites de recursos do worker. Consulte Ambientes do Optimize para mais informações sobre os valores padrão de simultaneidade de workers.

O número de workers e a simultaneidade de workers funcionam em conjunto, e o desempenho do ambiente depende muito dos dois parâmetros. Use as seguintes considerações para escolher a combinação correta:

  • Várias tarefas rápidas executadas em paralelo. É possível aumentar a simultaneidade de workers quando há tarefas esperando na fila e seus workers usam uma baixa porcentagem de CPUs e memória ao mesmo tempo. No entanto, em determinadas circunstâncias, a fila pode nunca ser preenchida, fazendo com que o escalonamento automático nunca seja acionado. Se a execução de pequenas tarefas for concluída quando os novos workers estiverem prontos, um worker existente poderá selecionar as tarefas restantes e não haverá tarefas para os recém-criados.

    Nessas situações, é recomendável aumentar o número mínimo de workers e aumentar a simultaneidade de workers para evitar escalonamento excessivo.

  • Várias tarefas longas em execução em paralelo. A alta simultaneidade de workers impede que o sistema escalone o número de workers. Se várias tarefas consomem muitos recursos e demoram muito para serem concluídas, uma alta simultaneidade de workers pode fazer com que a fila nunca seja preenchida e todas as tarefas sejam selecionadas por apenas um worker, resultando em problemas de desempenho. Nessas situações, é recomendável aumentar o número máximo de workers e diminuir a simultaneidade de workers.

A importância do paralelismo

Os programadores do Airflow controlam a programação de execuções de DAG e tarefas individuais deles. A opção de configuração [core]parallelism do Airflow controla quantas tarefas o programador do Airflow pode enfileirar na fila do executor depois que todas as dependências dessas tarefas forem atendidas.

O paralelismo é um mecanismo de proteção do Airflow que determina quantas tarefas podem ser executadas ao mesmo tempo para cada programador, independentemente da contagem de workers. O valor do paralelismo multiplicado pelo número de programadores no cluster é o número máximo de instâncias de tarefas que o ambiente pode enfileirar.

Normalmente, [core]parallelism é definido como produto de um número máximo de workers e [celery]worker_concurrency. Ele também é afetado pelo pool. Para alterar essa opção de configuração do Airflow, substitua-a. Para saber mais sobre como ajustar as configurações do Airflow relacionadas ao escalonamento, consulte Como escalonar a configuração do Airflow.

Encontrar as configurações de ambiente ideais

A maneira recomendada de corrigir problemas de programação é consolidar tarefas pequenas em tarefas maiores e distribuir tarefas de maneira mais uniforme ao longo do tempo. Além de otimizar o código DAG, também é possível otimizar as configurações do ambiente para ter uma capacidade suficiente para executar várias tarefas simultaneamente.

Por exemplo, suponha que você consolide tarefas no DAG o máximo possível, mas limitar as tarefas ativas para distribuí-las de maneira mais uniforme ao longo do tempo não é uma solução recomendada para seu caso de uso específico.

É possível ajustar o paralelismo, o número de workers e os parâmetros de simultaneidade de workers para executar o DAG dag_10_tasks_20_seconds_10 sem limitar as tarefas ativas. Neste exemplo, o DAG é executado 10 vezes, e cada execução contém 20 tarefas pequenas. Se você quiser executar todos simultaneamente:

  • Você precisará de um tamanho de ambiente maior, porque ele controla os parâmetros de desempenho da infraestrutura gerenciada do Cloud Composer do seu ambiente.

  • Os workers do Airflow precisam ser capazes de executar 20 tarefas simultaneamente, o que significa que você precisa definir a simultaneidade do worker como 20.

  • Os workers precisam de CPU e memória suficientes para processar todas as tarefas. A simultaneidade de workers é afetada pela CPU e pela memória do worker. Portanto, você precisará de pelo menos worker_concurrency / 12 na CPU e em least worker_concurrency / 8 na memória.

  • Será necessário aumentar o paralelismo para corresponder à simultaneidade de worker maior. Para que os workers coletem 20 tarefas da fila, o programador precisa programar essas 20 tarefas primeiro.

Ajuste as configurações do ambiente da seguinte maneira:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Configuração do ambiente.

  4. Encontre a configuração Recursos > Cargas de trabalho e clique em Editar.

  5. Na seção Worker, no campo Memória, especifique o novo limite de memória para os workers do Airflow. Neste tutorial, use 4 GB.

  6. No campo CPU, especifique o novo limite de CPU para os workers do Airflow. Neste tutorial, use duas vCPUs.

  7. Salve as alterações e aguarde alguns minutos até que os workers do Airflow sejam reiniciados.

Em seguida, modifique as opções de configuração do Airflow para paralelismo e simultaneidade de workers:

  1. Acesse a guia Substituições de configuração do Airflow.

  2. Clique em Editar e em Adicionar substituição de configuração do Airflow.

  3. Modifique a configuração do parralelismo:

    Seção Chave Valor
    core parallelism 20
  4. Clique em Adicionar substituição de configuração do Airflow e modifique a configuração de simultaneidade do worker:

    Seção Chave Valor
    celery worker_concurrency 20
  5. Clique em Salvar e aguarde até que a configuração do ambiente seja atualizada.

Acione o mesmo DAG de exemplo novamente com as configurações ajustadas:

  1. Na interface do Airflow, acesse a página DAGs.

  2. Encontre e exclua o DAG dag_10_tasks_20_seconds_10.

    Depois que o DAG é excluído, o Airflow verifica a pasta de DAGs no bucket do ambiente e executa o DAG de novo automaticamente.

Depois que as execuções do DAG forem concluídas, revise o histograma de registros novamente. No diagrama, é possível ver que o exemplo de dag_10_tasks_20_seconds_10 com tarefas mais consolidadas não gera erros e avisos quando executado com a configuração ajustada do ambiente. Compare os resultados com os dados anteriores no diagrama, em que o mesmo exemplo gerou erros e avisos ao executar com a configuração de ambiente padrão.

O histograma de registros de worker do Airflow com erros e avisos
        não mostra erros e avisos após o ajuste da configuração do
        ambiente
Figura 12. Histograma de registros do worker do Airflow após o ajuste da configuração do ambiente (clique para ampliar)

As configurações do ambiente e do Airflow desempenham um papel crucial na programação de tarefas. No entanto, não é possível aumentar as configurações além de determinados limites.

Recomendamos otimizar o código DAG, consolidar tarefas e usar a programação para otimizar o desempenho e a eficiência.

Exemplo: latência e erros de análise de DAG devido a um código DAG complexo

Neste exemplo, você investiga a latência de análise de um DAG de amostra que imita um excesso de variáveis do Airflow.

Criar uma variável do Airflow

Antes de fazer upload do código de amostra, crie uma variável do Airflow.

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na coluna Servidor da Web do Airflow, siga o link Airflow do ambiente.

  3. Acesse Administrador > Variáveis > Adicionar um novo registro.

  4. Defina os seguintes valores:

    • chave: example_var
    • val: test_airflow_variable

Fazer upload do DAG de amostra para o ambiente

Faça o upload do DAG de amostra a seguir para o ambiente que você criou nas etapas anteriores. Neste tutorial, este DAG é chamado de dag_for_loop_airflow_variable.

Esse DAG contém um loop "for" que é executado mil vezes e imita um excesso de variáveis do Airflow. Cada iteração lê a variável example_var e gera uma tarefa. Cada tarefa contém um comando que mostra o valor da variável.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

Diagnosticar os problemas de análise

O tempo de análise do DAG é o tempo que o programador do Airflow leva para ler e analisar um arquivo DAG. Antes que o programador do Airflow possa programar qualquer tarefa de um DAG, ele precisa analisar o arquivo DAG para descobrir a estrutura do DAG e as tarefas definidas.

Se a análise de um DAG demorar muito, isso consumirá a capacidade do programador e poderá reduzir o desempenho das execuções do DAG.

Para monitorar o tempo de análise do DAG:

  1. Execute o comando da CLI do Airflow dags report na CLI gcloud para ver o tempo de análise de todos os DAGs:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    Substitua:

    • ENVIRONMENT_NAME: o nome do ambiente;
    • LOCATION: a região onde o ambiente está localizado.
  2. Na saída do comando, procure o valor da duração do DAG dag_for_loop_airflow_variables. Um valor grande pode indicar que esse DAG não foi implementado da maneira ideal. Se você tiver vários DAGs, pode identificar quais têm um longo tempo de análise na tabela de saída.

    Exemplo:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Inspecione os tempos de análise do DAG no console do Google Cloud:

    1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  4. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  5. Acesse a guia Registros e clique em Todos os registros > gerenciador de processador DAG.

  6. Analise os registros dag-processor-manager e identifique possíveis problemas.

    Uma entrada de registro para o DAG de amostra mostra que o tempo de análise do DAG é de 46,3 segundos
    Figura 13. Os registros do gerenciador do processador de DAG mostram os tempos de análise do DAG (clique para ampliar)

Se o tempo total de análise do DAG exceder cerca de 10 segundos, seus programadores poderão ficar sobrecarregados com esse processo e não conseguir executá-los com eficácia.

Otimizar o código DAG

É recomendável evitar códigos Python de "nível superior" desnecessários nos DAGs. DAGs com muitas importações, variáveis e funções fora do DAG introduzem tempos de análise maiores para o programador do Airflow. Isso reduz o desempenho e a escalonabilidade do Cloud Composer e do Airflow. O excesso de leitura de variáveis do Airflow leva a um longo tempo de análise e uma alta carga do banco de dados. Se esse código estiver em um arquivo DAG, essas funções serão executadas em todos os batimentos de funcionamento do programador, o que pode ser lento.

Com os campos de modelo do Airflow, é possível incorporar valores de variáveis e modelos do Jinja aos DAGs. Isso impede a execução desnecessária de funções durante os batimentos cardíacos do programador.

Para implementar o exemplo de DAG de uma maneira melhor, evite usar variáveis do Airflow no código Python de nível superior dos DAGs. Em vez disso, transmita variáveis do Airflow para os operadores atuais usando um modelo Jinja, que atrasará a leitura do valor até a execução da tarefa.

Faça o upload da nova versão do DAG de amostra para seu ambiente. Neste tutorial, este DAG é chamado de dag_for_loop_airflow_variable_optimized.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

Inspecione o novo tempo de análise do DAG:

  1. Aguarde até que a execução do DAG seja concluída.

  2. Execute o comando dags report novamente para ver o tempo de análise de todos os DAGs:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. Revise os registros dag-processor-manager novamente e analise a duração da análise.

    Uma entrada de registro para o DAG de amostra mostra que o tempo de análise do DAG é de 4,21
    segundos
    Figura 14. Os registros do gerenciador do processador de DAG mostram os tempos de análise do DAG após a otimização do código DAG (clique para ampliar)

Ao substituir as variáveis de ambiente por modelos do Airflow, você simplificou o código DAG e reduziu a latência de análise em cerca de dez vezes.

Otimizar as configurações do ambiente do Airflow

O programador do Airflow sempre tenta acionar novas tarefas e analisa todos os DAGs no bucket do ambiente. Se os DAGs têm um longo tempo de análise e o programador consome muitos recursos, é possível otimizar as configurações do programador do Airflow para que o programador use os recursos com mais eficiência.

Neste tutorial, os arquivos DAG levam muito tempo para serem analisados, e os ciclos começam a se sobrepor, o que esgota a capacidade do programador. Como o primeiro DAG de exemplo leva mais de cinco segundos para ser analisado, você configura o programador para ser executado com menos frequência e usar os recursos com mais eficiência. Você modificará a opção de configuração scheduler_heartbeat_sec do Airflow. Essa configuração define a frequência de execução do programador (em segundos). Por padrão, o valor é definido como 5 segundos. Para alterar essa opção de configuração do Airflow, substitua-a.

Modifique a opção de configuração scheduler_heartbeat_sec do Airflow:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Substituições de configuração do Airflow.

  4. Clique em Editar e em Adicionar substituição de configuração do Airflow.

  5. Modifique a opção de configuração do Airflow:

    Seção Chave Valor
    scheduler scheduler_heartbeat_sec 10
  6. Clique em Salvar e aguarde até que a configuração do ambiente seja atualizada.

Verifique as métricas do programador:

  1. Acesse a guia Monitoramento e selecione Programadores.

  2. No gráfico de frequência do programador, clique no botão Mais opções (três pontos) e, em seguida, em Visualizar no Metrics Explorer.

O gráfico de batimentos cardíacos do programador mostra que o batimento cardíaco ocorre com menos frequência
Figura 15. Gráfico de sinal de funcionamento do programador (clique para ampliar)

No gráfico, você verá que o programador é executado duas vezes menos com frequência depois que você altera a configuração padrão de 5 para 10 segundos. Ao reduzir a frequência dos batimentos, você garante que o programador não seja executado enquanto o ciclo de análise anterior está em andamento e a capacidade de recursos do programador não foi esgotada.

Atribuir mais recursos ao programador

No Cloud Composer 2, é possível alocar mais recursos de CPU e memória para o programador. Dessa forma, é possível melhorar o desempenho do programador e acelerar o tempo de análise do DAG.

Aloque mais CPU e memória para o programador:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Configuração do ambiente.

  4. Encontre a configuração Recursos > Cargas de trabalho e clique em Editar.

  5. Na seção Programador, no campo Memória, especifique o novo limite de memória. Neste tutorial, use 4 GB.

  6. No campo CPU, especifique o novo limite de CPU. Neste tutorial, use duas vCPUs.

  7. Salve as alterações e aguarde alguns minutos até que os programadores do Airflow sejam reiniciados.

  8. Acesse a guia Registros e clique em Todos os registros > gerenciador de processador DAG.

  9. Revise os registros dag-processor-manager e compare a duração da análise dos DAGs de exemplo:

    Uma entrada de registro para o DAG de amostra mostra que o tempo de análise do DAG otimizado é de 1,5 segundo. Para o DAG não otimizado, o tempo de análise é de 28,71 segundos
    Figura 16. Os registros do gerenciador do processador de DAG mostram os tempos de análise do DAG depois que mais recursos são atribuídos ao programador (clique para ampliar)

Ao atribuir mais recursos ao programador, você aumentou a capacidade dele e reduziu significativamente a latência da análise em comparação com as configurações do ambiente padrão. Com mais recursos, o programador pode analisar os DAGs mais rapidamente, mas os custos associados aos recursos do Cloud Composer também aumentam. Além disso, não é possível aumentar os recursos além de um determinado limite.

Recomendamos alocar recursos somente após a implementação do código DAG e das otimizações de configuração do Airflow.

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados neste tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.

Excluir o projeto

  1. No Console do Google Cloud, acesse a página Gerenciar recursos.

    Acessar "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

Excluir recursos individuais

Se você planeja ver vários tutoriais e guias de início rápido, a reutilização de projetos pode evitar que você exceda os limites da cota do projeto.

Exclua o ambiente do Cloud Composer. Você também exclui o bucket do ambiente durante esse procedimento.

A seguir