调试任务调度问题

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本教程将指导您诊断和排查导致调度程序故障、解析错误和延迟以及任务失败的任务调度和解析问题。

简介

Airflow 调度器主要受以下两个因素影响:任务调度和 DAG 解析。其中一个因素出现问题都可能会对环境健康和性能产生负面影响。

有时,同时调度的任务过多。在这种情况下,队列会被填满,任务会保持“已调度”状态,或者在加入队列后重新调度,这可能会导致任务失败和性能延迟。

另一个常见问题是解析延迟时间和由 DAG 代码复杂性导致的错误。例如,如果 DAG 代码在代码顶层包含 Airflow 变量,则可能会导致解析延迟、数据库过载、调度失败和 DAG 超时。

在本教程中,您将诊断示例 DAG 并学习如何 排查调度和解析问题,改进 DAG 调度,以及 优化 DAG 代码和环境配置以提高性能。

目标

本部分列出了本教程中示例的目标。

示例:任务并发性较高导致调度程序故障和延迟

  • 上传同时运行多次的示例 DAG 进行诊断 Cloud Monitoring 的调度器故障和延迟问题。

  • 整合任务并评估 性能影响

  • 随时间更均匀地分配任务并评估性能 影响。

  • 优化 Airflow 配置和环境配置, 评估其影响。

示例:复杂代码导致的 DAG 解析错误和延迟时间

  • 上传包含 Airflow 变量的示例 DAG,并使用 Cloud Monitoring 诊断解析问题。

  • 通过在 并评估对解析时间的影响。

  • 优化 Airflow 配置和环境配置,并评估对解析时间的影响。

费用

本教程使用 Google Cloud 的以下收费组件:

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

准备工作

本部分介绍了在开始本教程之前需要执行的操作。

创建和配置项目

在本教程中,您需要一个 Google Cloud 项目。按以下方式配置项目:

  1. 在 Google Cloud 控制台中,选择或创建项目

    前往“项目选择器”

  2. 确保您的项目已启用结算功能。 了解如何检查项目是否已启用结算功能

  3. 确保您的 Google Cloud 项目用户具有以下角色,以便创建必要的资源:

    • Environment and Storage Object Administrator (roles/composer.environmentAndStorageObjectAdmin)
    • Compute Admin (roles/compute.admin)

为您的项目启用 API

Enable the Cloud Composer API.

Enable the API

创建 Cloud Composer 环境

创建一个 Cloud Composer 2 环境

在创建环境的过程中,您需要向 Composer Service Agent 账号授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) 角色。Cloud Composer 使用此账号执行操作 Google Cloud 项目中的资源。

示例:由于任务调度问题而导致的调度器故障和任务失败

此示例演示了调试调度程序故障以及高任务并发导致的延迟。

将示例 DAG 上传到您的环境

将以下示例 DAG 上传到环境中 您在之前步骤中创建的应用在本教程中,此 DAG 的名称为 dag_10_tasks_200_seconds_1

此 DAG 包含 200 个任务。每项任务都会等待 1 秒并显示“Complete!”。 DAG 会在上传后自动触发。Cloud Composer 会运行此 DAG 10 次,并且所有 DAG 运行都是并发进行的。

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

诊断调度器故障和任务失败问题

DAG 运行完成后,打开 Airflow 界面,然后点击 dag_10_tasks_200_seconds_1 DAG。您会看到,总共有 10 次 DAG 运行作业成功,并且每项作业都有 200 个成功的任务。

查看 Airflow 任务日志:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 前往日志标签页,然后依次选择所有日志 > Airflow 日志 > 工作器 > 在日志浏览器中查看

在日志直方图中,您可以看到用红色指示的错误和警告 和橙色:

Airflow 工作器日志包含错误和警告的直方图
    以红色和橙色表示
图 1.Airflow 工作器日志直方图(点击可放大)

示例 DAG 产生了大约 130 条警告和 60 条错误。点击包含黄色和红色条形的任意列。您会在日志中看到以下某些警告和错误:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

