调试任务调度问题

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本教程将指导您对任务安排进行诊断和问题排查 以及解析导致调度程序故障、解析错误和 延迟时间和任务失败

简介

Airflow 调度器主要受以下两个因素影响:任务调度和 DAG 解析。其中一个因素出现问题都可能会对环境健康和性能产生负面影响。

有时,系统会同时安排过多任务。在这种情况下, 队列已填满,但任务会保留在“已安排”状态或变为 在排队后重新调度,这可能会导致任务失败和性能 延迟时间

另一个常见问题是解析延迟时间和错误,这是由 DAG 代码例如,如果 DAG 代码在代码顶层包含 Airflow 变量,则可能会导致解析延迟、数据库过载、调度失败和 DAG 超时。

在本教程中,您将诊断示例 DAG,并学习如何排查调度和解析问题、改进 DAG 调度,以及优化 DAG 代码和环境配置以提升性能。

目标

本部分列出了本教程中示例的目标。

示例:高任务并发导致的调度器故障和延迟

  • 上传同时运行多次的示例 DAG,并使用 Cloud Monitoring 诊断调度程序故障和延迟问题。

  • 通过合并任务来优化 DAG 代码,并评估性能影响。

  • 随时间更均匀地分配任务并评估性能 影响。

  • 优化 Airflow 配置和环境配置, 评估其影响。

示例:由复杂代码导致的 DAG 解析错误和延迟

  • 上传包含 Airflow 变量的示例 DAG 并诊断解析问题 关于 Cloud Monitoring 的问题。

  • 通过在 并评估对解析时间的影响。

  • 优化 Airflow 配置和环境配置,并评估对解析时间的影响。

费用

本教程使用 Google Cloud 的以下收费组件:

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

准备工作

本部分介绍了在开始本教程之前需要执行的操作。

创建和配置项目

在本教程中,您需要一个 Google Cloud 项目。按以下方式配置项目:

  1. 在 Google Cloud 控制台中,选择或创建项目

    前往“项目选择器”

  2. 确保您的项目已启用结算功能。 了解如何检查项目是否已启用结算功能

  3. 如需创建必要的资源,请确保您的 Google Cloud 项目用户具有以下角色

    • Environment and Storage Object Administrator (roles/composer.environmentAndStorageObjectAdmin)
    • Compute Admin (roles/compute.admin)

为您的项目启用 API

Enable the Cloud Composer API.

Enable the API

创建 Cloud Composer 环境

创建一个 Cloud Composer 2 环境

在创建环境的过程中 您授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) 角色分配给 Composer Service Agent 。Cloud Composer 使用此账号执行操作 Google Cloud 项目中的资源。

示例:由于任务调度问题而导致的调度器故障和任务失败

此示例演示了调试调度程序故障以及高任务并发导致的延迟。

将示例 DAG 上传到您的环境

将以下示例 DAG 上传到环境中 您在之前步骤中创建的应用在本教程中,此 DAG 的名称为 dag_10_tasks_200_seconds_1

此 DAG 包含 200 个任务。每项任务都会等待 1 秒并显示“Complete!”。 DAG 会在上传后自动触发。Cloud Composer 会运行此 DAG 10 次,并且所有 DAG 运行都是并发进行的。

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

诊断调度器故障和任务失败问题

DAG 运行完成后,打开 Airflow 界面,然后点击 dag_10_tasks_200_seconds_1 DAG。您会看到,总共有 10 次 DAG 运行作业成功,并且每项作业都有 200 个成功的任务。

查看 Airflow 任务日志:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到日志标签页,然后转到所有日志 > Airflow 日志 > 工作器 > 在日志浏览器中查看

在日志直方图中,您可以看到用红色指示的错误和警告 和橙色:

Airflow 工作器日志的直方图,其中错误和警告用红色和橙色表示
图 1.Airflow 工作器日志直方图(点击可放大)

示例 DAG 产生了大约 130 条警告和 60 条错误。点击包含黄色和红色条形的任意列。您会在日志中看到以下某些警告和错误:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

