Depurar problemas de agendamento de tarefas

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Este tutorial orienta você a diagnosticar e resolver problemas de programação de tarefas e de análise que levam a um mau funcionamento do programador, erros de análise e latência e falha na tarefa.

Introdução

O programador do Airflow é afetado principalmente por dois fatores: programação de tarefas e análise de DAG. Problemas em um desses fatores podem ter um impacto negativo na saúde e no desempenho do ambiente.

Às vezes, muitas tarefas são programadas ao mesmo tempo. Nessa situação, a fila fica cheia, e as tarefas permanecem no estado "programada" ou são reprogramadas depois de serem enfileiradas, o que pode causar falha na tarefa e latência de desempenho.

Outro problema comum é a latência de análise e os erros causados pela complexidade de um código DAG. Por exemplo, um código DAG que contém variáveis do Airflow no nível superior do código pode levar a atrasos de análise, sobrecarga do banco de dados, falhas de programação e tempo limite do DAG.

Neste tutorial, você vai diagnosticar os DAGs de exemplo e aprender a solucionar problemas de programação e análise, melhorar a programação de DAGs e otimizar o código DAG e as configurações do ambiente para melhorar o desempenho.

Objetivos

Nesta seção, listamos os objetivos para exemplos neste tutorial.

Exemplo: mau funcionamento do programador e latência causada por alta simultaneidade de tarefas

  • fazer upload do DAG de amostra executado várias vezes simultaneamente e diagnosticar o mau funcionamento do programador e problemas de latência com o Cloud Monitoring.

  • Otimize seu código DAG consolidando as tarefas e avaliando impacto no desempenho.

  • Distribuir as tarefas de maneira mais uniforme ao longo do tempo e avaliar o desempenho. impacto.

  • otimizar as configurações do Airflow e do ambiente; avaliar o impacto.

Exemplo: latência e erros de análise do DAG causados por código complexo

  • Fazer upload do DAG de amostra com variáveis do Airflow e diagnosticar a análise problemas com o Cloud Monitoring.

  • Otimize o código do DAG evitando variáveis do Airflow no nível superior do código e avalie o impacto no tempo de análise.

  • otimizar as configurações do Airflow e do ambiente; e avaliar o impacto no tempo de análise.

Custos

Neste tutorial, usamos o seguinte componente faturável do Google Cloud:

Ao concluir este tutorial, exclua os recursos criados para evitar o faturamento contínuo. Veja mais detalhes em Limpeza.

Antes de começar

Esta seção descreve as ações necessárias antes de iniciar o tutorial.

Criar e configurar um projeto

Para este tutorial, você precisa de um projeto do Google Cloud. Configure o projeto da seguinte maneira:

  1. No console do Google Cloud, selecione ou crie um projeto:

    Acesse o seletor de projetos

  2. Verifique se o faturamento foi ativado para o projeto. Saiba como verificar se o faturamento está ativado em um projeto.

  3. Verifique se o usuário do projeto do Google Cloud tem os seguintes papéis para criar os recursos necessários:

    • Administrador de objetos do armazenamento e do ambiente (roles/composer.environmentAndStorageObjectAdmin)
    • Administrador do Compute (roles/compute.admin)

Ativar as APIs do projeto

Enable the Cloud Composer API.

Enable the API

Criar seu ambiente do Cloud Composer

Crie um ambiente do Cloud Composer 2.

Como parte da criação do ambiente, conceda o papel Extensão do agente de serviço da API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) à conta do agente de serviço do Composer. O Cloud Composer usa essa conta para realizar operações no projeto do Google Cloud.

Exemplo: falha no agendador e na tarefa devido a problemas de agendamento

Este exemplo demonstra o funcionamento incorreto do programador de depuração e a latência causada por alta simultaneidade de tarefas.

Fazer upload do DAG de exemplo para o ambiente

Faça upload do DAG de exemplo abaixo para o ambiente criado nas etapas anteriores. Neste tutorial, o DAG é chamado dag_10_tasks_200_seconds_1:

Este DAG tem 200 tarefas. Cada tarefa aguarda um segundo e exibe "Concluída!". O DAG é acionado automaticamente após o upload. O Cloud Composer executa esse DAG 10 vezes, e todas as execuções dele ocorrem em paralelo.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Diagnosticar problemas de falhas de tarefas e mau funcionamento do programador

