Airflow 调度器问题排查

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

本页面介绍 Airflow 调度程序常见问题和问题排查步骤和信息。

确定问题的来源

如需开始进行问题排查,请确定问题是出现在 DAG 解析时间还是在执行时处理任务的时候。如需详细了解解析时间和执行时间,请参阅 DAG 解析时间与 DAG 执行时间之间的差异

检查 DAG 处理器日志

如果您有复杂的 DAG,由调度器运行的 DAG 处理器可能无法解析您的所有 DAG。这可能会导致许多问题,这些问题具有以下症状。

症状

  • 如果 DAG 处理器在解析 DAG 时遇到问题,则可能由下面列出的问题组合导致。与静态 DAG 相比,如果 DAG 是动态生成的,这些问题的影响可能会更大。

  • DAG 在 Airflow 界面和 DAG 界面中不可见。

  • DAG 不会安排执行时间。

  • DAG 处理器日志中存在错误,例如:

    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Airflow 调度器遇到问题,会导致调度器重启。

  • 已安排执行的 Airflow 任务会被取消,DAG 会运行 对于无法解析的 DAG,可能会被标记为 failed。例如:

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

解决方案:

  • 增加与 DAG 解析相关的参数:

  • 更正或移除会导致 DAG 处理器出现问题的 DAG。

检查 DAG 解析时间

如需验证问题是否发生在 DAG 解析时间,请执行以下步骤。

控制台

在 Google Cloud 控制台中,您可以使用 Monitoring 页面和 日志标签页检查 DAG 解析时间。

使用 Cloud Composer“监控”页面检查 DAG 解析时间:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。此时将打开 Monitoring 页面。

  3. Monitoring 标签页中,查看 DAG 运行部分中的所有 DAG 文件的总解析时间图表,并找出可能的问题。

    “Composer Monitoring”标签页中的“DAG 运行”部分显示您所在环境中 DAG 的运行状况指标

使用 Cloud Composer 日志标签页检查 DAG 解析时间:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。系统随即会打开监控页面。

  3. 转到日志标签页,然后从所有日志导航树中 选择 DAG 处理器管理器部分。

  4. 查看 dag-processor-manager 日志并找出可能存在的问题。

    DAG 处理器日志将显示 DAG 解析时间

gcloud

使用 dags report 命令可查看所有 DAG 的解析时间。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

您需要将其中的:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。

该命令的输出类似于以下内容:

Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

查找表中列出的每个日期的 duration 值。 值较大可能表示您的某个 DAG 未实现 优化应用从输出表中,您可以确定哪些 DAG 的解析时间较长。

监控正在运行和已加入队列的任务

如需检查是否有任务卡在队列中,请执行以下步骤。

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到监控标签页。

  4. 监控标签页中,查看 Airflow 任务图表 DAG 运行部分中,并找出可能存在的问题。Airflow 任务是指在 Airflow 中处于队列状态的任务,它们可以进入 Celery 或 Kubernetes Executor 代理队列。“Celery 队列中的任务数”是指已进入 Celery 代理队列的任务实例数。

DAG 解析时排查问题

以下部分介绍了 DAG 解析时一些常见问题的症状和可能的修复。

受限线程数量

允许使用 DAG 处理器管理器(调度器中 处理 DAG 文件)以仅使用有限数量的线程,这可能会影响 DAG 解析时间。

如需解决此问题,请替换以下 Airflow 配置选项:

  • 对于 Airflow 1.10.12 及更低版本,请替换 max_threads 参数

    备注
    scheduler max_threads NUMBER_OF_CORES_IN_MACHINE - 1 NUMBER_OF_CORES_IN_MACHINE 替换为核心数
    部署在工作器节点机器中
  • 对于 Airflow 1.10.14 及更高版本,请替换 parsing_processes 参数

    备注
    scheduler parsing_processes NUMBER_OF_CORES_IN_MACHINE - 1 NUMBER_OF_CORES_IN_MACHINE 替换为工作器节点中核心的数量

任务的数量和时间分布

已知 Airflow 在安排大量小任务时会出现问题。在这种情况下,您应该选择使用较少的整合任务。

同时调度大量 DAG 或任务也可能是问题的来源。如需避免此问题,请随着时间推移更平均地分配您的任务。

在正在运行和已加入队列的任务中排查问题

以下部分介绍正在运行和已加入队列的任务的一些常见问题的症状和可能的修复。

任务队列过长

在某些情况下,调度器的任务队列可能会过长。如需了解如何优化工作器和 celcel 参数,请参阅将 Cloud Composer 环境与您的业务一起扩缩

使用 Airflow 调度器的 TimeTable 功能

从 Airflow 2.2 开始,您可以使用名为 TimeTable 的新功能为 DAG 定义时间表。

您可以使用以下任一方法定义时间表:

受限集群资源

本部分仅适用于 Cloud Composer 1。