这些日志可能表明资源使用量超出了限制, 工作器自行重启。

如果 Airflow 任务在队列中保留的时间过长,调度器就会 执行失败,然后 up_for_retry 处理请求,并将再次重新安排 执行。观察这种情况的症状的方法之一是查看 显示已加入队列的任务数的图表, 则任务可能会出现失败(没有 日志)。

查看监控信息:

  1. 转到监控标签页,然后选择概览

  2. 查看 Airflow 任务图表。

    Airflow 任务随时间变化的图,其中显示了
    已加入队列的任务数
    图 2. Airflow 任务图(点击可放大)

    在 Airflow 任务图中,排队等待的任务出现了一个峰值 超过 10 分钟,这可能意味着没有足够的资源 来处理所有计划任务。

  3. 查看活跃工作器图表:

    随时间变化的活跃 Airflow 工作器图表显示,
    活跃工作器数量已增加至上限
    图 3.活跃工作器图表(点击可放大)

    活跃工作器图表指示 DAG 触发了自动扩缩 设置为 DAG 运行期间允许的最大工作器数(3 个)。

  4. 资源使用情况图表可能会表明 Airflow 工作器没有足够的容量来运行队列中的任务。在监控标签页上,选择工作器并 查看工作器 CPU 总使用率工作器内存总使用量图表。

    Airflow 工作器 CPU 使用率图表显示 CPU 使用率增加到上限
    图 4. 工作器总 CPU 用量图表(点击可放大)
    Airflow 工作器的内存用量图显示了内存用量
    有所增加,但未达到上限
    图 5. 工作器总内存用量图(点击可放大)

    图表表明,由于同时执行的任务过多,导致 CPU 用量达到上限。资源的使用时间已超过 30 分钟,这甚至比 10 个 DAG 依次运行的 200 个任务的总时长还要长。

这些指标表示队列已满,并且缺少资源来处理所有已安排的任务。

合并任务

当前代码会创建许多 DAG 和任务,但没有足够的资源来并行处理所有任务,这会导致队列被填满。将任务在队列中保留的时间过长可能会导致任务重新安排或失败。 在此类情况下,您应当选择较少的 任务。

以下示例 DAG 更改了初始示例中的任务数量 并将等待时间从 1 秒增加到 10 秒, 执行相同工作量的综合性任务。

将以下示例 DAG 上传到您创建的环境。在本教程中,此 DAG 名为 dag_10_tasks_20_seconds_10

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

评估更多合并的任务对调度流程的影响:

  1. 等待 DAG 运行完成。

  2. 在 Airflow 界面的 DAG 页面上,点击 dag_10_tasks_20_seconds_10 DAG。您会看到 10 个 DAG 运行作业,每个作业都有 20 个成功的任务。

  3. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  4. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  5. 转到日志标签页,然后转到所有日志 > Airflow 日志 > 工作器 > 在日志浏览器中查看

    第二个例子中的任务整合程度更高, 10 个警告和 7 个错误。在直方图上,您可以 第一个示例(之前的值)和第二个示例中的 示例(之后的值)。

    Airflow 工作器日志包含错误和警告的直方图
    显示了完成任务后,错误和警告的数量减少了
    合并
    图 6. Airflow 工作器记录了 任务已合并(点击可放大)

    将第一个示例与整合程度更高的示例进行比较时,您可以 可以看到,在第二个事件中,错误和警告 示例。但是,与温关闭相关的相同错误仍会显示在 导致日志崩溃的问题

  6. 监控标签页中,选择工作器,然后查看图表。

    当您将第一个示例(之前的示例)的 Airflow 任务图表进行比较时, 第二个示例的图表,其中包含更整合的任务, 可以看到,排队任务激增所持续的时间较短 任务整合程度的提升情况。不过,它持续了近 10 分钟,仍然不太理想。

    Airflow 任务随时间变化的图表显示,
    Airflow 任务的持续时间比之前短。
    图 7. 任务合并后的 Airflow 任务图表(点击可放大)

    在“活跃工作器数”图表中,您可以看到第一个示例(图表左侧)使用资源的时间比第二个示例长得多,即使这两个示例模拟的工作量相同。

    随时间变化的活跃 Airflow 工作器图表显示,
    活跃工作器数量在更短时间内增加
    图 8.任务合并后的活跃工作器数图表(点击可放大)

    查看工作器资源消耗图表。尽管两者之间的差异 示例中使用的资源是整合程度更高的任务, 初始示例相当有意义,CPU 使用率仍然很高, 上限的 70%

    Airflow 工作器的 CPU 使用率图表会显示 CPU 使用率
    最高可增加到上限的 70%
    图 9.之后的工作器 CPU 总使用率图表 任务已合并(点击可放大)
    Airflow 工作器内存用量图表显示内存用量在增加,但未达到上限
    图 10.任务合并后的总工作器内存用量图表(点击可放大)