Depois que as execuções do DAG forem concluídas, abra a interface do Airflow e clique no DAG dag_10_tasks_200_seconds_1. Um total de 10 execuções do DAG foi bem-sucedidos, e cada um tem 200 tarefas bem-sucedidas.

Revise os registros de tarefas do Airflow:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Registros e depois Todos os registros > Registros do Airflow > Workers > Ver na Análise de registros.

No histograma de registros, é possível ver os erros e avisos indicados com cores vermelha e laranja:

O histograma de registros de workers do Airflow com erros e avisos
    indicados com cores vermelha e laranja
Figura 1. Histograma de registros do worker do Airflow (clique para ampliar)

O DAG de exemplo resultou em cerca de 130 avisos e 60 erros. Clique em qualquer que contém barras amarelas e vermelhas. Você vai encontrar alguns dos seguintes alertas e erros nos registros:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

Esses registros podem indicar que o uso de recursos excedeu os limites e o worker foi reiniciado.

Se uma tarefa do Airflow for mantida na fila por muito tempo, o programador a marca como com falha e up_for_retry e a reprograma para execução. Uma maneira de observar os sintomas dessa situação é olhar o gráfico com o número de tarefas na fila e, se os picos nesse gráfico não diminuírem em cerca de 10 minutos, provavelmente haverá falhas nas tarefas (sem registros).

Revise as informações de monitoramento:

  1. Acesse a guia Monitoramento e selecione Visão geral.

  2. Analise o gráfico de tarefas do Airflow.

    Gráfico das tarefas do Airflow ao longo do tempo, mostrando um pico no número de tarefas na fila
    Figura 2. Gráfico de tarefas do Airflow (clique para ampliar)

    No gráfico de tarefas do Airflow, há um pico nas tarefas na fila que dura mais de 10 minutos, o que pode significar que não há recursos suficientes no ambiente para processar todas as tarefas programadas.

  3. Analise o gráfico Trabalhadores ativos:

    O gráfico de workers ativos do Airflow ao longo do tempo mostra que
    o número de workers ativos aumentou até o limite máximo
    Figura 3. Gráfico de workers ativos (clique para ampliar)

    O gráfico Workers ativos indica que o DAG acionou o escalonamento automático para o limite máximo permitido de três workers durante a execução do DAG.

  4. Os gráficos de uso de recursos podem indicar a falta de capacidade nos workers do Airflow para executar tarefas na fila. Na guia Monitoramento, selecione Workers e Analise os gráficos Uso total de CPU do worker e Uso total de memória do worker.

    O gráfico de uso da CPU por workers do Airflow mostra o uso da CPU
    aumentando até o limite máximo
    Figura 4. Gráfico de uso total da CPU dos workers (clique para ampliar)
    O gráfico de uso da memória pelos workers do Airflow mostra que o uso da memória
    aumenta, mas não atinge o limite máximo.
    Figura 5. Gráfico de uso total da memória dos workers (clique para ampliar)

    Os gráficos indicam que a execução de muitas tarefas ao mesmo tempo resultou no limite da CPU. Os recursos foram usados por mais de 30 minutos, o que é ainda maior do que a duração total de 200 tarefas em 10 execuções de DAG executadas uma por uma.

Esses são os indicadores de que a fila está cheia e de falta de recursos para processar todas as tarefas agendadas.

Consolidar suas tarefas

O código atual cria muitas DAGs e tarefas sem recursos suficientes para processar todas as tarefas em paralelo, o que resulta no preenchimento da fila. Manter tarefas na fila por muito tempo pode fazer com que elas sejam reprogramadas ou falhem. Nessas situações, você deve optar por um número menor de modelos tarefas.

O DAG de amostra a seguir altera o número de tarefas no exemplo inicial de 200 para 20 e aumentar o tempo de espera de 1 para 10 segundos para imitar mais e consolidadas que realizam a mesma quantidade de trabalho.

