Airflow 调度器问题排查

Cloud Composer 1 | Cloud Composer 2

本页面提供了 Airflow 调度器常见问题的问题排查步骤和相关信息。

确定问题的来源

如需开始进行问题排查,请确定问题是出现在 DAG 解析时间还是在执行时处理任务的时候。如需详细了解解析时间和执行时间,请参阅 DAG 解析时间与 DAG 执行时间之间的差异

检查 DAG 处理器日志

如果您有复杂的 DAG,则由调度器运行的 DAG 处理器可能不会解析所有 DAG。这可能会导致许多问题,造成以下症状。

症状

  • 如果 DAG 处理器在解析 DAG 时遇到问题,则可能会导致下列问题组合使用。如果 DAG 是动态生成的,则与静态 DAG 相比,这些问题可能更具影响力。

  • DAG 在 Airflow 界面和 DAG 界面中不可见。

  • 未安排 DAG 执行。

  • DAG 处理器日志中有错误,例如:

    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Airflow 调度器遇到问题,导致调度器重启。

  • 已安排执行的 Airflow 任务会被取消,并且对于无法解析的 DAG 的 DAG 运行可能会被标记为 failed。例如:

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

解决方案:

  • 增加与 DAG 解析相关的参数:

  • 更正或移除给 DAG 处理器带来问题的 DAG。

检查 DAG 解析时间

如需验证问题是否发生在 DAG 解析时间,请执行以下步骤。

控制台

在 Google Cloud 控制台中,您可以使用 Monitoring 页面和日志标签页检查 DAG 解析时间。

使用 Cloud Composer Monitoring 页面检查 DAG 解析时间:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。监控页面随即打开。

  3. Monitoring 标签页中,查看 DAG 运行部分中的所有 DAG 文件的总解析时间图表,并找出可能的问题。

    Composer Monitoring 标签页中的 DAG 运行部分会显示您环境中 DAG 的运行状况指标

使用 Cloud Composer 日志标签页检查 DAG 解析时间:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。监控页面随即打开。

  3. 转到日志标签页,然后在所有日志导航树中选择 DAG 处理器管理器部分。

  4. 查看 dag-processor-manager 日志并找出可能的问题。

    DAG 处理器日志将显示 DAG 解析时间

gcloud

使用 dags report 命令查看所有 DAG 的解析时间。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

您需要在其中:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。

该命令的输出类似于以下内容:

Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

查找表中列出的每个 DAG 的 duration 值。 值较高可能表示您的某个 DAG 未以最佳方式实现。从输出表中,您可以确定哪些 DAG 的解析时间较长。

监控正在运行和已加入队列的任务

如需检查是否有任务卡在队列中,请执行以下步骤。

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境的名称。 环境详情页面会打开。

  3. 转到监控标签页。

  4. 监控标签页中,查看 DAG 运行部分中的 Airflow 任务图表,并找出可能的问题。Airflow 任务是 Airflow 中处于排队状态的任务,它们可以转到 Celery 或 Kubernetes Executor 代理队列。Celery 已加入队列的任务是指已进入 Celery 代理队列的任务实例。

DAG 解析时排查问题

以下部分介绍了 DAG 解析时一些常见问题的症状和可能的修复。

受限线程数量

允许 DAG 处理器管理器(处理 DAG 文件的调度器的一部分)仅使用有限数量的线程可能会影响 DAG 解析时间。

如需解决此问题,请替换以下 Airflow 配置选项:

  • 对于 Airflow 1.10.12 及更低版本,请替换 max_threads 参数

    Notes
    scheduler max_threads NUMBER_OF_CORES_IN_MACHINE - 1 NUMBER_OF_CORES_IN_MACHINE 替换为工作器节点机器中的核心数
  • 对于 Airflow 1.10.14 及更高版本,请替换 parsing_processes 参数

    Notes
    scheduler parsing_processes NUMBER_OF_CORES_IN_MACHINE - 1 NUMBER_OF_CORES_IN_MACHINE 替换为工作器节点机器中的核心数

任务的数量和时间分布

已知 Airflow 在安排大量小任务时会出现问题。在这种情况下,您应该选择使用较少的整合任务。

同时调度大量 DAG 或任务也可能是问题的来源。如需避免此问题,请随着时间推移更平均地分配您的任务。

在正在运行和已加入队列的任务中排查问题

以下部分介绍正在运行和已加入队列的任务的一些常见问题的症状和可能的修复。

任务队列过长

在某些情况下,任务队列对调度程序来说可能过长。如需了解如何优化工作器和 celcel 参数,请参阅将 Cloud Composer 环境与您的业务一起扩缩

使用 Airflow 调度器的 TimeTable 功能

从 Airflow 2.2 开始,您可以使用名为 TimeTable 的新功能为 DAG 定义时间表。

您可以使用以下方法之一定义时间表:

受限集群资源

本部分仅适用于 Cloud Composer 1。