随着时间推移更平均地分配任务

太多并发任务会导致队列被填满,从而导致 卡在队列中或重新调度的任务。在前面的步骤中, 通过整合这些任务减少了任务数量,但 日志和监控表明 次优。

您可以通过实施时间表来控制并发任务运行的数量 或设置可同时运行的任务数量限制。

在本教程中,您可以通过添加 DAG 级参数输入 dag_10_tasks_20_seconds_10 DAG 中:

  1. max_active_runs=1 参数添加到 DAG 上下文管理器。此参数会限制在给定时间内仅运行单个 DAG 实例。

  2. 向 DAG 上下文管理器添加 max_active_tasks=5 参数。此参数用于控制每个 DAG 中可以同时运行的任务实例数上限。

将以下示例 DAG 上传到您创建的环境。在本教程中,此 DAG 名为 dag_10_tasks_20_seconds_10_scheduled.py

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5


with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

评估随时间分配任务对调度流程的影响:

  1. 等待 DAG 运行完成。

  2. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  3. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  4. 转到日志标签页,然后转到所有日志 > Airflow 日志 > 工作器 > 在日志浏览器中查看

  5. 在直方图上,您可以看到,具有数量有限的第三个 DAG 有效任务和运行未生成任何警告或错误,并且 与之前的值相比,日志分布看起来更均匀。

    Airflow 工作器日志包含错误和警告的直方图
    合并任务后,不会显示任何错误或警告,
    随时间变化的情况。
    图 11.任务随时间推移进行整合和分发后,Airflow 工作器日志直方图(点击可放大)

dag_10_tasks_20_seconds_10_scheduled 示例中的任务具有 有限数量的活动任务和运行不会导致资源压力, 任务已均匀地排入队列。

执行完所述步骤后,您优化了资源使用情况,具体方法如下: 整合小任务并随着时间的推移更均匀地分配。

优化环境配置

您可以调整环境配置,确保 Airflow 工作器始终具有容量来运行已加入队列的任务。

工作器数量和工作器并发数

您可以调整工作器数量上限,让 Cloud Composer 在设定的限制内自动扩缩您的环境。

[celery]worker_concurrency 参数定义了最大任务数 单个工作器可以从任务队列中选取更改此参数 调整单个工作器可以同时执行的任务数量。 您可以通过替换此 Airflow 配置选项来更改此选项。默认情况下,工作器并发设置被设置为 32, 12 * worker_CPU, 8 * worker_memory,表示 取决于工作器资源限制。如需详细了解默认工作器并发设置值,请参阅优化环境

工作器数量和工作器并发性相互作用,并且环境性能在很大程度上取决于这两个参数。您可以参考以下注意事项选择正确的组合:

  • 并行运行多个快速任务。你可以增加工作器的数量 当队列中有任务等待处理,并且您的工作器使用 同时占用较低比例的 CPU 和内存。不过,在某些情况下,队列可能永远不会填满,导致自动扩缩功能永远不会触发。如果小任务在新工作器前完成执行 现有工作器可以继续处理剩余的任务 对于新创建的工作器,将不会有任何任务。

    在这种情况下,我们建议您 增加工作器数量下限并提高工作器并发性 以避免过度伸缩

  • 并行运行多个长时间运行的任务。较高的工作器并发性会阻止系统扩缩工作器数量。如果多个任务需要大量资源且需要很长时间才能完成,工作器并发性较高可能会导致队列永远不会填满,并且所有任务都由一个工作器处理,从而导致性能问题。在这些情况下,建议增加工作器数量上限并降低工作器并发数量

