Depurar problemas de DAG sem memória e sem armazenamento

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Este tutorial mostra as etapas para depurar um DAG com falha no Airflow Cloud Composer e diagnostique problemas relacionados a recursos do worker, como falta de memória ou de espaço de armazenamento no worker, com a ajuda de registros e o monitoramento.

Introdução

Este tutorial se concentra em problemas relacionados a recursos para demonstrar maneiras de depurar um DAG.

A falta de recursos de worker alocados causa falhas no DAG. Se uma tarefa do Airflow ficar sem memória ou armazenamento, talvez apareça uma exceção do Airflow, como:

WARNING airflow.exceptions.AirflowException: Task received SIGTERM signal
INFO - Marking task as FAILED.

ou

Task exited with return code Negsignal.SIGKILL

Nesses casos, a recomendação geral é aumentar o número de workers do Airflow recursos ou reduzir o número de tarefas por worker. No entanto, como o Airflow exceções podem ser genéricas, pode ser difícil identificar recurso específico que está causando o problema.

Neste tutorial, explicamos como diagnosticar o motivo de uma falha do DAG e identificar o tipo de recurso que causa problemas depurando dois exemplos de DAGs que falham devido à falta de memória e armazenamento do worker.

Objetivos

  • Execute exemplos de DAGs que falham pelos seguintes motivos:

    • Falta de memória do worker
    • Falta de armazenamento do worker
  • Diagnosticar os motivos da falha

  • Aumentar os recursos de workers alocados

  • Teste os DAGs com novos limites de recursos

Custos

Neste tutorial, usamos o seguinte componente faturável do Google Cloud:

Ao concluir este tutorial, exclua os recursos criados para evitar o faturamento contínuo. Veja mais detalhes em Limpeza.

Antes de começar

Esta seção descreve as ações necessárias antes de iniciar o tutorial.

Criar e configurar um projeto

Para este tutorial, você precisa ter uma conta do Google Cloud projeto. Configure o projeto da seguinte maneira:

  1. No console do Google Cloud, selecione ou crie um projeto:

    Acesse o seletor de projetos

  2. Verifique se o faturamento foi ativado para o projeto. Saiba como verificar se o faturamento está ativado em um projeto.

  3. Verifique se o usuário do projeto do Google Cloud tem os seguintes papéis para criar os recursos necessários:

    • Administrador de ambiente e de objetos do Storage (roles/composer.environmentAndStorageObjectAdmin)
    • Administrador do Compute (roles/compute.admin)
    • Editor do Monitoring (roles/monitoring.editor)

Ativar as APIs do projeto

Ative a API Cloud Composer.

Ative a API

criar o ambiente do Cloud Composer

Crie um ambiente do Cloud Composer 2.

Como parte da criação do ambiente, você concede a extensão do agente de serviço da API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) para o agente de serviço do Composer do Compute Engine. O Cloud Composer usa essa conta para realizar operações no projeto do Google Cloud.

Verificar os limites de recursos do worker

Verifique os limites de recursos de worker do Airflow no seu ambiente:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Configuração do ambiente.

  4. Acesse Recursos > Configuração de cargas de trabalho. > Worker.

  5. Verifique se os valores são 0,5 vCPU, 1,875 GB de memória e 1 GB de armazenamento. Estes são os limites de recursos de worker do Airflow com que você vai trabalhar nas próximas etapas deste tutorial.

Exemplo: diagnosticar problemas de falta de memória

Faça upload do DAG de amostra a seguir no ambiente que você criou nas etapas anteriores. Neste tutorial, o DAG é chamado create_list_with_many_strings:

Esse DAG contém uma tarefa que executa as seguintes etapas:

  1. Cria uma lista vazia s.
  2. Executa um ciclo para anexar a string More à lista.
  3. Mostra quanta memória a lista consome e aguarda 1 segundo a cada 1 por 1 minuto.
.
import time

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import sys
from datetime import timedelta

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'create_list_with_many_strings',
    default_args=default_args,
    schedule_interval=None)


def consume():
    s = []
    for i in range(120):
        for j in range(1000000):
            s.append("More")
        print(f"i={i}; size={sys.getsizeof(s) / (1000**3)}GB")
        time.sleep(1)


