Depure problemas de DAG de falta de memória e falta de armazenamento

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Este tutorial fornece passos para depurar um DAG do Airflow com falhas no Cloud Composer e diagnosticar problemas relacionados com recursos de trabalhadores, como a falta de memória ou espaço de armazenamento dos trabalhadores, com a ajuda de registos e monitorização do ambiente.

Introdução

Este tutorial foca-se em problemas relacionados com recursos para demonstrar formas de depurar um DAG.

A falta de recursos de trabalhadores atribuídos provoca falhas no DAG. Se uma tarefa do Airflow ficar sem memória ou armazenamento, pode ver uma exceção do Airflow, como:

WARNING airflow.exceptions.AirflowException: Task received SIGTERM signal
INFO - Marking task as FAILED.

ou

Task exited with return code Negsignal.SIGKILL

Nestes casos, a recomendação geral é aumentar os recursos do Airflow worker ou reduzir o número de tarefas por worker. No entanto, uma vez que as exceções do Airflow podem ser genéricas, pode ser difícil identificar o recurso específico que está a causar o problema.

Este tutorial explica como pode diagnosticar o motivo de uma falha do DAG e identificar o tipo de recurso que causa problemas depurando dois DAGs de exemplo que falham devido à falta de memória e armazenamento do trabalhador.

Objetivos

  • Executar DAGs de exemplo que falham pelos seguintes motivos:

    • Falta de memória do trabalhador
    • Falta de armazenamento do trabalhador
  • Diagnostique os motivos da falha

  • Aumente os recursos de trabalhadores atribuídos

  • Teste os DAGs com novos limites de recursos

Custos

Este tutorial usa os seguintes componentes faturáveis do Google Cloud:

Quando terminar este tutorial, pode evitar a faturação contínua eliminando os recursos que criou. Para mais detalhes, consulte o artigo Limpe.

Antes de começar

Esta secção descreve as ações necessárias antes de iniciar o tutorial.

Crie e configure um projeto

Para este tutorial, precisa de um Google Cloud projeto. Configure o projeto da seguinte forma:

  1. Na Google Cloud consola, selecione ou crie um projeto:

    Aceder ao seletor de projetos

  2. Certifique-se de que a faturação está ativada para o seu projeto. Saiba como verificar se a faturação está ativada num projeto.

  3. Certifique-se de que o utilizador do Google Cloud projeto tem as seguintes funções para criar os recursos necessários:

    • Administrador de objetos de ambiente e armazenamento (roles/composer.environmentAndStorageObjectAdmin)
    • Administrador do Compute (roles/compute.admin)
    • Editor de monitorização (roles/monitoring.editor)

Ative APIs para o seu projeto

Enable the Cloud Composer API.

Enable the API

Crie o seu ambiente do Cloud Composer

Crie um ambiente do Cloud Composer 2.

Como parte da criação do ambiente, concede a função Extensão do agente de serviço da API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) à conta do agente de serviço do Composer. O Cloud Composer usa esta conta para realizar operações no seu Google Cloud projeto.

Verifique os limites de recursos dos trabalhadores

Verifique os limites de recursos do trabalhador do Airflow no seu ambiente:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  3. Aceda ao separador Configuração do ambiente.

  4. Aceda a Recursos > Configuração das cargas de trabalho > Trabalhador.

  5. Verifique se os valores são 0,5 vCPUs, 1,875 GB de memória e 1 GB de armazenamento. Estes são os limites de recursos do worker do Airflow com os quais vai trabalhar nos passos seguintes deste tutorial.

Exemplo: diagnostique problemas de falta de memória

Carregue o seguinte DAG de exemplo para o ambiente que criou nos passos anteriores. Neste tutorial, este DAG tem o nome create_list_with_many_strings.

Este DAG contém uma tarefa que executa os seguintes passos:

  1. Cria uma lista vazia s.
  2. Executa um ciclo para acrescentar a string More à lista.
  3. Imprime a quantidade de memória que a lista consome e aguarda 1 segundo em cada iteração de 1 minuto.
import time

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import sys
from datetime import timedelta

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'create_list_with_many_strings',
    default_args=default_args,
    schedule_interval=None)


def consume():
    s = []
    for i in range(120):
        for j in range(1000000):
            s.append("More")
        print(f"i={i}; size={sys.getsizeof(s) / (1000**3)}GB")
        time.sleep(1)