并行处理的重要性

Airflow 调度器可通过 DAG。[core]parallelism Airflow 配置选项用于控制 Airflow 调度器可以将所有任务加入执行器的队列中, 这些任务的依赖项。

并行处理是 Airflow 的一种保护机制, 可以按每个调度器同时运行,而不考虑工作器数量。 并行处理值乘以集群中的调度器数量, 是您的环境可以加入队列的任务实例数上限。

通常,[core]parallelism 会设置为工作器数量上限和 [celery]worker_concurrency 的乘积。它还会受到广告资源池的影响。您可以通过替换此 Airflow 配置选项来更改此选项。如需详细了解如何调整与扩缩相关的 Airflow 配置,请参阅扩缩 Airflow 配置

找到最佳环境配置

建议您将小任务合并为更大的任务,并随着时间推移更均匀地分配任务,以解决调度问题。除了优化 DAG 代码之外,您还可以优化环境配置,以便有足够的容量来并行运行多个任务。

例如,假设您整合 DAG 中的任务 同时限制活跃任务,使其更均匀地分布在 而不是您的特定用例的首选解决方案。

您可以调整并行性、工作器数量和工作器并发数 在不限制活跃状态的情况下运行 dag_10_tasks_20_seconds_10 DAG 的参数 任务。在此示例中,DAG 会运行 10 次,每次运行包含 20 个小任务。如果您想同时运行这些测试,请执行以下操作:

  • 您需要更大的环境大小,因为它用于控制环境的代管式 Cloud Composer 基础架构的性能参数。

  • Airflow 工作器必须能够同时运行 20 个任务,这意味着您需要将工作器并发设置为 20。

  • 工作器需要足够的 CPU 和内存来处理所有任务。工作器 并发受工作器 CPU 和内存的影响,因此,您需要 CPU 占用率至少为 worker_concurrency / 12,在 least worker_concurrency / 8 中运行 内存中。

  • 您需要增加并行性,以匹配较高的工作器并发数。 为了让工作器从队列中挑选 20 个任务,调度器 需要先安排这 20 个任务。

您可以通过以下方式调整您的环境配置:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到环境配置标签页。

  4. 找到资源 > 工作负载配置,然后点击修改

  5. 工作器部分的内存字段中,为 Airflow 工作器指定新的内存限制。在本教程中,请使用 4 GB。

  6. CPU 字段中,为 Airflow 工作器指定新的 CPU 限制。在本教程中,请使用 2 个 vCPU。

  7. 保存更改,并等待 Airflow 工作器几分钟 重启。

接下来,替换并行性和工作器并发性 Airflow 配置选项:

  1. 前往 Airflow 配置替换标签页。

  2. 点击修改,然后点击添加 Airflow 配置替换

  3. 替换并行处理配置:

    core parallelism 20
  4. 点击添加 Airflow 配置替换并替换工作器 并发配置:

    celery worker_concurrency 20
  5. 点击保存,然后等待环境更新其配置。

使用调整后的配置再次触发同一示例 DAG:

  1. 在 Airflow 界面中,前往 DAG 页面。

  2. 找到 dag_10_tasks_20_seconds_10 DAG 并将其删除。

    删除 DAG 后,Airflow 会检查环境存储桶中的 DAG 文件夹,并自动再次运行 DAG。

DAG 运行完成后,再次查看日志直方图。上图中 可以看到,dag_10_tasks_20_seconds_10 示例包含更多 使用 调整后的环境配置。将结果与先前的数据进行比较 同一示例生成了错误和警告, 以 tge 默认环境配置运行的

