Airflow 调度器问题排查

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

本页面介绍 Airflow 调度程序常见问题和问题排查步骤和信息。

确定问题的来源

如需开始进行问题排查,请确定问题是出现在 DAG 解析时间还是在执行时处理任务的时候。如需详细了解解析时间和执行时间,请参阅 DAG 解析时间与 DAG 执行时间之间的差异

检查 DAG 处理器日志

如果您有复杂的 DAG,由调度器运行的 DAG 处理器可能无法解析您的所有 DAG。这可能会导致许多问题 会出现以下症状。

症状

  • 如果 DAG 处理器在解析 DAG 时遇到问题,则可能由下面列出的问题组合导致。与静态 DAG 相比,如果 DAG 是动态生成的,这些问题的影响可能会更大。

  • DAG 在 Airflow 界面和 DAG 界面中不可见。

  • DAG 不会安排执行时间。

  • DAG 处理器日志中存在错误,例如:

    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Airflow 调度器遇到问题,会导致调度器重启。

  • 已安排执行的 Airflow 任务会被取消,DAG 会运行 对于无法解析的 DAG,可能会被标记为 failed。例如:

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

解决方案:

  • 增加与 DAG 解析相关的参数:

  • 更正或移除会导致 DAG 处理器出现问题的 DAG。

检查 DAG 解析时间

如需验证问题是否发生在 DAG 解析时间,请执行以下步骤。

控制台

在 Google Cloud 控制台中,您可以使用 Monitoring 页面和 Logs 标签页来检查 DAG 解析时间。

使用 Cloud Composer 监控页面检查 DAG 解析时间:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。系统随即会打开监控页面。

  3. Monitoring 标签页中,查看 DAG 运行部分中的所有 DAG 文件的总解析时间图表,并找出可能的问题。

    Composer Monitoring 标签页中的 DAG 运行部分显示您环境中 DAG 的健康状况指标

使用 Cloud Composer 日志标签页检查 DAG 解析时间:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。系统随即会打开监控页面。

  3. 转到日志标签页,然后从所有日志导航树中 选择 DAG 处理器管理器部分。

  4. 查看 dag-processor-manager 日志并找出可能存在的问题。

    DAG 处理器日志将显示 DAG 解析时间

gcloud - Airflow 1

使用带有 -r 标志的 list_dags 命令可查看所有 DAG 的解析时间。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    list_dags -- -r

您需要将其中的:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。

该命令的输出类似于以下内容:

-------------------------------------------------------------------
DagBag loading stats for /home/airflow/gcs/dags
-------------------------------------------------------------------
Number of DAGs: 5
Total task number: 13
DagBag parsing time: 0.6765180000000001
-----------+----------+---------+----------+-----------------------
file       | duration | dag_num | task_num | dags
-----------+----------+---------+----------+-----------------------
/dag_1.py  | 0.6477   |       1 |        2 | ['dag_1']
/dag_2.py  | 0.018652 |       1 |        2 | ['dag_2']
/dag_3.py  | 0.004024 |       1 |        6 | ['dag_3']
/dag_4.py  | 0.003476 |       1 |        2 | ['dag_4']
/dag_5.py  | 0.002666 |       1 |        1 | ['dag_5']
-----------+----------+---------+----------+-----------------------

查找 DagBag 解析时间值。如果值较大,则可能表示某个 DAG 未以最佳方式实施。从输出表中,您可以确定哪些 DAG 的解析时间较长。

gcloud - Airflow 2

使用 dags report 命令可查看所有 DAG 的解析时间。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

您需要将其中的:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。

该命令的输出类似于以下内容:

Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

查找表中列出的每个日期的 duration 值。 如果值较大,则可能表示某个 DAG 未以最佳方式实施。从输出表中,您可以确定哪些 DAG 的解析时间较长。

监控正在运行和已加入队列的任务

如需检查是否有任务卡在队列中,请执行以下步骤。

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到监控标签页。

  4. Monitoring 标签页中,查看 DAG 运行部分中的 Airflow 任务图表,以确定潜在问题。Airflow 任务 在 Airflow 中处于排队状态的任务,它们可以 Celery 或 Kubernetes Executor 代理队列。“Celery 队列中的任务数”是指已进入 Celery 代理队列的任务实例数。