t1 = PythonOperator(
    task_id='task0',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0
)

Acione o DAG de amostra

Acione o DAG de exemplo, create_list_with_many_strings:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na coluna Servidor Web do Airflow, siga o link do Airflow para o seu ambiente.

  3. Na interface Web do Airflow, na página DAGs, na coluna Links do seu DAG, clique no botão Acionar DAG.

  4. Clique em Acionador.

  5. Na página DAGs, clique na tarefa que acionou e reveja os registos de saída para se certificar de que o DAG começou a ser executado.

Enquanto a tarefa está em execução, os registos de saída imprimem o tamanho da memória em GB que o DAG está a usar.

Após vários minutos, a tarefa falha porque excede o limite de memória do worker do Airflow de 1,875 GB.

Diagnostique o DAG com falhas

Se estava a executar várias tarefas no momento da falha, considere executar apenas uma tarefa e diagnosticar a pressão dos recursos durante esse período para identificar que tarefas causam pressão dos recursos e que recursos tem de aumentar.

Reveja os registos de tarefas do Airflow

Observe que a tarefa do DAG create_list_with_many_strings tem um estado Failed.

Reveja os registos da tarefa. É apresentada a seguinte entrada de registo:

```none
{local_task_job.py:102} INFO - Task exited with return code
Negsignal.SIGKILL
```

`Netsignal.SIGKILL` might be an indication of your task using more memory
than the Airflow worker is allocated. The system sends
the `Negsignal.SIGKILL` signal to avoid further memory consumption.

Reveja as cargas de trabalho

Reveja as cargas de trabalho para verificar se a carga da tarefa não faz com que o nó onde o pod é executado exceda o limite de consumo de memória:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  3. Aceda ao separador Configuração do ambiente.

  4. Em Recursos > Cluster do GKE > Cargas de trabalho, clique em ver cargas de trabalho do cluster.

  5. Verifique se alguns dos pods de carga de trabalho têm estados semelhantes aos seguintes:

    Error with exit code 137 and 1 more issue.
    ContainerStatusUnknown with exit code 137 and 1 more issue
    

    Exit code 137 significa que um contentor ou um Pod está a tentar usar mais memória do que o permitido. O processo é terminado para evitar a utilização de memória.

Reveja a monitorização do estado do ambiente e do consumo de recursos

Reveja a monitorização do estado do ambiente e do consumo de recursos:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  3. Aceda ao separador Monitorização e selecione Vista geral.

  4. No painel Vista geral do ambiente, localize o gráfico Estado do ambiente (DAG de monitorização do Airflow). Contém uma área vermelha, que corresponde à hora em que os registos começaram a imprimir erros.

  5. Selecione Trabalhadores e encontre o gráfico Utilização de memória total dos trabalhadores. Observe que a linha Utilização de memória tem um pico no momento em que a tarefa estava a ser executada.

A linha de utilização de memória tem um pico no momento em que a tarefa estava em execução
Figura 1. Gráfico de utilização de memória total dos trabalhadores (clique para aumentar)

Embora a linha de utilização de memória no gráfico não atinja o limite, ao diagnosticar os motivos da falha, tem de ter em conta a utilização de memória por trabalhadores individuais. Cada worker usa uma parte da respetiva memória para executar outros contentores que realizam ações necessárias para o funcionamento do worker, como sincronizar os respetivos ficheiros DAG com o contentor do ambiente. A quantidade real de memória disponível para um worker executar tarefas do Airflow é inferior ao limite de memória. Se um trabalhador atingir o limite da memória real disponível, a tarefa executada pode falhar devido a memória insuficiente do trabalhador. Nestes casos, pode observar falhas de tarefas, mesmo que a linha no gráfico de utilização de memória total dos trabalhadores não atinja o limite de memória.

Aumente o limite de memória do trabalhador

Atribua memória adicional ao trabalhador para que o DAG de exemplo seja bem-sucedido:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  3. Aceda ao separador Configuração do ambiente.

  4. Encontre a configuração Resources > Workloads e clique em Editar.

  5. Na secção Worker, no campo Memória, especifique o novo limite de memória para os trabalhadores do Airflow. Neste tutorial, use 3 GB.

  6. Guarde as alterações e aguarde alguns minutos para que os trabalhadores do Airflow sejam reiniciados.

