Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Este tutorial fornece passos para depurar um DAG do Airflow com falhas no Cloud Composer e diagnosticar problemas relacionados com recursos de trabalhadores, como a falta de memória ou espaço de armazenamento dos trabalhadores, com a ajuda de registos e monitorização do ambiente.
Introdução
Este tutorial foca-se em problemas relacionados com recursos para demonstrar formas de depurar um DAG.
A falta de recursos de trabalhadores atribuídos provoca falhas no DAG. Se uma tarefa do Airflow ficar sem memória ou armazenamento, pode ver uma exceção do Airflow, como:
WARNING airflow.exceptions.AirflowException: Task received SIGTERM signal
INFO - Marking task as FAILED.
ou
Task exited with return code Negsignal.SIGKILL
Nestes casos, a recomendação geral é aumentar os recursos do Airflow worker ou reduzir o número de tarefas por worker. No entanto, uma vez que as exceções do Airflow podem ser genéricas, pode ser difícil identificar o recurso específico que está a causar o problema.
Este tutorial explica como pode diagnosticar o motivo de uma falha do DAG e identificar o tipo de recurso que causa problemas depurando dois DAGs de exemplo que falham devido à falta de memória e armazenamento do trabalhador.
Objetivos
Executar DAGs de exemplo que falham pelos seguintes motivos:
- Falta de memória do trabalhador
- Falta de armazenamento do trabalhador
Diagnostique os motivos da falha
Aumente os recursos de trabalhadores atribuídos
Teste os DAGs com novos limites de recursos
Custos
Este tutorial usa os seguintes componentes faturáveis do Google Cloud:
- Cloud Composer (consulte os custos adicionais)
- Cloud Monitoring
Quando terminar este tutorial, pode evitar a faturação contínua eliminando os recursos que criou. Para mais detalhes, consulte o artigo Limpe.
Antes de começar
Esta secção descreve as ações necessárias antes de iniciar o tutorial.
Crie e configure um projeto
Para este tutorial, precisa de um Google Cloud projeto. Configure o projeto da seguinte forma:
Na Google Cloud consola, selecione ou crie um projeto:
Certifique-se de que a faturação está ativada para o seu projeto. Saiba como verificar se a faturação está ativada num projeto.
Certifique-se de que o utilizador do Google Cloud projeto tem as seguintes funções para criar os recursos necessários:
- Administrador de objetos de ambiente e armazenamento
(
roles/composer.environmentAndStorageObjectAdmin
) - Administrador do Compute (
roles/compute.admin
) - Editor de monitorização (
roles/monitoring.editor
)
- Administrador de objetos de ambiente e armazenamento
(
Ative APIs para o seu projeto
Enable the Cloud Composer API.
Crie o seu ambiente do Cloud Composer
Crie um ambiente do Cloud Composer 2.
Como parte da criação do ambiente,
concede a função Extensão do agente de serviço da API Cloud Composer v2
(roles/composer.ServiceAgentV2Ext
) à conta do agente de serviço do Composer. O Cloud Composer usa esta conta para realizar operações no seu Google Cloud projeto.
Verifique os limites de recursos dos trabalhadores
Verifique os limites de recursos do trabalhador do Airflow no seu ambiente:
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.
Aceda ao separador Configuração do ambiente.
Aceda a Recursos > Configuração das cargas de trabalho > Trabalhador.
Verifique se os valores são 0,5 vCPUs, 1,875 GB de memória e 1 GB de armazenamento. Estes são os limites de recursos do worker do Airflow com os quais vai trabalhar nos passos seguintes deste tutorial.
Exemplo: diagnostique problemas de falta de memória
Carregue o seguinte DAG de exemplo para o ambiente
que criou nos passos anteriores. Neste tutorial, este DAG tem o nome create_list_with_many_strings
.
Este DAG contém uma tarefa que executa os seguintes passos:
- Cria uma lista vazia
s
. - Executa um ciclo para acrescentar a string
More
à lista. - Imprime a quantidade de memória que a lista consome e aguarda 1 segundo em cada iteração de 1 minuto.
import time
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import sys
from datetime import timedelta
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'create_list_with_many_strings',
default_args=default_args,
schedule_interval=None)
def consume():
s = []
for i in range(120):
for j in range(1000000):
s.append("More")
print(f"i={i}; size={sys.getsizeof(s) / (1000**3)}GB")
time.sleep(1)
t1 = PythonOperator(
task_id='task0',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0
)
Acione o DAG de amostra
Acione o DAG de exemplo, create_list_with_many_strings
:
Na Google Cloud consola, aceda à página Ambientes.
Na coluna Servidor Web do Airflow, siga o link do Airflow para o seu ambiente.
Na interface Web do Airflow, na página DAGs, na coluna Links do seu DAG, clique no botão Acionar DAG.
Clique em Acionador.
Na página DAGs, clique na tarefa que acionou e reveja os registos de saída para se certificar de que o DAG começou a ser executado.
Enquanto a tarefa está em execução, os registos de saída imprimem o tamanho da memória em GB que o DAG está a usar.
Após vários minutos, a tarefa falha porque excede o limite de memória do worker do Airflow de 1,875 GB.
Diagnostique o DAG com falhas
Se estava a executar várias tarefas no momento da falha, considere executar apenas uma tarefa e diagnosticar a pressão dos recursos durante esse período para identificar que tarefas causam pressão dos recursos e que recursos tem de aumentar.
Reveja os registos de tarefas do Airflow
Observe que a tarefa do DAG create_list_with_many_strings
tem um estado Failed
.
Reveja os registos da tarefa. É apresentada a seguinte entrada de registo:
```none
{local_task_job.py:102} INFO - Task exited with return code
Negsignal.SIGKILL
```
`Netsignal.SIGKILL` might be an indication of your task using more memory
than the Airflow worker is allocated. The system sends
the `Negsignal.SIGKILL` signal to avoid further memory consumption.
Reveja as cargas de trabalho
Reveja as cargas de trabalho para verificar se a carga da tarefa não faz com que o nó onde o pod é executado exceda o limite de consumo de memória:
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.
Aceda ao separador Configuração do ambiente.
Em Recursos > Cluster do GKE > Cargas de trabalho, clique em ver cargas de trabalho do cluster.
Verifique se alguns dos pods de carga de trabalho têm estados semelhantes aos seguintes:
Error with exit code 137 and 1 more issue. ContainerStatusUnknown with exit code 137 and 1 more issue
Exit code 137
significa que um contentor ou um Pod está a tentar usar mais memória do que o permitido. O processo é terminado para evitar a utilização de memória.
Reveja a monitorização do estado do ambiente e do consumo de recursos
Reveja a monitorização do estado do ambiente e do consumo de recursos:
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.
Aceda ao separador Monitorização e selecione Vista geral.
No painel Vista geral do ambiente, localize o gráfico Estado do ambiente (DAG de monitorização do Airflow). Contém uma área vermelha, que corresponde à hora em que os registos começaram a imprimir erros.
Selecione Trabalhadores e encontre o gráfico Utilização de memória total dos trabalhadores. Observe que a linha Utilização de memória tem um pico no momento em que a tarefa estava a ser executada.