这些日志可能表明资源用量超出了限制,并且工作器自行重启。

如果 Airflow 任务在队列中保留的时间过长,则调度器会将其标记为 failed 和 up_for_retry,并且将再次重新安排它以执行。观察这种情况的一种方法是查看显示已加入队列的任务数量的图表,如果此图表中的峰值在约 10 分钟内没有下降,则可能存在任务失败的情况(没有日志)。

查看监控信息:

  1. 前往监控标签页,然后选择概览

  2. 查看 Airflow 任务图表。

    一段时间内 Airflow 任务的图表,显示了已加入队列的任务数量出现了激增
    图 2. Airflow 任务图(点击可放大)

    在 Airflow 任务图中,排队等待的任务出现了一个峰值 超过 10 分钟,这可能意味着没有足够的资源 来处理所有计划任务。

  3. 查看活跃工作器数图表:

    随时间变化的活跃 Airflow 工作器图表显示,
    活跃工作器数量已增加至上限
    图 3. 活跃工作器数图表(点击可放大)

    活跃工作器图表指示 DAG 触发了自动扩缩 设置为 DAG 运行期间允许的最大工作器数(3 个)。

  4. 资源使用情况图表可能会表明 Airflow 工作器没有足够的容量来运行队列中的任务。在监控标签页上,选择工作器并 查看工作器 CPU 总使用率工作器内存总使用量图表。

    Airflow 工作器的 CPU 使用率图表会显示 CPU 使用率
    增加到上限
    图 4.工作器总 CPU 用量图表(点击可放大)
    Airflow 工作器的内存用量图显示了内存用量
    有所增加,但未达到上限
    图 5. 工作器内存总用量图表(点击可放大)

    从这些图表可以看出,同时执行了太多任务 会导致达到 CPU 限制资源的使用时间已超过 30 分钟,这甚至比 10 个 DAG 依次运行的 200 个任务的总时长还要长。

这些指标表示队列已满,并且缺少资源来处理所有已安排的任务。

合并任务

当前代码会创建许多 DAG 和任务,但没有足够的资源来并行处理所有任务,这会导致队列被填满。将任务在队列中保留的时间过长可能会导致任务重新安排或失败。在此类情况下,您应当选择较少的 任务。

以下示例 DAG 更改了初始示例中的任务数量 并将等待时间从 1 秒增加到 10 秒, 执行相同工作量的综合性任务。

将以下示例 DAG 上传到您创建的环境。在本教程中,此 DAG 名为 dag_10_tasks_20_seconds_10

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

评估更多合并的任务对调度流程的影响:

  1. 等待 DAG 运行完成。

  2. 在 Airflow 界面的 DAG 页面上,点击 dag_10_tasks_20_seconds_10 DAG。您会看到 10 个 DAG 运行作业,每个作业都有 20 个成功的任务。

  3. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  4. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  5. 转到日志标签页,然后转到所有日志 > Airflow 日志 > 工作器 > 在日志浏览器中查看

    第二个示例包含更多合并的任务,因此产生了大约 10 个警告和 7 个错误。在直方图中,您可以比较第一个示例(早期值)和第二个示例(后期值)中的错误和警告数量。

    包含错误和警告的 Airflow 工作器日志的直方图,显示了任务合并后错误和警告数量的减少
    图 6. 任务合并后的 Airflow 工作器日志直方图(点击可放大)

    将第一个示例与整合程度更高的示例进行比较时,您可以 可以看到,在第二个事件中,错误和警告 示例。但是,与温关闭相关的相同错误仍会显示在 导致日志崩溃的问题

  6. 监控标签页上,选择工作器并查看图表。

    将第一个示例(较早值)的 Airflow 任务图表与第二个示例(任务更集中)的图表进行比较后,您会发现,当任务更集中时,队列中任务的激增持续时间更短。不过,它持续了近 10 分钟,仍然不太理想。

    Airflow 任务随时间变化的图表显示,
    Airflow 任务的持续时间比之前短。
    图 7.任务合并后的 Airflow 任务图表(点击可放大)

    在“活跃工作器数”图表中,您可以看到第一个示例(图表左侧)使用资源的时间比第二个示例长得多,即使这两个示例模拟的工作量相同。

    随时间变化的活跃 Airflow 工作器图表显示,
    活跃工作器数量在更短时间内增加
    图 8.任务合并后的活跃工作器数图表(点击可放大)

    查看工作器资源消耗图表。尽管两者之间的差异 示例中使用的资源是整合程度更高的任务, 初始示例相当有意义,CPU 使用率仍然很高, 上限的 70%

    Airflow 工作器 CPU 使用率图表显示 CPU 使用率最高可达到上限的 70%
    图 9. 之后的工作器 CPU 总使用率图表 任务已合并(点击可放大)
    Airflow 工作器内存用量图表显示内存用量在增加,但未达到上限
    图 10.任务合并后的总工作器内存用量图表(点击可放大)