Teste o DAG com o novo limite de memória

Acione novamente o DAG create_list_with_many_strings e aguarde até que termine a execução.

  1. Nos registos de saída da execução do DAG, verá Marking task as SUCCESS e o estado da tarefa indicará Êxito.

  2. Reveja a secção Vista geral do ambiente no separador Monitorização e certifique-se de que não existem áreas vermelhas.

  3. Clique na secção Trabalhadores e encontre o gráfico Utilização de memória total dos trabalhadores. Vai ver que a linha Limite de memória reflete a alteração no limite de memória e que a linha Utilização de memória está muito abaixo do limite de memória alocável real.

Exemplo: diagnosticar problemas de falta de armazenamento

Neste passo, carrega dois DAGs que criam ficheiros grandes. O primeiro DAG cria um ficheiro grande. O segundo DAG cria um ficheiro grande e imita uma operação de longa duração.

O tamanho do ficheiro em ambos os DAGs excede o limite de armazenamento do trabalhador do Airflow predefinido de 1 GB, mas o segundo DAG tem uma tarefa de espera adicional para prolongar artificialmente a respetiva duração.

Vai investigar as diferenças no comportamento de ambos os DAGs nos passos seguintes.

Carregue um DAG que crie um ficheiro grande

Carregue o seguinte DAG de exemplo para o ambiente que criou nos passos anteriores. Neste tutorial, este DAG tem o nome create_large_txt_file_print_logs.

Este DAG contém uma tarefa que executa os seguintes passos:

  1. Escreve um ficheiro de 1,5 GB localfile.txt no armazenamento do worker do Airflow.
  2. Imprime o tamanho do ficheiro criado através do módulo os do Python.
  3. Imprime a duração da execução do DAG a cada 1 minuto.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'create_large_txt_file_print_logs',
    default_args=default_args,
    schedule_interval=None)


def consume():
    size = 1000**2  # bytes in 1 MB
    amount = 100

    def create_file():
        print(f"Start creating a huge file")
        with open("localfile.txt", "ab") as f:
            for j in range(15):
                f.write(os.urandom(amount) * size)
        print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")

    create_file()
    print("Success!")


t1 = PythonOperator(
    task_id='create_huge_file',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0)

Carregue um DAG que crie um ficheiro grande numa operação de execução prolongada

Para imitar um DAG de execução prolongada e investigar o impacto da duração da tarefa no estado final, carregue o segundo DAG de exemplo para o seu ambiente. Neste tutorial, este DAG tem o nome long_running_create_large_txt_file_print_logs.

Este DAG contém uma tarefa que executa os seguintes passos:

  1. Escreve um ficheiro de 1,5 GB localfile.txt no armazenamento do worker do Airflow.
  2. Imprime o tamanho do ficheiro criado através do módulo os do Python.
  3. Aguardar 1 hora e 15 minutos para imitar algum tempo necessário para operações com o ficheiro, por exemplo, ler a partir do ficheiro.
  4. Imprime a duração da execução do DAG a cada 1 minuto.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'long_running_create_large_txt_file_print_logs',
    default_args=default_args,
    schedule_interval=None)


def consume():
    size = 1000**2  # bytes in 1 MB
    amount = 100

    def create_file():
        print(f"Start creating a huge file")
        with open("localfile.txt", "ab") as f:
            for j in range(15):
                f.write(os.urandom(amount) * size)
        print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")

    create_file()
    for k in range(75):
        time.sleep(60)
        print(f"{k+1} minute")

    print("Success!")


t1 = PythonOperator(
    task_id='create_huge_file',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0)

Acione DAGs de amostra

Acione o primeiro DAG, create_large_txt_file_print_logs:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na coluna Servidor Web do Airflow, siga o link do Airflow para o seu ambiente.

  3. Na interface Web do Airflow, na página DAGs, na coluna Links do seu DAG, clique no botão Acionar DAG.

  4. Clique em Acionador.

  5. Na página DAGs, clique na tarefa que acionou e reveja os registos de saída para se certificar de que o DAG começou a ser executado.

  6. Aguarde até que a tarefa criada com o DAG create_large_txt_file_print_logs esteja concluída. Pode demorar vários minutos.

  7. Na página DAGs, clique na execução do DAG. A tarefa tem o estado Success, mesmo que o limite de armazenamento tenha sido excedido.