Embora a linha de utilização de memória no gráfico não atinja o limite, ao diagnosticar os motivos da falha, tem de ter em conta a utilização de memória por trabalhadores individuais. Cada worker usa uma parte da respetiva memória para executar outros contentores que realizam ações necessárias para o funcionamento do worker, como sincronizar os respetivos ficheiros DAG com o contentor do ambiente. A quantidade real de memória disponível para um worker executar tarefas do Airflow é inferior ao limite de memória. Se um trabalhador atingir o limite da memória real disponível, a tarefa executada pode falhar devido a memória insuficiente do trabalhador. Nestes casos, pode observar falhas de tarefas, mesmo que a linha no gráfico de utilização de memória total dos trabalhadores não atinja o limite de memória.
Aumente o limite de memória do trabalhador
Atribua memória adicional ao trabalhador para que o DAG de exemplo seja bem-sucedido:
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.
Aceda ao separador Configuração do ambiente.
Encontre a configuração Resources > Workloads e clique em Editar.
Na secção Worker, no campo Memória, especifique o novo limite de memória para os trabalhadores do Airflow. Neste tutorial, use 3 GB.
Guarde as alterações e aguarde alguns minutos para que os trabalhadores do Airflow sejam reiniciados.
Teste o DAG com o novo limite de memória
Acione novamente o DAG create_list_with_many_strings
e aguarde até que termine a execução.
Nos registos de saída da execução do DAG, verá
Marking task as SUCCESS
e o estado da tarefa indicará Êxito.Reveja a secção Vista geral do ambiente no separador Monitorização e certifique-se de que não existem áreas vermelhas.
Clique na secção Trabalhadores e encontre o gráfico Utilização de memória total dos trabalhadores. Vai ver que a linha Limite de memória reflete a alteração no limite de memória e que a linha Utilização de memória está muito abaixo do limite de memória alocável real.
Exemplo: diagnosticar problemas de falta de armazenamento
Neste passo, carrega dois DAGs que criam ficheiros grandes. O primeiro DAG cria um ficheiro grande. O segundo DAG cria um ficheiro grande e imita uma operação de longa duração.
O tamanho do ficheiro em ambos os DAGs excede o limite de armazenamento do trabalhador do Airflow predefinido de 1 GB, mas o segundo DAG tem uma tarefa de espera adicional para prolongar artificialmente a respetiva duração.
Vai investigar as diferenças no comportamento de ambos os DAGs nos passos seguintes.
Carregue um DAG que crie um ficheiro grande
Carregue o seguinte DAG de exemplo para o ambiente
que criou nos passos anteriores. Neste tutorial, este DAG tem o nome create_large_txt_file_print_logs
.
Este DAG contém uma tarefa que executa os seguintes passos:
- Escreve um ficheiro de 1,5 GB
localfile.txt
no armazenamento do worker do Airflow. - Imprime o tamanho do ficheiro criado através do módulo
os
do Python. - Imprime a duração da execução do DAG a cada 1 minuto.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'create_large_txt_file_print_logs',
default_args=default_args,
schedule_interval=None)
def consume():
size = 1000**2 # bytes in 1 MB
amount = 100
def create_file():
print(f"Start creating a huge file")
with open("localfile.txt", "ab") as f:
for j in range(15):
f.write(os.urandom(amount) * size)
print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")
create_file()
print("Success!")
t1 = PythonOperator(
task_id='create_huge_file',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0)
Carregue um DAG que crie um ficheiro grande numa operação de execução prolongada
Para imitar um DAG de execução prolongada e investigar o impacto da duração da tarefa no estado final, carregue o segundo DAG de exemplo para o seu ambiente. Neste tutorial, este DAG tem o nome long_running_create_large_txt_file_print_logs
.
Este DAG contém uma tarefa que executa os seguintes passos:
- Escreve um ficheiro de 1,5 GB
localfile.txt
no armazenamento do worker do Airflow. - Imprime o tamanho do ficheiro criado através do módulo
os
do Python. - Aguardar 1 hora e 15 minutos para imitar algum tempo necessário para operações com o ficheiro, por exemplo, ler a partir do ficheiro.
- Imprime a duração da execução do DAG a cada 1 minuto.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'long_running_create_large_txt_file_print_logs',
default_args=default_args,
schedule_interval=None)
def consume():
size = 1000**2 # bytes in 1 MB
amount = 100
def create_file():
print(f"Start creating a huge file")
with open("localfile.txt", "ab") as f:
for j in range(15):
f.write(os.urandom(amount) * size)
print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")
create_file()
for k in range(75):
time.sleep(60)
print(f"{k+1} minute")
print("Success!")
t1 = PythonOperator(
task_id='create_huge_file',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0)
Acione DAGs de amostra
Acione o primeiro DAG, create_large_txt_file_print_logs
:
Na Google Cloud consola, aceda à página Ambientes.
Na coluna Servidor Web do Airflow, siga o link do Airflow para o seu ambiente.
Na interface Web do Airflow, na página DAGs, na coluna Links do seu DAG, clique no botão Acionar DAG.
Clique em Acionador.
Na página DAGs, clique na tarefa que acionou e reveja os registos de saída para se certificar de que o DAG começou a ser executado.
Aguarde até que a tarefa criada com o DAG
create_large_txt_file_print_logs
esteja concluída. Pode demorar vários minutos.Na página DAGs, clique na execução do DAG. A tarefa tem o estado
Success
, mesmo que o limite de armazenamento tenha sido excedido.
Reveja os registos do Airflow da tarefa:
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.
Aceda ao separador Registos e, de seguida, a Todos os registos > Registos do Airflow > Trabalhadores > Ver no Explorador de registos.
Filtre os registos por tipo: mostre apenas mensagens de Erro.
Nos registos, verá mensagens semelhantes às seguintes:
Worker: warm shutdown (Main Process)
ou
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.
Estes registos indicam que o pod iniciou o processo de "encerramento a quente" porque o armazenamento usado excedeu o limite e foi removido em 1 hora. No entanto, a execução do DAG não falhou porque foi concluída dentro do período de tolerância de encerramento do Kubernetes, o que é explicado mais detalhadamente neste tutorial.
Para ilustrar o conceito do período de tolerância de rescisão, reveja o resultado do segundo DAG de exemplo, long_running_create_large_txt_file_print_logs
.
Acione o segundo DAG, long_running_create_large_txt_file_print_logs
:
Na Google Cloud consola, aceda à página Ambientes.
Na coluna Servidor Web do Airflow, siga o link do Airflow para o seu ambiente.
Na interface Web do Airflow, na página DAGs, na coluna Links do seu DAG, clique no botão Acionar DAG.
Clique em Acionador.
Na página DAGs, clique na tarefa que acionou e reveja os registos de saída para se certificar de que o DAG começou a ser executado.
Aguarde até que a execução do DAG
long_running_create_large_txt_file_print_logs
falhe. Este processo demora cerca de uma hora.
Reveja os resultados da execução do DAG:
Na página DAGs, clique na execução do DAG
long_running_create_large_txt_file_print_logs
. Vai ver que a tarefa tem o estadoFailed
e que a duração da execução foi exatamente 1 hora e 5 minutos, o que é inferior ao período de espera da tarefa de 1 hora e 15 minutos.Reveja os registos da tarefa. Depois de o DAG criar o ficheiro
localfile.txt
no contentor do worker do Airflow, o registo indica que o DAG começou a aguardar e a duração da execução é apresentada nos registos de tarefas a cada 1 minuto. Neste exemplo, o DAG imprime o registolocalfile.txt size:
e o tamanho do ficheirolocalfile.txt
é de 1,5 GB.
Quando o ficheiro escrito no contentor do trabalhador do Airflow excede o limite de armazenamento, a execução do DAG deve falhar. No entanto, a tarefa não falha imediatamente e continua a ser executada até atingir a duração de 1 hora e 5 minutos. Isto acontece porque o Kubernetes não termina a tarefa imediatamente e mantém-na em execução para permitir 1 hora de tempo de recuperação, conhecido como o "período de tolerância de terminação". Quando um nó fica sem recursos, o Kubernetes não termina o pod imediatamente para processar a terminação de forma elegante, de modo a que o impacto no utilizador final seja mínimo.
O período de tolerância de encerramento ajuda os utilizadores a recuperar ficheiros após falhas de tarefas. No entanto, pode gerar confusão ao diagnosticar DAGs. Quando o limite de armazenamento do worker do Airflow é excedido, o estado da tarefa final depende da duração da execução do DAG:
Se a execução do DAG exceder o limite de armazenamento do trabalhador, mas for concluída em menos de 1 hora, a tarefa é concluída com o estado
Success
porque foi concluída dentro do período de tolerância de encerramento. No entanto, o Kubernetes termina o pod e o ficheiro escrito é eliminado imediatamente do contentor.Se o DAG exceder o limite de armazenamento do trabalhador e for executado durante mais de 1 hora, o DAG continua a ser executado durante 1 hora e pode exceder o limite de armazenamento em milhares de por cento antes de o Kubernetes eliminar o pod e o Airflow marcar a tarefa como
Failed
.
Diagnostique o DAG com falhas
Se estava a executar várias tarefas no momento da falha, considere executar apenas uma tarefa e diagnosticar a pressão dos recursos durante esse período para identificar que tarefas causam pressão dos recursos e que recursos tem de aumentar.
Reveja os registos de tarefas do segundo DAG,
long_running_create_large_txt_file_print_logs
:
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.
Aceda ao separador Registos e, de seguida, a Todos os registos > Registos do Airflow > Trabalhadores > Ver no Explorador de registos.
Filtre os registos por tipo: mostre apenas mensagens de Erro.
Nos registos, verá mensagens semelhantes às seguintes:
Container storage usage of worker reached 155.7% of the limit.
This likely means that the total size of local files generated by your DAGs is
close to the storage limit of worker.
You may need to decrease the storage usage or increase the worker storage limit
in your Cloud Composer environment configuration.
ou
Pod storage usage of worker reached 140.2% of the limit.
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.
This eviction likely means that the total size of dags and plugins folders plus
local files generated by your DAGs exceeds the storage limit of worker.
Please decrease the storage usage or increase the worker storage limit in your
Cloud Composer environment configuration.
Estas mensagens indicam que, à medida que a tarefa progredia, os registos do Airflow começaram a imprimir erros quando o tamanho dos ficheiros gerados pelo seu DAG excedeu o limite de armazenamento do worker e o período de tolerância de encerramento começou. Durante o período de tolerância de encerramento, o consumo de armazenamento não regressou ao limite, o que levou à remoção do pod após o fim do período de tolerância de encerramento.
Reveja a monitorização do estado do ambiente e do consumo de recursos:
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.
Aceda ao separador Monitorização e selecione Vista geral.
No painel Vista geral do ambiente, localize o gráfico Estado do ambiente (DAG de monitorização do Airflow). Contém uma área vermelha, que corresponde à hora em que os registos começaram a imprimir erros.
Selecione Trabalhadores e encontre o gráfico Utilização de disco total dos trabalhadores. Observe que a linha Utilização do disco tem um pico e excede a linha Limite do disco no momento em que a sua tarefa estava em execução.

