Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Este tutorial orienta você no diagnóstico e na solução de problemas de programação de tarefas e de análise que levam a erros de análise, latência e falha de tarefas e mau funcionamento do programador.
Introdução
O programador do Airflow é afetado principalmente por dois fatores: a programação de tarefas e a análise de DAG. Problemas em um desses fatores podem ter um impacto negativo na integridade e no desempenho do ambiente.
Às vezes, muitas tarefas são programadas simultaneamente. Nessa situação, a fila fica cheia, e as tarefas permanecem no estado "programado" ou são reprogramadas após serem enfileiradas, o que pode causar falha na tarefa e latência de desempenho.
Outro problema comum é a latência de análise e os erros causados pela complexidade de um código de DAG. Por exemplo, um código de DAG que contém variáveis do Airflow no nível superior do código pode causar atrasos na análise, sobrecarga do banco de dados, falhas no agendamento e tempos limite do DAG.
Neste tutorial, você vai diagnosticar os DAGs de exemplo e aprender a resolver problemas de programação e análise, melhorar a programação de DAGs e otimizar o código e as configurações de ambiente para melhorar o desempenho.
Objetivos
Esta seção lista os objetivos dos exemplos neste tutorial.
Exemplo: mau funcionamento do programador e latência causada por alta simultaneidade de tarefas
Faça upload do DAG de exemplo que é executado várias vezes simultaneamente e diagnostique o mau funcionamento do programador e os problemas de latência com o Cloud Monitoring.
Otimize o código DAG consolidando as tarefas e avalie o impacto na performance.
Distribua as tarefas de maneira mais uniforme ao longo do tempo e avalie o impacto na performance.
Otimize as configurações do Airflow e do ambiente e avalie o impacto.
Exemplo: erros de análise de DAG e latência causados por código complexo
Faça upload do DAG de exemplo com variáveis do Airflow e diagnostique problemas de análise com o Cloud Monitoring.
Otimize o código do DAG evitando variáveis do Airflow no nível superior do código e avalie o impacto no tempo de análise.
Otimize as configurações do Airflow e do ambiente e avalie o impacto no tempo de análise.
Custos
Neste tutorial, usamos os seguintes componentes faturáveis do Google Cloud:
- Cloud Composer (consulte custos extras)
- Cloud Monitoring
Ao concluir este tutorial, exclua os recursos criados para evitar o faturamento contínuo. Para mais detalhes, consulte Limpeza.
Antes de começar
Nesta seção, descrevemos as ações necessárias antes de iniciar o tutorial.
Criar e configurar um projeto
Para este tutorial, você precisa de um projeto Google Cloud. Configure o projeto da seguinte maneira:
No console do Google Cloud , selecione ou crie um projeto:
Verifique se o faturamento foi ativado para o projeto. Saiba como verificar se o faturamento está ativado em um projeto.
Verifique se o usuário do projeto Google Cloud tem as seguintes funções para criar os recursos necessários:
- Administrador de objetos do armazenamento e do ambiente
(
roles/composer.environmentAndStorageObjectAdmin
) - Administrador do Compute (
roles/compute.admin
)
- Administrador de objetos do armazenamento e do ambiente
(
Ativar as APIs do projeto
Enable the Cloud Composer API.
Criar o ambiente do Cloud Composer
Crie um ambiente do Cloud Composer 2.
Ao criar o ambiente, você concede o papel Extensão do agente de serviço da API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext
) à conta do agente de serviço do Composer. O Cloud Composer usa essa conta para realizar operações no seu projeto Google Cloud .
Exemplo: mau funcionamento do programador e falha da tarefa devido a problemas de agendamento
Este exemplo demonstra a depuração de um agendador com mau funcionamento e latência causada por alta simultaneidade de tarefas.
Fazer upload do DAG de exemplo para o ambiente
Faça upload do DAG de exemplo a seguir para o ambiente
criado nas etapas anteriores. Neste tutorial, o DAG é chamado de
dag_10_tasks_200_seconds_1
.
Esse DAG tem 200 tarefas. Cada tarefa aguarda um segundo e imprime "Concluído!". O DAG é acionado automaticamente após o upload. O Cloud Composer executa esse DAG 10 vezes, e todas as execuções acontecem em paralelo.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Diagnosticar problemas de mau funcionamento do programador e falha de tarefas
Depois que as execuções do DAG forem concluídas, abra a interface do Airflow e clique no DAG dag_10_tasks_200_seconds_1
. Você vai ver que 10 execuções de DAG foram concluídas com sucesso, e cada uma tem 200 tarefas concluídas.
Analise os registros de tarefas do Airflow:
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Registros e selecione Todos os registros > Registros do Airflow > Workers > Ver na Análise de registros.
No histograma de registros, você pode ver os erros e avisos indicados com cores vermelha e laranja:

O exemplo de DAG resultou em cerca de 130 avisos e 60 erros. Clique em qualquer coluna que contenha barras amarelas e vermelhas. Você vai encontrar alguns dos seguintes avisos e erros nos registros:
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
Esses registros podem indicar que o uso de recursos excedeu os limites e que o worker foi reiniciado automaticamente.
Se uma tarefa do Airflow ficar na fila por muito tempo, o programador a marcará como falha e up_for_retry e vai reagendar a execução mais uma vez. Uma maneira de observar os sintomas dessa situação é analisar o gráfico com o número de tarefas enfileiradas. Se os picos nesse gráfico não diminuírem em cerca de 10 minutos, provavelmente haverá falhas de tarefas (sem registros).
Revise as informações de monitoramento:
Acesse a guia Monitoring e selecione Visão geral.
Analise o gráfico de tarefas do Airflow.
Figura 2. Gráfico de tarefas do Airflow (clique para ampliar) No gráfico de tarefas do Airflow, há um pico de tarefas na fila que dura mais de 10 minutos. Isso pode significar que não há recursos suficientes no ambiente para processar todas as tarefas programadas.
Analise o gráfico Trabalhadores ativos:
Figura 3. Gráfico de workers ativos (clique para ampliar) O gráfico Workers ativos indica que o DAG acionou o escalonamento automático até o limite máximo permitido de três workers durante a execução do DAG.
Os gráficos de uso de recursos podem indicar a falta de capacidade nos workers do Airflow para executar tarefas enfileiradas. Na guia Monitoring, selecione Workers e analise os gráficos Uso total da CPU do worker e Uso total da memória do worker.
Figura 4. Gráfico do uso total da CPU dos workers (clique para ampliar) Figura 5. Gráfico de uso total da memória de workers (clique para ampliar) Os gráficos indicam que a execução de muitas tarefas simultaneamente resultou no atingimento do limite da CPU. Os recursos foram usados por mais de 30 minutos, o que é ainda mais longo do que a duração total de 200 tarefas em 10 execuções de DAG executadas uma por uma.
Esses são os indicadores de que a fila está sendo preenchida e há falta de recursos para processar todas as tarefas programadas.
Consolidar suas tarefas
O código atual cria muitos DAGs e tarefas sem recursos suficientes para processar todas as tarefas em paralelo, o que resulta no preenchimento da fila. Manter tarefas na fila por muito tempo pode fazer com que elas sejam reagendadas ou falhem. Nesses casos, opte por um número menor de tarefas mais consolidadas.
O exemplo de DAG a seguir muda o número de tarefas no exemplo inicial de 200 para 20 e aumenta o tempo de espera de 1 para 10 segundos para imitar mais tarefas consolidadas que fazem a mesma quantidade de trabalho.
Faça upload do exemplo de DAG a seguir para o ambiente
que você criou. Neste tutorial, o DAG é chamado de
dag_10_tasks_20_seconds_10
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Avalie o impacto de tarefas mais consolidadas nos processos de programação:
Aguarde até que as execuções do DAG sejam concluídas.
Na página DAGs da interface do Airflow, clique no DAG
dag_10_tasks_20_seconds_10
. Você verá 10 execuções de DAG, cada uma com 20 tarefas concluídas.No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Registros e selecione Todos os registros > Registros do Airflow > Workers > Ver na Análise de registros.
O segundo exemplo, com tarefas mais consolidadas, resultou em aproximadamente 10 avisos e 7 erros. No histograma, é possível comparar o número de erros e avisos no exemplo inicial (valores anteriores) e no segundo exemplo (valores posteriores).
Figura 6. Histograma de registros do worker do Airflow após a consolidação das tarefas (clique para ampliar) Ao comparar o primeiro exemplo com o mais consolidado, é possível perceber que há muito menos erros e avisos no segundo exemplo. No entanto, os mesmos erros relacionados ao desligamento gradual ainda aparecem nos registros devido à sobrecarga de recursos.
Na guia Monitoring, selecione Workers e analise os gráficos.
Ao comparar o gráfico de tarefas do Airflow do primeiro exemplo (valores anteriores) com o gráfico do segundo exemplo, que tem tarefas mais consolidadas, é possível notar que o pico de tarefas na fila durou menos tempo quando as tarefas estavam mais consolidadas. No entanto, ele durou quase 10 minutos, o que ainda é abaixo do ideal.
Figura 7. Gráfico de tarefas do Airflow após a consolidação das tarefas (clique para ampliar) No gráfico "Workers ativos", é possível ver que o primeiro exemplo (à esquerda) usou recursos por um período muito mais longo do que o segundo, embora ambos imitem a mesma quantidade de trabalho.
Figura 8. Gráfico de workers ativos após a consolidação das tarefas (clique para ampliar) Analise os gráficos de consumo de recursos do worker. Embora a diferença entre os recursos usados no exemplo com mais tarefas consolidadas e o exemplo inicial seja bastante significativa, o uso da CPU ainda está aumentando para 70% do limite.
Figura 9. Gráfico do uso total da CPU dos workers após a consolidação das tarefas (clique para ampliar) Figura 10. Gráfico do uso total de memória de workers após a consolidação das tarefas (clique para ampliar)
Distribua as tarefas de maneira mais uniforme com o tempo
Muitas tarefas simultâneas fazem com que a fila fique cheia, o que leva a tarefas presas na fila ou reprogramadas. Nas etapas anteriores, você diminuiu o número de tarefas consolidando-as. No entanto, os registros de saída e o monitoramento indicaram que o número de tarefas simultâneas ainda está abaixo do ideal.
É possível controlar o número de execuções de tarefas simultâneas implementando um cronograma ou definindo limites para quantas tarefas podem ser executadas ao mesmo tempo.
Neste tutorial, você distribui as tarefas de maneira mais uniforme ao longo do tempo adicionando parâmetros no nível do DAG ao DAG dag_10_tasks_20_seconds_10
:
Adicione o argumento
max_active_runs=1
ao gerenciador de contexto do DAG. Esse argumento define um limite de apenas uma instância de uma execução de DAG em um determinado momento.Adicione o argumento
max_active_tasks=5
ao gerenciador de contexto do DAG. Esse argumento controla o número máximo de instâncias de tarefa que podem ser executadas simultaneamente em cada DAG.
Faça upload do exemplo de DAG a seguir para o ambiente
que você criou. Neste tutorial, o DAG é chamado de
dag_10_tasks_20_seconds_10_scheduled.py
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Avalie o impacto da distribuição de tarefas ao longo do tempo nos processos de programação:
Aguarde até que as execuções do DAG sejam concluídas.
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Registros e selecione Todos os registros > Registros do Airflow > Workers > Ver na Análise de registros.
No histograma, é possível ver que o terceiro DAG com um número limitado de tarefas e execuções ativas não gerou avisos nem erros, e a distribuição de registros parece mais uniforme em comparação com os valores anteriores.
Figura 11. Histograma de registros de worker do Airflow depois que as tarefas foram consolidadas e distribuídas ao longo do tempo (clique para ampliar)
As tarefas no exemplo dag_10_tasks_20_seconds_10_scheduled
, que tem um número limitado de tarefas e execuções ativas, não causaram pressão de recursos porque foram enfileiradas de maneira uniforme.
Depois de realizar as etapas descritas, você otimizou o uso de recursos consolidando pequenas tarefas e distribuindo-as de maneira mais uniforme ao longo do tempo.
Otimizar as configurações do ambiente
É possível ajustar as configurações do ambiente para garantir que sempre haja capacidade nos workers do Airflow para executar tarefas enfileiradas.
Número de workers e simultaneidade de workers
É possível ajustar o número máximo de workers para que o Cloud Composer faça o escalonamento automático do ambiente dentro dos limites definidos.
O parâmetro [celery]worker_concurrency
define o número máximo de tarefas
que um único worker pode extrair da fila de tarefas. Alterar esse parâmetro ajusta o número de tarefas que um único worker pode executar ao mesmo tempo.
É possível mudar essa opção de configuração do Airflow substituindo-a. Por padrão, a simultaneidade do worker é definida com base no número de instâncias de tarefas simultâneas leves que um worker pode acomodar. Isso significa que o valor depende dos limites de recursos do worker.
O valor da simultaneidade do worker não depende do número de workers no ambiente.
O número de workers e a simultaneidade de workers funcionam em combinação, e o desempenho do ambiente depende muito dos dois parâmetros. Use as seguintes considerações para escolher a combinação correta:
Várias tarefas rápidas em execução em paralelo. É possível aumentar a simultaneidade de workers quando há tarefas aguardando na fila e seus workers usam uma porcentagem baixa de CPUs e memória ao mesmo tempo. No entanto, em determinadas circunstâncias, a fila pode nunca ser preenchida, fazendo com que o escalonamento automático nunca seja acionado. Se as tarefas pequenas terminarem a execução quando os novos workers estiverem prontos, um worker atual poderá assumir as tarefas restantes, e não haverá tarefas para os workers recém-criados.
Nessas situações, é recomendável aumentar o número mínimo de workers e a simultaneidade deles para evitar um escalonamento excessivo.
Várias tarefas longas em execução em paralelo. A alta simultaneidade de workers impede que o sistema dimensione o número de workers. Se várias tarefas exigirem muitos recursos e demorarem muito para serem concluídas, uma alta simultaneidade de workers poderá fazer com que a fila nunca seja preenchida e todas as tarefas sejam selecionadas por apenas um worker, o que resulta em problemas de desempenho. Nessas situações, é recomendável aumentar o número máximo de workers e diminuir a simultaneidade deles.
A importância do paralelismo
Os programadores do Airflow controlam a programação de execuções de DAG e tarefas individuais de
DAGs. A opção de configuração [core]parallelism
do Airflow controla quantas
tarefas o programador do Airflow pode enfileirar na fila do executor após todas as
dependências dessas tarefas serem atendidas.
O paralelismo é um mecanismo de proteção do Airflow que determina quantas tarefas podem ser executadas ao mesmo tempo por cada programador, independente da contagem de workers. O valor de paralelismo, multiplicado pelo número de programadores no cluster, é o número máximo de instâncias de tarefa que seu ambiente pode enfileirar.
Normalmente, [core]parallelism
é definido como um produto de um número máximo de workers e [celery]worker_concurrency
. Ela também é afetada pelo pool.
É possível mudar essa opção de configuração do Airflow substituindo-a. Para mais informações sobre como ajustar as configurações do Airflow relacionadas ao escalonamento, consulte Escalonar a configuração do Airflow.
Encontrar configurações de ambiente ideais
A maneira recomendada de corrigir problemas de programação é consolidar tarefas pequenas em tarefas maiores e distribuir as tarefas de maneira mais uniforme ao longo do tempo. Além de otimizar o código do DAG, também é possível otimizar as configurações do ambiente para ter capacidade suficiente para executar várias tarefas simultaneamente.
Por exemplo, suponha que você consolide tarefas no seu DAG o máximo possível, mas limitar as tarefas ativas para distribuí-las de maneira mais uniforme ao longo do tempo não seja uma solução preferida para seu caso de uso específico.
É possível ajustar o paralelismo, o número de workers e os parâmetros de simultaneidade de workers
para executar o DAG dag_10_tasks_20_seconds_10
sem limitar as tarefas
ativas. Neste exemplo, o DAG é executado 10 vezes, e cada execução contém 20 tarefas pequenas.
Se você quiser executar todos ao mesmo tempo:
Você vai precisar de um ambiente maior, porque ele controla os parâmetros de desempenho da infraestrutura gerenciada do Cloud Composer.
Os workers do Airflow precisam executar 20 tarefas simultaneamente. Portanto, defina a simultaneidade de workers como 20.
Os workers precisam de CPU e memória suficientes para processar todas as tarefas. A simultaneidade do worker é afetada pela CPU e pela memória dele. Portanto, você precisa de pelo menos
worker_concurrency / 12
de CPU eleast worker_concurrency / 8
de memória.Você precisará aumentar o paralelismo para corresponder à maior simultaneidade do worker. Para que os workers selecionem 20 tarefas da fila, o programador precisará agendar essas 20 tarefas primeiro.
Ajuste as configurações do ambiente da seguinte maneira:
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Configuração do ambiente.
Encontre a configuração Recursos > Cargas de trabalho e clique em Editar.
Na seção Worker, no campo Memória, especifique o novo limite de memória para os workers do Airflow. Neste tutorial, use 4 GB.
No campo CPU, especifique o novo limite de CPU para workers do Airflow. Neste tutorial, use 2 vCPUs.
Salve as mudanças e aguarde alguns minutos para que os workers do Airflow sejam reiniciados.
Em seguida, substitua as opções de configuração de paralelismo e simultaneidade de workers do Airflow:
Acesse a guia Modificações de configuração do Airflow.
Clique em Editar e em Adicionar substituição da configuração do Airflow.
Substitua a configuração de paralelismo:
Seção Chave Valor core
parallelism
20
Clique em Adicionar substituição da configuração do Airflow e substitua a configuração de simultaneidade do worker:
Seção Chave Valor celery
worker_concurrency
20
Clique em Salvar e aguarde até que o ambiente atualize a configuração.
Acione o mesmo exemplo de DAG novamente com as configurações ajustadas:
Na interface do Airflow, acesse a página DAGs.
Encontre e exclua o DAG
dag_10_tasks_20_seconds_10
.Depois que o DAG é excluído, o Airflow verifica a pasta de DAGs no bucket do seu ambiente e executa automaticamente o DAG novamente.
Depois que as execuções do DAG forem concluídas, revise o histograma de registros novamente. No diagrama, é possível ver que o exemplo dag_10_tasks_20_seconds_10
com tarefas mais consolidadas não gerou erros nem avisos ao ser executado com a configuração de ambiente ajustada. Compare os resultados com os dados anteriores no diagrama, em que o mesmo exemplo gerou erros e avisos ao ser executado com a configuração padrão do ambiente.