如果环境的 GKE 集群太小,无法处理所有 DAG 和任务,您可能会遇到性能问题。在这种情况下,请尝试以下解决方案之一:

  • 使用性能更高的机器类型创建一个新环境,并将 DAG 迁移到该环境中。
  • 创建更多 Cloud Composer 环境并在它们之间拆分 DAG。
  • 按照升级 GKE 节点的机器类型中的说明更改 GKE 节点的机器类型。由于此过程容易出错,因此这是最不建议的选项。
  • 升级在您的环境中运行 Airflow 数据库的 Cloud SQL 实例的机器类型,例如,使用 gcloud composer environments update 命令。Airflow 数据库性能低可能是导致调度器运行缓慢的原因。

避免在维护窗口内进行任务调度

您可以为环境定义特定的维护期。在这些时间段内,会出现 Cloud SQL 和 GKE 的维护事件。

让 Airflow 调度器忽略不必要的文件

您可以通过跳过 DAGs 文件夹中的不必要文件来提高 Airflow 调度器的性能。Airflow 调度器会忽略 .airflowignore 文件中指定的文件和文件夹。

如需让 Airflow 调度器忽略不必要的文件,请执行以下操作:

  1. 创建 .airflowignore 文件。
  2. 在此文件中,列出应忽略的文件和文件夹。
  3. 将此文件上传到环境存储桶中的 /dags 文件夹。

如需详细了解 .airflowignore 文件格式,请参阅 Airflow 文档

Airflow 调度器进程暂停 DAG

Airflow 用户会暂停 DAG 以避免执行。这样可以缩短 Airflow 工作器处理周期。

Airflow 调度器会继续解析已暂停的 DAG。如果您确实想要提高 Airflow 调度器性能,请使用 .airflowignore 或从 DAGs 文件夹中删除已暂停的 DAG。

在 DAG 中使用“wait_for_downstream”

如果您将 DAG 中的参数 wait_for_downstream 设置为 True,那么如果您希望某个任务成功,则该任务的所有直接下行任务都必须成功。这意味着,由于执行之前的 DAG 运行中的任务,属于特定 DAG 运行的任务的执行可能会变慢。如需了解详情,请参阅 Airflow 文档

排队时间过长的任务将被取消并重新安排

如果 Airflow 任务在队列中保留的时间过长,则调度程序将再次重新安排该任务以执行(在 2.3.1 之前的 Airflow 版本中,如果符合重试条件,该任务也会被标记为失败并重试)。

要观察这种情况的症状,一种方法是查看包含排队任务数量的图表(“Cloud Composer 界面中的“监控”标签页),如果此图表中的峰值在大约两个小时内没有下降,则任务很可能会重新调度(没有日志),然后显示调度器日志中的“Adopted tasks are still pending ...”日志条目。在这种情况下,您可能会在 Airflow 任务日志中看到“Log file is not found...”消息,因为相应任务未执行。

通常,此行为符合预期,预定任务的下一个实例将根据时间表执行。如果您在 Cloud Composer 环境中观察到大量此类情况,则可能意味着您的环境中没有足够的 Airflow 工作器来处理所有计划任务。

解决方法:如需解决此问题,您需要确保 Airflow 工作器始终具有容量来运行已加入队列的任务。例如,您可以增加工作器数量或 worker_concurrency。您还可以调整并行数量或池,以防止排队任务超出您的容量。

有时,过时的任务可能会阻止特定 DAG 的执行

在常规情况下,Airflow 调度器应能够处理队列中存在过时任务并且由于某种原因无法正确执行这些任务的情况(例如,过时任务所属的 DAG 已被删除)。

如果调度器未清除这些过时任务,您可能需要手动删除它们。例如,您可以在 Airflow 界面中执行此操作。导航到(菜单 > 浏览器 > 任务实例),查找属于过时 DAG 的已加入队列的任务并将其删除。

如需解决此问题,请将您的环境升级到 Cloud Composer 2.1.12 或更高版本。

Cloud Composer 对 [scheduler]min_file_process_interval 参数的方法

Cloud Composer 改变了 Airflow 调度器使用 [scheduler]min_file_process_interval 的方式。

Airflow 1

如果 Cloud Composer 使用 Airflow 1,则用户可以将 [scheduler]min_file_process_interval 的值设置为 0 到 600 秒之间。高于 600 秒的值会产生与将 [scheduler]min_file_process_interval 设置为 600 秒时相同的结果。

Airflow 2

在 Airflow 2 中,[scheduler]min_file_process_interval 只能用于 1.19.9、2.0.26 或更高版本

  • 早于 1.19.9 和 2.0.26 的 Cloud Composer 版本

    在这些版本中,[scheduler]min_file_process_interval 会被忽略。

  • Cloud Composer 版本 1.19.9、2.0.26 或更高版本

    Airflow 调度器会在所有 DAG 调度达到一定次数后重启,并且 [scheduler]num_runs 参数用于控制调度器执行该操作的次数。当调度器达到 [scheduler]num_runs 调度循环时,就会重启。调度器是一个无状态组件,此类重启是一种自动修复机制,用于修复调度器可能遇到的任何问题。如果未指定,则系统会应用 [scheduler]num_runs 的默认值,即 5000。

    [scheduler]min_file_process_interval 可用于配置 DAG 解析的频率,但此参数不能超过调度程序在安排 DAG 时执行 [scheduler]num_runs 循环所需的时间。

