Resolva problemas de agendamento de tarefas

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Este tutorial explica como diagnosticar e resolver problemas de agendamento de tarefas e de análise que levam a um mau funcionamento do agendador, erros de análise e latência, bem como falhas de tarefas.

Introdução

O agendador do Airflow é afetado principalmente por dois fatores: o agendamento de tarefas e a análise de DAGs. Os problemas num destes fatores podem ter um impacto negativo no desempenho e na integridade do ambiente.

Por vezes, são agendadas demasiadas tarefas em simultâneo. Nesta situação, a fila fica cheia e as tarefas permanecem no estado "agendado" ou são reagendadas após serem colocadas em fila, o que pode causar falhas nas tarefas e latência no desempenho.

Outro problema comum é a latência de análise e os erros causados pela complexidade de um código DAG. Por exemplo, um código DAG que contenha variáveis do Airflow ao nível superior do código pode originar atrasos na análise, sobrecarga da base de dados, falhas no agendamento e limites de tempo do DAG.

Neste tutorial, vai diagnosticar os DAGs de exemplo e saber como resolver problemas de agendamento e análise, melhorar o agendamento de DAGs e otimizar o código DAG e as configurações do ambiente para melhorar o desempenho.

Objetivos

Esta secção lista os objetivos dos exemplos neste tutorial.

Exemplo: mau funcionamento do agendador e latência causados pela concorrência elevada de tarefas

  • Carregue o DAG de exemplo que é executado várias vezes em simultâneo e diagnostique o mau funcionamento do programador e os problemas de latência com o Cloud Monitoring.

  • Otimize o código DAG consolidando as tarefas e avalie o impacto no desempenho.

  • Distribua as tarefas de forma mais uniforme ao longo do tempo e avalie o impacto no desempenho.

  • Otimize as configurações do Airflow e as configurações do ambiente e avalie o impacto.

Exemplo: erros de análise de DAG e latência causados por código complexo

  • Carregue o DAG de exemplo com variáveis do Airflow e diagnostique problemas de análise com o Cloud Monitoring.

  • Otimize o código DAG evitando variáveis do Airflow no nível superior do código e avalie o impacto no tempo de análise.

  • Otimize as configurações do Airflow e as configurações do ambiente e avalie o impacto no tempo de análise.

Custos

Este tutorial usa os seguintes componentes faturáveis do Google Cloud:

Quando terminar este tutorial, pode evitar a faturação contínua eliminando os recursos que criou. Para mais detalhes, consulte o artigo Limpe.

Antes de começar

Esta secção descreve as ações necessárias antes de iniciar o tutorial.

Crie e configure um projeto

Para este tutorial, precisa de um Google Cloud projeto. Configure o projeto da seguinte forma:

  1. Na Google Cloud consola, selecione ou crie um projeto:

    Aceder ao seletor de projetos

  2. Certifique-se de que a faturação está ativada para o seu projeto. Saiba como verificar se a faturação está ativada num projeto.

  3. Certifique-se de que o utilizador do Google Cloud projeto tem as seguintes funções para criar os recursos necessários:

    • Administrador de objetos de ambiente e armazenamento (roles/composer.environmentAndStorageObjectAdmin)
    • Administrador do Compute (roles/compute.admin)

Ative APIs para o seu projeto

Enable the Cloud Composer API.

Enable the API

Crie o seu ambiente do Cloud Composer

Crie um ambiente do Cloud Composer 2.

Como parte da criação do ambiente, concede a função Extensão do agente de serviço da API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) à conta do agente de serviço do Composer. O Cloud Composer usa esta conta para realizar operações no seu Google Cloud projeto.

Exemplo: mau funcionamento do agendador e falha da tarefa devido a problemas de agendamento de tarefas

Este exemplo demonstra a depuração do mau funcionamento do programador e da latência causada pela elevada simultaneidade de tarefas.

Carregue o DAG de exemplo para o seu ambiente

