调试任务调度问题

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本教程将指导您诊断和排查导致调度程序故障、解析错误和延迟以及任务失败的任务调度和解析问题。

简介

Airflow 调度器主要受两个因素影响:任务调度和 DAG 解析。其中一个因素出现问题可能会对环境健康度和性能产生负面影响。

有时,同时调度的任务过多。在这种情况下,队列会填满,任务会保持“已安排”状态或在排队后重新安排,这可能会导致任务失败和性能延迟。

另一个常见问题是因 DAG 代码的复杂性而导致的解析延迟和错误。例如,如果 DAG 代码在代码的顶层包含 Airflow 变量,可能会导致解析延迟、数据库过载、调度失败和 DAG 超时。

在本教程中,您将诊断示例 DAG,并学习如何排查调度和解析问题、改进 DAG 调度,以及优化 DAG 代码和环境配置以提高性能。

目标

本部分列出了本教程中示例的目标。

示例:调度程序故障和高任务并发导致的延迟

  • 上传同时运行多次的示例 DAG,并使用 Cloud Monitoring 诊断调度程序故障和延迟问题。

  • 通过整合任务来优化 DAG 代码,并评估性能影响。

  • 随着时间的推移更均匀地分配任务,并评估效果影响。

  • 优化 Airflow 配置和环境配置,并评估影响。

示例:由复杂代码导致的 DAG 解析错误和延迟时间

  • 上传包含 Airflow 变量的示例 DAG,并使用 Cloud Monitoring 诊断解析问题。

  • 通过避免在代码的顶层使用 Airflow 变量来优化 DAG 代码,并评估对解析时间的影响。

  • 优化 Airflow 配置和环境配置,并评估对解析时间的影响。

费用

本教程使用 Google Cloud的以下收费组件:

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

准备工作

本部分介绍了在开始本教程之前需要执行的操作。

创建和配置项目

在本教程中,您需要一个 Google Cloud 项目。 按如下方式配置项目:

  1. 在 Google Cloud 控制台中,选择或创建一个项目

    前往“项目选择器”

  2. 确保您的项目已启用结算功能。 了解如何检查项目是否已启用结算功能

  3. 确保您的 Google Cloud 项目用户拥有以下角色,以便创建必要的资源:

    • Environment and Storage Object Administrator (roles/composer.environmentAndStorageObjectAdmin)
    • Compute Admin (roles/compute.admin)

为您的项目启用 API

Enable the Cloud Composer API.

Enable the API

创建 Cloud Composer 环境

创建 Cloud Composer 2 环境

在创建环境的过程中,您会向 Composer 服务代理账号授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) 角色。Cloud Composer 使用此账号在您的 Google Cloud 项目中执行操作。

示例:调度器出现故障,并且由于任务调度问题导致任务失败

此示例演示了如何调试因任务并发性过高而导致的调度程序故障和延迟。

将示例 DAG 上传到您的环境

将以下示例 DAG 上传到您在之前步骤中创建的环境。在本教程中,此 DAG 的名称为 dag_10_tasks_200_seconds_1

此 DAG 有 200 个任务。每个任务等待 1 秒,然后打印“完成!”。 上传后,系统会自动触发 DAG。Cloud Composer 会运行此 DAG 10 次,所有 DAG 运行都会并行进行。

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

诊断调度器故障和任务失败问题

DAG 运行完成后,打开 Airflow 界面,然后点击 dag_10_tasks_200_seconds_1 DAG。您会看到总共有 10 个 DAG 运行作业成功完成,每个作业都有 200 个任务成功完成。

查看 Airflow 任务日志:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 前往日志标签页,然后依次前往所有日志 > Airflow 日志 > 工作器 > 在日志浏览器中查看

在日志直方图上,您可以看到以红色和橙色表示的错误和警告:

Airflow 工作器日志的直方图,其中错误和警告分别以红色和橙色表示
图 1. Airflow 工作器日志直方图(点击可放大)

示例 DAG 产生了大约 130 条警告和 60 条错误。点击包含黄色和红色条形的任意列。您会在日志中看到以下部分警告和错误:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