As configurações de ambiente e do Airflow são cruciais no agendamento de tarefas, mas não é possível aumentar as configurações além de determinados limites.
Recomendamos otimizar o código do DAG, consolidar tarefas e usar o agendamento para melhorar a performance e a eficiência.
Exemplo: erros de análise de DAG e latência devido a um código DAG complexo
Neste exemplo, você investiga a latência de análise de um DAG de amostra que imita um excesso de variáveis do Airflow.
Criar uma variável do Airflow
Antes de fazer upload do exemplo de código, crie uma variável do Airflow.
No console Google Cloud , acesse a página Ambientes.
Na coluna Servidor da Web do Airflow, siga o link Airflow do ambiente.
Acesse Administrador > Variáveis > Adicionar um novo registro.
Defina os seguintes valores:
- chave:
example_var
- val:
test_airflow_variable
- chave:
Fazer upload do DAG de exemplo para o ambiente
Faça upload do DAG de exemplo a seguir para o ambiente
criado nas etapas anteriores. Neste tutorial, o DAG é chamado de
dag_for_loop_airflow_variable
.
Esse DAG contém um loop "for" que é executado 1.000 vezes e imita um excesso de variáveis do Airflow. Cada iteração lê a variável example_var
e gera uma tarefa. Cada tarefa contém um comando que imprime o valor da variável.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
Diagnosticar problemas de análise
O tempo de análise do DAG é o tempo que leva para o programador do Airflow ler e analisar um arquivo DAG. Antes de o programador do Airflow programar qualquer tarefa de um DAG, ele precisa analisar o arquivo DAG para descobrir a estrutura do DAG e as tarefas definidas.
Se a análise de um DAG demorar muito, isso consumirá a capacidade do programador e poderá reduzir o desempenho das execuções do DAG.
Para monitorar o tempo de análise do DAG:
Execute o comando da CLI do Airflow
dags report
na CLI gcloud para conferir o tempo de análise de todos os seus DAGs:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
Substitua:
ENVIRONMENT_NAME
: o nome do ambiente;LOCATION
: a região em que o ambiente está localizado.
Na saída do comando, procure o valor da duração do DAG
dag_for_loop_airflow_variables
. Um valor grande pode indicar que esse DAG não está implementado da maneira ideal. Se você tiver vários DAGs, na tabela de respostas, é possível identificar quais deles têm um longo tempo de análise.Exemplo:
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
Inspecionar tempos de análise do DAG no console do Google Cloud :
- No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Registros e clique em Todos os registros > Gerenciador do processador DAG.
Analise os registros
dag-processor-manager
e identifique possíveis problemas.Figura 13. Os registros do gerenciador do processador de DAG mostram os tempos de análise do DAG (clique para ampliar)
Se o tempo total de análise do DAG exceder cerca de 10 segundos, seus programadores poderão ficar sobrecarregados com a análise do DAG e não poderão executar DAGs de maneira eficaz.
Otimizar o código do DAG
É recomendado evitar código Python desnecessário de "nível superior" nos seus DAGs. DAGs com muitas importações, variáveis e funções fora do DAG aumentam os tempos de análise para o programador do Airflow. Isso reduz o desempenho e a escalonabilidade do Cloud Composer e do Airflow. O excesso de leitura de variáveis do Airflow resulta em um longo tempo de análise e alta carga do banco de dados. Se esse código estiver em um arquivo DAG, essas funções serão executadas em cada pulsação do programador, o que pode ser lento.
Os campos de modelo do Airflow permitem incorporar valores de variáveis do Airflow e modelos Jinja aos seus DAGs. Isso evita a execução desnecessária de funções durante os heartbeats do programador.
Para implementar o exemplo de DAG de uma maneira melhor, evite usar variáveis do Airflow no código Python de nível superior dos DAGs. Em vez disso, transmita variáveis do Airflow para operadores existentes usando um modelo Jinja, que vai atrasar a leitura do valor até a execução da tarefa.
Faça upload da nova versão do DAG de amostra para o
ambiente. Neste tutorial, o DAG é chamado de
dag_for_loop_airflow_variable_optimized
.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
Inspecione o novo tempo de análise do DAG:
Aguarde a conclusão da execução do DAG.
Execute o comando
dags report
novamente para ver o tempo de análise de todos os seus DAGs:file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
Revise os registros
dag-processor-manager
novamente e analise a duração da análise.Figura 14. Os registros do gerenciador do processador de DAG mostram os tempos de análise do DAG depois que o código foi otimizado (clique para ampliar)
Ao substituir as variáveis de ambiente por modelos do Airflow, você simplificou o código do DAG e reduziu a latência de análise em cerca de dez vezes.
Otimizar as configurações do ambiente do Airflow
O programador do Airflow tenta constantemente acionar novas tarefas e analisa todos os DAGs no bucket do ambiente. Se os DAGs tiverem um tempo de análise longo e o programador consumir muitos recursos, otimize as configurações do programador do Airflow para que ele use os recursos de maneira mais eficiente.
Neste tutorial, os arquivos DAG levam muito tempo para serem analisados, e os ciclos de análise
começam a se sobrepor, o que esgota a capacidade do programador. No nosso exemplo, o primeiro DAG leva mais de 5 segundos para ser analisado. Por isso, você vai configurar o programador para ser executado com menos frequência e usar os recursos de maneira mais eficiente. Você vai substituir a opção de configuração do Airflow scheduler_heartbeat_sec
. Essa configuração define a frequência com que o
agendador deve ser executado (em segundos). Por padrão, o valor é definido como 5 segundos.
É possível mudar essa opção de configuração do Airflow substituindo-a.
Substitua a opção de configuração do Airflow scheduler_heartbeat_sec
:
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Modificações de configuração do Airflow.
Clique em Editar e em Adicionar substituição da configuração do Airflow.
Substitua a opção de configuração do Airflow:
Seção Chave Valor scheduler
scheduler_heartbeat_sec
10
Clique em Salvar e aguarde até que o ambiente atualize a configuração.
Verifique as métricas do programador:
Acesse a guia Monitoring e selecione Agendadores.
No gráfico Heartbeat do Scheduler, clique no botão Mais opções (três pontos) e em Ver no Metrics Explorer.