Faça upload do DAG de amostra a seguir no ambiente que você criou. Neste tutorial, o DAG é chamado dag_10_tasks_20_seconds_10:

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Avaliar o impacto de tarefas mais consolidadas nos processos de agendamento:

  1. Aguarde a conclusão das execuções do DAG.

  2. Na interface do Airflow, na página DAGs, clique em DAG dag_10_tasks_20_seconds_10. Você vai encontrar 10 execuções de DAG, cada uma com 20 tarefas bem-sucedidas.

  3. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  4. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  5. Acesse a guia Registros e depois Todos os registros > Registros do Airflow > Workers > Ver na Análise de registros.

    O segundo exemplo com tarefas mais consolidadas resultou em aproximadamente 10 avisos e 7 erros. No histograma, você pode comparar o número de e avisos no exemplo inicial (valores anteriores) e no segundo exemplo (valores posteriores).

    O histograma de registros de worker do Airflow com erros e avisos
    mostra a diminuição da quantidade de erros e avisos após as tarefas serem
    consolidados
    Figura 6. O worker do Airflow registra o histograma após o tarefas foram consolidadas (clique para ampliar)

    Ao comparar o primeiro exemplo com o mais consolidado, é possível há muito menos erros e avisos na segunda exemplo. No entanto, os mesmos erros relacionados ao desligamento parcial ainda aparecem nos registros devido à sobrecarga de recursos.

  6. Na guia Monitoramento, selecione Workers e analise os gráficos.

    Ao comparar o gráfico de tarefas do Airflow no primeiro exemplo (anterior valores) com o gráfico do segundo exemplo com tarefas mais consolidadas, você pode ver que o pico nas tarefas na fila durou um período menor de momento em que as tarefas estavam mais consolidadas. No entanto, durou quase 10 minutos, o que ainda não é o ideal.

    O gráfico de tarefas do Airflow ao longo do tempo mostra que o pico em
    As tarefas do Airflow duraram um período menor do que antes.
    Figura 7. Gráfico de tarefas do Airflow depois que as tarefas foram consolidadas (clique para ampliar)

    No gráfico "Workers ativos", confira o primeiro exemplo (à esquerda) do lado do gráfico) usou os recursos por um período muito mais longo que o segundo, embora os dois exemplos imitem a mesma quantidade de funcionam.

    O gráfico de workers ativos do Airflow ao longo do tempo mostra que o
    número de workers ativos aumentou por um período mais curto
    do que antes.
    Figura 8. Gráfico de workers ativos após tarefas foram consolidadas (clique para ampliar)

    Analise os gráficos de consumo de recursos do worker. Embora a diferença entre os recursos usados no exemplo com tarefas mais consolidadas e o exemplo inicial seja bastante significativa, o uso da CPU ainda está aumentando para 70% do limite.

    O gráfico de uso da CPU pelos workers do Airflow mostra o uso da CPU
    aumentando até 70% do limite máximo.
    Figura 9. Gráfico de uso da CPU pelos workers após a consolidação das tarefas (clique para ampliar)
    O gráfico do uso da memória por workers do Airflow mostra o uso da memória aumentando, mas não atingindo o limite máximo
    Figura 10. Gráfico do uso total de memória dos workers depois que as tarefas foram consolidadas (clique para ampliar)

Distribuir as tarefas de maneira mais uniforme ao longo do tempo

Muitas tarefas simultâneas resultam no preenchimento da fila, o que leva a que ficam presas na fila ou são reprogramadas. Nas etapas anteriores, você reduziu o número de tarefas ao consolidar essas tarefas. No entanto, os logs de saída e o monitoramento indicaram que o número de tarefas simultâneas ainda é subótimo.

É possível controlar o número de execuções de tarefas simultâneas implementando uma programação ou definir limites para quantas tarefas podem ser executadas simultaneamente.

Neste tutorial, você vai distribuir as tarefas de maneira mais uniforme ao longo do tempo adicionando parâmetros no nível do DAG ao DAG dag_10_tasks_20_seconds_10:

  1. Adicione o argumento max_active_runs=1 ao gerenciador de contexto do DAG. Esse argumento define um limite de apenas uma instância de uma execução de DAG em um determinado momento.

  2. Adicione o argumento max_active_tasks=5 ao gerenciador de contexto do DAG. Esse argumento controla o número máximo de instâncias de tarefa que podem ser executadas simultaneamente em cada DAG.

Faça upload do DAG de amostra a seguir no ambiente que você criou. Neste tutorial, o DAG é chamado dag_10_tasks_20_seconds_10_scheduled.py:

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5


