调试内存不足和存储空间不足 DAG 问题

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本教程介绍如何在 Cloud Composer 并诊断工作器资源相关问题,例如 工作器内存不足或存储空间不足;借助日志和环境 监控。

简介

本教程重点介绍资源相关问题,以演示调试 DAG。

缺少分配的工作器资源会导致 DAG 故障。如果一个 Airflow 任务 则可能会出现 Airflow 异常 例如:

WARNING airflow.exceptions.AirflowException: Task received SIGTERM signal
INFO - Marking task as FAILED.

Task exited with return code Negsignal.SIGKILL

在这种情况下,一般建议增加 Airflow 工作器数量。 或减少每个工作器的任务数。不过,由于 Airflow 例外情况可能比较宽泛,因此您可能很难确定 导致问题的特定资源。

本教程介绍了如何诊断 DAG 失败的原因,以及 通过调试两个示例 DAG 来确定导致问题的资源类型 会因为缺乏工作器内存和存储空间而失败。

目标

  • 运行由于以下原因失败的示例 DAG:

    • 缺少工作器内存
    • 缺少工作器存储空间
  • 诊断失败原因

  • 增加分配的工作器资源

  • 测试具有新资源限制的 DAG

费用

本教程使用 Google Cloud 的以下收费组件:

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

准备工作

本部分介绍了在开始学习本教程之前必须执行的操作。

创建和配置项目

在本教程中,您需要 project。 按以下方式配置项目:

  1. 在 Google Cloud 控制台中,选择或创建项目

    转到项目选择器

  2. 确保您的项目已启用结算功能。 了解如何检查项目是否已启用结算功能

  3. 如需创建必要的资源,请确保您的 Google Cloud 项目用户具有以下角色

    • 环境和存储对象管理员 (roles/composer.environmentAndStorageObjectAdmin)
    • Compute Admin (roles/compute.admin)
    • Monitoring Editor (roles/monitoring.editor)

为您的项目启用 API

启用 Cloud Composer API。

启用 API

创建 Cloud Composer 环境

创建一个 Cloud Composer 2 环境

在创建环境的过程中 您授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) 角色分配给 Composer Service Agent 。Cloud Composer 使用此账号执行操作 Google Cloud 项目中的资源。

检查工作器资源限制

检查环境中的 Airflow 工作器资源限制:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到环境配置标签页。

  4. 转到资源 > 工作负载配置 > 工作器。

  5. 检查这些值是否为 0.5 个 vCPU、1.875 GB 内存和 1 GB 存储空间。 这些是您在下一步中要处理的 Airflow 工作器资源限制 本教程中各步骤的说明。

示例:诊断内存不足问题

将以下示例 DAG 上传到环境中 您在之前步骤中创建的应用在本教程中,此 DAG 名为 create_list_with_many_strings.

此 DAG 包含一项执行以下步骤的任务:

  1. 创建一个空列表 s
  2. 运行一个循环,将 More 字符串附加到列表中。
  3. 输出列表占用的内存量,每 1 秒钟等待 1 秒 分钟迭代。
import time

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import sys
from datetime import timedelta

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'create_list_with_many_strings',
    default_args=default_args,
    schedule_interval=None)


def consume():
    s = []
    for i in range(120):
        for j in range(1000000):
            s.append("More")
        print(f"i={i}; size={sys.getsizeof(s) / (1000**3)}GB")
        time.sleep(1)


t1 = PythonOperator(
    task_id='task0',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0
)

触发示例 DAG

触发示例 DAG create_list_with_many_strings

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。

  3. 在 Airflow 网页界面中的 DAG 页面上,在您的 DAG 的链接列中,点击触发 Dag (Trigger Dag) 按钮。

  4. 点击触发器

  5. DAG 页面上,点击您触发的任务并查看输出 日志,确保您的 DAG 已经开始运行。

任务运行时,输出日志将打印内存大小(以 GB 为单位) DAG 正在使用的网络

几分钟后,任务将失败,因为它超出了 Airflow 工作器 内存上限为 1.875 GB

诊断失败的 DAG

如果失败时您正在运行多个任务,请考虑 仅完成一项任务并诊断在此期间的资源压力,以确定 哪些任务会造成资源压力,以及您需要增加哪些资源。

查看 Airflow 任务日志

您会发现,create_list_with_many_strings DAG 中的任务具有 Failed 状态。

查看任务的日志。您将看到以下日志条目:

```none
{local_task_job.py:102} INFO - Task exited with return code
Negsignal.SIGKILL
```

`Netsignal.SIGKILL` might be an indication of your task using more memory
than the Airflow worker is allocated. The system sends
the `Negsignal.SIGKILL` signal to avoid further memory consumption.

查看工作负载

查看工作负载,检查任务负载是否不会导致 Pod 运行以超出内存消耗限制:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到环境配置标签页。

  4. 资源 > GKE 集群 >工作负载,请点击查看集群工作负载

  5. 检查某些工作负载 Pod 是否具有类似如下所示的状态:

    Error with exit code 137 and 1 more issue.
    ContainerStatusUnknown with exit code 137 and 1 more issue
    

    Exit code 137 表示容器或 Pod 正在尝试使用更多内存 超过允许的范围。系统会终止该进程以避免使用内存。

查看环境健康状况和资源消耗监控情况

查看环境健康状况和资源消耗监控情况:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到监控标签页,然后选择概览

  4. 环境概览面板上,找到 Environment Health (Airflow Monitoring DAG) 图表。其中包含一个红色的 区域,该区域对应于日志开始输出错误的时间。

  5. 选择工作器,然后找到工作器内存总用量图表。 您会发现,内存用量线条 任务正在运行。

<ph type="x-smartling-placeholder"></ph> 内存使用量线在
    任务正在运行
图 1.工作器总内存用量图(点击可放大)

虽然图表上的内存用量线未达到上限, 在诊断失败原因时,你需要考虑以下因素: 仅限 可分配内存、 而图表上的内存上限线表示总内存 可用(包括 GKE 预留的容量)。

在此示例中,工作器内存限制设置为 1.875 GB。 GKE 预留前 4 GiB 内存的 25%。 GKE 还预留了 eviction-threshold: 每个节点上用于 kubelet 逐出的 100 MiB 内存。

可分配内存的计算方式如下:

ALLOCATABLE = CAPACITY - RESERVED - EVICTION-THRESHOLD

如果内存限制为 1.875 GB,则实际可分配内存为:

1.75 GiB (1.875GB) - 0.44 (25% GiB reserved) - 0.1 = 1.21 GiB (~1.3 GB).

将此实际限制附加到内存用量图后, 会看到任务的内存使用量峰值达到实际内存量 您可以断定是由于工作器不足而导致任务失败 内存。

提高工作器内存限制

请分配额外的工作器内存,以便示例 DAG 成功:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到环境配置标签页。

  4. 找到 Resources &gt; Workloads 配置,然后 点击修改

  5. Worker 部分的 Memory 字段中,指定新内存 Airflow 工作器限制。在本教程中,请使用 3 GB。

  6. 保存更改,并等待 Airflow 工作器几分钟 重启。

使用新的内存限制测试 DAG

再次触发 create_list_with_many_strings DAG 并等待它 。

  1. 在 DAG 运行的输出日志中,您将看到 Marking task as SUCCESS。 任务的状态将显示为 Success

  2. 查看监控标签页中的环境概览部分,然后 请确认没有红色区域

  3. 点击 Workers 部分,然后找到 Total worker memory usage 图表。您会看到内存限制行反映了 内存用量行远低于实际内存用量, 可分配内存上限。

示例:诊断存储空间不足问题

在此步骤中,您将上传两个用于创建大型文件的 DAG。第一个 DAG 会创建一个大文件。第二个 DAG 创建一个大文件,并模拟 长时间运行的操作。

两个 DAG 中的文件大小都超过了默认的 Airflow 工作器存储空间 但第二个 DAG 有额外的等待任务来扩展其 。

在接下来的内容中,您将研究这两个 DAG 的行为差异 步骤。

上传用于创建大型文件的 DAG

将以下示例 DAG 上传到环境中 您在之前步骤中创建的应用在本教程中,此 DAG 名为 create_large_txt_file_print_logs.

此 DAG 包含一项执行以下步骤的任务:

  1. 将 1.5 GB localfile.txt 文件写入 Airflow 工作器存储空间。
  2. 使用 Python os 模块输出所创建文件的大小。
  3. 输出每 1 分钟输出的 DAG 运行时长。
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'create_large_txt_file_print_logs',
    default_args=default_args,
    schedule_interval=None)


def consume():
    size = 1000**2  # bytes in 1 MB
    amount = 100

    def create_file():
        print(f"Start creating a huge file")
        with open("localfile.txt", "ab") as f:
            for j in range(15):
                f.write(os.urandom(amount) * size)
        print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")

    create_file()
    print("Success!")