Carregue o seguinte DAG de exemplo para o ambiente que criou nos passos anteriores. Neste tutorial, este DAG tem o nome dag_10_tasks_200_seconds_1.

Este DAG tem 200 tarefas. Cada tarefa aguarda 1 segundo e imprime "Concluído!". O DAG é acionado automaticamente assim que é carregado. O Cloud Composer executa este DAG 10 vezes e todas as execuções do DAG ocorrem em paralelo.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Diagnostique o mau funcionamento do programador e os problemas de falha de tarefas

Após a conclusão da execução do DAG, abra a IU do Airflow e clique no DAG dag_10_tasks_200_seconds_1. Vai ver que foram executadas com êxito um total de 10 execuções de DAG e que cada uma tem 200 tarefas bem-sucedidas.

Reveja os registos de tarefas do Airflow:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  3. Aceda ao separador Registos e, de seguida, a Todos os registos > Registos do Airflow > Trabalhadores > Ver no Explorador de registos.

No histograma de registos, pode ver os erros e os avisos indicados com as cores vermelha e laranja:

O histograma dos registos do trabalhador do Airflow com erros e avisos
    indicados com cores vermelhas e laranjas
Figura 1. Histograma dos registos do trabalhador do Airflow (clique para aumentar)

O DAG de exemplo resultou em cerca de 130 avisos e 60 erros. Clique em qualquer coluna que contenha barras amarelas e vermelhas. Vai ver alguns dos seguintes avisos e erros nos registos:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

Estes registos podem indicar que a utilização de recursos excedeu os limites e o worker foi reiniciado automaticamente.

Se uma tarefa do Airflow for mantida na fila durante demasiado tempo, o agendador marca-a como falhada e up_for_retry, e vai reagendá-la novamente para execução. Uma forma de observar os sintomas desta situação é analisar o gráfico com o número de tarefas em fila e, se os picos neste gráfico não diminuírem em cerca de 10 minutos, é provável que ocorram falhas nas tarefas (sem registos).

Reveja as informações de monitorização:

  1. Aceda ao separador Monitorização e selecione Vista geral.

  2. Reveja o gráfico Tarefas do Airflow.

    O gráfico de tarefas do Airflow ao longo do tempo, que mostra um pico no número de tarefas em fila
    Figura 2. Gráfico de tarefas do Airflow (clique para ampliar)

    No gráfico de tarefas do Airflow, existe um pico nas tarefas em fila que dura mais de 10 minutos, o que pode significar que não existem recursos suficientes no seu ambiente para processar todas as tarefas agendadas.

  3. Reveja o gráfico Trabalhadores ativos:

    O gráfico de trabalhadores ativos do Airflow ao longo do tempo mostra que o número de trabalhadores ativos foi aumentado até ao limite máximo
    Figura 3. Gráfico de trabalhadores ativos (clique para aumentar)

    O gráfico Trabalhadores ativos indica que o DAG acionou o dimensionamento automático até ao limite máximo permitido de três trabalhadores durante a execução do DAG.

  4. Os gráficos de utilização de recursos podem indicar a falta de capacidade nos trabalhadores do Airflow para executar tarefas em fila. No separador Monitorização, selecione Trabalhadores e reveja os gráficos Utilização total da CPU do trabalhador e Utilização total da memória do trabalhador.

    O gráfico de utilização da CPU pelos trabalhadores do Airflow mostra a utilização da CPU a aumentar até ao limite máximo
    Figura 4. Gráfico de utilização da CPU dos trabalhadores totais (clique para aumentar)
    O gráfico de utilização de memória pelos trabalhadores do Airflow mostra que a utilização de memória está a aumentar, mas não atinge o limite máximo
    Figura 5. Gráfico de utilização de memória total dos trabalhadores (clique para aumentar)

    Os gráficos indicam que a execução de demasiadas tarefas em simultâneo resultou no atingimento do limite da CPU. Os recursos foram usados durante mais de 30 minutos, o que é ainda mais do que a duração total de 200 tarefas em 10 execuções de DAG executadas uma a uma.

