Airflow 调度器问题排查

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页面介绍 Airflow 调度程序和 DAG 处理器常见问题的问题排查步骤和信息。

确定问题的来源

如需开始进行问题排查,请确定问题是出现在以下时间:

  • 在 DAG 解析时,当 Airflow DAG 处理器解析 DAG 时
  • 在执行时,当 DAG 由 Airflow 调度器处理时

如需详细了解解析时间和执行时间,请参阅 DAG 解析时间与 DAG 执行时间之间的差异

检查 DAG 处理问题

  1. 检查 DAG 处理器日志
  2. 检查 DAG 解析时间

监控正在运行和已加入队列的任务

如需检查是否有任务卡在队列中,请执行以下步骤。

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。 环境详情页面会打开。

  3. 转到监控标签页。

  4. Monitoring 标签页中,查看 DAG 运行部分中的 Airflow 任务图表,以确定潜在问题。Airflow 任务是指在 Airflow 中处于排队状态的任务,它们可以进入 Celery 或 Kubernetes Executor 代理队列。“Celery 队列中的任务数”是指已进入 Celery 代理队列的任务实例数。

DAG 解析时排查问题

以下部分介绍了 DAG 解析时一些常见问题的症状和可能的修复。

任务的数量和时间分布

如果同时调度大量 DAG 或任务,Airflow 可能会出现问题。为避免出现安排方面的问题,您可以:

  • 调整 DAG,以使用较少的整合任务。
  • 调整 DAG 的调度间隔,以便在一段时间内更均匀地分布 DAG 运行。

扩缩 Airflow 配置

Airflow 提供了 Airflow 配置选项,可用于控制 Airflow 可以同时执行的任务数量和 DAG 数量。如需设置这些配置选项,请根据您的环境替换相应值。您还可以在 DAG 或任务级层设置其中一些值。

  • 工作器并发

    参数 [celery]worker_concurrency控制 Airflow 工作器可以同时执行的最大任务数。如果将此参数的值乘以您的 Cloud Composer 环境中的 Airflow 工作器的数量,将得出在您的环境中,在给定的时刻可执行的最大任务数。此数字受 [core]parallelism Airflow 配置选项的限制,之后将进一步说明此内容。

    在 Cloud Composer 3 环境中,[celery]worker_concurrency 的默认值会根据工作器可容纳的轻量级并发任务实例数量自动计算。这意味着其值取决于工作器资源限制。工作器并发值不取决于环境中的工作器数量。

  • 活跃 DAG 运行次数上限

    [core]max_active_runs_per_dag Airflow 配置选项控制每个 DAG 的活跃 DAG 运行次数的上限。如果次数达到上限,则调度器不会继续创建更多 DAG 运行。

    如果此参数设置不正确,您可能会遇到调度器限制 DAG 执行的问题,因为它无法在给定时刻创建更多 DAG 运行实例。

    您还可以使用 max_active_runs 参数在 DAG 级层设置此值。

  • 每个 DAG 的活跃任务数上限

    [core]max_active_tasks_per_dag Airflow 配置选项可控制每个 DAG 中可以并发运行的任务实例数上限。

    如果此参数设置不正确,您可能会遇到单个 DAG 实例执行缓慢的问题,因为在给定时间可执行的 DAG 任务数有限。在这种情况下,您可以增加此配置选项的值。

    您还可以使用 max_active_tasks 参数在 DAG 级层设置此值。

    您可以在任务级别使用 max_active_tis_per_dagmax_active_tis_per_dagrun 参数来控制每个 DAG 和每次 DAG 运行允许运行的具有特定任务 ID 的实例数量。

  • 最大并行数量和池大小

    [core]parallelism Airflow 配置选项可控制在满足任务的所有依赖项后,Airflow 调度器可将多少任务添加到执行程序的队列中。

    这是整个 Airflow 设置的全局参数。

    任务会排入队列并在池中执行。Cloud Composer 环境仅使用一个池。此池的大小可控制调度器在给定时间可将多少任务加入执行队列。如果池太小,则即使尚未达到阈值([core]parallelism 配置选项和 [celery]worker_concurrency 配置选项之和乘以 Airflow 工作器数量所得的值),相应调度器也无法将任务加入执行队列。

    您可以在 Airflow 界面中配置池大小(菜单 > 管理 > )。将池大小调整为您的环境中预期的最大并行数量。

    通常,[core]parallelism 设置为工作器数量上限与 [celery]worker_concurrency 的乘积。

在正在运行和已加入队列的任务中排查问题

以下部分介绍正在运行和已加入队列的任务的一些常见问题的症状和可能的修复。

DAG 运行未执行

具体情况:

如果动态设置 DAG 的调度日期,可能会导致各种意外的副作用。例如:

  • DAG 执行始终在未来,并且 DAG 从未执行。

  • 即使未执行,过去的 DAG 运行也会被标记为已执行且成功。

如需了解详情,请参阅 Apache Airflow 文档

可能的解决方案:

  • 请遵循 Apache Airflow 文档中的建议。

  • 为 DAG 设置静态 start_date。您也可以使用 catchup=False 停用针对过去日期的 DAG 运行。

  • 除非您了解此方法的副作用,否则请避免使用 datetime.now()days_ago(<number of days>)

使用 Airflow 调度程序的 TimeTable 功能

Airflow 2.2 及更高版本可以使用时间表。

您可以使用以下方法之一为 DAG 定义时间表:

您还可以使用内置时间表

避免在维护窗口内进行任务调度

您可以为环境定义维护窗口,以便在您运行 DAG 的时间之外进行环境维护。您仍然可以在维护窗口期间运行 DAG,只要可以接受某些任务可能会中断并重试即可。如需详细了解维护窗口对环境的影响,请参阅指定维护窗口

