Airflow 调度器问题排查

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本页面介绍了针对常见错误情况的问题排查步骤和信息 Airflow 调度器的常见问题

确定问题的来源

如需开始进行问题排查,请确定问题是出现在 DAG 解析时间还是在执行时处理任务的时候。如需详细了解解析时间和执行时间,请参阅 DAG 解析时间与 DAG 执行时间之间的差异

检查 DAG 处理器日志

如果您有复杂的 DAG,则使用 DAG 处理器,该处理器由 调度器,因此可能不会解析所有 DAG。这可能会导致许多问题 会出现以下症状。

症状

  • 如果 DAG 处理器在解析 DAG 时遇到问题,则可能会导致下面列出的问题组合发生。如果 DAG 是动态生成的,则与静态 DAG 相比,这些问题可能更严重。

  • DAG 在 Airflow 界面和 DAG 界面中不可见。

  • DAG 未安排执行。

  • DAG 处理器日志中存在错误,例如:

    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Airflow 调度器遇到问题,会导致调度器重启。

  • 已安排执行的 Airflow 任务会被取消,DAG 会运行 对于无法解析的 DAG,可能会被标记为 failed。例如:

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

解决方案:

  • 增加与 DAG 解析相关的参数:

  • 更正或移除导致 DAG 处理器出现问题的 DAG。

检查 DAG 解析时间

如需验证问题是否发生在 DAG 解析时间,请执行以下步骤。

控制台

在 Google Cloud 控制台中,您可以使用 Monitoring 页面和 日志标签页检查 DAG 解析时间。

使用 Cloud Composer 监控页面检查 DAG 解析时间:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。此时将打开 Monitoring 页面。

  3. Monitoring 标签页中,查看 DAG 运行部分中的所有 DAG 文件的总解析时间图表,并找出可能的问题。

    Composer Monitoring 标签页中的 DAG 运行部分显示您环境中 DAG 的健康状况指标

使用 Cloud Composer 日志标签页检查 DAG 解析时间:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。此时将打开 Monitoring 页面。

  3. 转到日志标签页,然后从所有日志导航树中 选择 DAG 处理器管理器部分。

  4. 查看 dag-processor-manager 日志并找出可能存在的问题。

    DAG 处理器日志将显示 DAG 解析时间

gcloud

使用 dags report 命令可查看所有 DAG 的解析时间。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

您需要将其中的:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。

该命令的输出类似于以下内容:

Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

查找表中列出的每个日期的 duration 值。 值较大可能表示您的某个 DAG 未实现 优化应用通过输出表,您可以确定哪些 DAG 具有 解析时间较长。

监控正在运行和已加入队列的任务

如需检查是否有任务卡在队列中,请执行以下步骤。

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。 环境详情页面会打开。

  3. 转到监控标签页。

  4. 监控标签页中,查看 Airflow 任务图表 了解 DAG 运行部分,并找出可能存在的问题。Airflow 任务 在 Airflow 中处于排队状态的任务,它们可以 Celery 或 Kubernetes Executor 代理队列。Celery 队列中的任务是任务 添加到 Celery 代理队列中的实例。

DAG 解析时排查问题

以下部分介绍了 DAG 解析时一些常见问题的症状和可能的修复。

受限线程数量

允许使用 DAG 处理器管理器(调度器的一部分, 处理 DAG 文件)以仅使用有限数量的线程,这可能会影响 DAG 解析时间。

如需解决此问题,请替换以下 Airflow 配置选项:

  • 对于 Airflow 1.10.12 及更低版本,请替换 max_threads 参数

    备注
    scheduler max_threads NUMBER_OF_CORES_IN_MACHINE - 1 NUMBER_OF_CORES_IN_MACHINE 替换为核心数
    部署在工作器节点机器中
  • 对于 Airflow 1.10.14 及更高版本,请替换 parsing_processes 参数

    备注
    scheduler parsing_processes NUMBER_OF_CORES_IN_MACHINE - 1 NUMBER_OF_CORES_IN_MACHINE 替换为核心数
    部署在工作器节点机器中

任务的数量和时间分布

已知 Airflow 在安排大量小任务时会出现问题。在这种情况下,您应该选择使用较少的整合任务。

同时调度大量 DAG 或任务也可能是问题的来源。如需避免此问题,请随着时间推移更平均地分配您的任务。

在正在运行和已加入队列的任务中排查问题

以下部分介绍正在运行和已加入队列的任务的一些常见问题的症状和可能的修复。

任务队列过长

在某些情况下,任务队列对于调度器来说可能太长。 如需了解如何优化工作器和 celcel 参数,请参阅将 Cloud Composer 环境与您的业务一起扩缩

使用 Airflow 调度器的 TimeTable 功能

从 Airflow 2.2 开始,您可以使用 名为 TimeTable 的新功能

您可以使用以下方法之一定义时间表:

受限集群资源

本部分仅适用于 Cloud Composer 1。