Estes são os indicadores de que a fila está cheia e que não existem recursos para processar todas as tarefas agendadas.

Consolide as suas tarefas

O código atual cria muitos DAGs e tarefas sem recursos suficientes para processar todas as tarefas em paralelo, o que resulta no preenchimento da fila. Manter as tarefas na fila durante demasiado tempo pode fazer com que as tarefas sejam reagendadas ou falhem. Nestas situações, deve optar por um número menor de tarefas mais consolidadas.

O seguinte DAG de exemplo altera o número de tarefas no exemplo inicial de 200 para 20 e aumenta o tempo de espera de 1 para 10 segundos para imitar tarefas mais consolidadas que fazem a mesma quantidade de trabalho.

Carregue o seguinte exemplo de DAG para o ambiente que criou. Neste tutorial, este DAG tem o nome dag_10_tasks_20_seconds_10.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Avalie o impacto de tarefas mais consolidadas nos processos de agendamento:

  1. Aguarde até que as execuções do DAG estejam concluídas.

  2. Na IU do Airflow, na página DAGs, clique no DAG dag_10_tasks_20_seconds_10. Vai ver 10 execuções de DAG, cada uma com 20 tarefas concluídas com êxito.

  3. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  4. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  5. Aceda ao separador Registos e, de seguida, a Todos os registos > Registos do Airflow > Trabalhadores > Ver no Explorador de registos.

    O segundo exemplo com tarefas mais consolidadas resultou em aproximadamente 10 avisos e 7 erros. No histograma, pode comparar o número de erros e avisos no exemplo inicial (valores anteriores) e no segundo exemplo (valores posteriores).

    O histograma dos registos do trabalhador do Airflow com erros e avisos mostra a diminuição da quantidade de erros e avisos após a consolidação das tarefas
    Figura 6. Histograma dos registos do trabalhador do Airflow após a consolidação das tarefas (clique para aumentar)

    Quando compara o primeiro exemplo com o mais consolidado, pode ver que existem significativamente menos erros e avisos no segundo exemplo. No entanto, os mesmos erros relacionados com o encerramento parcial continuam a aparecer nos registos devido à sobrecarga de recursos.

  6. No separador Monitorização, selecione Trabalhadores e reveja os gráficos.

    Quando compara o gráfico de tarefas do Airflow do primeiro exemplo (valores anteriores) com o gráfico do segundo exemplo com tarefas mais consolidadas, pode ver que o pico de tarefas em fila durou menos tempo quando as tarefas estavam mais consolidadas. No entanto, durou quase 10 minutos, o que continua a não ser o ideal.

    O gráfico de tarefas do Airflow ao longo do tempo mostra que o pico de tarefas do Airflow durou menos tempo do que antes.
    Figura 7. Gráfico de tarefas do Airflow após a consolidação das tarefas (clique para aumentar)

    No gráfico Trabalhadores ativos, pode ver que o primeiro exemplo (no lado esquerdo do gráfico) usou recursos durante um período muito mais prolongado do que o segundo, apesar de ambos os exemplos imitarem a mesma quantidade de trabalho.

    O gráfico de trabalhadores ativos do Airflow ao longo do tempo mostra que o número de trabalhadores ativos foi aumentado durante um período mais curto do que antes.
    Figura 8. Gráfico de trabalhadores ativos após a consolidação das tarefas (clique para aumentar)

    Reveja os gráficos de consumo de recursos do trabalhador. Embora a diferença entre os recursos usados no exemplo com mais tarefas consolidadas e o exemplo inicial seja bastante significativa, a utilização da CPU continua a aumentar até 70% do limite.

    O gráfico de utilização da CPU pelos trabalhadores do Airflow mostra a utilização da CPU a aumentar até 70% do limite máximo
    Figura 9. Gráfico de utilização da CPU dos trabalhadores total após a consolidação das tarefas (clique para aumentar)
    O gráfico de utilização de memória pelos trabalhadores do Airflow mostra um aumento da utilização de memória, mas não atinge o limite máximo
    Figura 10. Gráfico de utilização de memória total dos trabalhadores após a consolidação das tarefas (clique para aumentar)

