Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本教程提供了相关步骤,可帮助您在 Cloud Composer 中调试失败的 Airflow DAG,并借助日志和环境监控来诊断与工作器资源相关的问题,例如工作器内存或存储空间不足。
简介
本教程重点介绍与资源相关的问题,以演示如何调试 DAG。
未分配工作器资源会导致 DAG 失败。如果 Airflow 任务耗尽内存或存储空间,您可能会看到 Airflow 异常,例如:
WARNING airflow.exceptions.AirflowException: Task received SIGTERM signal
INFO - Marking task as FAILED.
或
Task exited with return code Negsignal.SIGKILL
在这种情况下,一般建议是增加 Airflow 工作器资源或减少每个工作器的任务数量。不过,由于 Airflow 异常可能比较笼统,因此可能难以确定导致问题的特定资源。
本教程将通过调试两个因工作器内存和存储空间不足而失败的示例 DAG,说明如何诊断 DAG 失败的原因并确定导致问题的资源类型。
目标
运行因以下原因而失败的示例 DAG:
- 工作器内存不足
- 工作器存储空间不足
诊断失败原因
增加分配的工作器资源
测试具有新资源限制的 DAG
费用
本教程使用 Google Cloud的以下收费组件:
完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理。
准备工作
本部分介绍了在开始本教程之前需要执行的操作。
创建和配置项目
在本教程中,您需要一个 Google Cloud 项目。 按如下方式配置项目:
在 Google Cloud 控制台中,选择或创建一个项目:
确保您的项目已启用结算功能。 了解如何检查项目是否已启用结算功能。
确保您的 Google Cloud 项目用户拥有以下角色,以便创建必要的资源:
- Environment and Storage Object Administrator
(
roles/composer.environmentAndStorageObjectAdmin
) - Compute Admin (
roles/compute.admin
) - Monitoring Editor (
roles/monitoring.editor
)
- Environment and Storage Object Administrator
(
为您的项目启用 API
Enable the Cloud Composer API.
创建 Cloud Composer 环境
在创建环境的过程中,您会向 Composer 服务代理账号授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext
) 角色。Cloud Composer 使用此账号在您的 Google Cloud 项目中执行操作。
检查工作器资源限制
检查您环境中的 Airflow 工作器资源限制:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
转到环境配置标签页。
依次前往资源 > 工作负载配置 > Worker。
检查这些值是否为 0.5 个 vCPU、1.875 GB 内存和 1 GB 存储空间。这些是您将在本教程的后续步骤中使用的 Airflow worker 资源限制。
示例:诊断内存不足问题
将以下示例 DAG 上传到您在之前步骤中创建的环境。在本教程中,此 DAG 的名称为 create_list_with_many_strings
。
此 DAG 包含一个执行以下步骤的任务:
- 创建空列表
s
。 - 运行一个循环,将
More
字符串附加到列表中。 - 打印列表消耗的内存量,并在每分钟的迭代中等待 1 秒。
import time
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import sys
from datetime import timedelta
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'create_list_with_many_strings',
default_args=default_args,
schedule_interval=None)
def consume():
s = []
for i in range(120):
for j in range(1000000):
s.append("More")
print(f"i={i}; size={sys.getsizeof(s) / (1000**3)}GB")
time.sleep(1)
t1 = PythonOperator(
task_id='task0',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0
)
触发示例 DAG
触发示例 DAG create_list_with_many_strings
:
在 Google Cloud 控制台中,前往环境页面。
在 Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。
在 Airflow 网页界面中的 DAG 页面上,在您的 DAG 的链接列中,点击触发 Dag (Trigger Dag) 按钮。
点击触发器。
在 DAG 页面上,点击您触发的任务,然后查看输出日志,确保您的 DAG 已开始运行。
任务运行时,输出日志会打印 DAG 正在使用的内存大小(以 GB 为单位)。
几分钟后,该任务将因超出 Airflow 工作器的 1.875 GB 内存限制而失败。
诊断失败的 DAG
如果您在发生故障时运行了多个任务,请考虑仅运行一个任务,并在该时间段内诊断资源压力,以确定哪些任务会导致资源压力,以及需要增加哪些资源。
查看 Airflow 任务日志
观察到 create_list_with_many_strings
DAG 中的任务处于 Failed
状态。
查看任务的日志。您将看到以下日志条目:
```none
{local_task_job.py:102} INFO - Task exited with return code
Negsignal.SIGKILL
```
`Netsignal.SIGKILL` might be an indication of your task using more memory
than the Airflow worker is allocated. The system sends
the `Negsignal.SIGKILL` signal to avoid further memory consumption.
查看工作负载
检查工作负载,确保任务的负载不会导致运行 Pod 的节点超出内存消耗限制:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
转到环境配置标签页。
在资源 > GKE 集群 > 工作负载中,点击查看集群工作负载。
检查部分工作负载 pod 的状态是否类似于以下状态:
Error with exit code 137 and 1 more issue. ContainerStatusUnknown with exit code 137 and 1 more issue
Exit code 137
表示容器或 Pod 尝试使用的内存超出允许的范围。进程会被终止,以防止内存使用。
查看环境健康状况和资源消耗监控
查看环境健康状况和资源消耗监控:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
前往监控标签页,然后选择概览。
在环境概览面板上,找到环境健康情况(Airflow 监控 DAG)图表。它包含一个红色区域,对应于日志开始打印错误的时间。
选择工作器,然后找到工作器内存总用量图表。 请注意,在任务运行时,内存用量线出现峰值。