Reveja os registos do Airflow da tarefa:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

    1. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

    2. Aceda ao separador Registos e, de seguida, a Todos os registos > Registos do Airflow > Trabalhadores > Ver no Explorador de registos.

    3. Filtre os registos por tipo: mostre apenas mensagens de Erro.

Nos registos, verá mensagens semelhantes às seguintes:

Worker: warm shutdown (Main Process)

ou

A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.

Estes registos indicam que o pod iniciou o processo de "encerramento a quente" porque o armazenamento usado excedeu o limite e foi removido em 1 hora. No entanto, a execução do DAG não falhou porque foi concluída dentro do período de tolerância de encerramento do Kubernetes, o que é explicado mais detalhadamente neste tutorial.

Para ilustrar o conceito do período de tolerância de rescisão, reveja o resultado do segundo DAG de exemplo, long_running_create_large_txt_file_print_logs.

Acione o segundo DAG, long_running_create_large_txt_file_print_logs:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na coluna Servidor Web do Airflow, siga o link do Airflow para o seu ambiente.

  3. Na interface Web do Airflow, na página DAGs, na coluna Links do seu DAG, clique no botão Acionar DAG.

  4. Clique em Acionador.

  5. Na página DAGs, clique na tarefa que acionou e reveja os registos de saída para se certificar de que o DAG começou a ser executado.

  6. Aguarde até que a execução do DAG long_running_create_large_txt_file_print_logs falhe. Este processo demora cerca de uma hora.

Reveja os resultados da execução do DAG:

  1. Na página DAGs, clique na execução do DAG long_running_create_large_txt_file_print_logs. Vai ver que a tarefa tem o estado Failed e que a duração da execução foi exatamente 1 hora e 5 minutos, o que é inferior ao período de espera da tarefa de 1 hora e 15 minutos.

  2. Reveja os registos da tarefa. Depois de o DAG criar o ficheiro localfile.txt no contentor do worker do Airflow, o registo indica que o DAG começou a aguardar e a duração da execução é apresentada nos registos de tarefas a cada 1 minuto. Neste exemplo, o DAG imprime o registo localfile.txt size: e o tamanho do ficheiro localfile.txt é de 1,5 GB.

Quando o ficheiro escrito no contentor do trabalhador do Airflow excede o limite de armazenamento, a execução do DAG deve falhar. No entanto, a tarefa não falha imediatamente e continua a ser executada até atingir a duração de 1 hora e 5 minutos. Isto acontece porque o Kubernetes não termina a tarefa imediatamente e mantém-na em execução para permitir 1 hora de tempo de recuperação, conhecido como o "período de tolerância de terminação". Quando um nó fica sem recursos, o Kubernetes não termina o pod imediatamente para processar a terminação de forma elegante, de modo a que o impacto no utilizador final seja mínimo.

O período de tolerância de encerramento ajuda os utilizadores a recuperar ficheiros após falhas de tarefas. No entanto, pode gerar confusão ao diagnosticar DAGs. Quando o limite de armazenamento do worker do Airflow é excedido, o estado da tarefa final depende da duração da execução do DAG:

  • Se a execução do DAG exceder o limite de armazenamento do trabalhador, mas for concluída em menos de 1 hora, a tarefa é concluída com o estado Success porque foi concluída dentro do período de tolerância de encerramento. No entanto, o Kubernetes termina o pod e o ficheiro escrito é eliminado imediatamente do contentor.

  • Se o DAG exceder o limite de armazenamento do trabalhador e for executado durante mais de 1 hora, o DAG continua a ser executado durante 1 hora e pode exceder o limite de armazenamento em milhares de por cento antes de o Kubernetes eliminar o pod e o Airflow marcar a tarefa como Failed.

Diagnostique o DAG com falhas

Se estava a executar várias tarefas no momento da falha, considere executar apenas uma tarefa e diagnosticar a pressão dos recursos durante esse período para identificar que tarefas causam pressão dos recursos e que recursos tem de aumentar.

Reveja os registos de tarefas do segundo DAG, long_running_create_large_txt_file_print_logs:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  3. Aceda ao separador Registos e, de seguida, a Todos os registos > Registos do Airflow > Trabalhadores > Ver no Explorador de registos.

  4. Filtre os registos por tipo: mostre apenas mensagens de Erro.