Airflow 工作器日志包含错误和警告的直方图
        启用环境配置后,
        已调整
图 12.调整环境配置后,Airflow 工作器日志直方图(点击可放大)

环境配置和 Airflow 配置在任务调度中发挥着至关重要的作用,但无法超出特定限制来增加配置。

我们建议优化 DAG 代码、合并任务,并使用调度功能来优化性能和效率。

示例:由于 DAG 代码复杂而导致的 DAG 解析错误和延迟

在本例中,您将调查模拟过多 Airflow 变量的示例 DAG 的解析延迟时间。

创建新的 Airflow 变量

在上传示例代码之前,请创建一个新的 Airflow 变量。

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. Airflow Web 服务器列中,点击与您的环境对应的 Airflow 链接。

  3. 依次前往管理 > 变量 >添加新记录

  4. 设置以下值:

    • 键:example_var
    • val:test_airflow_variable

将示例 DAG 上传到您的环境

将以下示例 DAG 上传到您在前面步骤中创建的环境。在本教程中,此 DAG 名为 dag_for_loop_airflow_variable

此 DAG 包含一个运行 1,000 次的 for 循环,模拟了过多的 Airflow 变量。每次迭代都会读取 example_var 变量 生成任务。每个任务都包含一个用于输出变量值的命令。

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

诊断解析问题

DAG 解析时间是 Airflow 调度器读取和解析 DAG 文件所需的时间。在 Airflow 调度器可以安排 DAG 中的任何任务之前,调度器必须解析 DAG 文件,以发现 DAG 和定义的任务的结构。

如果 DAG 需要很长时间来解析,则会消耗调度器的容量, 可能会降低 DAG 运行的性能。

如需监控 DAG 解析时间,请执行以下操作:

  1. 在 gcloud CLI 中运行 dags report Airflow CLI 命令,查看所有 DAG 的解析时间:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    替换以下内容:

    • ENVIRONMENT_NAME:您的环境的名称。
    • LOCATION:环境所在的区域。
  2. 在命令的输出中,查找 dag_for_loop_airflow_variables DAG 的时长值。值较大可能表示 此 DAG 的实现方式并不理想。如果您有多个 DAG,则可以从输出表中确定哪些 DAG 的解析时间较长。

    示例:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. 在 Google Cloud 控制台中检查 DAG 解析时间:

    1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  4. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  5. 前往日志标签页,然后依次选择所有日志 > DAG 处理器管理器

  6. 查看 dag-processor-manager 日志并找出可能存在的问题。

    示例 DAG 的日志条目显示 DAG 解析时间为 46.3 秒
    图 13.DAG 处理器管理器日志显示 DAG 解析时间(点击可放大)

如果 DAG 总解析时间超过约 10 秒,则调度器可能因 DAG 解析而过载,因而无法有效运行 DAG。

优化 DAG 代码

时间是 建议 以避免不必要的“顶级”DAG 中的 Python 代码。包含多个 Deployment 的 DAG 之外的导入、变量和函数可以提高解析度 Airflow 调度器的运行时间。这会降低 Cloud Composer 和 Airflow。过多读取 Airflow 变量会导致解析时间过长和数据库负载过高。如果此代码位于 DAG 中 则这些函数会在每个调度程序检测信号发生时执行,这可能会很慢。

Airflow 的模板字段可让您合并来自 Airflow 的值 变量和 Jinja 模板导入您的 DAG。这可防止在调度程序心跳期间执行不必要的函数。

如需更好地实现 DAG 示例,请避免在 DAG 的顶级 Python 代码中使用 Airflow 变量。而是通过 Jinja 模板将 Airflow 变量传递给现有运算符,这样系统会延迟读取值,直到任务执行时再读取。

将新版示例 DAG 上传到您的环境。在本教程中,此 DAG 名为 dag_for_loop_airflow_variable_optimized

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