with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Avaliar o impacto da distribuição de tarefas ao longo do tempo nos processos de agendamento:

  1. Aguarde até que as execuções do DAG sejam concluídas.

  2. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  3. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  4. Acesse a guia Registros e depois Todos os registros > Registros do Airflow > Workers > Visualizar no Explorer de registros.

  5. No histograma, é possível ver que o terceiro DAG com um número limitado de tarefas e execuções ativas não gerou avisos nem erros, e a distribuição de registros parece mais uniforme em comparação com os valores anteriores.

    O histograma de registros de workers do Airflow com erros e avisos
    não mostra erros ou avisos depois que as tarefas foram consolidadas e
    distribuídas ao longo do tempo.
    Figura 11. Histograma de registros de workers do Airflow depois que as tarefas foram consolidadas e distribuídas ao longo do tempo (clique para ampliar)

As tarefas no exemplo dag_10_tasks_20_seconds_10_scheduled que têm um um número limitado de tarefas e execuções ativas não causou pressão de recursos porque as tarefas foram enfileiradas uniformemente.

Depois de realizar as etapas descritas, você otimiza o uso de recursos ao consolidar pequenas tarefas e distribuí-las de maneira mais uniforme ao longo do tempo.

Otimizar as configurações do ambiente

Você pode ajustar as configurações do ambiente para garantir que os workers do Airflow sempre tenham capacidade para executar tarefas na fila.

Número de workers e simultaneidade

É possível ajustar o número máximo de workers. para que o Cloud Composer escalone automaticamente seu ambiente dentro os limites definidos.

O parâmetro [celery]worker_concurrency define o número máximo de tarefas um único worker pode selecionar na fila de tarefas. Alterar esse parâmetro ajusta o número de tarefas que um único worker pode executar ao mesmo tempo. É possível mudar essa opção de configuração do Airflow substituindo-a. Por padrão, a simultaneidade do worker é definida como um mínimo de 32, 12 * worker_CPU, 8 * worker_memory, o que significa que ela depende dos limites de recursos do worker. Consulte Otimizar ambientes para mais informações sobre o padrão valores de simultaneidade de workers.

o número de workers e a simultaneidade de workers em combinação com cada e o desempenho do ambiente depende muito dos dois parâmetros. Use as considerações a seguir para escolher a combinação correta:

  • Várias tarefas rápidas em execução em paralelo. É possível aumentar o número de workers simultaneidade quando há tarefas aguardando na fila, e seus workers usam uma pequena porcentagem das CPUs e da memória ao mesmo tempo. No entanto, em determinadas circunstâncias, a fila pode nunca ser preenchida, fazendo com que o escalonamento automático nunca serão acionadas. Se tarefas pequenas terminam a execução no momento em que os novos workers estão prontos, um worker existente pode assumir as tarefas restantes não haverá tarefas para workers recém-criados.

    Nessas situações, é recomendável aumentar o número mínimo de workers e a simultaneidade deles para evitar o escalonamento excessivo.

  • Várias tarefas longas em execução em paralelo. A alta simultaneidade de workers impede que o sistema escalone o número de workers. Se várias tarefas exigem muitos recursos e levam muito tempo para serem concluídas, uma alta simultaneidade de workers pode fazer com que a fila nunca seja preenchida e todas as tarefas sejam capturadas por apenas um worker, o que resulta em problemas de desempenho. Nessas em diferentes situações, é recomendável aumentar o número máximo de workers e diminuir a simultaneidade de workers.

A importância do paralelismo

Os programadores do Airflow controlam a programação de execuções de DAGs e tarefas individuais os DAGs. A opção de configuração [core]parallelism do Airflow controla quantas tarefas o programador do Airflow pode enfileirar na fila do executor após todas as dependências dessas tarefas serem atendidas.

O paralelismo é um mecanismo de proteção do Airflow que determina quantas tarefas podem ser executadas ao mesmo tempo por cada programador, independentemente da contagem de workers. O valor de paralelismo multiplicado pelo número de programadores no cluster é o número máximo de instâncias de tarefa que seu ambiente pode enfileirar.

Normalmente, [core]parallelism é definido como um produto de um número máximo de workers e [celery]worker_concurrency. Ela também é afetada pool. É possível mudar essa opção de configuração do Airflow substituindo-a. Para mais informações sobre como ajustar as configurações do Airflow relacionadas à escala, consulte Como ajustar a configuração do Airflow.

Encontrar as configurações ideais de ambiente

