排查 Airflow 调度器问题

Cloud Composer 1 | Cloud Composer 2

本页面提供了有关 Airflow 调度器常见问题的问题排查步骤和信息。

确定问题的来源

如需开始进行问题排查,请确定问题是出现在 DAG 解析时间还是在执行时处理任务的时候。如需详细了解解析时间和执行时间,请参阅 DAG 解析时间与 DAG 执行时间之间的差异

检查 DAG 解析时间

如需验证问题是否发生在 DAG 解析时间,请执行以下步骤。

控制台

  1. 在 Google Cloud Console 中,转到环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到监控标签页。

  4. Monitoring 标签页中,查看 DAG 运行部分中的所有 DAG 文件的总解析时间图表,并找出可能的问题。

    “ Composer 监控”标签页中的“ DAG 运行”部分显示了您环境中的 DAG 的运行状况指标

gcloud

使用带有 -r 标志的 list_dags 命令可查看所有 DAG 的解析时间。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    list_dags -- -r

替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。

该命令的输出类似于以下内容:

-------------------------------------------------------------------
DagBag loading stats for /home/airflow/gcs/dags
-------------------------------------------------------------------
Number of DAGs: 5
Total task number: 13
DagBag parsing time: 0.6765180000000001
-----------+----------+---------+----------+-----------------------
file       | duration | dag_num | task_num | dags
-----------+----------+---------+----------+-----------------------
/dag_1.py  | 0.6477   |       1 |        2 | ['dag_1']
/dag_2.py  | 0.018652 |       1 |        2 | ['dag_2']
/dag_3.py  | 0.004024 |       1 |        6 | ['dag_3']
/dag_4.py  | 0.003476 |       1 |        2 | ['dag_4']
/dag_5.py  | 0.002666 |       1 |        1 | ['dag_5']
-----------+----------+---------+----------+-----------------------

查找 DagBag 解析时间值。如果值较大,则可能表示某个 DAG 未以最佳方式实施。从输出表中,您可以确定哪些 DAG 的解析时间较长。

监控正在运行和已加入队列的任务

如需检查是否有任务卡在队列中,请执行以下步骤。

  1. 在 Google Cloud Console 中,转到环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境的名称。 环境详情页面会打开。

  3. 转到监控标签页。

  4. 监控标签页中,查看 DAG 运行部分中的正在运行和已加入队列的任务图表,并确定可能的问题。

DAG 解析时排查问题

以下部分介绍了 DAG 解析时一些常见问题的症状和可能的修复。

受限线程数量

允许 DAG 处理器管理器(负责处理 DAG 文件的调度程序的一部分)只使用有限数量的线程可能会影响您的 DAG 解析时间。

如需解决此问题,请对 airflow.cfg 配置文件应用以下更改:

  • 对于 Airflow 1.10.12 及更低版本,请使用 max_threads 参数

    [scheduler]
    max_threads = <NUMBER_OF_CORES_IN_MACHINE - 1>
    
  • 对于 Airflow 1.10.14 及更高版本,请使用 parsing_processes 参数

    [scheduler]
    parsing_processes = <NUMBER_OF_CORES_IN_MACHINE - 1>
    

NUMBER_OF_CORES_IN_MACHINE 替换为工作器节点中核心的数量。

任务的数量和时间分布

已知 Airflow 在安排大量小任务时会出现问题。在这种情况下,您应该选择使用较少的整合任务。

同时调度大量 DAG 或任务也可能是问题的来源。如需避免此问题,请随着时间推移更平均地分配您的任务。

在正在运行和已加入队列的任务中排查问题

以下部分介绍正在运行和已加入队列的任务的一些常见问题的症状和可能的修复。

任务队列过长

在某些情况下,对于调度程序而言,任务队列可能太长了。如需了解如何优化工作器和 celcel 参数,请参阅将 Cloud Composer 环境与您的业务一起扩缩

受限集群资源

本部分仅适用于 Cloud Composer 1。

如果环境的 GKE 集群太小,无法处理所有 DAG 和任务,您可能会遇到性能问题。在这种情况下,请尝试以下解决方案之一:

  • 使用性能更高的机器类型创建一个新环境,并将 DAG 迁移到该环境中。
  • 创建更多 Cloud Composer 环境并在它们之间拆分 DAG。
  • 按照升级 GKE 节点的机器类型中的说明更改 GKE 节点的机器类型。由于此过程容易出错,因此这是最不建议的选项。
  • 升级在您的环境中运行 Airflow 数据库的 Cloud SQL 实例的机器类型,例如,使用 gcloud composer environments update 命令。Airflow 数据库性能不佳可能是调度程序运行缓慢的原因。

避免在维护窗口内进行任务调度

您可以为您的环境定义特定的维护期。在这些时间段内,会出现 Cloud SQL 和 GKE 的维护事件。

让 Airflow 调度器忽略不必要的文件

您可以通过跳过 DAGs 文件夹中的不必要文件来提高 Airflow 调度器的性能。Airflow 调度器会忽略 .airflowignore 文件中指定的文件和文件夹。

如需让 Airflow 调度器忽略不必要的文件,请执行以下操作:

  1. 创建 .airflowignore 文件。
  2. 在此文件中,列出应忽略的文件和文件夹。
  3. 将此文件上传到您环境存储分区中的 /dags 文件夹。

如需详细了解 .airflowignore 文件格式,请参阅 Airflow 文档

Airflow 调度器进程暂停 DAG

Airflow 用户会暂停 DAG 以避免执行。这样可以节省 Airflow 工作器的处理周期。