DAG 解析时排查问题

以下部分介绍了 DAG 解析时一些常见问题的症状和可能的修复。

Cloud Composer 1 和 Airflow 1 中的 DAG 解析和调度

Airflow 2 中的 DAG 解析效率显著提高。如果您遇到与 DAG 解析和调度相关的性能问题,请考虑迁移到 Airflow 2。

在 Cloud Composer 1 中,调度程序与其他 Cloud Composer 组件一起在集群节点上运行。因此,与其他节点相比,单个集群节点的负载可能会更高或更低。调度程序的性能(DAG 解析和调度)可能会因调度程序运行的节点而异。此外,由于升级或维护操作,运行调度程序的单个节点可能会发生变化。此限制在 Cloud Composer 2 中已得到解决,您可以向调度程序分配 CPU 和内存资源,并且调度程序的性能不依赖于集群节点的负载。

受限线程数量

仅允许 DAG 处理器管理器(用于处理 DAG 文件的调度器部分)使用有限数量的线程可能会影响您的 DAG 解析时间。

如需解决此问题,请替换以下 Airflow 配置选项:

  • 对于 Airflow 1.10.12 及更低版本,请替换 max_threads 参数

    备注
    scheduler max_threads NUMBER_OF_CORES_IN_MACHINE - 1 NUMBER_OF_CORES_IN_MACHINE 替换为核心数
    部署在工作器节点机器中
  • 对于 Airflow 1.10.14 及更高版本,请替换 parsing_processes 参数

    备注
    scheduler parsing_processes NUMBER_OF_CORES_IN_MACHINE - 1 NUMBER_OF_CORES_IN_MACHINE 替换为工作器节点中核心的数量

任务的数量和时间分布

已知 Airflow 在安排大量小任务时会出现问题。在这种情况下,您应该选择使用较少的整合任务。

同时调度大量 DAG 或任务也可能是问题的来源。如需避免此问题,请随着时间推移更平均地分配您的任务。

在正在运行和已加入队列的任务中排查问题

以下部分介绍正在运行和已加入队列的任务的一些常见问题的症状和可能的修复。

任务队列过长

在某些情况下,调度器的任务队列可能会过长。如需了解如何优化工作器和 celcel 参数,请参阅将 Cloud Composer 环境与您的业务一起扩缩

使用 Airflow 调度器的时间表功能

从 Airflow 2.2 开始,您可以使用 名为 TimeTable 的新功能

您可以使用以下方法之一定义时间表:

受限集群资源

本部分仅适用于 Cloud Composer 1。

如果环境的 GKE 集群太小,无法处理所有 DAG 和任务,您可能会遇到性能问题。在这种情况下,请尝试以下解决方案之一:

  • 使用性能更高的机器类型创建一个新环境,并将 DAG 迁移到该环境中。
  • 创建更多 Cloud Composer 环境并在它们之间拆分 DAG。
  • 按照升级 GKE 节点的机器类型中的说明更改 GKE 节点的机器类型。由于此过程容易出错,因此这是最不建议的选项。
  • 升级在您的环境中运行 Airflow 数据库的 Cloud SQL 实例的机器类型,例如,使用 gcloud composer environments update 命令。Airflow 数据库性能低可能是导致 调度程序运行缓慢

避免在维护窗口内进行任务调度

您可以为您的 Web 应用定义特定的维护窗口 环境在这些时间段内,会出现 Cloud SQL 和 GKE 的维护事件。

让 Airflow 调度器忽略不必要的文件

您可以通过跳过 DAGs 文件夹中的不必要文件来提高 Airflow 调度器的性能。Airflow 调度器会忽略 .airflowignore 文件中指定的文件和文件夹。

如需让 Airflow 调度器忽略不必要的文件,请执行以下操作:

  1. 创建 .airflowignore 文件。
  2. 在此文件中,列出应忽略的文件和文件夹。
  3. 将此文件上传到环境的存储桶中的 /dags 文件夹。

如需详细了解 .airflowignore 文件格式,请参阅 Airflow 文档

Airflow 调度器进程暂停 DAG

Airflow 用户会暂停 DAG 以避免执行。这样可以节省 Airflow 工作器处理周期。