A maneira recomendada de corrigir problemas de programação é consolidar tarefas pequenas em tarefas maiores e distribuir as tarefas de maneira mais uniforme ao longo do tempo. Além de com otimização do código DAG, também é possível otimizar as configurações do ambiente para capacidade suficiente para executar várias tarefas simultaneamente.

Por exemplo, suponha que você consolide tarefas no seu DAG o máximo possível, mas limitando as tarefas ativas para distribuí-las de modo mais uniforme tempo não é uma solução preferida para seu caso de uso específico.

É possível ajustar o paralelismo, o número e a simultaneidade de workers. parâmetros para executar o DAG dag_10_tasks_20_seconds_10 sem limitar o acesso tarefas. Neste exemplo, o DAG é executado 10 vezes, e cada execução contém 20 pequenas tarefas. Se você quiser executar todos simultaneamente:

  • Você precisará de um tamanho de ambiente maior, porque ele controla o desempenho da infraestrutura gerenciada do Cloud Composer do seu ambiente.

  • Os workers do Airflow precisam ser capazes de executar 20 tarefas simultaneamente, o que significa que você precisa definir a simultaneidade do worker como 20.

  • Os workers precisam de CPU e memória suficientes para processar todas as tarefas. Trabalhador a simultaneidade é afetada pela CPU e memória do worker. Portanto, você precisará de no mínimo worker_concurrency / 12 em CPU e em least worker_concurrency / 8 na memória.

  • Você vai precisar aumentar o paralelismo para corresponder à maior simultaneidade de workers. Para que os workers peguem 20 tarefas da fila, o programador precisa agendar essas 20 tarefas primeiro.

Ajuste as configurações do ambiente da seguinte maneira:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Configuração do ambiente.

  4. Encontre a configuração Recursos > Cargas de trabalho e clique em Editar.

  5. Na seção Worker, no campo Memory, especifique o novo limite de memória para workers do Airflow. Neste tutorial, use 4 GB.

  6. No campo CPU, especifique o novo limite de CPU para os workers do Airflow. Neste tutorial, use duas vCPUs.

  7. Salve as mudanças e aguarde alguns minutos para que os workers do Airflow sejam reiniciados.

Em seguida, substitua as opções de configuração do Airflow para paralelismo e simultaneidade de workers:

  1. Acesse a guia Substituições de configuração do Airflow.

  2. Clique em Edit e em Add Airflow Configuration Override.

  3. Modifique a configuração de parralismo:

    Seção Chave Valor
    core parallelism 20
  4. Clique em Add Airflow Configuration Override e substitua a configuração de concorrência do worker:

    Seção Chave Valor
    celery worker_concurrency 20
  5. Clique em Salvar e aguarde a atualização da configuração do ambiente.

Acione o mesmo DAG de exemplo novamente com as configurações ajustadas:

  1. Na interface do Airflow, acesse a página DAGs.

  2. Encontre e exclua o DAG dag_10_tasks_20_seconds_10.

    Depois que o DAG é excluído, o Airflow verifica a pasta de DAGs nos do ambiente de execução e executa automaticamente o DAG outra vez.

Após a conclusão das execuções do DAG, revise o histograma de registros novamente. No diagrama, observe que o exemplo dag_10_tasks_20_seconds_10 com mais tarefas consolidadas não geravam nenhum erro e aviso ao serem executadas com a configuração do ambiente ajustada. Compare os resultados com os dados anteriores no diagrama, em que o mesmo exemplo gerou erros e avisos ao ser executado com a configuração de ambiente padrão.

O histograma de registros de worker do Airflow com erros e avisos
        não mostra nenhum erro e aviso após a configuração do ambiente
        ajustado
Figura 12. Histograma de registros do worker do Airflow depois que a configuração do ambiente foi ajustada (clique para ampliar)

As configurações de ambiente e do Airflow desempenham um papel crucial na programação de tarefas. No entanto, não é possível aumentar as configurações além de determinados limites.

Recomendamos otimizar o código DAG, consolidar tarefas e usar a programação para otimizar a performance e a eficiência.

Exemplo: latência e erros de análise do DAG devido a um código DAG complexo

Neste exemplo, você investiga a latência de análise de um DAG de exemplo que imita um excesso de variáveis do Airflow.

Criar uma nova variável do Airflow