随时间更均匀地分配任务

并发任务过多会导致队列被填满,进而导致任务卡在队列中或重新调度。在前面的步骤中,您通过合并这些任务来减少了任务数量,但输出日志和监控结果表明,并发任务的数量仍然不太理想。

您可以通过实施时间表来控制并发任务运行的数量 或设置可同时运行的任务数量限制。

在本教程中,您将向 dag_10_tasks_20_seconds_10 DAG 添加 DAG 级参数,以便随着时间推移更平均地分配任务:

  1. 向 DAG 上下文管理器添加 max_active_runs=1 参数。此参数 设置在给定时刻仅运行的单个 DAG 实例的限制。

  2. max_active_tasks=5 参数添加到 DAG 上下文管理器。此参数 控制系统中可以并发运行的任务实例数上限, 每个 DAG

将以下示例 DAG 上传到您创建的环境。在本教程中,此 DAG 的名称为 dag_10_tasks_20_seconds_10_scheduled.py

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5


with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

评估随时间分配任务对调度流程的影响:

  1. 等待 DAG 运行完成。

  2. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  3. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  4. 前往日志标签页,然后依次选择所有日志 > Airflow 日志 > 工作器 > 在日志浏览器中查看

  5. 在直方图上,您可以看到,具有数量有限的第三个 DAG 有效任务和运行未生成任何警告或错误,并且 与之前的值相比,日志分布看起来更均匀。

    包含错误和警告的 Airflow 工作器日志的直方图显示,随着时间的推移,任务经过整合和分发后,没有出现错误或警告。
    图 11.Airflow 工作器记录了 任务经过合并和分布一段时间(点击可放大)

dag_10_tasks_20_seconds_10_scheduled 示例中的任务具有有限的活动任务数和运行次数,但由于任务均匀加入队列,因此并未造成资源压力。

执行上述步骤后,您通过合并小任务并在一段时间内更均匀地分布这些任务,优化了资源使用情况。

优化环境配置

您可以调整环境配置,以确保 Airflow 工作器始终有容量来运行排队的任务。

工作器数量和工作器并发数

您可以调整工作器数量上限 让 Cloud Composer 在 YAML 文件中 不超出所设的限制

[celery]worker_concurrency 参数定义了最大任务数 单个工作器可以从任务队列中选取更改此参数可调整单个工作器可以同时执行的任务量。您可以通过以下方式更改此 Airflow 配置选项: 覆盖它。默认情况下,工作器并发设置被设置为 32, 12 * worker_CPU, 8 * worker_memory,表示 取决于工作器资源限制。如需详细了解默认工作器并发设置值,请参阅优化环境