即使图表上的内存用量线未达到限制,在诊断失败原因时,您也需要考虑各个工作器的内存用量。每个工作器都会使用一部分内存来运行其他容器,这些容器会执行工作器运行所需的必要操作,例如将其 DAG 文件与环境的存储桶同步。工作器可用于执行 Airflow 任务的实际内存量小于其内存限制。如果工作器达到其可用实际内存的上限,则执行的任务可能会因工作器内存不足而失败。在这种情况下,即使工作器内存总用量图表中的线未达到内存上限,您也可能会看到任务失败。
提高工作器内存上限
分配额外的工作器内存,以使示例 DAG 成功运行:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
转到环境配置标签页。
找到资源 > 工作负载配置,然后点击修改。
在工作器部分的内存字段中,指定 Airflow 工作器的新内存限制。在本教程中,请使用 3 GB。
保存更改,并等待几分钟,让 Airflow 工作器重新启动。
使用新的内存限制测试 DAG
再次触发 create_list_with_many_strings
DAG,然后等待其运行完成。
在 DAG 运行的输出日志中,您会看到
Marking task as SUCCESS
,并且任务的状态会显示为 Success。查看监控标签页中的环境概览部分,确保没有红色区域。
点击工作器部分,然后找到工作器内存总用量图表。您会看到,内存上限线反映了内存上限的变化,而内存用量线远低于实际可分配的内存上限。
示例:诊断存储空间不足问题
在此步骤中,您将上传两个用于创建大型文件的 DAG。第一个 DAG 会创建一个大文件。第二个 DAG 会创建一个大文件并模拟长时间运行的操作。
两个 DAG 中的文件大小均超过了默认的 Airflow 工作器存储空间限制 (1 GB),但第二个 DAG 具有额外的等待任务,可人为延长其持续时间。
在接下来的步骤中,您将调查这两个 DAG 在行为方面的差异。
上传可创建大型文件的 DAG
将以下示例 DAG 上传到您在之前步骤中创建的环境。在本教程中,此 DAG 的名称为 create_large_txt_file_print_logs
。
此 DAG 包含一个执行以下步骤的任务:
- 将 1.5 GB 的
localfile.txt
文件写入 Airflow 工作器存储空间。 - 使用 Python
os
模块输出所创建文件的大小。 - 每 1 分钟打印一次 DAG 运行的时长。
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'create_large_txt_file_print_logs',
default_args=default_args,
schedule_interval=None)
def consume():
size = 1000**2 # bytes in 1 MB
amount = 100
def create_file():
print(f"Start creating a huge file")
with open("localfile.txt", "ab") as f:
for j in range(15):
f.write(os.urandom(amount) * size)
print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")
create_file()
print("Success!")
t1 = PythonOperator(
task_id='create_huge_file',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0)
上传在长时间运行的操作中创建大文件的 DAG
如需模拟长时间运行的 DAG 并调查任务时长对最终状态的影响,请将第二个示例 DAG 上传到您的环境中。在本教程中,此 DAG 的名称为 long_running_create_large_txt_file_print_logs
。
此 DAG 包含一个执行以下步骤的任务:
- 将 1.5 GB 的
localfile.txt
文件写入 Airflow 工作器存储空间。 - 使用 Python
os
模块输出所创建文件的大小。 - 等待 1 小时 15 分钟,以模拟文件操作所需的时间,例如从文件中读取数据。
- 每 1 分钟打印一次 DAG 运行的时长。
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'long_running_create_large_txt_file_print_logs',
default_args=default_args,
schedule_interval=None)
def consume():
size = 1000**2 # bytes in 1 MB
amount = 100
def create_file():
print(f"Start creating a huge file")
with open("localfile.txt", "ab") as f:
for j in range(15):
f.write(os.urandom(amount) * size)
print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")
create_file()
for k in range(75):
time.sleep(60)
print(f"{k+1} minute")
print("Success!")
t1 = PythonOperator(
task_id='create_huge_file',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0)
触发示例 DAG
触发第一个 DAG create_large_txt_file_print_logs
:
在 Google Cloud 控制台中,前往环境页面。
在 Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。
在 Airflow 网页界面中的 DAG 页面上,在您的 DAG 的链接列中,点击触发 Dag (Trigger Dag) 按钮。
点击触发器。
在 DAG 页面上,点击您触发的任务,然后查看输出日志,确保您的 DAG 已开始运行。
等待您使用
create_large_txt_file_print_logs
DAG 创建的任务完成。这可能需要几分钟的时间。在 DAG 页面上,点击相应 DAG 运行。即使存储空间用量已超出上限,您也会看到相应任务处于
Success
状态。
查看任务 Airflow 日志:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
前往日志标签页,然后依次前往所有日志 > Airflow 日志 > 工作器 > 在日志浏览器中查看。
按类型过滤日志:仅显示错误消息。
在日志中,您会看到类似如下内容的消息:
Worker: warm shutdown (Main Process)
或
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.
这些日志表明,由于使用的存储空间超出限制,Pod 启动了“温关闭”流程,并将在 1 小时后被逐出。不过,由于 DAG 运行在 Kubernetes 终止宽限期内完成,因此并未失败,本教程中将对此进行进一步说明。
为了说明终止宽限期的概念,请查看第二个示例 DAG long_running_create_large_txt_file_print_logs
的结果。
触发第二个 DAG long_running_create_large_txt_file_print_logs
:
在 Google Cloud 控制台中,前往环境页面。
在 Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。
在 Airflow 网页界面中的 DAG 页面上,在您的 DAG 的链接列中,点击触发 Dag (Trigger Dag) 按钮。
点击触发器。
在 DAG 页面上,点击您触发的任务,然后查看输出日志,确保您的 DAG 已开始运行。
等待
long_running_create_large_txt_file_print_logs
DAG 运行失败。此过程大约需要 1 小时。
查看 DAG 运行结果:
在 DAG 页面上,点击
long_running_create_large_txt_file_print_logs
DAG 运行。您会看到,该任务处于Failed
状态,并且运行时长正好为 1 小时 5 分钟,小于任务的等待时间 1 小时 15 分钟。查看任务的日志。DAG 在 Airflow 工作器的容器中创建
localfile.txt
文件后,日志会输出 DAG 开始等待,并且每 1 分钟在任务日志中输出一次运行时长。在此示例中,DAG 会输出localfile.txt size:
日志,并且localfile.txt
文件的大小将为 1.5 GB。
一旦写入 Airflow 工作器容器的文件超出存储空间限制,DAG 运行就会失败。不过,该任务不会立即失败,而是会继续运行,直到其时长达到 1 小时 5 分钟。这是因为 Kubernetes 不会立即终止任务,而是会继续运行,以便留出 1 小时的恢复时间,即“终止宽限期”。一旦节点耗尽资源,Kubernetes 不会立即终止 Pod 以便妥善处理终止,从而尽可能减少对最终用户的影响。
终止宽限期有助于用户在任务失败后恢复文件,但在诊断 DAG 时可能会造成混淆。当 Airflow worker 存储空间超出限额时,结束任务状态取决于 DAG 运行的时长:
如果 DAG 运行超出工作器存储空间上限,但在 1 小时内完成,则任务会以
Success
状态完成,因为任务是在终止宽限期内完成的。不过,Kubernetes 会终止 Pod,并且写入的文件会立即从容器中删除。如果 DAG 超出工作器存储空间限制且运行时间超过 1 小时,则 DAG 会继续运行 1 小时,并且在 Kubernetes 消除 Pod 且 Airflow 将任务标记为
Failed
之前,DAG 超出的存储空间限制可能会达到数千个百分点。
诊断失败的 DAG
如果您在发生故障时运行了多个任务,请考虑仅运行一个任务,并在该时间段内诊断资源压力,以确定哪些任务会导致资源压力,以及需要增加哪些资源。
查看第二个 DAG(long_running_create_large_txt_file_print_logs
)的任务日志:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
前往日志标签页,然后依次前往所有日志 > Airflow 日志 > 工作器 > 在日志浏览器中查看。
按类型过滤日志:仅显示错误消息。
在日志中,您会看到类似如下内容的消息:
Container storage usage of worker reached 155.7% of the limit.
This likely means that the total size of local files generated by your DAGs is
close to the storage limit of worker.
You may need to decrease the storage usage or increase the worker storage limit
in your Cloud Composer environment configuration.
或
Pod storage usage of worker reached 140.2% of the limit.
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.
This eviction likely means that the total size of dags and plugins folders plus
local files generated by your DAGs exceeds the storage limit of worker.
Please decrease the storage usage or increase the worker storage limit in your
Cloud Composer environment configuration.
这些消息表明,随着任务的进行,当 DAG 生成的文件大小超过工作器存储空间限制时,Airflow 日志开始打印错误,并且终止宽限期开始。在终止宽限期内,存储空间用量未恢复到限制以下,导致在终止宽限期结束后 pod 被逐出。
查看环境健康状况和资源消耗监控:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
前往监控标签页,然后选择概览。
在环境概览面板上,找到环境健康情况(Airflow 监控 DAG)图表。它包含一个红色区域,对应于日志开始打印错误的时间。
选择工作器,然后找到工作器磁盘总用量图表。请注意,在任务运行时,“磁盘使用率”线出现峰值并超过“磁盘限制”线。