Distribuir as tarefas de forma mais uniforme ao longo do tempo

Demasiadas tarefas em simultâneo fazem com que a fila fique cheia, o que leva a que as tarefas fiquem bloqueadas na fila ou sejam reagendadas. Nos passos anteriores, diminuiu o número de tarefas consolidando-as. No entanto, os registos de saída e a monitorização indicaram que o número de tarefas simultâneas continua a ser inferior ao ideal.

Pode controlar o número de execuções de tarefas simultâneas implementando uma programação ou definindo limites para o número de tarefas que podem ser executadas em simultâneo.

Neste tutorial, distribui as tarefas de forma mais uniforme ao longo do tempo adicionando parâmetros ao nível do DAG no dag_10_tasks_20_seconds_10 DAG:

  1. Adicione o argumento max_active_runs=1 ao gestor de contexto DAG. Este argumento define um limite de apenas uma instância de uma execução de DAG num determinado momento.

  2. Adicione o argumento max_active_tasks=5 ao gestor de contexto DAG. Este argumento controla o número máximo de instâncias de tarefas que podem ser executadas em simultâneo em cada DAG.

Carregue o seguinte exemplo de DAG para o ambiente que criou. Neste tutorial, este DAG tem o nome dag_10_tasks_20_seconds_10_scheduled.py.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5


with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Avalie o impacto da distribuição de tarefas ao longo do tempo nos processos de agendamento:

  1. Aguarde até que as execuções do DAG estejam concluídas.

  2. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  3. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  4. Aceda ao separador Registos e, de seguida, a Todos os registos > Registos do Airflow > Trabalhadores > Ver no Explorador de registos.

  5. No histograma, pode ver que o terceiro DAG com um número limitado de tarefas e execuções ativas não gerou avisos nem erros, e a distribuição dos registos parece mais uniforme em comparação com os valores anteriores.

    O histograma dos registos do trabalhador do Airflow com erros e avisos
    não mostra erros nem avisos depois de as tarefas terem sido consolidadas e
    distribuídas ao longo do tempo.
    Figura 11. Histograma dos registos do trabalhador do Airflow após a consolidação e distribuição das tarefas ao longo do tempo (clique para aumentar)

As tarefas no exemplo dag_10_tasks_20_seconds_10_scheduled que tem um número limitado de tarefas ativas e execuções não causaram pressão sobre os recursos porque as tarefas foram colocadas em fila de forma uniforme.

Depois de executar os passos descritos, otimizou a utilização de recursos consolidando pequenas tarefas e distribuindo-as de forma mais uniforme ao longo do tempo.

Otimize as configurações do ambiente

Pode ajustar as configurações do ambiente para garantir que existe sempre capacidade nos trabalhadores do Airflow para executar tarefas em fila.

Número de trabalhadores e concorrência de trabalhadores

Pode ajustar o número máximo de trabalhadores para que o Cloud Composer dimensione automaticamente o seu ambiente dentro dos limites definidos.

O parâmetro [celery]worker_concurrency define o número máximo de tarefas que um único trabalhador pode obter da fila de tarefas. A alteração deste parâmetro ajusta o número de tarefas que um único trabalhador pode executar em simultâneo. Pode alterar esta opção de configuração do Airflow substituindo-a. Por predefinição, a simultaneidade do trabalhador é definida com base no número de instâncias de tarefas simultâneas simples que um trabalhador pode acomodar. Isto significa que o respetivo valor depende dos limites de recursos do trabalhador. O valor de simultaneidade do trabalhador não depende do número de trabalhadores no seu ambiente.