Airflow 调度器会继续解析已暂停的 DAG。如果您确实想要提高 Airflow 调度器性能,请使用 .airflowignore 或从 DAGs 文件夹中删除已暂停的 DAG。

在 DAG 中使用“wait_for_downstream”

如果您将 DAG 中的参数 wait_for_downstream 设置为 True,那么如果您希望某个任务成功,则该任务的所有直接下行任务都必须成功。这意味着,由于执行之前的 DAG 运行中的任务,属于特定 DAG 运行的任务的执行可能会变慢。有关详情,请访问 Airflow 文档

排队时间过长的任务将被取消并重新安排

如果 Airflow 任务在队列中保留的时间过长,则调度器 将重新安排执行作业(在 Airflow 2.3.1 之前的版本中, 该任务也会被标记为失败,并会在符合重试条件时重试)。

观察这种情况的症状的一种方式 查看包含排队任务数量的图表 (Cloud Composer 界面中的“监控”标签页) 如果此图表中的峰值在大约两个小时内没有下降 这些任务很可能会重新安排(没有日志),然后 “已采用的任务仍待处理 ...”调度器日志中的日志条目。 在这种情况下,您可能会看到“未找到日志文件...”消息 Airflow 任务日志中,因为该任务未执行。

一般来说,预期此任务失败,并且计划任务的下一个实例应该根据时间表执行。如果您在 Cloud Composer 环境中观察到许多此类情况,则可能表示您的环境中没有足够的 Airflow 工作器来处理所有计划任务。

解决方法:如需解决此问题,您需要确保 Airflow 工作器始终具有容量来运行已加入队列的任务。例如,您可以增加 worker 或 worker_concurrency 属性。您还可以将并行处理或池调整为 避免在排队时超出您的容量

有时,过时的任务可能会阻止特定 DAG 的执行

在一般情况下,Airflow 调度器应该能够处理这样的情况:队列中存在过时任务,并且由于某些原因导致无法正确执行它们(例如,过时任务所属的 DAG 被删除)。

如果调度器未完全清除这些过时任务,您可能需要 手动将其删除例如,在 Airflow 界面中,您可以 导航到(菜单 &gt; 浏览器 &gt;Task 实例),找到属于过时 DAG 的已加入队列的任务并将其删除。

如需解决此问题,请将您的环境升级到 Cloud Composer 2.1.12 或更高版本。

Cloud Composer 方法 [scheduler]min_file_process_interval 参数

Cloud Composer 更改了 Airflow 调度器使用 [scheduler]min_file_process_interval 的方式。

Airflow 1

如果 Cloud Composer 使用 Airflow 1,则用户可以设置 [scheduler]min_file_process_interval 的取值范围为 0 到 600 秒。值大于 与将 [scheduler]min_file_process_interval 设置为 600 秒时得到的结果相同。

Airflow 2

在 Airflow 2 中,[scheduler]min_file_process_interval 只能用于 1.19.9 和 2.0.26 或更高版本

  • 1.19.9 和 2.0.26 之前的 Cloud Composer 版本

    在这些版本中,系统会忽略 [scheduler]min_file_process_interval

  • Cloud Composer 1.19.9 版或 2.0.26 版或更高版本

    Airflow 调度器会在所有 DAG 达到一定次数后重启 [scheduler]num_runs 参数用于控制调度器执行该任务的次数。当调度程序到达 [scheduler]num_runs 调度循环时,它会重启 - 调度程序是无状态组件,此类重启是对调度程序可能遇到的任何问题的自动修复机制。如果未指定,系统会应用 [scheduler]num_runs 的默认值,即 5000。

    [scheduler]min_file_process_interval 可用于配置 DAG 解析的频率,但此参数不得超过调度器在调度 DAG 时执行 [scheduler]num_runs 循环所需的时间。

扩缩 Airflow 配置