Airflow 调度器会继续解析已暂停的 DAG。如果您确实想要提高 Airflow 调度器性能,请使用 .airflowignore 或从 DAGs 文件夹删除已暂停的 DAG。

在 DAG 中使用“wait_for_downstream”

如果您将 DAG 中的参数 wait_for_downstream 设置为 True,那么如果您希望某个任务成功,则该任务的所有直接下行任务都必须成功。这意味着,由于执行之前的 DAG 运行中的任务,属于特定 DAG 运行的任务的执行可能会变慢。如需了解详情,请参阅 Airflow 文档

排队时间过长的任务将被取消并重新安排

如果 Airflow 任务在队列中保留的时间过长,则调度程序会将其标记为 failed/up_for_retry 并再次重新调度以执行。观察此情况的症状的一种方法是查看包含排队任务数量的图表(Cloud Composer 界面中的“监控”标签页),如果此图表中峰值在大约 10 分钟内未下降,则很有可能会发生任务失败(无日志)后跟“已采用的任务仍记录的日志...”的情况。在这种情况下,您可能会在 Airflow 任务日志中看到“Log file not found...”消息,因为该任务未执行。

通常,此任务失败是正常的,已安排的任务的下一个实例将根据计划执行。如果您在 Cloud Composer 环境中观察到许多此类情况,则可能意味着您的环境中没有足够的 Airflow 工作器来处理所有安排的任务。

解决方法:如需解决此问题,您需要确保 Airflow 工作器始终具有容量来运行已加入队列的任务。例如,您可以增加工作器数量或 Worker_concurrency。您还可以调整并行性或池,以防止将任务排入队列容量超过您的容量。

有时,过时的任务可能会阻止特定 DAG 的执行

通常情况下,Airflow 调度器应该能够处理队列中存在过时任务的情况,并且由于某种原因,它无法正确执行这些任务(例如,过时任务所属的 DAG 已被删除)。

如果调度程序未完全清除这些过时的任务,您可能需要手动将其删除。例如,您可以在 Airflow 界面中执行此操作 - 您可以导航到 (Menu > Browser > Task Instance),找到属于过时 DAG 的排队任务并将其删除。

Cloud Composer 方法 min_file_process_interval 参数

Cloud Composer 更改了 Airflow 调度器使用 min_file_process_interval 的方式。

对于使用 Airflow 1 的 Cloud Composer,用户可以将 min_file_process_interval 的值设置为 0 到 600 秒。高于 600 秒的值将产生与将 min_file_process_interval 设置为 600 秒相同的结果。

对于使用 Airflow 2 的 Cloud Composer,用户可以将 min_file_process_interval 的值设置为 0 到 1200 秒之间的值。高于 1200 秒的值将产生与 min_file_process_interval 设置为 1200 秒相同的结果。

扩缩 Airflow 配置

Airflow 提供了 Airflow 配置选项,可用于控制 Airflow 可以同时执行的任务数量和 DAG 数量。要设置这些配置选项,请根据您的环境替换它们的值。

  • 工作器并发

    参数 [celery]worker_concurrency控制 Airflow 工作器可以同时执行的最大任务数。如果将此参数的值乘以您的 Cloud Composer 环境中的 Airflow 工作器的数量,将得出在您的环境中,在给定的时刻可执行的最大任务数。此数字受到 [core]parallelism Airflow 配置选项的限制,详见下文。

  • 活跃的 DAG 运行次数上限

    [core]max_active_runs_per_dag Airflow 配置选项控制每个 DAG 的活跃 DAG 运行次数的上限。如果次数达到上限,则调度器不会继续创建更多 DAG 运行。

    如果此参数设置不正确,您可能会遇到调度器限制 DAG 执行的问题,因为它无法在给定时刻创建更多 DAG 运行实例。

  • 每个 DAG 的活跃任务数上限

    [core]max_active_tasks_per_dag Airflow 配置选项可控制每个 DAG 中可以并发运行的任务实例数上限。它是 DAG 级别的参数。

    如果此参数设置不正确,您可能会遇到单个 DAG 实例执行缓慢的问题,因为在给定时间可执行的 DAG 任务数有限。

    解决方案:增加 [core]max_active_tasks_per_dag

  • 最大并行数量和池大小

    [core]parallelism Airflow 配置选项可控制在满足任务的所有依赖项后,Airflow 调度器可将多少任务添加到执行程序的队列中。

    这是整个 Airflow 设置的全局参数。

    任务会排入队列并在池中执行。Cloud Composer 环境仅使用一个池。此池的大小可控制调度器在给定时间可将多少任务加入执行队列。如果池太小,则即使尚未达到阈值([core]parallelism 配置选项和 [celery]worker_concurrency 配置选项之和乘以 Airflow 工作器数量所得的值),相应调度器也无法将任务加入执行队列。

    您可以在 Airflow 界面中配置池大小(菜单 > 管理 > )。将池大小调整为您的环境中预期的最大并行数量。

由于 DAG 处理器超时,调度程序未安排 DAG

如需详细了解此问题,请参阅排查 DAG 问题

到达 dagrun_timeout 后将任务标记为失败

如果 DAG 运行未在 dagrun_timeout(一个 DAG 参数)内完成,则调度程序会将未完成(正在运行、已安排和已加入队列)的任务标记为失败。

解决方案:

Airflow 数据库出现负载压力时的症状

有时,在 Airflow 调度器日志中,您可能会看到以下警告日志条目

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

此类错误或警告可能是 Airflow 元数据数据库被操作过多的症状。

可能的解决方案:

后续步骤