t1 = PythonOperator(
    task_id='create_huge_file',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0)

上传用于在长时间运行的操作中创建大型文件的 DAG

模拟长时间运行的 DAG 并调查任务时长的影响 在最终状态下,将第二个示例 DAG 上传到 环境在本教程中,此 DAG 名为 long_running_create_large_txt_file_print_logs.

此 DAG 包含一项执行以下步骤的任务:

  1. 将 1.5 GB localfile.txt 文件写入 Airflow 工作器存储空间。
  2. 使用 Python os 模块输出所创建文件的大小。
  3. 等待 1 小时 15 分钟,以模拟 例如,从文件中读取内容。
  4. 输出每 1 分钟输出的 DAG 运行时长。
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'long_running_create_large_txt_file_print_logs',
    default_args=default_args,
    schedule_interval=None)


def consume():
    size = 1000**2  # bytes in 1 MB
    amount = 100

    def create_file():
        print(f"Start creating a huge file")
        with open("localfile.txt", "ab") as f:
            for j in range(15):
                f.write(os.urandom(amount) * size)
        print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")

    create_file()
    for k in range(75):
        time.sleep(60)
        print(f"{k+1} minute")

    print("Success!")


t1 = PythonOperator(
    task_id='create_huge_file',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0)

触发示例 DAG

触发第一个 DAG create_large_txt_file_print_logs

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。

  3. 在 Airflow 网页界面中的 DAG 页面上,在您的 DAG 的链接列中,点击触发 Dag (Trigger Dag) 按钮。

  4. 点击触发器

  5. DAG 页面上,点击您触发的任务并查看输出 日志,确保您的 DAG 已经开始运行。

  6. 等待您使用 create_large_txt_file_print_logs DAG 完成。这可能需要几个时间 分钟。

  7. DAG 页面上,点击 DAG 运行。你会看到你的任务 具有 Success 状态,即使超出了存储空间上限也是如此。

查看任务 Airflow 日志:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

    1. 在环境列表中,点击您的环境名称。环境详情页面会打开。

    2. 转到日志标签页,然后转到所有日志 &gt; Airflow 日志 &gt; 工作器 &gt; 在日志浏览器中查看

    3. 按类型过滤日志:仅显示错误消息。

在日志中,您将看到类似于以下内容的消息:

Worker: warm shutdown (Main Process)

A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.

这些日志表明 Pod 启动了“温关停”过程,因为 已使用的存储空间超出上限,并在 1 小时内被逐出。不过,DAG 从 1 到 8 搭载 没有失败,因为它是在 Kubernetes 终止宽限期内完成的 本教程将对此进行详细介绍。

为了解释终止宽限期的概念,请查看结果 第二个示例 DAG long_running_create_large_txt_file_print_logs 中。

触发第二个 DAG long_running_create_large_txt_file_print_logs

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。

  3. 在 Airflow 网页界面中的 DAG 页面上,在您的 DAG 的链接列中,点击触发 Dag (Trigger Dag) 按钮。

  4. 点击触发器

  5. DAG 页面上,点击您触发的任务并查看输出 日志,确保您的 DAG 已经开始运行。

  6. 等待 long_running_create_large_txt_file_print_logs DAG 运行 失败。此过程大约需要一个小时

查看 DAG 运行结果:

  1. DAG 页面上,点击 long_running_create_large_txt_file_print_logs 个 DAG 运行。你会看到 任务处于 Failed 状态,且运行时长为 正好 1 小时 5 分钟,少于该任务的等待期 1 小时 15 分钟。

  2. 查看任务的日志。DAG 创建好 localfile.txt 文件后, Airflow 工作器的容器时,日志会输出 DAG 启动的 系统每隔 1 分钟就会将运行时长输出到任务日志中。 在此示例中,DAG 会输出 localfile.txt size: 日志和 大小将为 1.5 GB。localfile.txt

当写入 Airflow 工作器容器的文件超出存储空间大小时 DAG 运行应该会失败。不过,任务不会失败 并在达到 1 小时 5 分钟后持续投放 这是因为 Kubernetes 不会立即终止任务, 运行,以等待 1 小时的恢复时间,称为“终止宽限期” 期限”。当节点的资源耗尽时,Kubernetes 不会终止 Pod 立即妥善处理终止事宜,尽量减少 对最终用户的影响。