O número de trabalhadores e a concorrência de trabalhadores funcionam em combinação entre si, e o desempenho do seu ambiente depende muito de ambos os parâmetros. Pode usar as seguintes considerações para escolher a combinação correta:

  • Várias tarefas rápidas executadas em paralelo. Pode aumentar a concorrência de trabalhadores quando existem tarefas à espera na fila e os seus trabalhadores usam uma percentagem baixa das respetivas CPUs e memória ao mesmo tempo. No entanto, em determinadas circunstâncias, a fila pode nunca ficar cheia, o que faz com que o ajuste de escala automático nunca seja acionado. Se as tarefas pequenas terminarem a execução quando os novos trabalhadores estiverem prontos, um trabalhador existente pode assumir as tarefas restantes e não haverá tarefas para os trabalhadores recém-criados.

    Nestas situações, recomenda-se que aumente o número mínimo de trabalhadores e aumente a concorrência de trabalhadores para evitar o escalonamento excessivo.

  • Várias tarefas longas em execução em paralelo. A elevada simultaneidade de trabalhadores impede o sistema de dimensionar o número de trabalhadores. Se várias tarefas forem exigentes em termos de recursos e demorarem muito tempo a ser concluídas, uma simultaneidade de trabalhadores elevada pode fazer com que a fila nunca seja preenchida e que todas as tarefas sejam processadas por apenas um trabalhador, o que resulta em problemas de desempenho. Nestas situações, recomenda-se aumentar o número máximo de trabalhadores e diminuir a concorrência de trabalhadores.

A importância do paralelismo

Os programadores do Airflow controlam o agendamento de execuções de DAGs e tarefas individuais de DAGs. A opção de configuração do fluxo de ar [core]parallelism controla quantas tarefas o programador do fluxo de ar pode colocar em fila na fila do executor depois de todos os requisitos destas tarefas serem cumpridos.

O paralelismo é um mecanismo de proteção do Airflow que determina quantas tarefas podem ser executadas em simultâneo por cada programador, independentemente do número de trabalhadores. O valor de paralelismo, multiplicado pelo número de programadores no cluster, é o número máximo de instâncias de tarefas que o seu ambiente pode colocar em fila.

Normalmente, [core]parallelism é definido como um produto de um número máximo de trabalhadores e [celery]worker_concurrency. Também é afetado pelo conjunto. Pode alterar esta opção de configuração do Airflow substituindo-a. Para mais informações sobre o ajuste das configurações do Airflow relacionadas com o dimensionamento, consulte o artigo Dimensionar a configuração do Airflow.

Encontre configurações de ambiente ideais

A forma recomendada de corrigir problemas de agendamento é consolidar tarefas pequenas em tarefas maiores e distribuir as tarefas de forma mais uniforme ao longo do tempo. Além de otimizar o código DAG, também pode otimizar as configurações do ambiente para ter uma capacidade suficiente para executar várias tarefas em simultâneo.

Por exemplo, suponha que consolida tarefas no seu DAG o máximo possível, mas limitar as tarefas ativas para as distribuir de forma mais uniforme ao longo do tempo não é uma solução preferencial para o seu exemplo de utilização específico.

Pode ajustar o paralelismo, o número de trabalhadores e os parâmetros de simultaneidade dos trabalhadores para executar o DAG dag_10_tasks_20_seconds_10 sem limitar as tarefas ativas. Neste exemplo, o DAG é executado 10 vezes e cada execução contém 20 tarefas pequenas. Se quiser executá-los todos em simultâneo:

  • Precisa de um tamanho do ambiente maior, porque controla os parâmetros de desempenho da infraestrutura do Cloud Composer gerida do seu ambiente.

  • Os trabalhadores do Airflow têm de conseguir executar 20 tarefas em simultâneo, o que significa que tem de definir a simultaneidade dos trabalhadores como 20.

  • Os trabalhadores precisam de CPU e memória suficientes para processar todas as tarefas. A concorrência de trabalhadores é afetada pela CPU e pela memória dos trabalhadores. Por isso, precisa de, pelo menos, worker_concurrency / 12 na CPU e least worker_concurrency / 8 na memória.

  • Tem de aumentar o paralelismo para corresponder à concorrência de trabalhadores mais elevada. Para que os trabalhadores possam selecionar 20 tarefas da fila, o planeador tem de agendar primeiro essas 20 tarefas.

