Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Este tutorial orienta você a diagnosticar e resolver problemas de programação de tarefas e de análise que levam a um mau funcionamento do programador, erros de análise e latência e falha na tarefa.
Introdução
O programador do Airflow é afetado principalmente por dois fatores: programação de tarefas e análise de DAG. Problemas em um desses fatores podem ter um impacto negativo na integridade e desempenho do ambiente.
Às vezes, muitas tarefas são programadas ao mesmo tempo. Nessa situação, a fila fica cheia, e as tarefas permanecem no estado "programada" ou são reprogramadas depois de serem enfileiradas, o que pode causar falha na tarefa e latência de desempenho.
Outro problema comum é a latência de análise e os erros causados pela complexidade de um código DAG. Por exemplo, um código DAG que contenha variáveis do Airflow na parte superior pode gerar atrasos na análise, sobrecarga do banco de dados, e tempo limite do DAG.
Neste tutorial, você vai diagnosticar os DAGs de exemplo e aprender a resolver problemas de programação e análise, melhorar a programação de DAGs e otimizar o código e as configurações do ambiente de DAGs para melhorar o desempenho.
Objetivos
Esta seção lista os objetivos dos exemplos neste tutorial.
Exemplo: mau funcionamento do programador e latência causada por alta simultaneidade de tarefas
Faça upload do DAG de exemplo que é executado várias vezes simultaneamente e diagnostique o problema de funcionamento do programador e os problemas de latência com o Cloud Monitoring.
Otimize o código do DAG consolidando as tarefas e avaliando o impacto na performance.
Distribuir as tarefas de maneira mais uniforme ao longo do tempo e avaliar o desempenho. impacto.
otimizar as configurações do Airflow e do ambiente; avaliar o impacto.
Exemplo: latência e erros de análise do DAG causados por código complexo
Fazer upload do DAG de amostra com variáveis do Airflow e diagnosticar a análise problemas com o Cloud Monitoring.
Otimize o código DAG evitando variáveis do Airflow no nível superior código e avaliar o impacto no tempo de análise.
Otimize as configurações do Airflow e do ambiente e avalie o impacto no tempo de análise.
Custos
Neste tutorial, usamos o seguinte componente faturável do Google Cloud:
- Cloud Composer (consulte custos adicionais)
- Cloud Monitoring
Ao concluir este tutorial, exclua os recursos criados para evitar o faturamento contínuo. Veja mais detalhes em Limpeza.
Antes de começar
Esta seção descreve as ações necessárias antes de iniciar o tutorial.
Criar e configurar um projeto
Para este tutorial, você precisa ter uma conta do Google Cloud projeto. Configure o projeto da seguinte maneira:
No console do Google Cloud, selecione ou crie um projeto:
Verifique se o faturamento foi ativado para o projeto. Saiba como verificar se o faturamento está ativado em um projeto.
Verifique se o usuário do projeto do Google Cloud tem os seguintes papéis para criar os recursos necessários:
- Administrador de ambiente e de objetos do Storage
(
roles/composer.environmentAndStorageObjectAdmin
) - Administrador do Compute (
roles/compute.admin
)
- Administrador de ambiente e de objetos do Storage
(
Ativar as APIs do projeto
Enable the Cloud Composer API.
Criar seu ambiente do Cloud Composer
Crie um ambiente do Cloud Composer 2.
Como parte da criação do ambiente,
você concede a extensão do agente de serviço da API Cloud Composer v2
(roles/composer.ServiceAgentV2Ext
) para o agente de serviço do Composer
do Compute Engine. O Cloud Composer usa essa conta para realizar operações
no projeto do Google Cloud.
Exemplo: falha no agendador e na tarefa devido a problemas de agendamento
Este exemplo demonstra a depuração do mau funcionamento do programador e a latência causada devido à alta simultaneidade de tarefas.
faça upload do DAG de amostra para o ambiente
Faça upload do DAG de exemplo abaixo para o ambiente
criado nas etapas anteriores. Neste tutorial, o DAG é chamado de
dag_10_tasks_200_seconds_1
.
Este DAG tem 200 tarefas. Cada tarefa aguarda um segundo e exibe "Concluída!". O DAG é acionado automaticamente após o upload. O Cloud Composer executa esse DAG 10 vezes, e todas as execuções do DAG acontecem em paralelo.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Diagnosticar o mau funcionamento do programador e problemas de falha nas tarefas
Depois que as execuções do DAG forem concluídas, abra a interface do Airflow e clique no
DAG dag_10_tasks_200_seconds_1
. Você vai notar que 10 execuções de DAG foram
concluídas e cada uma delas tem 200 tarefas concluídas.
Revise os registros de tarefas do Airflow:
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Registros e depois Todos os registros > Registros do Airflow > Workers > Ver na Análise de registros.
No histograma de registros, é possível ver os erros e avisos indicados com cores vermelha e laranja:
O DAG de exemplo resultou em cerca de 130 avisos e 60 erros. Clique em qualquer que contém barras amarelas e vermelhas. Você verá algumas e erros nos registros:
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
Esses registros podem indicar que o uso de recursos excedeu os limites o worker reiniciou por conta própria.
Se uma tarefa do Airflow é mantida na fila por muito tempo, o programador marca como falhou e up_for_retry e vai reprogramá-lo novamente para execução. Uma maneira de observar os sintomas dessa situação é olhar para o gráfico com o número de tarefas na fila e se os picos do gráfico não cair em cerca de 10 minutos, provavelmente haverá falhas nas tarefas (sem registros).
Revise as informações de monitoramento:
Acesse a guia Monitoramento e selecione Visão geral.
Analise o gráfico de tarefas do Airflow.
No gráfico de tarefas do Airflow, há um pico nas tarefas na fila que dura cerca de mais de 10 minutos, o que pode significar que não há recursos suficientes em seu ambiente para processar todas as tarefas agendadas.
Analise o gráfico Trabalhadores ativos:
O gráfico Workers ativos indica que o DAG acionou o escalonamento automático para o limite máximo permitido de três workers durante a execução do DAG.
Os gráficos de uso de recursos podem indicar a falta de capacidade em workers do Airflow para executar tarefas na fila. Na guia Monitoring, selecione Workers e analise os gráficos Total worker CPU usage e Total worker memory usage.
Os gráficos indicam que a execução de muitas tarefas ao mesmo tempo resultou no limite da CPU. Os recursos foram usados por mais de 30 minutos, o que é ainda mais do que a duração total de 200 tarefas em 10 O DAG é executado um por um.
Esses são os indicadores de que a fila está cheia e de falta de recursos para processar todas as tarefas agendadas.
Consolidar suas tarefas
O código atual cria muitos DAGs e tarefas sem recursos suficientes para processar todas as tarefas em paralelo, o que resulta na preenchimento da fila. Manter as tarefas na fila por muito tempo pode fazer com que elas sejam reagendadas ou falhem. Nessas situações, você deve optar por um número menor de modelos tarefas.
O DAG de exemplo a seguir muda o número de tarefas no exemplo inicial de 200 para 20 e aumenta o tempo de espera de 1 para 10 segundos para imitar tarefas mais consolidadas que fazem a mesma quantidade de trabalho.
Faça upload do DAG de amostra a seguir no ambiente
que você criou. Neste tutorial, o DAG é chamado de
dag_10_tasks_20_seconds_10
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Avaliar o impacto de tarefas mais consolidadas nos processos de agendamento:
Aguarde até que as execuções do DAG sejam concluídas.
Na interface do Airflow, na página DAGs, clique em DAG
dag_10_tasks_20_seconds_10
. Você vai encontrar 10 execuções de DAG, cada uma com 20 tarefas bem-sucedidas.No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Registros e depois Todos os registros > Registros do Airflow > Workers > Visualizar no Explorer de registros.
O segundo exemplo com tarefas mais consolidadas resultou em aproximadamente 10 avisos e 7 erros. No histograma, é possível comparar o número de erros e avisos no exemplo inicial (valores anteriores) e no segundo exemplo (valores posteriores).
Ao comparar o primeiro exemplo com o mais consolidado, é possível observar que há muito menos erros e avisos no segundo exemplo. No entanto, os mesmos erros relacionados ao encerramento com estado salvo ainda aparecem devido à sobrecarga de recursos.
Na guia Monitoramento, selecione Workers e analise os gráficos.
Ao comparar o gráfico Tarefas do Airflow do primeiro exemplo (valores anteriores) com o gráfico do segundo exemplo com tarefas mais consolidadas, é possível notar que o pico nas tarefas em fila durou um período mais curto quando as tarefas estavam mais consolidadas. No entanto, ela durou cerca de 10 minutos, o que ainda é abaixo do ideal.
No gráfico "Workers ativos", confira o primeiro exemplo (à esquerda) do lado do gráfico) usou os recursos por um período muito mais longo que o segundo, embora os dois exemplos imitem a mesma quantidade de funcionam.
Analisar os gráficos de consumo de recursos dos workers. Mesmo que a diferença entre os recursos usados no exemplo com tarefas mais consolidadas e exemplo inicial é bastante significativo, o uso da CPU ainda está aumentando a 70% do limite.
Distribuir as tarefas de maneira mais uniforme ao longo do tempo
Muitas tarefas simultâneas resultam no preenchimento da fila, o que faz com que as tarefas fiquem presas na fila ou sejam reprogramadas. Nas etapas anteriores, você reduziu o número de tarefas ao consolidar essas tarefas. No entanto, os logs de saída e o monitoramento indicaram que o número de tarefas simultâneas ainda é subótimo.
É possível controlar o número de execuções de tarefas simultâneas implementando uma programação ou definir limites para quantas tarefas podem ser executadas simultaneamente.
Neste tutorial, você vai distribuir as tarefas de maneira mais uniforme ao longo do tempo adicionando
parâmetros no nível do DAG ao DAG dag_10_tasks_20_seconds_10
:
Adicione o argumento
max_active_runs=1
ao gerenciador de contexto do DAG. Este argumento define o limite de uma única instância da execução de um DAG em um determinado momento.Adicione o argumento
max_active_tasks=5
ao gerenciador de contexto do DAG. Esse argumento controla o número máximo de instâncias de tarefa que podem ser executadas simultaneamente em cada DAG.
Faça o upload do DAG de exemplo a seguir para o ambiente
criado. Neste tutorial, o DAG é chamado de
dag_10_tasks_20_seconds_10_scheduled.py
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Avaliar o impacto da distribuição de tarefas ao longo do tempo nos processos de agendamento:
Aguarde até que as execuções do DAG sejam concluídas.
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Registros e depois Todos os registros > Registros do Airflow > Workers > Ver na Análise de registros.
No histograma, é possível notar que o terceiro DAG com um número limitado de tarefas e execuções ativas não geraram avisos ou erros, e dos registros parece mais uniforme em comparação com os valores anteriores.
As tarefas no exemplo dag_10_tasks_20_seconds_10_scheduled
que têm um
número limitado de tarefas ativas e execuções não causaram pressão de recursos porque
as tarefas foram enfileiradas de maneira uniforme.
Depois de executar as etapas descritas, você otimizou o uso de recursos consolidar pequenas tarefas e distribuí-las de modo mais uniforme ao longo do tempo.
Otimizar as configurações do ambiente
Você pode ajustar as configurações do ambiente para garantir que os workers do Airflow sempre tenham capacidade para executar tarefas na fila.
Número de workers e simultaneidade de workers
É possível ajustar o número máximo de workers para que o Cloud Composer escalone automaticamente o ambiente dentro dos limites definidos.
O parâmetro [celery]worker_concurrency
define o número máximo de tarefas
que um único worker pode pegar da fila de tarefas. Alterar esse parâmetro
ajusta o número de tarefas que um único worker pode executar ao mesmo tempo.
É possível mudar essa opção de configuração do Airflow
substituindo-a. Por padrão, a simultaneidade do worker é definida como um
mínimo de 32, 12 * worker_CPU, 8 * worker_memory
, o que significa que
ela depende dos limites de recursos do worker. Consulte
Otimizar ambientes para mais informações sobre os valores padrão
de simultaneidade do worker.
O número de workers e a simultaneidade dos workers trabalham em conjunto, e o desempenho do ambiente é altamente dependente de ambos os parâmetros. Use as seguintes considerações para escolher a combinação correta:
Várias tarefas rápidas em execução em paralelo. É possível aumentar o número de workers simultaneidade quando há tarefas aguardando na fila, e seus workers usam uma pequena porcentagem das CPUs e da memória ao mesmo tempo. No entanto, em determinadas circunstâncias, a fila pode nunca ficar cheia, fazendo com que o escalonamento automático nunca seja acionado. Se tarefas pequenas terminam a execução no momento em que os novos workers estão prontos, um worker existente pode assumir as tarefas restantes não haverá tarefas para workers recém-criados.
Nessas situações, é recomendável aumentar o número mínimo de workers e a simultaneidade de workers. para evitar um escalonamento excessivo.
Várias tarefas longas em execução em paralelo. A alta simultaneidade de workers impede que o sistema escalone o número de workers. Se várias tarefas usam muitos recursos e demoram muito para serem concluídas, um alto nível de a simultaneidade pode fazer com que a fila nunca seja preenchida e todas as tarefas sejam por apenas um worker, o que causa problemas de desempenho. Nessas situações, é recomendável aumentar o número máximo de workers e diminuir a simultaneidade.
A importância do paralelismo
Os programadores do Airflow controlam a programação de execuções de DAGs e tarefas individuais
os DAGs. A opção de configuração [core]parallelism
do Airflow controla quantas
tarefas o programador do Airflow pode enfileirar na fila do executor após todas as
dependências dessas tarefas serem atendidas.
O paralelismo é um mecanismo de proteção do Airflow que determina quantas tarefas podem ser executadas ao mesmo tempo por cada programador, independentemente da contagem de workers. O valor de paralelismo, multiplicado pelo número de programadores no cluster, é o número máximo de instâncias de tarefas que o ambiente pode enfileirar.
Normalmente, [core]parallelism
é definido como um produto de um número máximo de workers
e [celery]worker_concurrency
. Ele também é afetado pelo
pool.
É possível mudar essa opção de configuração do Airflow
substituindo-a. Para mais informações sobre como ajustar as configurações do Airflow relacionadas à escala, consulte Como ajustar a configuração do Airflow.
Encontrar as configurações de ambiente ideais
A maneira recomendada de corrigir problemas de programação é consolidar tarefas pequenas em tarefas maiores e distribuir as tarefas de maneira mais uniforme ao longo do tempo. Além de otimizar o código do DAG, você também pode otimizar as configurações do ambiente para ter uma capacidade suficiente para executar várias tarefas simultaneamente.
Por exemplo, suponha que você consolide tarefas no seu DAG o máximo possível, mas limitando as tarefas ativas para distribuí-las de modo mais uniforme tempo não é uma solução preferida para seu caso de uso específico.
É possível ajustar o paralelismo, o número e a simultaneidade de workers.
parâmetros para executar o DAG dag_10_tasks_20_seconds_10
sem limitar o acesso
tarefas. Neste exemplo, o DAG é executado 10 vezes, e cada execução contém 20 pequenas tarefas.
Se você quiser executar todos simultaneamente:
Você vai precisar de um tamanho de ambiente maior, porque ele controla os parâmetros de desempenho da infraestrutura gerenciada do Cloud Composer do seu ambiente.
Os workers do Airflow precisam ser capazes de executar 20 tarefas simultaneamente, o que significa que você precisa definir a simultaneidade do worker como 20.
Os workers precisam de CPU e memória suficientes para processar todas as tarefas. A simultaneidade do worker é afetada pela CPU e pela memória do worker. Portanto, você vai precisar de pelo menos
worker_concurrency / 12
na CPU eleast worker_concurrency / 8
na memória.Você vai precisar aumentar o paralelismo para corresponder à maior simultaneidade de workers. Para que os workers peguem 20 tarefas da fila, o programador precisa agendar essas 20 tarefas primeiro.
Ajuste as configurações do ambiente da seguinte maneira:
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Configuração do ambiente.
Encontre a configuração Recursos > Cargas de trabalho e clique em Editar.
Na seção Worker, no campo Memory, especifique a nova memória. para workers do Airflow. Neste tutorial, use 4 GB.
No campo CPU, especifique o novo limite de CPU para os workers do Airflow. Neste use duas vCPUs.
Salve as mudanças e aguarde alguns minutos para que os workers do Airflow reiniciar.
Em seguida, substitua as opções de configuração do Airflow para paralelismo e simultaneidade de workers:
Acesse a guia Modificações de configuração do Airflow.
Clique em Edit e em Add Airflow Configuration Override.
Modifique a configuração de parralismo:
Seção Chave Valor core
parallelism
20
Clique em Add Airflow Configuration Override e substitua a configuração de concorrência do worker:
Seção Chave Valor celery
worker_concurrency
20
Clique em Salvar e aguarde até que o ambiente atualize a configuração.
Acione o mesmo DAG de exemplo novamente com as configurações ajustadas:
Na interface do Airflow, acesse a página DAGs.
Encontre e exclua o DAG
dag_10_tasks_20_seconds_10
.Depois que o DAG é excluído, o Airflow verifica a pasta de DAGs no bucket do ambiente e executa o DAG novamente de forma automática.
Depois que as execuções de DAG forem concluídas, analise o histograma de registros novamente. No diagrama,
observe que o exemplo dag_10_tasks_20_seconds_10
com mais
tarefas consolidadas não geravam nenhum erro e aviso ao serem executadas com
a configuração do ambiente ajustada. Compare os resultados com os dados anteriores
no diagrama, em que o mesmo exemplo gerou erros e avisos ao
em execução com a configuração de ambiente padrão.
As configurações de ambiente e do Airflow desempenham um papel crucial na programação de tarefas. No entanto, não é possível aumentar as configurações além de determinados limites.
Recomendamos otimizar o código DAG, consolidar tarefas e usar a programação para otimizar a performance e a eficiência.
Exemplo: erros de análise e latência de DAG devido a um código complexo
Neste exemplo, você investiga a latência de análise de uma amostra de DAG que imita um excesso de variáveis do Airflow.
Criar uma variável do Airflow
Antes de fazer upload do exemplo de código, crie uma nova variável do Airflow.
No console do Google Cloud, acesse a página Ambientes.
Na coluna Servidor da Web do Airflow, siga o link Airflow do ambiente.
Acesse Administrador > Variáveis > Adicionar um novo registro.
Defina os seguintes valores:
- chave:
example_var
- val:
test_airflow_variable
- chave:
faça upload do DAG de amostra para o ambiente
Faça upload do DAG de exemplo abaixo para o ambiente
criado nas etapas anteriores. Neste tutorial, o DAG é chamado
dag_for_loop_airflow_variable
:
Esse DAG contém um loop for que é executado 1.000 vezes e imita um excesso de
variáveis do Airflow. Cada iteração lê a variável example_var
e
gera uma tarefa. Cada tarefa contém um comando que imprime o valor da
variável.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
Diagnosticar os problemas de análise
O tempo de análise do DAG é o tempo que leva para o programador do Airflow ler um arquivo DAG e analisá-lo. Antes que o programador do Airflow possa programar qualquer tarefa de um DAG, o programador precisa analisar o arquivo DAG para descobrir a estrutura sobre o DAG e as tarefas definidas.
Se a análise de um DAG demorar muito, isso consumirá a capacidade do programador e pode reduzir o desempenho das execuções de DAGs.
Para monitorar o tempo de análise do DAG:
Execute o comando da CLI do Airflow
dags report
em CLI gcloud para conferir o tempo de análise de todos os DAGs:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
Substitua:
ENVIRONMENT_NAME
: o nome do ambiente;LOCATION
: a região em que o ambiente está localizado.
Na saída do comando, procure o valor de duração do valor-chave DAG
dag_for_loop_airflow_variables
. Um valor grande pode indicar que para que o DAG não seja implementado da melhor maneira. Se você tiver vários DAGs, na tabela de saída, é possível identificar quais DAGs têm um longo tempo de análise.Exemplo:
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
Inspecione os tempos de análise do DAG no console do Google Cloud:
- No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Registros e depois Todos os registros > Gerenciador do processador de DAG.
Analise os registros do
dag-processor-manager
e identifique possíveis problemas.
Se o tempo de análise total do DAG exceder cerca de 10 segundos, seus programadores poderão estar sobrecarregados com a análise do DAG e não poderão executar DAGs de maneira eficaz.
Otimizar o código DAG
É recomendado evitar o código Python "de nível superior" desnecessário nos DAGs. DAGs com muitos importações, variáveis e funções fora do DAG introduzem maior análise para o programador do Airflow. Isso reduz a performance e a escalabilidade do Cloud Composer e do Airflow. O excesso de leitura de variáveis do Airflow leva a um tempo de análise longo e a uma carga alta do banco de dados. Se esse código estiver em um arquivo DAG, essas funções serão executadas em cada batimento cardíaco do programador, o que pode ser lento.
Os campos de modelo do Airflow permitem incorporar valores do Airflow variáveis e modelos Jinja nos seus DAGs. Isso evita o uso desnecessário execução da função durante os sinais de funcionamento do programador.
Para implementar o exemplo de DAG de uma maneira melhor, evite usar variáveis do Airflow no código Python de nível superior dos DAGs. Em vez disso, transmita as variáveis do Airflow para operadores usam um modelo Jinja, que atrasa a leitura do valor até a execução da tarefa.
Faça o upload da nova versão do DAG de exemplo para o
ambiente. Neste tutorial, o DAG é chamado de
dag_for_loop_airflow_variable_optimized
.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
Inspecione o novo tempo de análise do DAG:
Aguarde até que a execução do DAG seja concluída.
Execute o comando
dags report
novamente para ver a o tempo de análise de todos os DAGs:file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
Revise os registros
dag-processor-manager
outra vez e a duração da análise.
Ao substituir as variáveis de ambiente por modelos do Airflow, você simplificou o código DAG e reduziu a latência de análise em cerca de dez vezes.
Otimizar as configurações do ambiente do Airflow
O programador do Airflow tenta constantemente acionar novas tarefas e analisar todos os DAGs no bucket do ambiente. Se os DAGs tiverem um tempo de análise longo e o programador consumir muitos recursos, será possível otimizar as configurações do programador do Airflow para que ele use os recursos de maneira mais eficiente.
Neste tutorial, os arquivos DAG levam muito tempo para analisar e analisar ciclos.
começam a se sobrepor, o que esgota a capacidade do programador. Em nosso exemplo,
o primeiro DAG de exemplo leva mais de 5 segundos para analisar, então você vai configurar
que o programador seja executado com menos frequência para usar os recursos com mais eficiência. Você
vai substituir a opção de configuração do Airflow
scheduler_heartbeat_sec
. Essa configuração define com que frequência o
programador precisa ser executado (em segundos). Por padrão, o valor é definido como 5 segundos.
Para alterar essa opção de configuração do Airflow,
substituí-lo.
Substitua a opção de configuração do Airflow scheduler_heartbeat_sec
:
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Substituições de configuração do Airflow.
Clique em Editar e em Adicionar substituição da configuração do Airflow.
Modifique a opção de configuração do Airflow:
Seção Chave Valor scheduler
scheduler_heartbeat_sec
10
Clique em Salvar e aguarde a atualização da configuração do ambiente.
Verifique as métricas do programador:
Acesse a guia Monitoramento e selecione Agendadores.
No gráfico Frequência de funcionamento do programador, clique no botão Mais opções (três pontos) e, em seguida, clique em Visualizar no Metrics Explorer.
No gráfico, você vai notar que o programador é executado com metade da frequência depois que você mudou a configuração padrão de 5 segundos para 10 segundos. Ao reduzir a frequência de batimentos, você garante que o programador não comece a ser executado enquanto o ciclo de análise anterior estiver em andamento e a capacidade de recursos do programador não estiver esgotada.
Atribuir mais recursos ao programador
No Cloud Composer 2, é possível alocar mais recursos de CPU e memória para o programador. Dessa forma, você pode aumentar a performance do programador e para acelerar o tempo de análise do DAG.
Aloque mais CPU e memória para o programador:
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Configuração do ambiente.
Encontre a configuração Resources > Workloads e Clique em Editar.
Na seção Programador, no campo Memória, especifique o novo limite de memória. Neste tutorial, use 4 GB.
No campo CPU, especifique o novo limite de CPU. Neste tutorial, use duas vCPUs.
Salve as mudanças e aguarde alguns minutos para que os programadores do Airflow reiniciar.
Acesse a guia Registros e depois Todos os registros > Gerenciador de processador DAG.
Revise os registros
dag-processor-manager
e compare a duração da análise dos exemplos de DAGs:
Ao atribuir mais recursos ao programador, você aumentou a capacidade dele e reduziu a latência de análise significativamente em comparação com as configurações padrão do ambiente. Com mais recursos, o programador pode analisar os DAGs mais rapidamente. No entanto, os custos associados aos recursos do Cloud Composer também vão aumentar. Além disso, não é possível aumentar recursos além de um determinado limite.
Recomendamos alocar recursos somente após o possível código DAG e As otimizações de configuração do Airflow foram implementadas.
Limpar
Para evitar cobranças na sua conta do Google Cloud pelos recursos usados neste tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.
Exclua o projeto
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Excluir recursos individuais
Se você planeja ver vários tutoriais e guias de início rápido, a reutilização de projetos pode evitar que você exceda os limites da cota do projeto.
Exclua o ambiente do Cloud Composer. Você também excluir o bucket do ambiente durante este procedimento.