工作器数量和工作器并发性相互作用,并且环境性能在很大程度上取决于这两个参数。您可以参考以下注意事项选择正确的组合:

  • 并行运行多个快速任务。当队列中有任务在等待,同时工作器在使用一小部分 CPU 和内存时,您可以提高工作器并发性。不过,在某些情况下,队列可能永远不会填满,导致自动扩缩功能永远不会触发。如果小任务在新工作器前完成执行 现有工作器可以继续处理剩余的任务 对于新创建的工作器,将不会有任何任务。

    在这些情况下,建议增加工作器数量下限并提高工作器并发数量,以避免过度扩缩。

  • 并行运行多个长时间运行的任务。工作器并发数高 可防止系统扩缩工作器数量。如果多个任务需要大量资源且需要很长时间才能完成,工作器并发性较高可能会导致队列永远不会填满,并且所有任务都由一个工作器处理,从而导致性能问题。在这些情况下,建议增加工作器数量上限并降低工作器并发数量

并行的重要性

Airflow 调度器可通过 DAG。[core]parallelism Airflow 配置选项可控制在满足任务的所有依赖项后,Airflow 调度器可将多少任务添加到执行程序的队列中。

并行性是 Airflow 的一种保护机制,用于确定每个调度器可以同时运行多少任务,而不管工作器数量如何。并行处理值乘以集群中的调度器数量, 是您的环境可以加入队列的任务实例数上限。

通常,[core]parallelism 设置为工作器数量上限的乘积 和 [celery]worker_concurrency。还会受到 pool。 您可以通过替换此 Airflow 配置选项来更改此选项。如需详细了解如何调整与扩缩相关的 Airflow 配置,请参阅扩缩 Airflow 配置

找到最佳环境配置

解决调度问题的推荐方法是将一些小任务整合为 大型任务,以及随着时间的推移更均匀地分配任务。除了 优化 DAG 代码,还可以优化环境配置, 有足够的容量来同时运行多个任务。

例如,假设您整合 DAG 中的任务 同时限制活跃任务,使其更均匀地分布在 而不是您的特定用例的首选解决方案。

您可以调整并行性、工作器数量和工作器并发数 在不限制活跃状态的情况下运行 dag_10_tasks_20_seconds_10 DAG 的参数 任务。在此示例中,DAG 运行了 10 次,每次运行包含 20 个小任务。 如果您想同时运行这些测试,请执行以下操作:

  • 您需要扩大环境大小,因为它可控制性能 您环境中的托管式 Cloud Composer 基础架构的参数。

  • Airflow 工作器必须能够同时运行 20 个任务,这意味着您需要将工作器并发设置为 20。

  • 工作器需要足够的 CPU 和内存来处理所有任务。工作器并发性会受到工作器 CPU 和内存的影响,因此您需要至少有 worker_concurrency / 12 个 CPU 和 least worker_concurrency / 8 内存。

  • 您需要增加并行性,以匹配较高的工作器并发数。 为了让工作器从队列中提取 20 个任务,调度程序需要先调度这 20 个任务。

您可以通过以下方式调整您的环境配置:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到环境配置标签页。

  4. 找到资源 > 工作负载配置,然后点击修改

  5. 工作器部分的内存字段中,为 Airflow 工作器指定新的内存限制。在本教程中,请使用 4 GB。

  6. CPU 字段中,为 Airflow 工作器指定新的 CPU 限制。在本教程中,请使用 2 个 vCPU。

  7. 保存更改,并等待 Airflow 工作器几分钟 重启。

接下来,替换并行性和工作器并发性 Airflow 配置选项:

  1. 前往 Airflow 配置替换标签页。

  2. 点击修改,然后点击添加 Airflow 配置替换

  3. 替换并行处理配置:

    core parallelism 20
  4. 点击添加 Airflow 配置替换,然后替换 worker 并发配置:

    celery worker_concurrency 20
  5. 点击保存,然后等待环境更新其配置。

使用调整后的配置再次触发同一示例 DAG:

  1. 在 Airflow 界面中,前往 DAG 页面。

  2. 找到 dag_10_tasks_20_seconds_10 DAG 并将其删除。

    删除 DAG 后,Airflow 检查 环境的存储桶并自动再次运行该 DAG。

DAG 运行完成后,请再次查看“日志”直方图。上图中 可以看到,dag_10_tasks_20_seconds_10 示例包含更多 使用 调整后的环境配置。将结果与先前的数据进行比较 同一示例生成了错误和警告, 以 tge 默认环境配置运行的