Antes de fazer upload do exemplo de código, crie uma nova variável do Airflow.

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na coluna Servidor da Web do Airflow, siga o link Airflow do ambiente.

  3. Acesse Administrador > Variáveis > Adicionar um novo registro.

  4. Defina os seguintes valores:

    • chave: example_var
    • valor: test_airflow_variable

Fazer upload do DAG de exemplo para o ambiente

Faça upload do DAG de amostra a seguir no ambiente que você criou nas etapas anteriores. Neste tutorial, o DAG é chamado dag_for_loop_airflow_variable:

Esse DAG contém uma repetição "for" que é executada 1.000 vezes e imita um excesso de Variáveis do Airflow. Cada iteração lê a variável example_var e gera uma tarefa. Cada tarefa contém um comando que imprime o valor da variável.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

Diagnosticar os problemas de análise

O tempo de análise do DAG é o tempo que o programador do Airflow leva para ler um arquivo DAG e fazer a análise dele. Antes que o programador do Airflow possa programar qualquer tarefa de um DAG, o programador precisa analisar o arquivo DAG para descobrir a estrutura sobre o DAG e as tarefas definidas.

Se a análise de um DAG demorar muito, isso consumirá a capacidade do programador e pode reduzir o desempenho das execuções de DAGs.

Para monitorar o tempo de análise do DAG, faça o seguinte:

  1. Execute o comando da CLI do Airflow dags report na CLI do gcloud para conferir o tempo de análise de todos os seus DAGs:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    Substitua:

    • ENVIRONMENT_NAME: o nome do ambiente;
    • LOCATION: a região em que o ambiente está localizado.
  2. Na saída do comando, procure o valor de duração da DAG dag_for_loop_airflow_variables. Um valor grande pode indicar que para que o DAG não seja implementado da melhor maneira. Se você tiver vários DAGs, na tabela de saída, é possível identificar quais DAGs têm um longo tempo de análise.

    Exemplo:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Inspecione os tempos de análise do DAG no console do Google Cloud:

    1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  4. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  5. Acesse a guia Registros e depois Todos os registros > Gerenciador de processador DAG.

  6. Analise os registros do dag-processor-manager e identifique possíveis problemas.

    Uma entrada de registro do DAG de exemplo mostra que o tempo de análise do DAG é de 46,3 segundos.
    Figura 13. Os registros do gerenciador do processador de DAG mostram o DAG tempos de análise (clique para ampliar)

Se o tempo total de análise do DAG exceder cerca de 10 segundos, talvez os programadores sobrecarregadas com análise de DAGs e não podem executá-los de maneira eficaz.

Otimizar o código DAG

É recomendado evitar o código Python "de nível superior" desnecessário nos DAGs. DAGs com muitas importações, variáveis e funções fora do DAG introduzem tempos de análise maiores para o programador do Airflow. Isso reduz o desempenho e a escalonabilidade Cloud Composer e Airflow. O excesso de leitura de variáveis do Airflow leva a um tempo de análise longo e a uma carga alta do banco de dados. Se esse código estiver em um arquivo DAG, essas funções serão executadas em cada batimento cardíaco do programador, o que pode ser lento.

Os campos de modelo do Airflow permitem incorporar valores do Airflow variáveis e modelos Jinja nos seus DAGs. Isso evita a execução desnecessária de funções durante os batimentos do programador.

Para implementar o exemplo de DAG de uma maneira melhor, evite usar variáveis do Airflow em código Python de nível superior dos DAGs. Em vez disso, transmita as variáveis do Airflow para operadores usam um modelo Jinja, que atrasa a leitura do valor até a execução da tarefa.

Faça o upload da nova versão do DAG de exemplo para o ambiente. Neste tutorial, o DAG é chamado de dag_for_loop_airflow_variable_optimized.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

Inspecione o novo tempo de análise do DAG:

  1. Aguarde até que a execução do DAG seja concluída.

  2. Execute o comando dags report novamente para conferir o tempo de análise de todos os seus DAGs:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. Revise os registros dag-processor-manager outra vez e a duração da análise.

    Uma entrada de registro para o DAG de amostra mostra que o tempo de análise do DAG é 4,21
    segundos
    Figura 14. Os registros do gerenciador do processador de DAG mostram o DAG tempos de análise após a otimização do código DAG (clique para ampliar)

Ao substituir as variáveis de ambiente por modelos do Airflow, você simplificou o código DAG e reduziu a latência da análise em cerca de dez vezes.

Otimizar as configurações do ambiente do Airflow