如果环境的 GKE 集群太小,无法处理所有 DAG 和任务,您可能会遇到性能问题。在这种情况下,请尝试以下解决方案之一:

  • 使用性能更高的机器类型创建一个新环境,并将 DAG 迁移到该环境中。
  • 创建更多 Cloud Composer 环境并在它们之间拆分 DAG。
  • 按照升级 GKE 节点的机器类型中的说明更改 GKE 节点的机器类型。由于此过程容易出错,因此这是最不建议的选项。
  • 升级在您的环境中运行 Airflow 数据库的 Cloud SQL 实例的机器类型,例如,使用 gcloud composer environments update 命令。Airflow 数据库性能低可能是导致 调度程序运行缓慢

避免在维护窗口内进行任务调度

您可以为环境定义特定的维护窗口。在这些时间段内,会出现 Cloud SQL 和 GKE 的维护事件。

让 Airflow 调度器忽略不必要的文件

您可以通过跳过 DAGs 文件夹中的不必要文件来提高 Airflow 调度器的性能。Airflow 调度器会忽略 .airflowignore 文件中指定的文件和文件夹。

如需让 Airflow 调度器忽略不必要的文件,请执行以下操作:

  1. 创建 .airflowignore 文件。
  2. 在此文件中,列出应忽略的文件和文件夹。
  3. 将此文件上传到环境的存储桶中的 /dags 文件夹。

如需详细了解 .airflowignore 文件格式,请参阅 Airflow 文档

Airflow 调度器进程暂停 DAG

Airflow 用户会暂停 DAG 以避免执行。这样可以节省 Airflow 工作器 处理周期。

Airflow 调度器会继续解析已暂停的 DAG。如果你确实想 提升 Airflow 调度器性能,请使用 .airflowignore 或删除已暂停项 DAGs 文件夹中的 DAG。

在 DAG 中使用“wait_for_downstream”

如果您将 DAG 中的参数 wait_for_downstream 设置为 True,那么如果您希望某个任务成功,则该任务的所有直接下行任务都必须成功。这意味着,由于执行之前的 DAG 运行中的任务,属于特定 DAG 运行的任务的执行可能会变慢。如需了解详情,请参阅 Airflow 文档

排队时间过长的任务将被取消并重新安排

如果 Airflow 任务在队列中保留的时间过长,则调度器会再次重新安排它以执行(在 2.3.1 之前的 Airflow 版本中,如果任务符合重试条件,系统还会将其标记为失败并重试)。

观察这种情况的症状的一种方式 查看包含排队任务数量的图表 (Cloud Composer 界面中的“监控”标签页) 如果此图表中的峰值在大约两个小时内没有下降 这些任务很可能会重新安排(没有日志),然后 “已采用的任务仍待处理 ...”调度器日志中的日志条目。 在这种情况下,您可能会在 Airflow 任务日志中看到“Log file not found...”消息,因为该任务未执行。

一般来说,此行为符合预期,已安排的 按计划执行任务如果您观察到大量 遇到这种情况时,可能意味着 您的环境中没有足够的 Airflow 工作器来处理所有 计划任务。

解决方法:如需解决此问题,您需要确保 Airflow 工作器始终具有容量来运行已加入队列的任务。例如,您可以增加工作器数量或提高工作器并发 (worker_concurrency) 值。您还可以调整并行性或池,以防止队列任务超出您具有的容量。

有时,过时的任务可能会阻止特定 DAG 的执行

在一般情况下,Airflow 调度器应该能够处理这样的情况:队列中存在过时任务,并且由于某些原因导致无法正确执行它们(例如,过时任务所属的 DAG 被删除)。

如果调度器未清除这些过时的任务,则您可能需要手动删除这些任务。例如,在 Airflow 界面中,您可以 导航到(菜单 &gt; 浏览器 &gt;Task 实例),找到属于过时 DAG 的已加入队列的任务并将其删除。

如需解决此问题,请将您的环境升级到 Cloud Composer 2.1.12 或更高版本。

使用 Cloud Composer 处理 [scheduler]min_file_process_interval 参数的方法

Cloud Composer 更改了 Airflow 调度器使用 [scheduler]min_file_process_interval 的方式。

Airflow 1

对于使用 Airflow 1 的 Cloud Composer,用户可以将 [scheduler]min_file_process_interval 的值设置为 0 到 600 秒。高于 600 秒的值将产生与将 [scheduler]min_file_process_interval 设置为 600 秒相同的结果。

Airflow 2

在 Airflow 2 中,[scheduler]min_file_process_interval 只能用于 1.19.9 和 2.0.26 或更高版本

  • 1.19.9 和 2.0.26 之前的 Cloud Composer 版本

    在这些版本中,[scheduler]min_file_process_interval 会被忽略。

  • Cloud Composer 1.19.9 版或 2.0.26 版或更高版本

    在调度所有 DAG 的次数达到一定数目后,Airflow 调度器会重启,并且 [scheduler]num_runs 参数用于控制调度器执行此操作的次数。当调度程序到达 [scheduler]num_runs 调度循环时,它会重启 - 调度程序是无状态组件,此类重启是对调度程序可能遇到的任何问题的自动修复机制。如果未指定,系统会应用 [scheduler]num_runs 的默认值,即 5000。

    [scheduler]min_file_process_interval 可用于配置 DAG 解析的频率,但此参数不得超过调度器在调度 DAG 时执行 [scheduler]num_runs 循环所需的时间。