如果环境的 GKE 集群太小,无法处理所有 DAG 和任务,您可能会遇到性能问题。在这种情况下,请尝试以下解决方案之一:

  • 使用性能更高的机器类型创建一个新环境,并将 DAG 迁移到该环境中。
  • 创建更多 Cloud Composer 环境并在它们之间拆分 DAG。
  • 按照升级 GKE 节点的机器类型中的说明更改 GKE 节点的机器类型。由于此过程容易出错,因此这是最不建议的选项。
  • 升级在您的环境中运行 Airflow 数据库的 Cloud SQL 实例的机器类型,例如,使用 gcloud composer environments update 命令。Airflow 数据库性能低可能是导致 调度程序运行缓慢

避免在维护窗口内进行任务调度

您可以为自己的集群定义特定的维护窗口 环境在这些时间段内,会出现 Cloud SQL 和 GKE 的维护事件。

让 Airflow 调度器忽略不必要的文件

您可以通过跳过 DAGs 文件夹中的不必要文件来提高 Airflow 调度器的性能。Airflow 调度器会忽略 .airflowignore 文件中指定的文件和文件夹。

如需让 Airflow 调度器忽略不必要的文件,请执行以下操作:

  1. 创建 .airflowignore 文件。
  2. 在此文件中,列出应忽略的文件和文件夹。
  3. 将此文件上传到以下位置的“/dags”文件夹: 环境的存储桶中。

如需详细了解 .airflowignore 文件格式,请参阅 Airflow 文档

Airflow 调度器进程暂停 DAG

Airflow 用户会暂停 DAG 以避免执行。这样可以节省 Airflow 工作器 处理周期。

Airflow 调度器会继续解析已暂停的 DAG。如果你确实想 提高 Airflow 调度器性能,请使用 .airflowignore 或删除已暂停项 DAGs 文件夹中的 DAG。

在 DAG 中使用“wait_for_downstream”

如果您将 DAG 中的参数 wait_for_downstream 设置为 True,那么如果您希望某个任务成功,则该任务的所有直接下行任务都必须成功。这意味着,由于执行之前的 DAG 运行中的任务,属于特定 DAG 运行的任务的执行可能会变慢。有关详情,请访问 Airflow 文档

排队时间过长的任务将被取消并重新安排

如果 Airflow 任务在队列中保留的时间过长,则调度器 将重新安排执行作业(在 Airflow 2.3.1 之前的版本中, 该任务也会被标记为失败,并会在符合重试条件时重试)。

观察这种情况的症状的一种方式 查看包含排队任务数量的图表 (Cloud Composer 界面中的“监控”标签页) 如果此图表中的峰值在大约两个小时内没有下降 这些任务很可能会重新安排(没有日志),然后 “已采用的任务仍待处理 ...”调度器日志中的日志条目。 在这种情况下,您可能会看到“未找到日志文件...”消息 Airflow 任务日志中,因为该任务未执行。

一般情况下,此行为符合预期,已安排的 按计划执行任务如果您观察到大量 遇到这种情况时,可能意味着 您的环境中没有足够的 Airflow 工作器来处理所有 计划任务。

解决方法:如需解决此问题,您需要确保 Airflow 工作器始终具有容量来运行已加入队列的任务。例如,您可以增加 或 worker_concurrency 属性。您还可以将并行处理或池调整为 避免在排队时超出您的容量

有时,过时的任务可能会阻止特定 DAG 的执行

在一般情况下,Airflow 调度器应该能够处理 因为队列中有过时的任务,但由于某种原因, 正确执行这些任务(例如,过时任务所属的 DAG) 已删除)。

如果调度器未完全清除这些过时任务,您可能需要 手动将其删除例如,在 Airflow 界面中,您可以 导航到(菜单 &gt; 浏览器 &gt;Task 实例),找到属于过时 DAG 的已加入队列的任务并将其删除。

如需解决此问题,请将您的环境升级到 Cloud Composer 2.1.12 或更高版本。

使用 Cloud Composer 处理 [scheduler]min_file_process_interval 参数的方法

Cloud Composer 更改了 Airflow 调度器使用 [scheduler]min_file_process_interval 的方式。

Airflow 1

如果 Cloud Composer 使用 Airflow 1,则用户可以设置 [scheduler]min_file_process_interval 的取值范围为 0 到 600 秒。值大于 与将 [scheduler]min_file_process_interval 设置为 600 秒时得到的结果相同。

Airflow 2

在 Airflow 2 中,[scheduler]min_file_process_interval 只能用于 1.19.9 和 2.0.26 或更高版本

  • 1.19.9 和 2.0.26 之前的 Cloud Composer 版本

    在这些版本中,[scheduler]min_file_process_interval 会被忽略。

  • Cloud Composer 1.19.9 版或 2.0.26 版或更高版本

    Airflow 调度器会在所有 DAG 达到一定次数后重启 [scheduler]num_runs 参数用于控制调度器执行该任务的次数。当调度器时 达到 [scheduler]num_runs 个调度循环后, 调度程序是一个无状态组件,此类重启 自动修复机制,用于解决调度程序可能遇到的任何问题。 如果未指定,则使用默认值 [scheduler]num_runs 的值 即 5000

    [scheduler]min_file_process_interval 可用于配置 发生 DAG 解析,但此参数不得超过所需的时间 让调度器执行 [scheduler]num_runs 在安排 DAG 时可能出现循环。