Airflow 提供了 Airflow 配置选项,可用于控制 Airflow 可以同时执行的任务数量和 DAG 数量。要设置这些配置选项 根据您的环境替换它们的值。

  • 工作器并发

    参数 [celery]worker_concurrency控制 Airflow 工作器可以同时执行的最大任务数。如果将此参数的值乘以您的 Cloud Composer 环境中的 Airflow 工作器的数量,将得出在您的环境中,在给定的时刻可执行的最大任务数。这个 该数量受 [core]parallelism Airflow 配置选项的限制, 我们将进一步介绍这一点。

    在 Cloud Composer 2 环境中,默认值为 系统会自动计算[celery]worker_concurrency

    • 对于 Airflow 版本 2.3.3 及更高版本,设置了 [celery]worker_concurrency 32 * worker_CPU 和 8 * worker_memory 中的最小值。

    • 对于 Airflow 版本 2.2.5 或更低版本,[celery]worker_concurrency 设为 12 * 工作器的 CPU 数量。

  • 活跃 DAG 运行次数上限

    [core]max_active_runs_per_dag Airflow 配置选项控制每个 DAG 的活跃 DAG 运行次数的上限。如果次数达到上限,则调度器不会继续创建更多 DAG 运行。

    如果此参数设置不正确,您可能会遇到调度器限制 DAG 执行的问题,因为它无法在给定时刻创建更多 DAG 运行实例。

  • 每个 DAG 的有效任务数上限

    [core]max_active_tasks_per_dag Airflow 配置选项可控制每个 DAG 中可以并发运行的任务实例数上限。它是 DAG 级别的参数。

    如果此参数设置不正确,您可能会遇到单个 DAG 实例执行缓慢的问题,因为在给定时间可执行的 DAG 任务数有限。

    解决方案:增加 [core]max_active_tasks_per_dag

  • 最大并行数量和池大小

    [core]parallelism Airflow 配置选项可控制在满足任务的所有依赖项后,Airflow 调度器可将多少任务添加到执行程序的队列中。

    这是整个 Airflow 设置的全局参数。

    任务会排入队列并在池中执行。Cloud Composer 环境仅使用一个池。此池的大小可控制调度器在给定时间可将多少任务加入执行队列。如果池太小,则即使尚未达到阈值([core]parallelism 配置选项和 [celery]worker_concurrency 配置选项之和乘以 Airflow 工作器数量所得的值),相应调度器也无法将任务加入执行队列。

    您可以在 Airflow 界面中配置池大小(菜单 > 管理 > )。将池大小调整为您的环境中预期的最大并行数量。

    通常,[core]parallelism 设置为工作器数量上限的乘积 和 [celery]worker_concurrency

由于 DAG 处理器超时,调度器未调度 DAG

如需详细了解此问题,请参阅 DAG 问题排查

在达到 dagrun_timeout 后将任务标记为失败

调度器会标记尚未完成的任务(正在运行、已安排和已加入队列) 如果 DAG 运行未在 dagrun_timeout(一个 DAG 参数)。

解决方案:

Airflow 数据库面临负载压力的症状

有时,您可能会在 Airflow 调度程序日志中看到以下警告日志条目:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Airflow 工作器日志中可能还会出现类似症状:

对于 MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

对于 PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

此类错误或警告可能是 Airflow 数据库遭到 打开的连接数或查询数 同时执行(由调度器或其他 Airflow 组件) 例如 worker、触发器和 Web 服务器。

可能的解决方案:

Web 服务器显示“调度器似乎未运行”警告

调度器定期向 Airflow 报告其检测信号 数据库。Airflow Web 服务器会根据这些信息确定调度程序是否处于活动状态。

有时,如果调度程序负载过重,则可能无法每隔 [scheduler]scheduler-heartbeat-sec 报告一次心跳。

在这种情况下,Airflow Web 服务器可能会显示以下警告:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

可能的解决方案:

回填 DAG 期间遇到的问题的解决方法

有时,您可能需要重新运行已执行的 DAG。您可以通过以下方式使用 Airflow 命令行工具执行此操作:

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

如需仅针对特定 DAG 重新运行失败的任务,还可以使用 --rerun_failed_tasks 参数。

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

如需仅重新运行特定 DAG 的失败任务,还应使用 --rerun-failed-tasks 参数。

您需要将其中的:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • START_DATE,其中包含 start_date DAG 参数的值,格式为 YYYY-MM-DD
  • END_DATE 替换为 end_date DAG 参数的值,位于 YYYY-MM-DD 格式。
  • DAG_NAME 替换为 DAG 名称。

回填操作有时可能会产生死锁情况, 无法回填,因为任务已锁定。例如:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

在某些情况下,您可以使用以下权宜解决方法来解决死锁问题:

后续步骤