在 DAG 中使用“wait_for_downstream”

如果您将 DAG 中的参数 wait_for_downstream 设置为 True,那么如果您希望某个任务成功,则该任务的所有直接下行任务都必须成功。这意味着,由于执行之前的 DAG 运行中的任务,属于特定 DAG 运行的任务的执行可能会变慢。如需了解详情,请参阅 Airflow 文档

排队时间过长的任务将被取消并重新安排

如果 Airflow 任务在队列中保留的时间过长,则调度器会在 [scheduler]task_queued_timeout Airflow 配置选项中设置的时间量过去后,再次重新安排该任务以执行。默认值为 2400。 在 2.3.1 之前的 Airflow 版本中,如果任务符合重试条件,则也会被标记为失败并进行重试。

观察此情况的一种方法是查看包含排队任务数的图表(Cloud Composer 界面中的“监控”标签页),如果此图表中的峰值在约两小时内没有下降,则任务很可能会被重新安排(没有日志),随后调度程序日志中会出现“Adopted tasks were still pending ...”日志条目。在这种情况下,您可能会在 Airflow 任务日志中看到“Log file not found...”消息,因为该任务未执行。

一般来说,预期此行为,并且计划任务的下一个实例应该根据时间表执行。如果您在 Cloud Composer 环境中观察到许多此类情况,则可能表示您的环境中没有足够的 Airflow 工作器来处理所有计划任务。

解决方法:如需解决此问题,您需要确保 Airflow 工作器始终具有容量来运行已加入队列的任务。例如,您可以增加工作器数量或提高 worker_concurrency 值。您还可以调整并行性或池,以防止队列任务超出您具有的容量。

卡在队列中的任务可能会阻止特定 DAG 的执行

如需解决此问题,请将环境升级到 Cloud Composer 2.1.12 版或更高版本。

在一般情况下,Airflow 调度器应该能够处理这样的情况:队列中存在任务,并且由于某些原因导致无法正确执行它们(例如,这些任务所属的 DAG 被删除)。

如果调度器未清除这些任务,则您可能需要手动删除这些任务。例如,您可以在 Airflow 界面中执行此操作(菜单 > 浏览器 > 任务实例),找到已加入队列的任务并将其删除。

Cloud Composer 方法 min_file_process_interval 参数

Cloud Composer 更改了 Airflow 调度器使用 [scheduler]min_file_process_interval 的方式。

在低于 2.0.26 的 Cloud Composer 版本中,系统会忽略 [scheduler]min_file_process_interval

在 2.0.26 之后的 Cloud Composer 版本中:

在所有 DAG 都被调度一定次数后,Airflow 调度器会重启,而 [scheduler]num_runs 参数可控制调度器执行此操作的次数。当调度程序达到 [scheduler]num_runs 个调度循环时,系统会重启调度程序。调度程序是一个无状态组件,因此这种重启是针对调度程序可能遇到的任何问题的自动修复机制。[scheduler]num_runs 的默认值为 5000。

[scheduler]min_file_process_interval 可用于配置 DAG 解析的频率,但此参数不能长于调度程序在调度 DAG 时执行 [scheduler]num_runs 循环所需的时间。

在达到 dagrun_timeout 后将任务标记为失败

如果 DAG 运行未在 dagrun_timeout(一个 DAG 参数)内完成,调度程序会将未完成(正在运行、已调度和已排队)的任务标记为失败。

解决方案:

Airflow 数据库处于高负载状态的症状

有时,您可能会在 Airflow 调度程序日志中看到以下警告日志条目:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

在 Airflow 工作器日志中也可能会观察到类似的症状:

对于 MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

对于 PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

此类错误或警告可能表明 Airflow 数据库因打开的连接数或在同一时间执行的查询数过多而过载,这些连接或查询可能是由调度器或其他 Airflow 组件(如工作器、触发器和 Web 服务器)发起的。

可能的解决方案:

Web 服务器显示“调度器似乎未运行”警告

调度程序会定期向 Airflow 数据库报告其检测信号。Airflow Web 服务器会根据此信息确定调度程序是否处于活跃状态。

有时,如果调度程序负载过重,可能无法每隔 [scheduler]scheduler_heartbeat_sec 报告一次心跳。

在这种情况下,Airflow Web 服务器可能会显示以下警告:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

可能的解决方案:

  • 增加调度器的 CPU 和内存资源。

  • 优化 DAG,使其解析和调度速度更快,并且不会消耗过多的调度程序资源。

  • 避免在 Airflow DAG 中使用全局变量。请改为使用环境变量Airflow 变量

  • 增加 [scheduler]scheduler_health_check_threshold Airflow 配置选项的值,以便 Web 服务器在报告调度器不可用之前等待更长时间。

针对回填 DAG 期间遇到的问题的解决方法

有时,您可能需要重新运行已执行的 DAG。您可以使用 Airflow CLI 命令按以下方式执行此操作:

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

如需仅重新运行特定 DAG 的失败任务,也请使用 --rerun-failed-tasks 实参。

您需要进行如下替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • START_DATE,其中包含 start_date DAG 参数的值,采用 YYYY-MM-DD 格式。
  • END_DATE,其中包含 end_date DAG 参数的值,采用 YYYY-MM-DD 格式。
  • DAG_NAME 替换为 DAG 名称。

回填操作有时可能会产生死锁情况,即由于任务被锁定而无法进行回填。例如:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

在某些情况下,您可以使用以下解决方法来克服死锁:

  • 通过将 [core]schedule_after_task_execution 替换False 来停用迷你调度程序。

  • 针对较窄的日期范围运行回填。例如,设置 START_DATEEND_DATE 可指定仅 1 天的期限。

后续步骤