这些日志可能表明资源使用量超出了限制,并且工作器自行重启。

如果 Airflow 任务在队列中保留的时间过长,则调度器会将其标记为 failed 和 up_for_retry,并且将再次重新安排它以执行。观察此情况症状的一种方法是查看包含排队任务数量的图表,如果此图表中的峰值在约 10 分钟内没有下降,则很可能会出现任务失败(没有日志)。

查看监控信息:

  1. 前往监控标签页,然后选择概览

  2. 查看 Airflow 任务图表。

    显示 Airflow 任务随时间变化的图表,其中显示了排队任务数量的峰值
    图 2. Airflow 任务图表(点击可放大)

    在 Airflow 任务图中,排队任务数量出现持续时间超过 10 分钟的峰值,这可能意味着您的环境中没有足够的资源来处理所有计划任务。

  3. 查看活跃工作器数图表:

    活跃 Airflow 工作器数量随时间变化的图表显示,活跃工作器的数量已增加到上限
    图 3. 活跃工作器数图表(点击可放大)

    活跃工作器图表表明,在 DAG 运行期间,DAG 触发了自动扩缩,将工作器数量扩缩到允许的上限(3 个)。

  4. 资源使用情况图表可以指示 Airflow 工作器是否缺乏运行排队任务的容量。在监控标签页上,选择工作器,然后查看工作器 CPU 总用量工作器内存总用量图表。

    Airflow 工作器的 CPU 使用率图表显示 CPU 使用率增加到上限
    图 4. 工作器的总 CPU 使用情况图表(点击可放大)
    Airflow 工作器的内存用量图显示内存用量在增加,但未达到上限
    图 5. 工作器内存总用量图表(点击可放大)

    这些图表表明,同时执行过多任务会导致达到 CPU 上限。资源的使用时间超过 30 分钟,甚至比 10 次 DAG 运行中 200 个任务逐个运行的总时长还要长。

这些指标表明队列正在被填满,并且缺少处理所有已调度任务的资源。

合并任务

当前代码创建了许多 DAG 和任务,但没有足够的资源来并行处理所有任务,导致队列被填满。如果任务在队列中保留的时间过长,可能会导致任务重新安排或失败。 在这种情况下,您应该选择使用较少的整合任务。

以下示例 DAG 将初始示例中的任务数从 200 更改为 20,并将等待时间从 1 秒增加到 10 秒,以模拟执行相同工作量的更整合的任务。

将以下示例 DAG 上传到您创建的环境。在本教程中,此 DAG 的名称为 dag_10_tasks_20_seconds_10

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

评估任务合并对安排流程的影响:

  1. 等待 DAG 运行完成。

  2. 在 Airflow 界面中的 DAGs 页面上,点击 dag_10_tasks_20_seconds_10 DAG。您将看到 10 个 DAG 运行作业,每个作业都有 20 个成功完成的任务。

  3. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  4. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  5. 前往日志标签页,然后依次前往所有日志 > Airflow 日志 > 工作器 > 在日志浏览器中查看

    第二个示例的任务更加整合,结果产生了大约 10 个警告和 7 个错误。在直方图上,您可以比较初始示例(较早的值)和第二个示例(较晚的值)中的错误和警告数量。

    包含错误和警告的 Airflow 工作器日志直方图显示了任务合并后错误和警告数量的减少
    图 6. 合并任务后的 Airflow 工作器日志直方图(点击可放大)

    将第一个示例与更精简的示例进行比较,您会发现第二个示例中的错误和警告明显减少。不过,由于资源过载,日志中仍会显示与暖关机相关的相同错误。

  6. 监控标签页上,选择工作器,然后查看图表。

    将第一个示例(较早的值)的 Airflow 任务图表与第二个示例(任务更精简)的图表进行比较,您会发现,当任务更精简时,排队任务的峰值持续时间更短。不过,该视频持续了近 10 分钟,这仍然不太理想。

    Airflow 任务随时间变化的图表显示,Airflow 任务的峰值持续时间比以前短。
    图 7. 合并任务后的 Airflow 任务图表(点击可放大)

    在“活跃工作器”图表中,您可以看到第一个示例(位于图表左侧)使用的资源的时间比第二个示例长得多,即使这两个示例模拟的工作量相同。

    从活跃 Airflow 工作器的随时间变化图可以看出,活跃工作器的数量增加的时间段比之前短。
    图 8. 任务合并后的活跃工作器数图表(点击可放大)

    查看工作器资源消耗图。虽然在任务更整合的示例中使用的资源与初始示例之间存在相当大的差异,但 CPU 使用率仍会飙升至限值的 70%。

    Airflow 工作器的 CPU 使用率图表显示 CPU 使用率最高可达到上限的 70%
    图 9. 任务合并后的总工作器 CPU 使用情况图表(点击可放大)
    Airflow 工作器的内存用量图显示内存用量在增加,但未达到上限
    图 10. 任务合并后的工作器内存总用量图表(点击可放大)