调整环境配置后,包含错误和警告的 Airflow 工作器日志的直方图显示没有错误和警告
图 12. 调整环境配置后的 Airflow 工作器日志直方图(点击可放大)

环境配置和 Airflow 配置在 但无法为集群内的 超过特定限制。

我们建议您优化 DAG 代码、整合任务并使用调度功能 以优化性能和效率。

示例:由于 DAG 代码复杂而导致 DAG 解析错误和延迟

在此示例中,您将调查一个示例 DAG 的解析延迟时间, 会模仿过多的 Airflow 变量。

创建新的 Airflow 变量

在上传示例代码之前,请创建一个新的 Airflow 变量。

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。

  3. 依次前往管理 > 变量 >添加新记录

  4. 设置以下值:

    • 键:example_var
    • val:test_airflow_variable

将示例 DAG 上传到您的环境

将以下示例 DAG 上传到环境中 您在之前步骤中创建的应用在本教程中,此 DAG 名为 dag_for_loop_airflow_variable

此 DAG 包含一个运行 1,000 次的 for 循环,模拟了过多的 Airflow 变量。每次迭代都会读取 example_var 变量 生成任务。每个任务都包含一个用于输出变量值的命令。

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

诊断解析问题

DAG 解析时间是 Airflow 调度器读取所需时间 生成一个 DAG 文件并对其进行解析。在 Airflow 调度器可以安排 DAG 中的任何任务之前,调度器必须解析 DAG 文件,以发现 DAG 和定义的任务的结构。

如果 DAG 需要很长时间才能解析,这会占用调度器的容量,并且可能会降低 DAG 运行的性能。

如需监控 DAG 解析时间,请执行以下操作:

  1. 在 gcloud CLI 中运行 dags report Airflow CLI 命令,查看所有 DAG 的解析时间:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    替换以下内容:

    • ENVIRONMENT_NAME:您的环境的名称。
    • LOCATION:环境所在的区域。
  2. 在命令的输出中,查找 dag_for_loop_airflow_variables DAG 的时长值。值较大可能表示 此 DAG 的实现方式并不理想。如果您有多个 DAG 从输出表中,您可以确定哪些 DAG 的解析时间较长。

    示例:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. 在 Google Cloud 控制台中检查 DAG 解析时间:

    1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  4. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  5. 前往日志标签页,然后依次选择所有日志 > DAG 处理器管理器

  6. 查看 dag-processor-manager 日志并找出可能存在的问题。

    示例 DAG 的日志条目显示 DAG 解析时间为 46.3 秒
    图 13.DAG 处理器管理器日志显示 DAG 解析时间(点击可放大)

如果 DAG 总解析时间超过约 10 秒,则调度器可能因 DAG 解析而过载,因而无法有效运行 DAG。

优化 DAG 代码

时间是 建议 以避免不必要的“顶级”DAG 中的 Python 代码。如果 DAG 包含许多外部导入、变量和函数,则会导致 Airflow 调度器的解析时间更长。这会降低 Cloud Composer 和 Airflow 的性能和可伸缩性。过多读取 Airflow 变量会导致解析时间过长和数据库负载过高。如果此代码位于 DAG 中 则这些函数会在每个调度程序检测信号发生时执行,这可能会很慢。

借助 Airflow 的模板字段,您可以将 Airflow 变量和 Jinja 模板中的值纳入 DAG。这可防止在调度程序心跳期间执行不必要的函数。

如需更好地实现 DAG 示例,请避免在 DAG 的顶级 Python 代码中使用 Airflow 变量。而是通过 Jinja 模板将 Airflow 变量传递给现有运算符,这样系统会延迟读取值,直到任务执行时再读取。

将新版示例 DAG 上传到您的环境。在本教程中,此 DAG 的名称为 dag_for_loop_airflow_variable_optimized

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