t1 = PythonOperator(
    task_id='task0',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0
)

Acionar o DAG de amostra

Acione o DAG de amostra, create_list_with_many_strings:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na coluna Servidor da Web do Airflow, siga o link Airflow do ambiente.

  3. Na interface da Web do Airflow, na página DAGs, na coluna Links do DAG, clique no botão Acionar DAG.

  4. Clique em Gatilho.

  5. Na página DAGs, clique na tarefa que você acionou e revise a saída. registros para garantir que o DAG começou a ser executado.

Enquanto a tarefa estiver em execução, os registros de saída imprimirão o tamanho da memória em GB que o DAG está usando.

Após vários minutos, a tarefa falhará porque excede o worker do Airflow de 1,875 GB.

Diagnosticar o DAG com falha

Se você estava executando várias tarefas no momento da falha, considere executar a apenas uma tarefa e diagnosticar a pressão de recursos durante esse tempo para identificar quais tarefas causam pressão de recursos e quais recursos você precisa aumentar.

revisar os registros de tarefas do Airflow

A tarefa do DAG create_list_with_many_strings tem um Failed.

Analise os registros da tarefa. A seguinte entrada de registro vai aparecer:

```none
{local_task_job.py:102} INFO - Task exited with return code
Negsignal.SIGKILL
```

`Netsignal.SIGKILL` might be an indication of your task using more memory
than the Airflow worker is allocated. The system sends
the `Negsignal.SIGKILL` signal to avoid further memory consumption.

Revise as cargas de trabalho

Revise as cargas de trabalho para verificar se a carga da tarefa não faz com que o nó em que o pod é executado para exceder o limite de consumo de memória:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Configuração do ambiente.

  4. Em Recursos > Cluster do GKE > Cargas de trabalho, clique em Ver cargas de trabalho do cluster.

  5. Verifique se alguns dos pods de carga de trabalho têm status semelhante a este:

    Error with exit code 137 and 1 more issue.
    ContainerStatusUnknown with exit code 137 and 1 more issue
    

    Exit code 137 significa que um contêiner ou pod está tentando usar mais memória do que o permitido. O processo é encerrado para evitar o uso da memória.

Analisar a integridade do ambiente e o monitoramento do consumo de recursos

Revise a integridade do ambiente e o monitoramento do consumo de recursos:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Monitoramento e selecione Visão geral.

  4. No painel Visão geral do ambiente, localize o Gráfico Integridade do ambiente (DAG de monitoramento do Airflow). Ela contém área, que corresponde à hora em que os registros começaram a mostrar erros.

  5. Selecione Workers e encontre o gráfico Uso total da memória dos workers. Observe que a linha Uso da memória tem um pico no momento em que estava em execução.

.
A linha de uso da memória tem um pico no momento em que
    a tarefa estava em execução
Figura 1. Gráfico de uso total da memória dos workers (clique para ampliar)
.

Mesmo que a linha de uso da memória no gráfico não atinja o limite, ao diagnosticar os motivos da falha, é preciso levar em conta somente memória alocável, e a linha Limite de memória no gráfico representa a memória total. disponíveis, incluindo a capacidade reservada pelo GKE.

Neste exemplo, o limite de memória do worker é de 1,875 GB. GKE reserva 25% dos primeiros 4 GiB de memória. O GKE também reserva uma eviction-threshold: 100 MiB de memória em cada nó para remoção de kubelet.

A memória alocável é calculada da seguinte maneira:

ALLOCATABLE = CAPACITY - RESERVED - EVICTION-THRESHOLD

Se o limite de memória for de 1,875 GB, a memória alocável real será:

1.75 GiB (1.875GB) - 0.44 (25% GiB reserved) - 0.1 = 1.21 GiB (~1.3 GB).

Ao anexar esse limite real ao gráfico de uso da memória, você verá que o pico do uso de memória da tarefa atinge a memória real e concluir que a tarefa falhou devido à insuficiência de workers memória.

Aumentar o limite de memória do worker

Aloque mais memória do worker para que o DAG de amostra funcione:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Configuração do ambiente.

  4. Encontre a configuração Resources > Workloads e Clique em Editar.

  5. Na seção Worker, no campo Memory, especifique a nova memória. de workers do Airflow. Neste tutorial, use 3 GB.

  6. Salve as mudanças e aguarde alguns minutos para que os workers do Airflow reiniciar.