随着时间的推移更均匀地分配任务

过多的并发任务会导致队列被填满,从而导致任务卡在队列中或被重新安排。在之前的步骤中,您通过合并任务减少了任务数量,但输出日志和监控结果表明,并发任务数量仍未达到最佳状态。

您可以通过实现调度或设置可同时运行的任务数量上限来控制并发任务运行的数量。

在本教程中,您将通过向 dag_10_tasks_20_seconds_10 DAG 添加 DAG 级参数,随着时间的推移更均匀地分配任务:

  1. 向 DAG 上下文管理器添加 max_active_runs=1 实参。此实参用于设置限制,即在给定时刻只能运行一个 DAG 实例。

  2. 向 DAG 上下文管理器添加 max_active_tasks=5 实参。此实参用于控制每个 DAG 中可以同时运行的任务实例数上限。

将以下示例 DAG 上传到您创建的环境。在本教程中,此 DAG 的名称为 dag_10_tasks_20_seconds_10_scheduled.py

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5


with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

评估随时间推移分配任务对调度流程的影响:

  1. 等待 DAG 运行完成。

  2. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  3. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  4. 前往日志标签页,然后依次前往所有日志 > Airflow 日志 > 工作器 > 在日志浏览器中查看

  5. 在直方图中,您可以看到,第三个 DAG 的有效任务和运行次数有限,但未生成任何警告或错误,并且日志的分布与之前的值相比看起来更均匀。

    包含错误和警告的 Airflow 工作器日志的直方图显示,在任务合并并随时间分布后,没有错误或警告。
    图 11.在任务整合并随时间分布后,Airflow 工作器日志直方图(点击可放大)

dag_10_tasks_20_seconds_10_scheduled 示例中,由于活跃任务和运行次数有限,且任务均匀排队,因此不会造成资源压力。

执行上述步骤后,您可以通过整合小型任务并在一段时间内更均匀地分布这些任务来优化资源使用情况。

优化环境配置

您可以调整环境配置,确保 Airflow 工作器始终具有容量来运行已加入队列的任务。

工作器数量和工作器并发数

您可以调整工作器数量上限,使 Cloud Composer 在设定的限制内自动扩缩您的环境。

[celery]worker_concurrency 参数定义了单个工作器可以从任务队列中提取的任务数量上限。更改此参数可调整单个工作器可以同时执行的任务量。您可以通过替换此 Airflow 配置选项来更改它。默认情况下,工作器并发数是根据工作器可容纳的轻量级并发任务实例数设置的。这意味着,其值取决于工作器资源限制。 工作器并发值不取决于环境中的工作器数量。