检查新的 DAG 解析时间:

  1. 等待 DAG 运行完成。

  2. 再次运行 dags report 命令,查看所有 DAG 的解析时间:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. 再次查看 dag-processor-manager 日志,并分析解析时长。

    示例 DAG 的日志条目显示 DAG 解析时间为 4.21 秒
    图 14. DAG 处理器管理器日志显示 DAG DAG 代码优化后的解析时间(点击可放大)

通过将环境变量替换为 Airflow 模板, DAG 代码,并将解析延迟时间缩短了约 10 倍。

优化 Airflow 环境配置

Airflow 调度器会不断尝试触发新任务并解析所有 DAG 创建 Deployment 清单如果您的 DAG 解析时间较长,并且 调度器会消耗大量资源,您可以优化 Airflow 调度器 配置,以便调度器更高效地使用资源。

在本教程中,DAG 文件需要大量时间来解析和解析周期 开始重叠,然后耗尽调度器的容量。在我们的示例中, 第一个示例 DAG 的解析时间超过 5 秒,因此您需要配置 降低调度器的运行频率,以便更高效地使用资源。您将替换 scheduler_heartbeat_sec Airflow 配置选项。此配置定义了调度程序应运行的频率(以秒为单位)。默认情况下,该值设置为 5 秒。 您可以通过以下方式更改此 Airflow 配置选项: 覆盖它

替换 scheduler_heartbeat_sec Airflow 配置选项:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到 Airflow 配置替换标签页。

  4. 点击修改,然后点击添加 Airflow 配置替换

  5. 替换 Airflow 配置选项:

    scheduler scheduler_heartbeat_sec 10
  6. 点击保存,然后等待环境更新其配置。

检查调度器指标:

  1. 前往 Monitoring(监控)标签页,然后选择 Schedulers(调度程序)。

  2. 调度器检测信号图表中,点击更多选项按钮 (三个点),然后点击在 Metrics Explorer 中查看

调度程序心跳图表显示心跳发生频率较低
图 15. 调度程序检测信号图表(点击可放大)

在图表中,您会看到将默认配置从 5 秒更改为 10 秒后,调度程序的运行频率降低了两倍。通过降低心跳频率,您可以确保在之前的解析周期进行且调度程序的资源容量未耗尽时,调度程序不会开始运行。

为调度器分配更多资源

在 Cloud Composer 2 中,您可以向 scheduler.通过这种方式,您可以提高调度器的性能, 缩短 DAG 解析时间。

为调度器分配额外的 CPU 和内存:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到环境配置标签页。

  4. 找到资源 > 工作负载配置,然后点击修改

  5. 调度器部分的内存字段中,指定新内存 上限。在本教程中,请使用 4 GB。

  6. CPU 字段中,指定新的 CPU 限制。在本课中, 使用 2 个 vCPU

  7. 保存更改,并等待几分钟,让 Airflow 调度器重启。

  8. 转到日志标签页,然后转到所有日志 > DAG 处理器管理器

  9. 查看 dag-processor-manager 日志,并比较示例 DAG 的解析时长:

    示例 DAG 的日志条目显示,优化 DAG 的 DAG 解析时间为 1.5 秒。对于未经优化的 DAG,解析时间为 28.71 秒
    图 16. DAG 处理器管理器日志显示向调度器分配更多资源后的 DAG 解析时间(点击可放大)

与默认环境配置相比,通过向调度程序分配更多资源,您可以提高调度程序的容量并显著缩短解析延迟时间。有了更多资源,调度程序就可以解析 但是,与 Cloud Composer 相关的费用 也会增加此外,资源数量无法超过一定的限制。

我们建议您仅在实现可能的 DAG 代码和 Airflow 配置优化后再分配资源。

清理

为避免系统因资源向您的 Google Cloud 账号收取费用 您可以删除包含这些资源的项目 或者保留项目而删除各个资源。

删除项目

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除各个资源

如果您打算探索多个教程和快速入门,重复使用项目可以帮助您避免超出项目配额上限。

删除 Cloud Composer 环境。您 删除环境的存储桶

后续步骤