Teste o DAG com o novo limite de memória

Acione o DAG create_list_with_many_strings novamente e aguarde até que ele termina de ser executado.

  1. Nos registros de saída da execução do DAG, você verá Marking task as SUCCESS, e o estado da tarefa vai indicar Sucesso.

  2. Revise a seção Visão geral do ambiente na guia Monitoramento e verifique se não há áreas vermelhas.

  3. Clique na seção Workers e localize o Uso total da memória dos workers. gráfico. Você vai notar que a linha Limite de memória reflete a mudança no o limite de memória, e a linha Uso da memória está muito abaixo do limite de memória alocável.

Exemplo: diagnosticar problemas de falta de armazenamento

Nesta etapa, você faz upload de dois DAGs que criam arquivos grandes. O primeiro DAG cria um arquivo grande. O segundo DAG cria um arquivo grande e imita uma uma operação de longa duração.

O tamanho do arquivo nos dois DAGs excede o armazenamento de workers padrão do Airflow máximo de 1 GB, mas o segundo DAG tem uma tarefa de espera adicional para artificialmente.

Você investigará as diferenças no comportamento de ambos os DAGs nos próximos etapas.

Fazer upload de um DAG que crie um arquivo grande

Faça upload do DAG de amostra a seguir no ambiente que você criou nas etapas anteriores. Neste tutorial, o DAG é chamado create_large_txt_file_print_logs:

Esse DAG contém uma tarefa que executa as seguintes etapas:

  1. Grava um arquivo localfile.txt de 1,5 GB no armazenamento de workers do Airflow.
  2. Mostra o tamanho do arquivo criado usando o módulo os do Python.
  3. Imprime a duração da execução do DAG a cada um minuto.
.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'create_large_txt_file_print_logs',
    default_args=default_args,
    schedule_interval=None)


def consume():
    size = 1000**2  # bytes in 1 MB
    amount = 100

    def create_file():
        print(f"Start creating a huge file")
        with open("localfile.txt", "ab") as f:
            for j in range(15):
                f.write(os.urandom(amount) * size)
        print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")

    create_file()
    print("Success!")


t1 = PythonOperator(
    task_id='create_huge_file',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0)

Fazer upload de um DAG que crie um arquivo grande em uma operação de longa duração

Para imitar um DAG de longa duração e investigar o impacto da duração da tarefa No estado final, faça upload do segundo DAG de amostra para de nuvem. Neste tutorial, o DAG é chamado long_running_create_large_txt_file_print_logs:

Esse DAG contém uma tarefa que executa as seguintes etapas:

  1. Grava um arquivo localfile.txt de 1,5 GB no armazenamento de workers do Airflow.
  2. Mostra o tamanho do arquivo criado usando o módulo os do Python.
  3. Espera 1 hora e 15 minutos para imitar o tempo necessário para operações com do arquivo, por exemplo, lendo do arquivo.
  4. Imprime a duração da execução do DAG a cada um minuto.
.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'long_running_create_large_txt_file_print_logs',
    default_args=default_args,
    schedule_interval=None)


def consume():
    size = 1000**2  # bytes in 1 MB
    amount = 100

    def create_file():
        print(f"Start creating a huge file")
        with open("localfile.txt", "ab") as f:
            for j in range(15):
                f.write(os.urandom(amount) * size)
        print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")

    create_file()
    for k in range(75):
        time.sleep(60)
        print(f"{k+1} minute")

    print("Success!")


t1 = PythonOperator(
    task_id='create_huge_file',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0)

Acionar DAGs de amostra

Acione o primeiro DAG, create_large_txt_file_print_logs:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na coluna Servidor da Web do Airflow, siga o link Airflow do ambiente.

  3. Na interface da Web do Airflow, na página DAGs, na coluna Links do DAG, clique no botão Acionar DAG.

  4. Clique em Gatilho.

  5. Na página DAGs, clique na tarefa que você acionou e revise a saída. registros para garantir que o DAG começou a ser executado.

  6. Aguarde até que a tarefa que você criou com o create_large_txt_file_print_logs DAG foi concluído. Isso pode levar vários minutos.

  7. Na página DAGs, clique na execução do DAG. Você verá sua tarefa tem um estado Success, mesmo que o limite de armazenamento tenha sido excedido.