工作器数量和工作器并发性会相互配合,而环境性能在很大程度上取决于这两个参数。您可以根据以下注意事项选择正确的组合:

  • 并行运行多个快速任务。当队列中有任务在等待,同时工作器在使用一小部分 CPU 和内存时,您可以提高工作器并发性。不过,在某些情况下,队列可能永远不会填满,导致自动扩缩永远不会触发。如果小型任务在新的工作器准备就绪之前完成执行,则现有工作器可以接手剩余的任务,而新创建的工作器将没有任务可执行。

    在这些情况下,建议增加工作器的最小数量并提高工作器并发性,以避免过度热切的伸缩。

  • 并行运行多个长时间运行的任务。工作器并发数量过高会阻止系统伸缩工作器数量。如果多个任务消耗大量资源且需要很长时间才能完成,那么较高的工作器并发数可能会导致队列永远无法填满,并且所有任务都只由一个工作器拾取,从而导致性能问题。在这些情况下,建议增加工作器数量上限并降低工作器并发数量

并行性的重要性

Airflow 调度器控制 DAG 运行和 DAG 中各项任务的时间安排。[core]parallelism Airflow 配置选项可控制在满足任务的所有依赖项后,Airflow 调度器可将多少任务添加到执行程序的队列中。

并行性是 Airflow 的一种保护机制,用于确定每个调度器可以同时运行的任务数,而与工作器数量无关。并行值乘以集群中的调度器数量,即为环境可排队的最大任务实例数。

通常,[core]parallelism 设置为工作器数量上限与 [celery]worker_concurrency 的乘积。它还受的影响。 您可以通过替换此 Airflow 配置选项来更改它。如需详细了解如何调整与伸缩相关的 Airflow 配置,请参阅伸缩 Airflow 配置

找到最佳环境配置

解决调度问题的推荐方法是将小任务整合为大任务,并随着时间的推移更均匀地分配任务。除了优化 DAG 代码之外,您还可以优化环境配置,以便有足够的容量来同时运行多个任务。

例如,假设您尽可能在 DAG 中整合任务,但限制活跃任务以使它们在时间上更均匀地分布并不是您特定使用情形的首选解决方案。

您可以调整并行性、工作器数量和工作器并发性参数,以运行 dag_10_tasks_20_seconds_10 DAG,而不会限制有效任务。在此示例中,DAG 运行 10 次,每次运行包含 20 个小型任务。如果您想同时运行所有测试,请执行以下操作:

  • 您需要更大的环境规模,因为该参数会控制环境的代管式 Cloud Composer 基础架构的性能参数。

  • Airflow 工作器必须能够同时运行 20 个任务,这意味着您需要将工作器并发数设置为 20。

  • 工作器需要有足够的 CPU 和内存来处理所有任务。工作器并发性受工作器 CPU 和内存的影响,因此您至少需要 worker_concurrency / 12 个 CPU 和 least worker_concurrency / 8 内存。

  • 您需要提高并行度,以匹配更高的工作器并发数。为了让工作器从队列中提取 20 个任务,调度程序需要先调度这 20 个任务。

按以下方式调整环境配置:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到环境配置标签页。

  4. 找到资源 > 工作负载配置,然后点击修改

  5. 工作器部分的内存字段中,指定 Airflow 工作器的新内存限制。在本教程中,请使用 4 GB。

  6. CPU 字段中,指定 Airflow 工作器的新 CPU 限制。在本教程中,请使用 2 个 vCPU。

  7. 保存更改,并等待几分钟,让 Airflow 工作器重新启动。

接下来,替换并行性和工作器并发性 Airflow 配置选项:

  1. 前往 Airflow 配置替换标签页。

  2. 依次点击修改添加 Airflow 配置替换

  3. 替换并行处理配置:

    部分
    core parallelism 20
  4. 点击添加 Airflow 配置替换,然后替换工作线程并发配置:

    部分
    celery worker_concurrency 20
  5. 点击保存,然后等待环境更新其配置。

使用调整后的配置再次触发同一示例 DAG:

  1. 在 Airflow 界面中,前往 DAG 页面。

  2. 找到 dag_10_tasks_20_seconds_10 DAG 并将其删除。

    删除 DAG 后,Airflow 会检查您环境的存储桶中的 DAG 文件夹,并自动再次运行该 DAG。