No gráfico, você vai notar que o programador é executado duas vezes com menos frequência depois que você mudou a configuração padrão de 5 para 10 segundos. Ao reduzir a frequência dos heartbeats, você garante que o programador não comece a ser executado enquanto o ciclo de análise anterior está em andamento e que a capacidade de recursos do programador não se esgota.
Atribuir mais recursos ao programador
No Cloud Composer 2, é possível alocar mais recursos de CPU e memória para o scheduler. Dessa forma, você pode aumentar a performance do programador e acelerar o tempo de análise do DAG.
Alocar mais CPU e memória para o programador:
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Configuração do ambiente.
Encontre a configuração Recursos > Cargas de trabalho e clique em Editar.
Na seção Scheduler, no campo Memory, especifique o novo limite de memória. Neste tutorial, use 4 GB.
No campo CPU, especifique o novo limite. Neste tutorial, use 2 vCPUs.
Salve as mudanças e aguarde alguns minutos para que os programadores do Airflow sejam reiniciados.
Acesse a guia Registros e clique em Todos os registros > Gerenciador do processador DAG.
Revise os registros
dag-processor-manager
e compare a duração da análise dos DAGs de exemplo:Figura 16. Os registros do gerenciador de processador de DAG mostram os tempos de análise do DAG depois que mais recursos foram atribuídos ao programador (clique para ampliar)
Ao atribuir mais recursos ao programador, você aumentou a capacidade dele e reduziu significativamente a latência de análise em comparação com as configurações padrão do ambiente. Com mais recursos, o programador pode analisar os DAGs mais rapidamente. No entanto, os custos associados aos recursos do Cloud Composer também aumentam. Além disso, não é possível aumentar os recursos além de um determinado limite.
Recomendamos alocar recursos somente depois que as possíveis otimizações de código DAG e configuração do Airflow forem implementadas.
Limpar
Para evitar cobranças na sua conta do Google Cloud pelos recursos usados neste tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.
Excluir o projeto
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Excluir recursos individuais
Se você planeja ver vários tutoriais e guias de início rápido, a reutilização de projetos pode evitar que você exceda os limites da cota do projeto.
Exclua o ambiente do Cloud Composer. Você também exclui o bucket do ambiente durante esse procedimento.