提高工作器存储空间上限
分配额外的 Airflow 工作器存储空间,以确保示例 DAG 成功运行:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
转到环境配置标签页。
找到资源 > 工作负载配置,然后点击修改。
在工作器部分的存储空间字段中,指定 Airflow 工作器的新存储空间限制。在本教程中,请将其设置为 2 GB。
保存更改,并等待几分钟,让 Airflow 工作器重新启动。
使用新的存储空间限额测试 DAG
再次触发 long_running_create_large_txt_file_print_logs
DAG,等待 1 小时 15 分钟,直到它运行完成。
在 DAG 运行的输出日志中,您会看到
Marking task as SUCCESS
,并且任务的状态将指示成功,持续时间为 1 小时 15 分钟,这等于在 DAG 代码中设置的等待时间。查看监控标签页中的环境概览部分,确保没有红色区域。
点击工作器部分,找到工作器磁盘总用量图表。您会看到,磁盘限额行反映了存储空间限额的变化,而磁盘使用情况行则在允许的范围内。
摘要
在本教程中,您通过调试两个因工作器内存和存储空间不足而失败的示例 DAG,诊断了 DAG 失败的原因,并确定了导致压力的资源类型。然后,您为工作器分配了更多内存和存储空间,并成功运行了 DAG。不过,建议您先优化 DAG(工作流)以减少工作器资源消耗,因为资源无法增加到超出某个阈值。
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留该项目但删除各个资源。
删除项目
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
删除各个资源
如果您打算探索多个教程和快速入门,重复使用项目可以帮助您避免超出项目配额上限。
删除 Cloud Composer 环境。在此过程中,您还可以删除环境的存储桶。