Aumente o limite de armazenamento do trabalhador
Atribua armazenamento adicional ao trabalhador do Airflow para que o DAG de exemplo seja bem-sucedido:
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.
Aceda ao separador Configuração do ambiente.
Encontre a configuração Resources > Workloads e clique em Editar.
Na secção Worker, no campo Armazenamento, especifique o novo limite de armazenamento para os workers do Airflow. Neste tutorial, defina-o como 2 GB.
Guarde as alterações e aguarde alguns minutos para que os trabalhadores do Airflow sejam reiniciados.
Teste o seu DAG com o novo limite de armazenamento
Acione novamente o DAG long_running_create_large_txt_file_print_logs
e aguarde 1 hora e 15 minutos até terminar a execução.
Nos registos de saída da execução do DAG, verá
Marking task as SUCCESS
e o estado da tarefa vai indicar Sucesso, com uma duração de 1 hora e 15 minutos, o que equivale ao tempo de espera definido no código do DAG.Reveja a secção Vista geral do ambiente no separador Monitorização e certifique-se de que não existem áreas vermelhas.
Clique na secção Trabalhadores e encontre o gráfico Utilização do disco total dos trabalhadores. Vai ver que a linha Limite do disco reflete a alteração no limite de armazenamento e que a linha Utilização do disco está dentro do intervalo permitido.
Resumo
Neste tutorial, diagnosticou o motivo de uma falha do DAG e identificou o tipo de recurso que causa pressão ao depurar dois DAGs de exemplo que falham devido à falta de memória e armazenamento do trabalhador. Em seguida, executou os DAGs com êxito depois de atribuir mais memória e armazenamento aos seus trabalhadores. No entanto, é recomendado que otimize os seus DAGs (fluxos de trabalho) para reduzir o consumo de recursos dos trabalhadores em primeiro lugar, porque não é possível aumentar os recursos além de um determinado limite.
Limpar
Para evitar incorrer em custos na sua Google Cloud conta pelos recursos usados neste tutorial, elimine o projeto que contém os recursos ou mantenha o projeto e elimine os recursos individuais.
Elimine o projeto
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Elimine recursos individuais
Se planeia explorar vários tutoriais e inícios rápidos, a reutilização de projetos pode ajudar a evitar exceder os limites de quota do projeto.
Elimine o ambiente do Cloud Composer. Também elimina o contentor do ambiente durante este procedimento.