DAG 运行完成后,再次查看“日志”直方图。在图表中,您可以看到,在运行调整后的环境配置时,任务更加整合的 dag_10_tasks_20_seconds_10 示例未生成任何错误和警告。将结果与图表中之前的数据进行比较,在之前的数据中,同一示例在以默认环境配置运行时生成了错误和警告。

调整环境配置后,包含错误和警告的 Airflow 工作器日志的直方图显示没有错误和警告
图 12. 调整环境配置后的 Airflow 工作器日志直方图(点击可放大)

环境配置和 Airflow 配置在任务调度中发挥着至关重要的作用,但是,无法将配置增加到超出某些限制。

建议优化 DAG 代码、整合任务并使用调度功能,以实现最佳性能和效率。

示例:由于 DAG 代码复杂而导致的 DAG 解析错误和延迟时间

在此示例中,您将调查模仿 Airflow 变量过多的示例 DAG 的解析延迟时间。

创建新的 Airflow 变量

在上传示例代码之前,请创建一个新的 Airflow 变量。

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。

  3. 依次前往管理 > 变量 > 添加新记录

  4. 设置以下值:

    • 键:example_var
    • 值:test_airflow_variable

将示例 DAG 上传到您的环境

将以下示例 DAG 上传到您在之前步骤中创建的环境。在本教程中,此 DAG 的名称为 dag_for_loop_airflow_variable

此 DAG 包含一个运行 1,000 次的 for 循环,用于模拟过多的 Airflow 变量。每次迭代都会读取 example_var 变量并生成一个任务。每个任务都包含一个用于输出变量值的命令。

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

诊断解析问题

DAG 解析时间是指 Airflow 调度器读取和解析 DAG 文件所需的时间。在 Airflow 调度器可以调度 DAG 中的任何任务之前,调度器必须解析 DAG 文件,以发现 DAG 的结构和已定义的任务。

如果 DAG 需要很长时间才能解析,这会占用调度器的容量,并且可能会降低 DAG 运行的性能。

如需监控 DAG 解析时间,请执行以下操作:

  1. 在 gcloud CLI 中运行 dags report Airflow CLI 命令,以查看所有 DAG 的解析时间:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    替换以下内容:

    • ENVIRONMENT_NAME:您的环境的名称。
    • LOCATION:环境所在的区域。
  2. 在命令的输出中,查找 dag_for_loop_airflow_variables DAG 的时长值。如果值较大,则可能表示此 DAG 未以最佳方式实施。如果您有多个 DAG,可以从输出表中确定哪些 DAG 的解析时间较长。

    示例:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. 在 Google Cloud 控制台中检查 DAG 解析时间:

    1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  4. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  5. 前往日志标签页,然后依次前往所有日志 > DAG 处理器管理器

  6. 查看 dag-processor-manager 日志并找出可能存在的问题。

    示例 DAG 的日志条目显示,DAG 解析时间为 46.3 秒
    图 13. DAG 处理器管理器日志显示 DAG 解析时间(点击可放大)

如果 DAG 总解析时间超过约 10 秒,则调度器可能因 DAG 解析而过载,因而无法有效运行 DAG。

优化 DAG 代码

建议避免在 DAG 中使用不必要的“顶级”Python 代码。如果 DAG 在外部包含大量导入、变量和函数,Airflow 调度器的解析时间会更长。这会降低 Cloud Composer 和 Airflow 的性能和可伸缩性。读取过多的 Airflow 变量会导致解析时间过长和数据库负载过高。如果此代码位于 DAG 文件中,这些函数会在每次调度程序心跳时执行,这可能会很慢。

借助 Airflow 的模板字段,您可以将 Airflow 变量和 Jinja 模板中的值纳入到 DAG 中。这样可以防止在调度程序心跳期间执行不必要的函数。

为了更好地实现 DAG 示例,请避免在 DAG 的顶级 Python 代码中使用 Airflow 变量。而是通过 Jinja 模板将 Airflow 变量传递给现有运算符,这样会延迟读取值,直到任务执行时才读取。

将示例 DAG 的新版本上传到您的环境。在本教程中,此 DAG 的名称为 dag_for_loop_airflow_variable_optimized

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