扩缩 Airflow 配置

Airflow 提供了 Airflow 配置选项,可用于控制 Airflow 可以同时执行的任务数量和 DAG 数量。如需设置这些配置选项,请根据您的环境替换它们的值。

  • 工作器并发

    参数 [celery]worker_concurrency控制 Airflow 工作器可以同时执行的最大任务数。如果将此参数的值乘以您的 Cloud Composer 环境中的 Airflow 工作器的数量,将得出在您的环境中,在给定的时刻可执行的最大任务数。此数量受 [core]parallelism Airflow 配置选项的限制,下文对此进行了详细介绍。

    在 Cloud Composer 2 环境中,系统会自动计算 [celery]worker_concurrency 的默认值

    • 对于 Airflow 2.3.3 及更高版本,[celery]worker_concurrency 设置为 32、12 * worker_CPU 和 8 * worker_memory 中的最小值。

    • 对于 Airflow 2.2.5 或更低版本,[celery]worker_concurrency 设置为 12 * 工作器 CPU 数量。

  • 活跃 DAG 运行次数上限

    [core]max_active_runs_per_dag Airflow 配置选项控制每个 DAG 的活跃 DAG 运行次数的上限。如果次数达到上限,则调度器不会继续创建更多 DAG 运行。

    如果此参数设置不正确,您可能会遇到调度器限制 DAG 执行的问题,因为它无法在给定时刻创建更多 DAG 运行实例。

  • 每个 DAG 活跃任务数上限

    [core]max_active_tasks_per_dag Airflow 配置选项可控制每个 DAG 中可以并发运行的任务实例数上限。它是 DAG 级别的参数。

    如果此参数设置不正确,您可能会遇到单个 DAG 实例执行缓慢的问题,因为在给定时间可执行的 DAG 任务数有限。

    解决方法:增加 [core]max_active_tasks_per_dag

  • 最大并行数量和池大小

    [core]parallelism Airflow 配置选项可控制在满足任务的所有依赖项后,Airflow 调度器可将多少任务添加到执行程序的队列中。

    这是整个 Airflow 设置的全局参数。

    任务会排入队列并在池中执行。Cloud Composer 环境仅使用一个池。此池的大小可控制调度器在给定时间可将多少任务加入执行队列。如果池太小,则即使尚未达到阈值([core]parallelism 配置选项和 [celery]worker_concurrency 配置选项之和乘以 Airflow 工作器数量所得的值),相应调度器也无法将任务加入执行队列。

    您可以在 Airflow 界面中配置池大小(菜单 > 管理 > )。将池大小调整为您的环境中预期的最大并行数量。

    通常,[core]parallelism 是工作器数量上限与 [celery]worker_concurrency 的乘积。

由于 DAG 处理器超时,调度器未调度 DAG

如需详细了解此问题,请参阅 DAG 问题排查

达到 dagrun_timeout 后将任务标记为失败

如果 DAG 运行未在 dagrun_timeout(DAG 参数)中完成,则调度器会将未完成(正在运行、已调度和已加入队列)的任务标记为失败。

解决方案:

Airflow 数据库承受负载压力的症状

有时,您可能会在 Airflow 调度器日志中看到以下警告日志条目:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

在 Airflow 工作器日志中也可能观察到类似症状:

对于 MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

对于 PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

此类错误或警告可能表现为 Airflow 数据库被调度器或其他 Airflow 组件(如工作器、触发器和 Web 服务器)执行,打开连接的数量或同时执行的查询数量过多。

可能的解决方案:

Web 服务器显示“The scheduler does not look to be running”(调度程序似乎正在运行)警告

调度器会定期向 Airflow 数据库报告其检测信号。Airflow Web 服务器会根据此信息确定调度器是否处于活跃状态。

有时,如果调度器处于高负载状态,则可能无法每次 [scheduler]scheduler-heartbeat-sec 报告其检测信号。

在这种情况下,Airflow Web 服务器可能会显示以下警告:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

可能的解决方案:

回填 DAG 期间遇到问题的解决方法

有时,您可能希望重新运行已执行的 DAG。您可以通过以下方式使用 Airflow 命令行工具执行此操作:

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

如需仅针对特定 DAG 重新运行失败的任务,也请使用 --rerun_failed_tasks 参数。

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

如需仅针对特定 DAG 重新运行失败的任务,也请使用 --rerun-failed-tasks 参数。

您需要在其中:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • START_DATE 替换为 start_date DAG 参数的值,采用 YYYY-MM-DD 格式。
  • END_DATE 替换为 end_date DAG 参数的值,采用 YYYY-MM-DD 格式。
  • DAG_NAME 替换为 DAG 名称。

回填操作有时可能会产生死锁情况,即由于任务锁定了而无法进行回填。例如:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

在某些情况下,您可以使用以下解决方法来解决死锁问题:

  • 通过将 [core]schedule-after-task-execution 替换False 来停用迷你调度器。

  • 针对较小的日期范围运行回填。例如,设置 START_DATEEND_DATE 可指定仅包含 1 天的时间段。

后续步骤