Cloud Composer 1 | Cloud Composer 2
本页面提供了 Airflow 调度器常见问题的问题排查步骤和相关信息。
确定问题的来源
如需开始进行问题排查,请确定问题是出现在 DAG 解析时间还是在执行时处理任务的时候。如需详细了解解析时间和执行时间,请参阅 DAG 解析时间与 DAG 执行时间之间的差异。
检查 DAG 处理器日志
如果您有复杂的 DAG,则由调度器运行的 DAG 处理器可能不会解析所有 DAG。这可能会导致许多问题,造成以下症状。
症状:
如果 DAG 处理器在解析 DAG 时遇到问题,则可能会导致下列问题组合使用。如果 DAG 是动态生成的,则与静态 DAG 相比,这些问题可能更具影响力。
DAG 在 Airflow 界面和 DAG 界面中不可见。
未安排 DAG 执行。
DAG 处理器日志中有错误,例如:
dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR - Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
或
dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR - Processor for /home/airflow/gcs/dags/dag-example.py exited with return code 1.
Airflow 调度器遇到问题,导致调度器重启。
已安排执行的 Airflow 任务会被取消,并且对于无法解析的 DAG 的 DAG 运行可能会被标记为
failed
。例如:airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1 manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag 'dag-example'. Marking it as removed.
解决方案:
增加与 DAG 解析相关的参数:
将 dagbag-import-timeout 增加到至少 120 秒(如果需要,也可更长时间)。
将 dag-file-processor-timeout 增加到至少 180 秒(如果需要,也可更长时间)。此值必须大于
dagbag-import-timeout
。
更正或移除给 DAG 处理器带来问题的 DAG。
检查 DAG 解析时间
如需验证问题是否发生在 DAG 解析时间,请执行以下步骤。
控制台
在 Google Cloud 控制台中,您可以使用 Monitoring 页面和日志标签页检查 DAG 解析时间。
使用 Cloud Composer Monitoring 页面检查 DAG 解析时间:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。监控页面随即打开。
在 Monitoring 标签页中,查看 DAG 运行部分中的所有 DAG 文件的总解析时间图表,并找出可能的问题。
使用 Cloud Composer 日志标签页检查 DAG 解析时间:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。监控页面随即打开。
转到日志标签页,然后在所有日志导航树中选择 DAG 处理器管理器部分。
查看
dag-processor-manager
日志并找出可能的问题。
gcloud - Airflow 1
使用带有 -r
标志的 list_dags
命令可查看所有 DAG 的解析时间。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
list_dags -- -r
您需要在其中:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。
该命令的输出类似于以下内容:
-------------------------------------------------------------------
DagBag loading stats for /home/airflow/gcs/dags
-------------------------------------------------------------------
Number of DAGs: 5
Total task number: 13
DagBag parsing time: 0.6765180000000001
-----------+----------+---------+----------+-----------------------
file | duration | dag_num | task_num | dags
-----------+----------+---------+----------+-----------------------
/dag_1.py | 0.6477 | 1 | 2 | ['dag_1']
/dag_2.py | 0.018652 | 1 | 2 | ['dag_2']
/dag_3.py | 0.004024 | 1 | 6 | ['dag_3']
/dag_4.py | 0.003476 | 1 | 2 | ['dag_4']
/dag_5.py | 0.002666 | 1 | 1 | ['dag_5']
-----------+----------+---------+----------+-----------------------
查找 DagBag 解析时间值。如果值较大,则可能表示某个 DAG 未以最佳方式实施。从输出表中,您可以确定哪些 DAG 的解析时间较长。
gcloud - Airflow 2
使用 dags report
命令查看所有 DAG 的解析时间。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags report
您需要在其中:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。
该命令的输出类似于以下内容:
Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file | duration | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py | 0:00:00.038334 | 2 | 10 | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1 | 1 | airflow_monitoring
查找表中列出的每个 DAG 的 duration 值。 值较高可能表示您的某个 DAG 未以最佳方式实现。从输出表中,您可以确定哪些 DAG 的解析时间较长。
监控正在运行和已加入队列的任务
如需检查是否有任务卡在队列中,请执行以下步骤。
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境的名称。 环境详情页面会打开。
转到监控标签页。
在监控标签页中,查看 DAG 运行部分中的 Airflow 任务图表,并找出可能的问题。Airflow 任务是 Airflow 中处于排队状态的任务,它们可以转到 Celery 或 Kubernetes Executor 代理队列。Celery 已加入队列的任务是指已进入 Celery 代理队列的任务实例。
DAG 解析时排查问题
以下部分介绍了 DAG 解析时一些常见问题的症状和可能的修复。
Cloud Composer 1 和 Airflow 1 中的 DAG 解析和调度
Airflow 2 中 DAG 解析效率得到显著提高。如果您遇到与 DAG 解析和调度相关的性能问题,请考虑迁移到 Airflow 2。
在 Cloud Composer 1 中,该调度器会与其他 Cloud Composer 组件一起在集群节点上运行。因此,各个集群节点的负载可能高于或低于其他节点。调度器的性能(DAG 解析和调度)可能会因运行该调度器的节点而异。除此之外,运行调度程序的单个节点可能会因升级或维护操作而发生变化。此限制已在 Cloud Composer 2 中得到解决。在 Cloud Composer 2 中,您可以为调度器分配 CPU 和内存资源,且调度器的性能不取决于集群节点的负载。
受限线程数量
允许 DAG 处理器管理器(处理 DAG 文件的调度器的一部分)仅使用有限数量的线程可能会影响 DAG 解析时间。
如需解决此问题,请替换以下 Airflow 配置选项:
对于 Airflow 1.10.12 及更低版本,请替换
max_threads
参数:段 键 值 Notes scheduler
max_threads
NUMBER_OF_CORES_IN_MACHINE - 1
将 NUMBER_OF_CORES_IN_MACHINE
替换为工作器节点机器中的核心数
。对于 Airflow 1.10.14 及更高版本,请替换
parsing_processes
参数:段 键 值 Notes scheduler
parsing_processes
NUMBER_OF_CORES_IN_MACHINE - 1
将 NUMBER_OF_CORES_IN_MACHINE
替换为工作器节点机器中的核心数
。
任务的数量和时间分布
已知 Airflow 在安排大量小任务时会出现问题。在这种情况下,您应该选择使用较少的整合任务。
同时调度大量 DAG 或任务也可能是问题的来源。如需避免此问题,请随着时间推移更平均地分配您的任务。
在正在运行和已加入队列的任务中排查问题
以下部分介绍正在运行和已加入队列的任务的一些常见问题的症状和可能的修复。
任务队列过长
在某些情况下,任务队列对调度程序来说可能过长。如需了解如何优化工作器和 celcel 参数,请参阅将 Cloud Composer 环境与您的业务一起扩缩。
使用 Airflow 调度器的 TimeTable 功能
从 Airflow 2.2 开始,您可以使用名为 TimeTable 的新功能为 DAG 定义时间表。
您可以使用以下方法之一定义时间表:
受限集群资源
本部分仅适用于 Cloud Composer 1。
如果环境的 GKE 集群太小,无法处理所有 DAG 和任务,您可能会遇到性能问题。在这种情况下,请尝试以下解决方案之一:
- 使用性能更高的机器类型创建一个新环境,并将 DAG 迁移到该环境中。
- 创建更多 Cloud Composer 环境并在它们之间拆分 DAG。
- 按照升级 GKE 节点的机器类型中的说明更改 GKE 节点的机器类型。由于此过程容易出错,因此这是最不建议的选项。
- 升级在您的环境中运行 Airflow 数据库的 Cloud SQL 实例的机器类型,例如,使用
gcloud composer environments update
命令。Airflow 数据库性能低可能是导致调度器运行缓慢的原因。
避免在维护窗口内进行任务调度
您可以为环境定义特定的维护期。在这些时间段内,会出现 Cloud SQL 和 GKE 的维护事件。
让 Airflow 调度器忽略不必要的文件
您可以通过跳过 DAGs 文件夹中的不必要文件来提高 Airflow 调度器的性能。Airflow 调度器会忽略 .airflowignore
文件中指定的文件和文件夹。
如需让 Airflow 调度器忽略不必要的文件,请执行以下操作:
- 创建
.airflowignore
文件。 - 在此文件中,列出应忽略的文件和文件夹。
- 将此文件上传到环境存储桶中的
/dags
文件夹。
如需详细了解 .airflowignore
文件格式,请参阅 Airflow 文档。
Airflow 调度器进程暂停 DAG
Airflow 用户会暂停 DAG 以避免执行。这样可以缩短 Airflow 工作器处理周期。
Airflow 调度器会继续解析已暂停的 DAG。如果您确实想要提高 Airflow 调度器性能,请使用 .airflowignore
或从 DAGs 文件夹中删除已暂停的 DAG。
在 DAG 中使用“wait_for_downstream”
如果您将 DAG 中的参数 wait_for_downstream
设置为 True
,那么如果您希望某个任务成功,则该任务的所有直接下行任务都必须成功。这意味着,由于执行之前的 DAG 运行中的任务,属于特定 DAG 运行的任务的执行可能会变慢。如需了解详情,请参阅 Airflow 文档。
排队时间过长的任务将被取消并重新安排
如果 Airflow 任务在队列中保留的时间过长,则调度程序将再次重新安排该任务以执行(在 2.3.1 之前的 Airflow 版本中,如果符合重试条件,该任务也会被标记为失败并重试)。
要观察这种情况的症状,一种方法是查看包含排队任务数量的图表(“Cloud Composer 界面中的“监控”标签页),如果此图表中的峰值在大约两个小时内没有下降,则任务很可能会重新调度(没有日志),然后显示调度器日志中的“Adopted tasks are still pending ...”日志条目。在这种情况下,您可能会在 Airflow 任务日志中看到“Log file is not found...”消息,因为相应任务未执行。
通常,此行为符合预期,预定任务的下一个实例将根据时间表执行。如果您在 Cloud Composer 环境中观察到大量此类情况,则可能意味着您的环境中没有足够的 Airflow 工作器来处理所有计划任务。
解决方法:如需解决此问题,您需要确保 Airflow 工作器始终具有容量来运行已加入队列的任务。例如,您可以增加工作器数量或 worker_concurrency。您还可以调整并行数量或池,以防止排队任务超出您的容量。
有时,过时的任务可能会阻止特定 DAG 的执行
在常规情况下,Airflow 调度器应能够处理队列中存在过时任务并且由于某种原因无法正确执行这些任务的情况(例如,过时任务所属的 DAG 已被删除)。
如果调度器未清除这些过时任务,您可能需要手动删除它们。例如,您可以在 Airflow 界面中执行此操作。导航到(菜单 > 浏览器 > 任务实例),查找属于过时 DAG 的已加入队列的任务并将其删除。
如需解决此问题,请将您的环境升级到 Cloud Composer 2.1.12 或更高版本。
Cloud Composer 对 [scheduler]min_file_process_interval
参数的方法
Cloud Composer 改变了 Airflow 调度器使用 [scheduler]min_file_process_interval
的方式。
Airflow 1
如果 Cloud Composer 使用 Airflow 1,则用户可以将 [scheduler]min_file_process_interval
的值设置为 0 到 600 秒之间。高于 600 秒的值会产生与将 [scheduler]min_file_process_interval
设置为 600 秒时相同的结果。
Airflow 2
在 Airflow 2 中,[scheduler]min_file_process_interval
只能用于 1.19.9、2.0.26 或更高版本
早于 1.19.9 和 2.0.26 的 Cloud Composer 版本
在这些版本中,
[scheduler]min_file_process_interval
会被忽略。Cloud Composer 版本 1.19.9、2.0.26 或更高版本
Airflow 调度器会在所有 DAG 调度达到一定次数后重启,并且
[scheduler]num_runs
参数用于控制调度器执行该操作的次数。当调度器达到[scheduler]num_runs
调度循环时,就会重启。调度器是一个无状态组件,此类重启是一种自动修复机制,用于修复调度器可能遇到的任何问题。如果未指定,则系统会应用[scheduler]num_runs
的默认值,即 5000。[scheduler]min_file_process_interval
可用于配置 DAG 解析的频率,但此参数不能超过调度程序在安排 DAG 时执行[scheduler]num_runs
循环所需的时间。
扩缩 Airflow 配置
Airflow 提供了 Airflow 配置选项,可用于控制 Airflow 可以同时执行的任务数量和 DAG 数量。如需设置这些配置选项,请根据您的环境替换它们的值。
-
参数
[celery]worker_concurrency
控制 Airflow 工作器可以同时执行的最大任务数。如果将此参数的值乘以您的 Cloud Composer 环境中的 Airflow 工作器的数量,将得出在您的环境中,在给定的时刻可执行的最大任务数。此数量受[core]parallelism
Airflow 配置选项的限制,下文对此进行了详细介绍。在 Cloud Composer 2 环境中,系统会自动计算
[celery]worker_concurrency
的默认值对于 Airflow 2.3.3 及更高版本,
[celery]worker_concurrency
设置为 32、12 * worker_CPU 和 8 * worker_memory 中的最小值。对于 Airflow 2.2.5 或更低版本,
[celery]worker_concurrency
设置为 12 * 工作器 CPU 数量。
-
[core]max_active_runs_per_dag
Airflow 配置选项控制每个 DAG 的活跃 DAG 运行次数的上限。如果次数达到上限,则调度器不会继续创建更多 DAG 运行。如果此参数设置不正确,您可能会遇到调度器限制 DAG 执行的问题,因为它无法在给定时刻创建更多 DAG 运行实例。
-
[core]max_active_tasks_per_dag
Airflow 配置选项可控制每个 DAG 中可以并发运行的任务实例数上限。它是 DAG 级别的参数。如果此参数设置不正确,您可能会遇到单个 DAG 实例执行缓慢的问题,因为在给定时间可执行的 DAG 任务数有限。
解决方法:增加
[core]max_active_tasks_per_dag
。 最大并行数量和池大小
[core]parallelism
Airflow 配置选项可控制在满足任务的所有依赖项后,Airflow 调度器可将多少任务添加到执行程序的队列中。这是整个 Airflow 设置的全局参数。
任务会排入队列并在池中执行。Cloud Composer 环境仅使用一个池。此池的大小可控制调度器在给定时间可将多少任务加入执行队列。如果池太小,则即使尚未达到阈值(
[core]parallelism
配置选项和[celery]worker_concurrency
配置选项之和乘以 Airflow 工作器数量所得的值),相应调度器也无法将任务加入执行队列。您可以在 Airflow 界面中配置池大小(菜单 > 管理 > 池)。将池大小调整为您的环境中预期的最大并行数量。
通常,
[core]parallelism
是工作器数量上限与 [celery]worker_concurrency 的乘积。
由于 DAG 处理器超时,调度器未调度 DAG
如需详细了解此问题,请参阅 DAG 问题排查。
达到 dagrun_timeout
后将任务标记为失败
如果 DAG 运行未在 dagrun_timeout
(DAG 参数)中完成,则调度器会将未完成(正在运行、已调度和已加入队列)的任务标记为失败。
解决方案:
扩展
dagrun_timeout
以满足超时要求。
Airflow 数据库承受负载压力的症状
有时,您可能会在 Airflow 调度器日志中看到以下警告日志条目:
Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"
在 Airflow 工作器日志中也可能观察到类似症状:
对于 MySQL:
(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"
对于 PostgreSQL:
psycopg2.OperationalError: connection to server at ... failed
此类错误或警告可能表现为 Airflow 数据库被调度器或其他 Airflow 组件(如工作器、触发器和 Web 服务器)执行,打开连接的数量或同时执行的查询数量过多。
可能的解决方案:
对 Airflow 数据库进行纵向扩容:
- (Cloud Composer 1) 更改存储您环境的 Airflow 数据库的 Cloud SQL 实例的机器类型。
- (Cloud Composer 2) 调整环境大小。
减少调度器数量。在大多数情况下,一个或两个调度器就足以解析和调度 Airflow 任务;除非有正当理由,否则不建议配置两个以上的调度器。
避免在 Airflow DAG 中使用全局变量:Cloud Composer 环境变量和 Airflow 变量。
将 [scheduler]scheduler-heartbeat-sec 设置为较高的值,例如设置为 15 秒或更长时间。
将 [scheduler]job-heartbeat-sec 设置为更高的值,例如 30 秒或更长时间。
将 [scheduler]scheduler_health_check_threshold 设置为等于
[scheduler]job-heartbeat-sec
乘以4
得出的值。
Web 服务器显示“The scheduler does not look to be running”(调度程序似乎正在运行)警告
调度器会定期向 Airflow 数据库报告其检测信号。Airflow Web 服务器会根据此信息确定调度器是否处于活跃状态。
有时,如果调度器处于高负载状态,则可能无法每次 [scheduler]scheduler-heartbeat-sec 报告其检测信号。
在这种情况下,Airflow Web 服务器可能会显示以下警告:
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
可能的解决方案:
为调度器增加资源的 CPU 和内存。
优化您的 DAG,使其解析和调度速度更快,且不会占用过多调度器资源。
避免在 Airflow DAG 中使用全局变量:Cloud Composer 环境变量和 Airflow 变量。
增加 [scheduler]scheduler-health-check-threshold 的值,以使 Web 服务器等待更长时间后再报告调度器不可用。
回填 DAG 期间遇到问题的解决方法
有时,您可能希望重新运行已执行的 DAG。您可以通过以下方式使用 Airflow 命令行工具执行此操作:
Airflow 1
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
如需仅针对特定 DAG 重新运行失败的任务,也请使用 --rerun_failed_tasks
参数。
Airflow 2
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
如需仅针对特定 DAG 重新运行失败的任务,也请使用 --rerun-failed-tasks
参数。
您需要在其中:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。- 将
START_DATE
替换为start_date
DAG 参数的值,采用YYYY-MM-DD
格式。 - 将
END_DATE
替换为end_date
DAG 参数的值,采用YYYY-MM-DD
格式。 DAG_NAME
替换为 DAG 名称。
回填操作有时可能会产生死锁情况,即由于任务锁定了而无法进行回填。例如:
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
在某些情况下,您可以使用以下解决方法来解决死锁问题:
通过将
[core]schedule-after-task-execution
替换为False
来停用迷你调度器。针对较小的日期范围运行回填。例如,设置
START_DATE
和END_DATE
可指定仅包含 1 天的时间段。