扩缩 Airflow 配置

Airflow 提供了 Airflow 配置选项,可用于控制 Airflow 可以同时执行的任务数量和 DAG 数量。要设置这些配置选项 根据您的环境替换它们的值。

  • 工作器并发

    参数 [celery]worker_concurrency控制 Airflow 工作器可以同时执行的最大任务数。如果将此参数的值乘以您的 Cloud Composer 环境中的 Airflow 工作器的数量,将得出在您的环境中,在给定的时刻可执行的最大任务数。本次 该数量受 [core]parallelism Airflow 配置选项的限制, 我们将进一步介绍这一点。

    在 Cloud Composer 2 环境中,默认值为 系统会自动计算[celery]worker_concurrency

    • 对于 Airflow 版本 2.3.3 及更高版本,设置了 [celery]worker_concurrency 32 * worker_CPU 和 8 * worker_memory 中的最小值。

    • 对于 Airflow 2.2.5 或更低版本,[celery]worker_concurrency 为 设置为 12 * 工作器数量CPU。

  • 活跃 DAG 运行次数上限

    [core]max_active_runs_per_dag Airflow 配置选项控制每个 DAG 的活跃 DAG 运行次数的上限。如果次数达到上限,则调度器不会继续创建更多 DAG 运行。

    如果此参数设置不正确,您可能会遇到调度器限制 DAG 执行的问题,因为它无法在给定时刻创建更多 DAG 运行实例。

  • 每个 DAG 的有效任务数上限

    [core]max_active_tasks_per_dag Airflow 配置选项可控制每个 DAG 中可以并发运行的任务实例数上限。它是 DAG 级别的参数。

    如果此参数设置不正确,您可能会遇到单个 DAG 实例执行缓慢的问题,因为在给定时间可执行的 DAG 任务数有限。

    解决方案:增加 [core]max_active_tasks_per_dag

  • 最大并行数量和池大小

    [core]parallelism Airflow 配置选项可控制在满足任务的所有依赖项后,Airflow 调度器可将多少任务添加到执行程序的队列中。

    这是整个 Airflow 设置的全局参数。

    任务会排入队列并在池中执行。Cloud Composer 环境仅使用一个池。此池的大小可控制调度器在给定时间可将多少任务加入执行队列。如果池太小,则即使尚未达到阈值([core]parallelism 配置选项和 [celery]worker_concurrency 配置选项之和乘以 Airflow 工作器数量所得的值),相应调度器也无法将任务加入执行队列。

    您可以在 Airflow 界面中配置池大小(菜单 > 管理 > )。将池大小调整为您的环境中预期的最大并行数量。

    通常,[core]parallelism 设置为工作器数量上限的乘积 和 [celery]worker_concurrency

由于 DAG 处理器超时,调度器无法调度 DAG

如需详细了解此问题,请参阅 DAG 问题排查

达到 dagrun_timeout 后将任务标记为失败

调度器会标记尚未完成的任务(正在运行、已安排和已加入队列) 如果 DAG 运行未在 dagrun_timeout(一个 DAG 参数)。

解决方案:

Airflow 数据库面临负载压力的症状

有时,您可能会在 Airflow 调度器日志中看到 以下警告日志条目:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Airflow 工作器日志中可能还会出现类似症状:

对于 MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

对于 PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

此类错误或警告可能是 Airflow 数据库遭到 打开的连接数或查询数 同时执行(由调度器或其他 Airflow 组件) 例如 worker、触发器和 Web 服务器。

可能的解决方案:

Web 服务器显示“调度程序似乎未运行”警告

调度器定期向 Airflow 报告其检测信号 数据库。Airflow Web 服务器会根据这些信息确定 调度程序是否处于活动状态。

有时,如果调度器负载过重,则可能就无法 报告检测信号时间间隔 [scheduler]scheduler-heartbeat-sec

在这种情况下,Airflow Web 服务器可能会显示以下警告:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

可能的解决方案:

回填 DAG 期间遇到的问题的解决方法

有时,您可能希望重新运行已执行的 DAG。 您可以通过以下方式使用 Airflow 命令行工具执行此操作:

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

如需仅针对特定 DAG 重新运行失败的任务,还可以使用 --rerun_failed_tasks 参数。

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

如需仅针对特定 DAG 重新运行失败的任务,还可以使用 --rerun-failed-tasks 参数。

您需要将其中的:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • START_DATE 替换为 start_date DAG 参数的值,位于 YYYY-MM-DD 格式。
  • END_DATE 替换为 end_date DAG 参数的值,位于 YYYY-MM-DD 格式。
  • DAG_NAME 替换为 DAG 名称。

回填操作有时可能会产生死锁情况, 无法回填,因为任务已锁定。例如:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

在某些情况下,您可以使用以下解决方法来解决死锁问题:

后续步骤