Nos registos, verá mensagens semelhantes às seguintes:

Container storage usage of worker reached 155.7% of the limit.

This likely means that the total size of local files generated by your DAGs is
close to the storage limit of worker.

You may need to decrease the storage usage or increase the worker storage limit
in your Cloud Composer environment configuration.

ou

Pod storage usage of worker reached 140.2% of the limit.
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.

This eviction likely means that the total size of dags and plugins folders plus
local files generated by your DAGs exceeds the storage limit of worker.

Please decrease the storage usage or increase the worker storage limit in your
Cloud Composer environment configuration.

Estas mensagens indicam que, à medida que a tarefa progredia, os registos do Airflow começaram a imprimir erros quando o tamanho dos ficheiros gerados pelo seu DAG excedeu o limite de armazenamento do worker e o período de tolerância de encerramento começou. Durante o período de tolerância de encerramento, o consumo de armazenamento não regressou ao limite, o que levou à remoção do pod após o fim do período de tolerância de encerramento.

Reveja a monitorização do estado do ambiente e do consumo de recursos:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  3. Aceda ao separador Monitorização e selecione Vista geral.

  4. No painel Vista geral do ambiente, localize o gráfico Estado do ambiente (DAG de monitorização do Airflow). Contém uma área vermelha, que corresponde à hora em que os registos começaram a imprimir erros.

  5. Selecione Trabalhadores e encontre o gráfico Utilização de disco total dos trabalhadores. Observe que a linha Utilização do disco tem um pico e excede a linha Limite do disco no momento em que a sua tarefa estava em execução.

A linha de utilização do disco tem um pico e excede a linha de limite do disco no momento em que a sua tarefa estava em execução
Figura 2. Gráfico de utilização do disco dos trabalhadores totais (clique para ampliar)

Aumente o limite de armazenamento do trabalhador

Atribua armazenamento adicional ao trabalhador do Airflow para que o DAG de exemplo seja bem-sucedido:

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

  3. Aceda ao separador Configuração do ambiente.

  4. Encontre a configuração Resources > Workloads e clique em Editar.

  5. Na secção Worker, no campo Armazenamento, especifique o novo limite de armazenamento para os workers do Airflow. Neste tutorial, defina-o como 2 GB.

  6. Guarde as alterações e aguarde alguns minutos para que os trabalhadores do Airflow sejam reiniciados.

Teste o seu DAG com o novo limite de armazenamento

Acione novamente o DAG long_running_create_large_txt_file_print_logs e aguarde 1 hora e 15 minutos até terminar a execução.

  1. Nos registos de saída da execução do DAG, verá Marking task as SUCCESS e o estado da tarefa vai indicar Sucesso, com uma duração de 1 hora e 15 minutos, o que equivale ao tempo de espera definido no código do DAG.

  2. Reveja a secção Vista geral do ambiente no separador Monitorização e certifique-se de que não existem áreas vermelhas.

  3. Clique na secção Trabalhadores e encontre o gráfico Utilização do disco total dos trabalhadores. Vai ver que a linha Limite do disco reflete a alteração no limite de armazenamento e que a linha Utilização do disco está dentro do intervalo permitido.

Resumo

Neste tutorial, diagnosticou o motivo de uma falha do DAG e identificou o tipo de recurso que causa pressão ao depurar dois DAGs de exemplo que falham devido à falta de memória e armazenamento do trabalhador. Em seguida, executou os DAGs com êxito depois de atribuir mais memória e armazenamento aos seus trabalhadores. No entanto, é recomendado que otimize os seus DAGs (fluxos de trabalho) para reduzir o consumo de recursos dos trabalhadores em primeiro lugar, porque não é possível aumentar os recursos além de um determinado limite.

Limpar

Para evitar incorrer em custos na sua Google Cloud conta pelos recursos usados neste tutorial, elimine o projeto que contém os recursos ou mantenha o projeto e elimine os recursos individuais.

Elimine o projeto

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Elimine recursos individuais

Se planeia explorar vários tutoriais e inícios rápidos, a reutilização de projetos pode ajudar a evitar exceder os limites de quota do projeto.

Elimine o ambiente do Cloud Composer. Também elimina o contentor do ambiente durante este procedimento.

O que se segue?