Revise os registros da tarefa do Airflow:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

    1. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

    2. Acesse a guia Registros e depois Todos os registros > Registros do Airflow > Workers > Ver na Análise de registros.

    3. Filtre os registros por tipo: mostre apenas mensagens de Erro.

Nos registros, serão exibidas mensagens semelhantes às seguintes:

Worker: warm shutdown (Main Process)

ou

A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.

Esses registros indicam que o pod iniciou o "encerramento com estado salvo". processo, porque o o armazenamento usado excedeu o limite e foi removido em 1 hora. No entanto, a execução do DAG não falhou porque foi concluída dentro do período de carência de encerramento do Kubernetes que é explicado com mais detalhes neste tutorial.

Para ilustrar o conceito do período de carência da rescisão, analise o resultado. do segundo DAG de amostra, long_running_create_large_txt_file_print_logs.

Acione o segundo DAG, long_running_create_large_txt_file_print_logs:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na coluna Servidor da Web do Airflow, siga o link Airflow do ambiente.

  3. Na interface da Web do Airflow, na página DAGs, na coluna Links do DAG, clique no botão Acionar DAG.

  4. Clique em Gatilho.

  5. Na página DAGs, clique na tarefa que você acionou e revise a saída. registros para garantir que o DAG começou a ser executado.

  6. Aguarde a execução do DAG long_running_create_large_txt_file_print_logs falhar. Isso vai levar cerca de uma hora.

Revise os resultados da execução do DAG:

  1. Na página DAGs, clique no long_running_create_large_txt_file_print_logs execução do DAG. Você verá que a tarefa tem um estado Failed e que a duração da execução foi exatamente 1 hora e 5 minutos, que é menos do que o período de espera da tarefa de 1 hora e 15 minutos.

  2. Analise os registros da tarefa. Depois que o DAG criou o arquivo localfile.txt o contêiner do worker do Airflow, o registro imprime que o DAG iniciou aguardando e a duração da execução é mostrada nos registros de tarefas a cada 1 minuto. Neste exemplo, o DAG imprime o registro localfile.txt size: e o tamanho do arquivo localfile.txt será 1,5 GB.

Quando o arquivo gravado no contêiner do worker do Airflow excede o armazenamento limite, a execução do DAG vai falhar. No entanto, a tarefa não falha imediatamente e continua em execução até atingir 1 hora e 5 minutos. Isso acontece porque o Kubernetes não encerra a tarefa imediatamente e mantém em execução para permitir uma hora de tempo de recuperação, conhecida como "graça de encerramento". período". Quando um nó fica sem recursos, o Kubernetes não encerra a Pod imediatamente para lidar com o encerramento de modo normal, a fim de que haja o mínimo sobre o usuário final.

O período de carência do encerramento ajuda os usuários a recuperar arquivos após falhas em tarefas, No entanto, pode causar confusão ao diagnosticar os DAGs. Quando o worker do Airflow quando o limite de armazenamento é excedido, o estado final da tarefa depende da duração do Execução do DAG:

  • Se a execução do DAG exceder o limite de armazenamento do worker, mas for concluída em menos de 1 hora, a tarefa é concluída com um status Success porque foi concluída. dentro do período de carência da rescisão. No entanto, o Kubernetes encerra o pod e o arquivo gravado é excluído do contêiner imediatamente.

  • Se o DAG exceder o limite de armazenamento do worker e for executado por mais de uma hora, o DAG continua em execução por uma hora e pode exceder o limite de armazenamento em milhares antes que o Kubernetes elimine as marcações de pod e Airflow a tarefa como Failed.

.

Diagnosticar o DAG com falha

Se você estava executando várias tarefas no momento da falha, considere executar a apenas uma tarefa e diagnosticar a pressão de recursos durante esse tempo para identificar quais tarefas causam pressão de recursos e quais recursos você precisa aumentar.