扩缩 Airflow 配置

Airflow 提供了 Airflow 配置选项,可用于控制 Airflow 可以同时执行的任务数量和 DAG 数量。要设置这些配置选项 根据您的环境替换它们的值。

  • 工作器并发

    参数 [celery]worker_concurrency控制 Airflow 工作器可以同时执行的最大任务数。如果将此参数的值乘以您的 Cloud Composer 环境中的 Airflow 工作器的数量,将得出在您的环境中,在给定的时刻可执行的最大任务数。此数字受 [core]parallelism Airflow 配置选项的限制,之后将进一步说明此内容。

    在 Cloud Composer 2 环境中,默认值为 系统会自动计算[celery]worker_concurrency

    • 对于 Airflow 版本 2.3.3 及更高版本,设置了 [celery]worker_concurrency 32 * worker_CPU 和 8 * worker_memory 中的最小值。

    • 对于 Airflow 2.2.5 或更低版本,[celery]worker_concurrency 为 设置为 12 * 工作器数量CPU。

  • 活跃 DAG 运行次数上限

    [core]max_active_runs_per_dag Airflow 配置选项控制每个 DAG 的活跃 DAG 运行次数的上限。如果次数达到上限,则调度器不会继续创建更多 DAG 运行。

    如果此参数设置不正确,您可能会遇到调度器限制 DAG 执行的问题,因为它无法在给定时刻创建更多 DAG 运行实例。

  • 每个 DAG 的有效任务数上限

    [core]max_active_tasks_per_dag Airflow 配置选项可控制每个 DAG 中可以并发运行的任务实例数上限。它是 DAG 级别的参数。

    如果此参数设置不正确,您可能会遇到单个 DAG 实例执行缓慢的问题,因为在给定时间可执行的 DAG 任务数有限。

    解决方案:增加 [core]max_active_tasks_per_dag

  • 最大并行数量和池大小

    [core]parallelism Airflow 配置选项可控制在满足任务的所有依赖项后,Airflow 调度器可将多少任务添加到执行程序的队列中。

    这是整个 Airflow 设置的全局参数。

    任务会排入队列并在池中执行。Cloud Composer 环境仅使用一个池。此池的大小可控制调度器在给定时间可将多少任务加入执行队列。如果池太小,则即使尚未达到阈值([core]parallelism 配置选项和 [celery]worker_concurrency 配置选项之和乘以 Airflow 工作器数量所得的值),相应调度器也无法将任务加入执行队列。

    您可以在 Airflow 界面中配置池大小(菜单 > 管理 > )。将池大小调整为您的环境中预期的最大并行数量。

    通常,[core]parallelism 设置为工作器数量上限的乘积 和 [celery]worker_concurrency

由于 DAG 处理器超时,调度器无法调度 DAG

如需详细了解此问题,请参阅排查 DAG 问题

在达到 dagrun_timeout 后将任务标记为失败

如果 DAG 运行作业未在 dagrun_timeout(DAG 参数)内完成,调度程序会将未完成的任务(正在运行、已调度和已加入队列的任务)标记为失败。

解决方案:

Airflow 数据库承受负载压力的症状

有时,您可能会在 Airflow 调度器日志中看到 以下警告日志条目:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Airflow 工作器日志中可能还会出现类似症状:

对于 MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

对于 PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

此类错误或警告可能是 Airflow 数据库遭到 打开的连接数或查询数 同时执行(由调度器或其他 Airflow 组件) 例如 worker、触发器和 Web 服务器。

可能的解决方案:

Web 服务器显示“调度程序似乎未运行”警告

调度程序会定期向 Airflow 数据库报告其心跳。Airflow Web 服务器会根据这些信息确定 调度程序是否处于活动状态。

有时,如果调度器负载过重,则可能就无法 报告检测信号时间间隔 [scheduler]scheduler-heartbeat-sec

在这种情况下,Airflow Web 服务器可能会显示以下警告:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

可能的解决方案:

针对在回填 DAG 期间遇到的问题的解决方法

有时,您可能需要重新运行已执行的 DAG。您可以使用 Airflow 命令行工具按如下方式执行此操作:

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

如需仅针对特定 DAG 重新运行失败的任务,还可以使用 --rerun_failed_tasks 参数。

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

如需仅针对特定 DAG 重新运行失败的任务,还可以使用 --rerun-failed-tasks 参数。

您需要将其中的:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • START_DATE 替换为 start_date DAG 参数的值,位于 YYYY-MM-DD 格式。
  • END_DATE,其中包含 end_date DAG 参数的值,格式为 YYYY-MM-DD
  • DAG_NAME 替换为 DAG 名称。

回填操作有时可能会产生死锁情况, 无法回填,因为任务已锁定。例如:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

在某些情况下,您可以使用以下权宜解决方法来解决死锁问题:

后续步骤