O programador do Airflow tenta acionar novas tarefas constantemente e analisa todos os DAGs no bucket do ambiente. Se os DAGs tiverem um tempo de análise longo e o consumir muitos recursos, poderá otimizar o programador do Airflow configurações para que o programador use os recursos com mais eficiência.

Neste tutorial, os arquivos DAG levam muito tempo para analisar e analisar ciclos. começam a se sobrepor, o que esgota a capacidade do programador. No nosso exemplo, o primeiro DAG de exemplo leva mais de cinco segundos para ser analisado. Portanto, você vai configurar o agendador para ser executado com menos frequência e usar os recursos de maneira mais eficiente. Você vai substituir a opção de configuração do Airflow scheduler_heartbeat_sec. Essa configuração define com que frequência programador deve ser executado (em segundos). Por padrão, o valor é definido como 5 segundos. É possível mudar essa opção de configuração do Airflow substituindo-a.

Substitua a opção de configuração scheduler_heartbeat_sec do Airflow:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Substituições de configuração do Airflow.

  4. Clique em Editar e em Adicionar substituição da configuração do Airflow.

  5. Modifique a opção de configuração do Airflow:

    Seção Chave Valor
    scheduler scheduler_heartbeat_sec 10
  6. Clique em Salvar e aguarde a atualização da configuração do ambiente.

Verifique as métricas do programador:

  1. Acesse a guia Monitoramento e selecione Agendadores.

  2. No gráfico Frequência de funcionamento do programador, clique no botão Mais opções (três pontos) e, em seguida, clique em Visualizar no Metrics Explorer.

O gráfico do sinal de funcionamento do programador mostra que os batimentos cardíacos ocorrem com menos frequência
Figura 15. Gráfico de sinal de funcionamento do programador (clique para ampliar)

No gráfico, você vai notar que o programador será executado duas vezes menos depois mudou a configuração padrão de 5 segundos para 10 segundos. Ao reduzir o frequência dos batimentos cardíacos, você garante que o programador não inicie em execução enquanto o ciclo de análise anterior está em andamento e a configuração a capacidade de recursos não estiver esgotada.

Atribuir mais recursos ao programador

No Cloud Composer 2, é possível alocar mais recursos de CPU e memória para scheduler. Dessa forma, você pode aumentar a performance do programador e acelerar o tempo de análise do DAG.

Aloque mais CPU e memória para o programador:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Configuração do ambiente.

  4. Encontre a configuração Resources > Workloads e Clique em Editar.

  5. Na seção Programador, no campo Memória, especifique o novo limite de memória. Neste tutorial, use 4 GB.

  6. No campo CPU, especifique o novo limite de CPU. Neste use duas vCPUs.

  7. Salve as alterações e aguarde alguns minutos para que os programadores do Airflow sejam reiniciados.

  8. Acesse a guia Registros e depois Todos os registros > Gerenciador de processador DAG.

  9. Analise os registros dag-processor-manager e compare a duração da análise dos DAGs de exemplo:

    Uma entrada de registro para o DAG de exemplo mostra que o tempo de análise do DAG otimizado é de 1,5 segundo. Para o DAG não otimizado, o tempo de análise é de 28,71 segundos.
    Figura 16. Os registros do gerenciador do processador DAG mostram os tempos de análise do DAG depois que mais recursos foram atribuídos ao programador (clique para ampliar)

Ao atribuir mais recursos ao programador, você aumentou a capacidade dele e reduziu a latência de análise significativamente em comparação com as configurações padrão do ambiente. Com mais recursos, o programador pode analisar os os DAGs são gerados com mais rapidez, mas os custos associados ao Cloud Composer os recursos também aumentam. Além disso, não é possível aumentar os recursos além de um determinado limite.

Recomendamos alocar recursos somente depois que o possível código DAG e os otimizações de configuração do Airflow forem implementados.

Limpar

Para evitar cobranças dos recursos na sua conta do Google Cloud usados neste tutorial, exclua o projeto que contém os recursos ou manter o projeto e excluir os recursos individuais.

Exclua o projeto

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Excluir recursos individuais

Se você planeja ver vários tutoriais e guias de início rápido, a reutilização de projetos pode evitar que você exceda os limites da cota do projeto.

Exclua o ambiente do Cloud Composer. Você também exclui o bucket do ambiente durante esse procedimento.

A seguir