Revise os registros de tarefas do segundo DAG, long_running_create_large_txt_file_print_logs:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Registros e depois Todos os registros > Registros do Airflow > Workers > Ver na Análise de registros.

  4. Filtre os registros por tipo: mostre apenas mensagens de Erro.

Nos registros, serão exibidas mensagens semelhantes às seguintes:

Container storage usage of worker reached 155.7% of the limit.

This likely means that the total size of local files generated by your DAGs is
close to the storage limit of worker.

You may need to decrease the storage usage or increase the worker storage limit
in your Cloud Composer environment configuration.

ou

Pod storage usage of worker reached 140.2% of the limit.
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.

This eviction likely means that the total size of dags and plugins folders plus
local files generated by your DAGs exceeds the storage limit of worker.

Please decrease the storage usage or increase the worker storage limit in your
Cloud Composer environment configuration.

Essas mensagens indicam que, à medida que a tarefa avançava, os registros do Airflow eram iniciados erros de impressão quando o tamanho dos arquivos gerados pelo DAG excede o o limite de armazenamento dos workers e o início do período de carência do encerramento. Durante o o período de carência do encerramento, o consumo de armazenamento não voltou ao limite o que levou à remoção de pods após o término do período de carência do encerramento.

Revise a integridade do ambiente e o monitoramento do consumo de recursos:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Monitoramento e selecione Visão geral.

  4. No painel Visão geral do ambiente, localize o Gráfico Integridade do ambiente (DAG de monitoramento do Airflow). Ela contém área, que corresponde à hora em que os registros começaram a mostrar erros.

  5. Selecione Workers e encontre o gráfico Uso total do disco de workers. Observar se a linha Uso de disco tem um pico e excede o limite de disco linha no momento em que a tarefa estava em execução.

.
A linha de uso do disco tem um pico e excede o limite do disco
    linha no momento em que a tarefa estava em execução
Figura 2. Gráfico de uso total do disco de workers (clique para ampliar)
.

Aumentar o limite de armazenamento do worker

Alocar mais armazenamento de workers do Airflow para que o DAG de amostra bem-sucedidos:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Configuração do ambiente.

  4. Encontre a configuração Resources > Workloads e Clique em Editar.

  5. Na seção Worker, no campo Armazenamento, especifique o novo armazenamento. de workers do Airflow. Neste tutorial, defina-o como 2 GB.

  6. Salve as mudanças e aguarde alguns minutos para que os workers do Airflow reiniciar.

Teste o DAG com o novo limite de armazenamento

Acione o DAG long_running_create_large_txt_file_print_logs de novo e aguarde 1 hora e 15 minutos até que a execução termine.

  1. Nos registros de saída da execução do DAG, você verá Marking task as SUCCESS, e o estado da tarefa vai indicar Sucesso, com duração de uma hora e 15 minutos, que é igual ao tempo de espera definido no código DAG.

  2. Revise a seção Visão geral do ambiente na guia Monitoramento e verifique se não há áreas vermelhas.

  3. Clique na seção Workers e encontre o Uso total do disco de workers. gráfico. Você verá que a linha Limite de disco reflete a alteração na o limite de armazenamento e a linha Uso do disco está dentro do intervalo permitido.

Resumo

Neste tutorial, você diagnosticou o motivo da falha do DAG e identificou tipo de recurso que causa pressão depurando dois exemplos de DAGs que falham devido à falta de memória e armazenamento do worker. Depois, você executou os DAGs depois de alocar mais memória e armazenamento para os workers. No entanto, recomendado para otimizar os DAGs (fluxos de trabalho) reduzir o consumo de recursos dos workers, porque não é possível aumentar os recursos além de um certo limite.

Limpar

Para evitar cobranças dos recursos na sua conta do Google Cloud usados neste tutorial, exclua o projeto que contém os recursos ou manter o projeto e excluir os recursos individuais.

Exclua o projeto

  1. No Console do Google Cloud, acesse a página Gerenciar recursos.

    Acessar "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

Excluir recursos individuais

Se você planeja ver vários tutoriais e guias de início rápido, a reutilização de projetos pode evitar que você exceda os limites da cota do projeto.

Exclua o ambiente do Cloud Composer. Você também excluir o bucket do ambiente durante este procedimento.

A seguir