检查新的 DAG 解析时间:

  1. 等待 DAG 运行完成。

  2. 再次运行 dags report 命令,查看所有 DAG 的解析时间:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. 再次查看 dag-processor-manager 日志,并分析解析时长。

    示例 DAG 的日志条目显示,DAG 解析时间为 4.21 秒
    图 14. DAG 处理器管理器日志显示了优化 DAG 代码后的 DAG 解析时间(点击可放大)

通过将环境变量替换为 Airflow 模板,您简化了 DAG 代码,并将解析延迟时间缩短了大约 10 倍。

优化 Airflow 环境配置

Airflow 调度器会不断尝试触发新任务,并解析环境存储桶中的所有 DAG。如果您的 DAG 解析时间较长,并且调度器消耗大量资源,您可以优化 Airflow 调度器配置,以便调度器更高效地使用资源。

在本教程中,DAG 文件需要很长时间才能解析,并且解析周期开始重叠,然后耗尽调度器的容量。在我们的示例中,第一个示例 DAG 需要 5 秒以上的时间才能完成解析,因此您需要配置调度程序以降低运行频率,从而更高效地使用资源。您将替换 scheduler_heartbeat_sec Airflow 配置选项。此配置定义了调度程序的运行频率(以秒为单位)。默认情况下,该值设置为 5 秒。 您可以通过替换此 Airflow 配置选项来更改它。

替换 scheduler_heartbeat_sec Airflow 配置选项:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 前往 Airflow 配置替换标签页。

  4. 依次点击修改添加 Airflow 配置替换

  5. 替换 Airflow 配置选项:

    部分
    scheduler scheduler_heartbeat_sec 10
  6. 点击保存,然后等待环境更新其配置。

检查调度器指标:

  1. 前往监控标签页,然后选择调度程序

  2. 调度程序心跳图表中,点击更多选项按钮(三个点),然后点击在 Metrics Explorer 中查看

调度程序检测信号图显示检测信号的发生频率较低
图 15. 调度器检测信号图表(点击可放大)

在图表中,您会看到在将默认配置从 5 秒更改为 10 秒后,调度程序的运行频率降低了一半。通过降低心跳频率,您可以确保调度程序不会在之前的解析周期正在进行时开始运行,并且调度程序的资源容量不会耗尽。

为调度程序分配更多资源

在 Cloud Composer 2 中,您可以为调度程序分配更多 CPU 和内存资源。这样一来,您就可以提高调度器的性能,并缩短 DAG 的解析时间。

为调度器分配额外的 CPU 和内存:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到环境配置标签页。

  4. 找到资源 > 工作负载配置,然后点击修改

  5. 调度程序部分的内存字段中,指定新的内存限制。在本教程中,请使用 4 GB。

  6. CPU 字段中,指定新的 CPU 上限。在本教程中,请使用 2 个 vCPU。

  7. 保存更改,并等待几分钟,让 Airflow 调度器重新启动。

  8. 前往日志标签页,然后依次前往所有日志 > DAG 处理器管理器

  9. 查看 dag-processor-manager 日志,并比较示例 DAG 的解析时长:

    示例 DAG 的日志条目显示,优化后的 DAG 的 DAG 解析时间为 1.5 秒。对于未优化的 DAG,解析时间为 28.71 秒
    图 16. DAG 处理器管理器日志显示了在为调度器分配更多资源后 DAG 的解析时间(点击可放大)

通过为调度程序分配更多资源,您提高了调度程序的容量,并与默认环境配置相比,显著降低了解析延迟时间。资源越多,调度程序解析 DAG 的速度就越快,但与 Cloud Composer 资源相关的费用也会增加。此外,资源无法超出一定限制。

建议仅在实施了可能的 DAG 代码和 Airflow 配置优化后才分配资源。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留该项目但删除各个资源。

删除项目

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除各个资源

如果您打算探索多个教程和快速入门,重复使用项目可以帮助您避免超出项目配额上限。

删除 Cloud Composer 环境。在此过程中,您还可以删除环境的存储桶。

后续步骤