终止宽限期可帮助用户在任务失败后恢复文件。 但在诊断 DAG 时可能会引起混淆。Airflow worker 超出存储限制,则结束任务状态取决于 DAG 运行:

  • 如果 DAG 运行作业超出了工作器存储空间上限,但在 1 分钟内完成 小时,由于该任务已完成,并以 Success 状态完成 处于终止宽限期内。不过,Kubernetes 会将 Pod 写入的文件会立即从容器中删除。

  • 如果此 DAG 超出了工作器存储空间上限且运行时间超过 1 小时, DAG 会持续运行 1 小时,可能会超出存储空间上限 在 Kubernetes 消除 Pod 和 Airflow 标志 以 Failed 的形式指定任务。

诊断失败的 DAG

如果失败时您正在运行多个任务,请考虑 仅完成一项任务并诊断在此期间的资源压力,以确定 哪些任务会造成资源压力,以及您需要增加哪些资源。

查看第二个 DAG 的任务日志, long_running_create_large_txt_file_print_logs:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到日志标签页,然后转到所有日志 &gt; Airflow 日志 &gt; 工作器 &gt; 在日志浏览器中查看

  4. 按类型过滤日志:仅显示错误消息。

在日志中,您将看到类似于以下内容的消息:

Container storage usage of worker reached 155.7% of the limit.

This likely means that the total size of local files generated by your DAGs is
close to the storage limit of worker.

You may need to decrease the storage usage or increase the worker storage limit
in your Cloud Composer environment configuration.

Pod storage usage of worker reached 140.2% of the limit.
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.

This eviction likely means that the total size of dags and plugins folders plus
local files generated by your DAGs exceeds the storage limit of worker.

Please decrease the storage usage or increase the worker storage limit in your
Cloud Composer environment configuration.

这些消息表示,随着任务的进展,Airflow 日志已开始 在 DAG 生成的文件大小超过 工作器存储空间上限,并且终止宽限期已开始。在 终止宽限期,存储空间用量未恢复到限制, 这导致终止宽限期结束后 Pod 被逐出。

查看环境健康状况和资源消耗监控情况:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到监控标签页,然后选择概览

  4. 环境概览面板上,找到 Environment Health (Airflow Monitoring DAG) 图表。其中包含一个红色的 区域,该区域对应于日志开始输出错误的时间。

  5. 选择工作器,然后找到工作器磁盘总用量图表。观察 磁盘使用率行出现峰值并超出磁盘限制

<ph type="x-smartling-placeholder"></ph> 磁盘用量行出现峰值,且超出磁盘上限
    添加代码行
图 2.工作器磁盘用量图表(点击可放大)

提高工作器存储空间上限

请分配额外的 Airflow 工作器存储空间,以便示例 DAG 成功:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到环境配置标签页。

  4. 找到 Resources &gt; Workloads 配置,然后 点击修改

  5. Worker 部分的 Storage 字段中,指定新的存储空间 Airflow 工作器限制。在本教程中,应将其设置为 2 GB。

  6. 保存更改,并等待 Airflow 工作器几分钟 重启。

使用新的存储空间限制测试 DAG

再次触发 long_running_create_large_txt_file_print_logs DAG 请等待 1 小时 15 分钟,直到它完成运行。

  1. 在 DAG 运行的输出日志中,您将看到 Marking task as SUCCESS。 任务状态将显示为成功,持续时间为 1 小时 15 分钟,即 DAG 代码中设置的等待时间。

  2. 查看监控标签页中的环境概览部分,然后 请确认没有红色区域

  3. 点击工作器部分,然后找到工作器磁盘总用量 图表。您将看到磁盘限制行反映了 且磁盘使用量行在允许的范围内。

摘要

在本教程中,您诊断了 DAG 失败的原因并确定了 通过调试两个失败的示例 DAG 来产生压力的资源类型 而过度耗用资源。然后,您成功运行了 DAG 在向工作器分配更多内存和存储空间后决定如何操作不过, 推荐给 优化 DAG(工作流) 减少工作器资源消耗量 将资源增加到超过特定阈值。

清理

为避免系统因资源向您的 Google Cloud 账号收取费用 您可以删除包含这些资源的项目 或者保留项目而删除各个资源。

删除项目

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除各个资源

如果您打算探索多个教程和快速入门,重复使用项目可以帮助您避免超出项目配额上限。

删除 Cloud Composer 环境。您 删除环境的存储桶

后续步骤