检查新的 DAG 解析时间:

  1. 等待 DAG 运行完成。

  2. 再次运行 dags report 命令,查看所有 DAG 的解析时间:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. 再次查看 dag-processor-manager 日志并 分析解析时长。

    示例 DAG 的日志条目显示 DAG 解析时间为 4.21 秒
    图 14.DAG 处理器管理器日志显示 DAG 代码优化后的 DAG 解析时间(点击可放大)

通过将环境变量替换为 Airflow 模板,您简化了 DAG 代码,并将解析延迟时间缩短了约 10 倍。

优化 Airflow 环境配置

Airflow 调度器会不断尝试触发新任务并解析所有 DAG 创建 Deployment 清单如果 DAG 的解析时间很长,并且调度器会消耗大量资源,您可以优化 Airflow 调度器配置,以便调度器更高效地使用资源。

在本教程中,DAG 文件需要大量时间来解析和解析周期 开始重叠,然后耗尽调度器的容量。在我们的示例中,第一个示例 DAG 的解析时间超过 5 秒,因此您需要将调度程序配置为运行频率更低,以便更高效地使用资源。您 将覆盖 scheduler_heartbeat_sec Airflow 配置选项。此配置定义了调度程序应运行的频率(以秒为单位)。默认情况下,该值设置为 5 秒。 您可以通过替换此 Airflow 配置选项来更改此选项。

替换 scheduler_heartbeat_sec Airflow 配置选项:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到 Airflow 配置替换标签页。

  4. 点击修改,然后点击添加 Airflow 配置替换

  5. 替换 Airflow 配置选项:

    scheduler scheduler_heartbeat_sec 10
  6. 点击保存,然后等待环境更新其配置。

检查调度器指标:

  1. 转到监控标签页,然后选择调度器

  2. 调度程序心跳图表中,点击更多选项按钮(三点状图标),然后点击在 Metrics Explorer 中查看

调度器心跳图显示心跳发生频率更低
图 15. 调度器检测信号图表(点击可放大)

在图表上,您会发现在完成上述操作后,调度程序的运行频率会降低两倍 将默认配置从 5 秒更改为 10 秒。通过降低心跳频率,您可以确保在之前的解析周期进行且调度程序的资源容量未耗尽时,调度程序不会开始运行。

为调度程序分配更多资源

在 Cloud Composer 2 中,您可以向调度程序分配更多 CPU 和内存资源。通过这种方式,您可以提高调度器的性能, 缩短 DAG 解析时间。

向调度器分配额外的 CPU 和内存:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到环境配置标签页。

  4. 找到资源 > 工作负载配置,然后点击修改

  5. 调度程序部分的内存字段中,指定新的内存限制。在本教程中,请使用 4 GB。

  6. CPU 字段中,指定新的 CPU 上限。在本课中, 使用 2 个 vCPU

  7. 保存更改,并等待 Airflow 调度器等待几分钟 重启。

  8. 前往日志标签页,然后依次选择所有日志 > DAG 处理器管理器

  9. 查看 dag-processor-manager 日志并比较 示例 DAG:

    示例 DAG 的日志条目显示,经过优化的 DAG 的 DAG 解析时间为 1.5 秒。对于未优化的 DAG,解析时间为 28.71 秒
    图 16. DAG 处理器管理器日志显示向调度器分配更多资源后的 DAG 解析时间(点击可放大)

与默认环境配置相比,通过向调度程序分配更多资源,您可以提高调度程序的容量并显著缩短解析延迟时间。资源越多,调度程序解析 DAG 的速度就越快,但与 Cloud Composer 资源相关的费用也会增加。此外,也无法增加 超过一定限制的资源。

我们建议您仅在可能的 DAG 代码分配和 实现了 Airflow 配置优化。

清理

为避免系统因资源向您的 Google Cloud 账号收取费用 您可以删除包含这些资源的项目 或者保留项目而删除各个资源。

删除项目

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除各个资源

如果您打算探索多个教程和快速入门,重复使用项目可以帮助您避免超出项目配额上限。

删除 Cloud Composer 环境。在此过程中,您还会删除环境的存储桶。

后续步骤