Ajuste as configurações do ambiente da seguinte forma:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  3. Aceda ao separador Configuração do ambiente.

  4. Encontre a configuração Resources > Workloads e clique em Editar.

  5. Na secção Worker, no campo Memória, especifique o novo limite de memória para os trabalhadores do Airflow. Neste tutorial, use 4 GB.

  6. No campo CPU, especifique o novo limite de CPU para os trabalhadores do Airflow. Neste tutorial, use 2 vCPU.

  7. Guarde as alterações e aguarde alguns minutos para que os trabalhadores do Airflow sejam reiniciados.

Em seguida, substitua as opções de configuração de paralelismo e simultaneidade de trabalhadores do Airflow:

  1. Aceda ao separador Substituições da configuração do Airflow.

  2. Clique em Editar e, de seguida, em Adicionar substituição da configuração do Airflow.

  3. Substitua a configuração de paralelismo:

    Secção Chave Valor
    core parallelism 20
  4. Clique em Adicionar substituição da configuração do Airflow e substitua a configuração de concorrência do worker:

    Secção Chave Valor
    celery worker_concurrency 20
  5. Clique em Guardar e aguarde até que o ambiente atualize a respetiva configuração.

Acione novamente o mesmo exemplo de DAG com as configurações ajustadas:

  1. Na IU do Airflow, aceda à página DAGs.

  2. Encontre o dag_10_tasks_20_seconds_10 DAG e elimine-o.

    Depois de o DAG ser eliminado, o Airflow verifica a pasta DAGs no contentor do seu ambiente e executa automaticamente o DAG novamente.

Depois de as execuções do DAG estarem concluídas, reveja novamente o histograma de registos. No diagrama, pode ver que o exemplo dag_10_tasks_20_seconds_10 com mais tarefas consolidadas não gerou erros nem avisos quando executado com a configuração do ambiente ajustada. Compare os resultados com os dados anteriores no diagrama, onde o mesmo exemplo gerou erros e avisos quando foi executado com a configuração do ambiente predefinido.

O histograma dos registos do trabalhador do Airflow com erros e avisos
        não mostra erros nem avisos após o ajuste da configuração do ambiente
Figura 12. Histograma dos registos do worker do Airflow após o ajuste da configuração do ambiente (clique para aumentar)

As configurações do ambiente e as configurações do Airflow desempenham um papel crucial na programação de tarefas. No entanto, não é possível aumentar as configurações para além de determinados limites.

Recomendamos que otimize o código DAG, consolide tarefas e use a programação para um desempenho e uma eficiência otimizados.

Exemplo: erros de análise DAG e latência devido a código DAG complexo

Neste exemplo, investiga a latência de análise de um DAG de amostra que imita um excesso de variáveis do Airflow.

Crie uma nova variável do Airflow

Antes de carregar o código de exemplo, crie uma nova variável do Airflow.

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na coluna Servidor Web do Airflow, siga o link do Airflow para o seu ambiente.

  3. Aceda a Administração > Variáveis > Adicionar um novo registo.

  4. Defina os seguintes valores:

    • chave: example_var
    • val: test_airflow_variable

Carregue o DAG de exemplo para o seu ambiente

Carregue o seguinte DAG de exemplo para o ambiente que criou nos passos anteriores. Neste tutorial, este DAG tem o nome dag_for_loop_airflow_variable.

Este DAG contém um ciclo for que é executado 1000 vezes e imita um excesso de variáveis do Airflow. Cada iteração lê a variável example_var e gera uma tarefa. Cada tarefa contém um comando que imprime o valor da variável.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

Diagnostique os problemas de análise

O tempo de análise de DAG é o tempo que o programador do Airflow demora a ler um ficheiro DAG e a analisá-lo. Antes de o programador do Airflow poder agendar qualquer tarefa de um DAG, o programador tem de analisar o ficheiro DAG para descobrir a estrutura do DAG e as tarefas definidas.

Se um DAG demorar muito tempo a ser analisado, consome a capacidade do programador e pode reduzir o desempenho das execuções de DAGs.

Para monitorizar o tempo de análise do DAG:

  1. Execute o dags report comando da CLI do Airflow na CLI gcloud para ver o tempo de análise de todos os seus DAGs:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    Substitua o seguinte:

    • ENVIRONMENT_NAME: o nome do seu ambiente.
    • LOCATION: a região onde o ambiente está localizado.
  2. No resultado do comando, procure o valor da duração do dag_for_loop_airflow_variables DAG. Um valor elevado pode indicar que este DAG não está implementado de forma ideal. Se tiver vários DAGs, na tabela de saída, pode identificar os DAGs com um longo tempo de análise.

    Exemplo:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Inspeccione os tempos de análise de DAG na Google Cloud consola:

    1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  4. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  5. Aceda ao separador Registos e, de seguida, a Todos os registos > Gestor do processador DAG.

  6. Reveja os registos dag-processor-manager e identifique possíveis problemas.

    Uma entrada de registo para o DAG de exemplo mostra que o tempo de análise do DAG é de 46,3 segundos
    Figura 13. Os registos do gestor do processador DAG mostram os tempos de análise do DAG (clique para aumentar)

Se o tempo total de análise DAG exceder cerca de 10 segundos, os seus programadores podem ficar sobrecarregados com a análise DAG e não conseguem executar DAGs de forma eficaz.

Otimize o código DAG

É recomendado evitar código Python "de nível superior" desnecessário nos seus DAGs. Os DAGs com muitas importações, variáveis e funções fora do DAG introduzem tempos de análise mais longos para o programador do Airflow. Isto reduz o desempenho e a escalabilidade do Cloud Composer e do Airflow. O excesso de leitura de variáveis do Airflow leva a um longo tempo de análise e a uma elevada carga da base de dados. Se este código estiver num ficheiro DAG, estas funções são executadas em cada pulsação do agendador, o que pode ser lento.

Os campos de modelo do Airflow permitem-lhe incorporar valores de variáveis do Airflow e modelos Jinja nos seus DAGs. Isto impede a execução de funções desnecessárias durante os sinais de pulsação do agendador.

Para implementar o exemplo de DAG de uma forma melhor, evite usar variáveis do Airflow no código Python de nível superior dos DAGs. Em alternativa, transmita variáveis do Airflow a operadores existentes através de um modelo Jinja, o que atrasa a leitura do valor até à execução da tarefa.

Carregue a nova versão do DAG de exemplo para o seu ambiente. Neste tutorial, este DAG tem o nome dag_for_loop_airflow_variable_optimized.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

Inspeção do novo tempo de análise do DAG:

  1. Aguarde até que a execução do DAG esteja concluída.

  2. Execute novamente o comando dags report para ver o tempo de análise de todos os seus DAGs:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. Reveja novamente os dag-processor-managerregistos e analise a duração da análise.

    Uma entrada de registo para o DAG de exemplo mostra que o tempo de análise do DAG é de 4,21 segundos
    Figura 14. Os registos do gestor do processador DAG mostram os tempos de análise do DAG após a otimização do código DAG (clique para aumentar)

Ao substituir as variáveis de ambiente por modelos do Airflow, simplificou o código do DAG e reduziu a latência de análise em cerca de dez vezes.

Otimize as configurações do ambiente do Airflow

O programador do Airflow tenta constantemente acionar novas tarefas e analisa todos os DAGs no seu bucket de ambiente. Se os seus DAGs tiverem um longo tempo de análise e o programador consumir muitos recursos, pode otimizar as configurações do programador do Airflow para que o programador use os recursos de forma mais eficiente.

Neste tutorial, os ficheiros DAG demoram muito tempo a ser analisados e os ciclos de análise começam a sobrepor-se, o que esgota a capacidade do programador. No nosso exemplo, o primeiro DAG demora mais de 5 segundos a analisar, pelo que vai configurar o agendador para ser executado com menos frequência para usar os recursos de forma mais eficiente. Vai substituir a opção de configuração do scheduler_heartbeat_sec Airflow. Esta configuração define a frequência com que o agendador deve ser executado (em segundos). Por predefinição, o valor está definido para 5 segundos. Pode alterar esta opção de configuração do Airflow substituindo-a.

Substitua a scheduler_heartbeat_secopção de configuração do Airflow:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  3. Aceda ao separador Substituições da configuração do Airflow.

  4. Clique em Editar e, de seguida, em Adicionar substituição da configuração do Airflow.

  5. Substitua a opção de configuração do Airflow:

    Secção Chave Valor
    scheduler scheduler_heartbeat_sec 10
  6. Clique em Guardar e aguarde até que o ambiente atualize a respetiva configuração.

Verifique as métricas do programador:

  1. Aceda ao separador Monitorização e selecione Agendadores.

  2. No gráfico Sinal de pulsação do agendador, clique no botão Mais opções (três pontos) e, de seguida, clique em Ver no explorador de métricas.

O gráfico de pulsação do agendador mostra que a pulsação ocorre com menos frequência
Figura 15. Gráfico de heartbeat do programador (clique para aumentar)

No gráfico, verá que o agendador é executado duas vezes menos frequentemente depois de alterar a configuração predefinida de 5 segundos para 10 segundos. Ao reduzir a frequência dos sinais de pulsação, garante que o agendador não começa a ser executado enquanto o ciclo de análise anterior estiver em curso e a capacidade de recursos do agendador não estiver esgotada.

Atribua mais recursos ao agendador

No Cloud Composer 2, pode atribuir mais recursos de CPU e memória ao programador. Desta forma, pode aumentar o desempenho do seu programador e acelerar o tempo de análise do seu DAG.

Atribua CPU e memória adicionais ao programador:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  3. Aceda ao separador Configuração do ambiente.

  4. Encontre a configuração Resources > Workloads e clique em Editar.

  5. Na secção Agendador, no campo Memória, especifique o novo limite de memória. Neste tutorial, use 4 GB.

  6. No campo CPU, especifique o novo limite da CPU. Neste tutorial, use 2 vCPU.

  7. Guarde as alterações e aguarde alguns minutos para que os programadores do Airflow sejam reiniciados.

  8. Aceda ao separador Registos e, de seguida, a Todos os registos > Gestor do processador DAG.

  9. Reveja os registos de dag-processor-manager e compare a duração da análise para os DAGs de exemplo:

    Uma entrada de registo para o DAG de exemplo mostra que o tempo de análise do DAG para o DAG otimizado é de 1,5 segundos. Para o DAG não otimizado, o tempo de análise é de 28,71 segundos
    Figura 16. Os registos do gestor do processador DAG mostram os tempos de análise do DAG após a atribuição de mais recursos ao programador (clique para aumentar)

Ao atribuir mais recursos ao programador, aumentou a capacidade do programador e reduziu significativamente a latência de análise em comparação com as configurações de ambiente predefinidas. Com mais recursos, o programador pode analisar os DAGs mais rapidamente. No entanto, os custos associados aos recursos do Cloud Composer também aumentam. Além disso, não é possível aumentar os recursos além de um determinado limite.

Recomendamos que atribua recursos apenas depois de implementar as possíveis otimizações do código DAG e da configuração do Airflow.

Limpar

Para evitar incorrer em custos na sua Google Cloud conta pelos recursos usados neste tutorial, elimine o projeto que contém os recursos ou mantenha o projeto e elimine os recursos individuais.

Elimine o projeto

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Elimine recursos individuais

Se planeia explorar vários tutoriais e inícios rápidos, a reutilização de projetos pode ajudar a evitar exceder os limites de quota do projeto.

Elimine o ambiente do Cloud Composer. Também elimina o contentor do ambiente durante este procedimento.

O que se segue?